Compare commits

..

2 Commits

13 changed files with 1415 additions and 42 deletions

View File

@ -88,5 +88,20 @@ namespace YunDa.Server.ISMSTcp.Configuration
public string BeijingYounuoApiPushYounuoAlert => $"{BaseUrl.TrimEnd('/')}/api/services/SOMS/BeijingYounuoApi/PushYounuoAlert";
public string BeijingYounuoApiDeleteAlarmsByTwinId => $"{BaseUrl.TrimEnd('/')}/api/services/SOMS/BeijingYounuoApi/DeleteAlarmsByTwinId";
/// <summary>
/// 二次回路巡检计划 URI
/// </summary>
public string SecondaryCircuitInspectionPlanUri => $"{BaseUrl.TrimEnd('/')}/api/services/SOMS/SecondaryCircuitInspectionPlan/GetList";
/// <summary>
/// 获取孪生体与遥测数据绑定关系 URI
/// </summary>
public string ThingDeviceBindingConfigUri => $"{BaseUrl.TrimEnd('/')}/api/services/SOMS/TwinDataBinding/FindDatas";
/// <summary>
/// 获取设备和孪生体的关联 URI
/// </summary>
public string ThingSimDatasUri => $"{BaseUrl.TrimEnd('/')}/api/services/SOMS/RackEquipment/GetSimDatas";
}
}

View File

@ -2,9 +2,11 @@ using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using Org.BouncyCastle.Crypto;
using Org.BouncyCastle.Utilities;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
@ -253,22 +255,27 @@ namespace YunDa.Server.ISMSTcp.Controllers
/// <remarks>
/// 示例请求GET /api/CallYCByDataId?dataIds=CallYCByDataID|001#002#003
/// </remarks>
[HttpGet("CallYCByDataId")]
[HttpPost("CallYCByDataId")]
public async Task<IActionResult> CallYCByDataId(
[FromQuery] string cmd,
[FromQuery] int times,
[FromBody] CallYCByDataIdRequest request,
CancellationToken cancellationToken = default)
{
var id = request.Id;
var times = request.Times;
id.RemoveAll(string.IsNullOrWhiteSpace);
string cmd = string.Format("CallYCByDataID|{0}", string.Join("#", id));
try
{
// 参数验证
var validationResult = ValidateDataIdsParameter(cmd);
if (!validationResult.IsValid)
if (id.Count == 0)
{
return BadRequest(new
{
success = false,
message = validationResult.ErrorMessage,
message = "未提供数据ID",
timestamp = DateTime.Now
});
}
@ -276,6 +283,8 @@ namespace YunDa.Server.ISMSTcp.Controllers
_logger.LogInformation("收到遥测数据召唤请求 - cmd: {cmd}", cmd);
// 发送TCP消息
DateTime cmdTime = DateTime.Now;
int sucessCount = 0;
for (int i = 0; i < times; i++)
{
@ -287,15 +296,44 @@ namespace YunDa.Server.ISMSTcp.Controllers
await Task.Delay(1000);
}
if (times-sucessCount<3)
{
return Ok(new
List<YCResultData> ycDataList = null;
var sw = Stopwatch.StartNew();
while(sw.ElapsedMilliseconds < 10*1000)
{
success = true,
message = "遥测数据召唤命令已发送",
timestamp = DateTime.Now
});
ycDataList = await _telemeteringHandle.GetYCDataByDataIds(id, cmdTime, sucessCount);
if (ycDataList.Count == id.Count)
break;
await Task.Delay(100);
}
if (ycDataList?.Count == id.Count)
{
return Ok(new
{
success = true,
message = "获取遥测数据成功",
data = ycDataList,
count = ycDataList.Count,
timestamp = DateTime.Now
});
}
else
{
return StatusCode(500, new
{
success = false,
message = "发送命令成功,获取遥测数据超时",
timestamp = DateTime.Now
});
}
}
else
{
@ -329,34 +367,6 @@ namespace YunDa.Server.ISMSTcp.Controllers
}
}
/// <summary>
/// 验证数据ID参数
/// </summary>
/// <param name="dataIds">数据ID字符串</param>
/// <returns>验证结果</returns>
private (bool IsValid, string ErrorMessage) ValidateDataIdsParameter(string dataIds)
{
if (string.IsNullOrWhiteSpace(dataIds))
{
return (false, "参数 dataIds 不能为空");
}
// 检查格式是否符合 CallYCByDataID|DataID1#DataID2#...
if (!dataIds.StartsWith("CallYCByDataID|", StringComparison.OrdinalIgnoreCase))
{
return (false, "参数格式错误应为CallYCByDataID|DataID1#DataID2#DataID3#......");
}
// 检查是否包含数据ID
var parts = dataIds.Split('|');
if (parts.Length < 2 || string.IsNullOrWhiteSpace(parts[1]))
{
return (false, "未提供数据ID格式应为CallYCByDataID|DataID1#DataID2#DataID3#......");
}
return (true, string.Empty);
}
/// <summary>
/// 发送TCP消息
/// </summary>

View File

@ -46,6 +46,21 @@ namespace YunDa.Server.ISMSTcp.Domain
/// </summary>
string LoginUri { get; }
/// <summary>
/// 二次回路巡检计划 URI
/// </summary>
string SecondaryCircuitInspectionPlanUri { get; }
/// <summary>
/// 获取孪生体与遥测数据绑定关系 URI
/// </summary>
string ThingDeviceBindingConfigUri { get; }
/// <summary>
///
/// 获取设备和孪生体的关联 URI
/// </summary>
string ThingSimDatasUri { get; }
}
/// <summary>
@ -104,5 +119,20 @@ namespace YunDa.Server.ISMSTcp.Domain
/// 获取用户登录的URI
/// </summary>
public string LoginUri => _config.LoginUri;
/// <summary>
/// 二次回路巡检计划 URI
/// </summary>
public string SecondaryCircuitInspectionPlanUri => _config.SecondaryCircuitInspectionPlanUri;
/// <summary>
/// 获取孪生体与遥测数据绑定关系 URI
/// </summary>
public string ThingDeviceBindingConfigUri => _config.ThingDeviceBindingConfigUri;
/// <summary>
/// 获取设备和孪生体的关联 URI
/// </summary>
public string ThingSimDatasUri => _config.ThingSimDatasUri;
}
}

View File

@ -8,8 +8,10 @@ using System.Text;
using System.Threading.Tasks;
using ToolLibrary;
using YunDa.Server.ISMSTcp.Models;
using YunDa.Server.ISMSTcp.Services;
using YunDa.SOMS.DataTransferObject;
using YunDa.SOMS.DataTransferObject.Account;
using YunDa.SOMS.DataTransferObject.DataMonitoring.SecondaryCircuitInspection;
using YunDa.SOMS.DataTransferObject.ExternalEntities.BeijingYounuo;
using YunDa.SOMS.DataTransferObject.GeneralInformation.ProtectionDeviceInfoDto;
using YunDa.SOMS.DataTransferObject.MaintenanceAndOperations.SecondaryEquipment;
@ -622,5 +624,100 @@ namespace YunDa.Server.ISMSTcp.Domain
}
}
public async Task UploadAlertMessageAsync(List<AlertData> alertDatas)
{
try
{
//告警上传
foreach (var alert in alertDatas)
{
await HttpHelper.HttpPostRequestAsync<object>(_apiEndpoints.AlarmUploadUri, alert);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error Call UploadAlertMessageAsync Api");
}
}
/// <summary>
/// 获取二次回路巡检计划
/// </summary>
/// <returns>二次回路巡检计划列表</returns>
public async Task<List<SecondaryCircuitInspectionPlanOutput>> GetSecondaryCircuitInspectionPlanListAsync()
{
try
{
var response = await HttpHelper.HttpPostRequestAsync<JObject>(_apiEndpoints.SecondaryCircuitInspectionPlanUri, new object());
if (response != null)
{
var result = ExtractDataFromAbpResponse<List<SecondaryCircuitInspectionPlanOutput>>(response);
return result;
}
}
catch(Exception ex)
{
}
return null;
}
/// <summary>
/// 获取孪生体与遥测数据绑定关系
/// </summary>
public async Task<List<ThingDeviceBindingModel>> GetThingDeviceBindingConfigAsync()
{
try
{
var response = await HttpHelper.HttpPostRequestAsync<JObject>(_apiEndpoints.ThingDeviceBindingConfigUri, new object());
if (response != null)
{
var result = ExtractDataFromAbpResponse<List<ThingDeviceBindingModel>>(response);
return result;
}
}
catch (Exception ex)
{
}
return null;
}
/// <summary>
/// 获取设备和孪生体的关联,使用这个关联关系,推送设备数据到是三维场景
/// </summary>
public async Task<List<ThingDeviceBindingModel>> GetThingSimDatasAsync()
{
try
{
var response = await HttpHelper.HttpPostRequestAsync<JObject>(_apiEndpoints.ThingSimDatasUri, new object());
if (response != null)
{
var result = ExtractDataFromAbpResponse<List<ThingDeviceBindingModel>>(response);
return result;
}
}
catch (Exception ex)
{
}
return null;
}
}
}

View File

@ -126,6 +126,9 @@ namespace YunDa.Server.ISMSTcp.Extensions
services.AddSingleton<TelemeteringHandle>();
services.AddSingleton<TelesignalisationHandle>();
services.AddSingleton<VirtualTerminalHandler>();
services.AddSingleton<SecondaryCircuitInspectionPlanService>();
services.AddSingleton<ThingService>();
services.AddSingleton<ThingApiService>();
// 日志过滤器配置
services.Configure<YunDa.Server.ISMSTcp.Filters.Configuration.LogFilterConfiguration>(

View File

@ -19,4 +19,51 @@ namespace YunDa.Server.ISMSTcp.Models
[JsonPropertyName("T")]
public string T { get; set; } = string.Empty;
}
public class CallYCByDataIdRequest
{
public List<string> Id { get; set; }
public int Times { get; set; }
}
public class YCResultValue
{
[JsonPropertyName("Value")]
public decimal Value { get; set; }
[JsonPropertyName("TimeStamp")]
public string TimeStamp { get; set; } = string.Empty;
}
public class YCResultData
{
[JsonPropertyName("Id")]
public string Id { get; set; } = string.Empty;
[JsonPropertyName("Name")]
public string Name { get; set; } = string.Empty;
[JsonPropertyName("DispatcherAddress")]
public int? DispatcherAddress { get; set; } = null;
[JsonPropertyName("Data")]
public List<YCResultValue> Data { get; set; } = new List<YCResultValue>();
public void ParseValue(List<YCData> datas)
{
Data.Clear();
foreach (var data in datas)
{
Data.Add(new YCResultValue() { Value = data.V, TimeStamp = data.T });
}
}
}
}

View File

@ -94,6 +94,9 @@ namespace YunDa.Server.ISMSTcp
// 添加主机服务ConsoleInterface 已在 AddISMSServices 中注册)
services.AddHostedService<ConsoleInterface>();
// 注册 WebApi 配置类
services.Configure<WebApiSettings>(context.Configuration.GetSection("WebApi"));
})
.UseSerilog()
.UseConsoleLifetime();

View File

@ -26,6 +26,7 @@ using YunDa.Server.ISMSTcp.Models;
using YunDa.SOMS.DataTransferObject.GeneralInformation.ProtectionDeviceInfoDto;
using YunDa.SOMS.DataTransferObject.MaintenanceAndOperations.SecondaryEquipment;
using YunDa.SOMS.Redis.Repositories;
using static System.Runtime.InteropServices.JavaScript.JSType;
namespace YunDa.Server.ISMSTcp.Services
{
@ -54,6 +55,10 @@ namespace YunDa.Server.ISMSTcp.Services
private readonly IWebSocketPushService _webSocketPushService;
private readonly IRedisRepository<ProtectionDeviceCommInfoOutput, string> _protectionDeviceCommInfoRedis;
private readonly IApiEndpoints _apiEndpoints;
private readonly WebApiRequest _webApiRequest;
//孪生体服务
private readonly ThingService _thingService;
// 高频数据处理优化常量
private const int MaxDataSize = 50000; // 单次处理的最大数据大小50KB
@ -66,6 +71,9 @@ namespace YunDa.Server.ISMSTcp.Services
private int _totalProcessedPackets = 0;
private int _discardedDataCount = 0;
//二次回路巡检计划
private readonly SecondaryCircuitInspectionPlanService _secondaryCircuitInspectionPlanService;
public DataProcessor(
ILogger<DataProcessor> logger,
MessageParser messageParser,
@ -80,7 +88,10 @@ namespace YunDa.Server.ISMSTcp.Services
IQueryCoordinationService queryCoordinationService,
IWebSocketPushService webSocketPushService,
IRedisRepository<ProtectionDeviceCommInfoOutput, string> protectionDeviceCommInfoRedis,
IApiEndpoints apiEndpoints)
IApiEndpoints apiEndpoints,
WebApiRequest webApiRequest,
SecondaryCircuitInspectionPlanService secondaryCircuitInspectionPlanService,
ThingService thingService)
{
_logger = logger;
_messageParser = messageParser;
@ -96,6 +107,9 @@ namespace YunDa.Server.ISMSTcp.Services
_webSocketPushService = webSocketPushService ?? throw new ArgumentNullException(nameof(webSocketPushService));
_protectionDeviceCommInfoRedis = protectionDeviceCommInfoRedis ?? throw new ArgumentNullException(nameof(protectionDeviceCommInfoRedis));
_apiEndpoints = apiEndpoints ?? throw new ArgumentNullException(nameof(apiEndpoints));
_webApiRequest = webApiRequest ?? throw new ArgumentNullException(nameof(webApiRequest));
_secondaryCircuitInspectionPlanService = secondaryCircuitInspectionPlanService ?? throw new ArgumentNullException(nameof(secondaryCircuitInspectionPlanService));
_thingService = thingService ?? throw new ArgumentNullException(nameof(thingService));
}
/// <summary>
@ -235,8 +249,24 @@ namespace YunDa.Server.ISMSTcp.Services
{
await _telesignalisationHandle.InitAsync();
}
if (contentToken is JArray contentArray && contentArray.Count > 0)
{
//2025-10-25 所有值加1
foreach (var item in contentArray)
{
if (item["V"] != null && item["V"].Type == JTokenType.String)
{
string val = item["V"].Value<string>();
if(int.TryParse(val, out int value))
{
item["V"] = (value + 1).ToString();
}
}
}
// 🔧 优化:根据数据量区分处理方式
// 大批量数据(>10仅更新Redis不触发转发逻辑命令响应场景
// 小批量数据(<=10完整处理包括转发主动变位场景
@ -260,6 +290,9 @@ namespace YunDa.Server.ISMSTcp.Services
await _telesignalisationHandle.ProcessYXDataAsync(item, skipForwarding: false);
}
}
//将状态推送到孪生体
_thingService.UpdateDeviceChangeStatus(contentArray.ToObject<List<YXData>>());
}
else
@ -571,6 +604,11 @@ namespace YunDa.Server.ISMSTcp.Services
// 处理故障报告类型的告警
//await ProcessFaultReportsAsync(alertDatas);
//报警上传
await _webApiRequest.UploadAlertMessageAsync(alertDatas);
// WebSocket实时推送 - 异步执行,不阻塞主流程
_ = Task.Run(async () =>
{

View File

@ -0,0 +1,288 @@
using Jint;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Newtonsoft.Json.Linq;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Net.Http;
using System.Net.Http.Json;
using System.Numerics;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using YunDa.Server.ISMSTcp.Domain;
using YunDa.Server.ISMSTcp.Interfaces;
using YunDa.Server.ISMSTcp.Models;
using YunDa.SOMS.DataTransferObject.DataMonitoring.SecondaryCircuitInspection;
namespace YunDa.Server.ISMSTcp.Services
{
public class WebApiSettings
{
public int Port { get; set; }
}
public class SecondaryCircuitInspectionPlanStateModel
{
//执行时间
public DateTime ExecuteTime { get; set; } = DateTime.MinValue;
public SecondaryCircuitInspectionPlanOutput Plan { get; set; }
public SecondaryCircuitInspectionPlanStateModel()
{
Plan = new SecondaryCircuitInspectionPlanOutput();
}
}
//二次回路巡检计划
public class SecondaryCircuitInspectionPlanService
{
private readonly ILogger<SecondaryCircuitInspectionPlanService> _logger;
private readonly IApiEndpoints _apiEndpoints;
private readonly WebApiRequest _webApiRequest;
private readonly WebApiSettings _webApiSettings;
private bool _updatedPlanOk = false;
private Queue<SecondaryCircuitInspectionPlanStateModel> _planList;
private readonly object _planLock = new object();
private int _planCheckDay = 0;
public SecondaryCircuitInspectionPlanService(
ILogger<SecondaryCircuitInspectionPlanService> logger,
IApiEndpoints apiEndpoints,
WebApiRequest webApiRequest,
IOptions<WebApiSettings> webApiOptions
)
{
_apiEndpoints = apiEndpoints ?? throw new ArgumentNullException(nameof(apiEndpoints));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_webApiRequest = webApiRequest ?? throw new ArgumentNullException(nameof(webApiRequest));
_webApiSettings = webApiOptions.Value;
_planList = new Queue<SecondaryCircuitInspectionPlanStateModel>();
StartAsync();
}
private async Task StartAsync()
{
//更新计划
_ = Task.Run(async () =>
{
while (true)
{//每30秒更新一下配置
await UpdatePlans();
await Task.Delay(30000);
}
});
//执行计划
_ = Task.Run(async () =>
{
while (true)
{//每个小时,更新一下全体孪生状态
if (_updatedPlanOk)
{
await CheckPlan();
await Task.Delay(10 * 1000);
}
else
{
await Task.Delay(10000);
}
}
});
await Task.CompletedTask;
}
//更新计划
private async Task UpdatePlans()
{
try
{
var list = await _webApiRequest.GetSecondaryCircuitInspectionPlanListAsync();
if (list != null)
{
lock (_planLock)
{
var oldPlan = _planList.ToArray();
_planList.Clear();
List<SecondaryCircuitInspectionPlanStateModel> planlist = new List<SecondaryCircuitInspectionPlanStateModel>();
foreach (var item in list)
{
planlist.Add(new SecondaryCircuitInspectionPlanStateModel() { Plan = item });
}
foreach (var item in planlist)
{
var plan = oldPlan.FirstOrDefault(x => x.Plan.Name == item.Plan.Name && x.Plan.ScheduledTimeString == item.Plan.ScheduledTimeString);
if (plan != null)
{
item.ExecuteTime = plan.ExecuteTime;
}
_planList.Enqueue(item);
}
//获取到绑定信息,可以更新全部状态了
if (planlist.Count > 0)
_updatedPlanOk = true;
}
}
}
catch (Exception ex)
{
_updatedPlanOk = false;
}
await Task.CompletedTask;
}
//检测计划,判断计划是否该执行
private async Task<bool> CheckPlan()
{
bool ret = false;
if (_planList == null)
return ret;
try
{
SecondaryCircuitInspectionPlanStateModel[] planList;
lock (_planLock)
{
planList = _planList.ToArray();
}
DateTime now = DateTime.Now;
int week = DateTime.Now.DayOfWeek == DayOfWeek.Sunday ? 7 : (int)DateTime.Now.DayOfWeek;
if (now.Day != _planCheckDay)
{//翻天,清空已执行标记
lock (_planLock)
{
foreach (var item in _planList)
item.ExecuteTime = DateTime.MinValue;
}
}
_planCheckDay = now.Day;
foreach (var item in planList)
{
if (item.ExecuteTime == DateTime.MinValue && item.Plan.IsActive)
{//当天未执行
if (now.Hour == item.Plan.ScheduledHour && now.Minute == item.Plan.ScheduledMinute)
{
if (item.Plan.ScheduledWeekDaysList.IndexOf(week) != -1)
{
if (await ExecutePlan(item.Plan))
{//执行成功
lock (_planLock)
{
item.ExecuteTime = DateTime.Now;
}
ret = true;
}
}
}
}
}
}
catch (Exception ex)
{
_logger.LogError(ex, "SecondaryCircuitInspectionPlanService - CheckPlan发生错误");
}
return ret;
}
//执行计划
private async Task<bool> ExecutePlan(SecondaryCircuitInspectionPlanOutput plan)
{
try
{
bool ret = true;
var engine = new Jint.Engine();
foreach (var item in plan.InspectionItems)
{
if(!string.IsNullOrWhiteSpace(item.CalculationExpression))
{
engine.Execute(item.CalculationExpression);
var result = engine.Invoke("calculate", new List<YCResultData>()).AsNumber();
}
}
return ret;
}
catch (Exception ex)
{
_logger.LogError(ex, "SecondaryCircuitInspectionPlanService - ExecutePlan发生错误");
}
return false;
}
//获取遥测数据
public async Task<List<YCResultData>> CallYCByDataIdAsync(CallYCByDataIdRequest request, CancellationToken cancellationToken = default)
{
try
{
using var httpClient = new HttpClient();
string url = $"http://127.0.0.1:{_webApiSettings.Port}/api/CallYCByDataId";
var response = await httpClient.PostAsJsonAsync(url, request);
response.EnsureSuccessStatusCode();
var result = await response.Content.ReadAsStringAsync();
JObject obj = JObject.Parse(result);
if (obj.ContainsKey("data"))
{
JArray jArray = obj.Value<JArray>("data");
if (jArray != null)
{
List<YCResultData> list = jArray.ToObject<List<YCResultData>>();
return list;
}
}
}
catch(Exception ex)
{
_logger.LogError(ex, "SecondaryCircuitInspectionPlanService - CallYCByDataIdAsync发生错误");
}
return null;
}
}
}

View File

@ -1,6 +1,7 @@
using Microsoft.AspNetCore.Razor.Language;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json.Linq;
using StackExchange.Redis;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
@ -390,6 +391,7 @@ namespace YunDa.Server.ISMSTcp.Services
}
}
// 处理每个haskey对应的TelemeteringModel
foreach (string haskey in haskeys)
{
@ -422,7 +424,8 @@ namespace YunDa.Server.ISMSTcp.Services
batchTelemeteringModels.Add(telemeteringModel);
// 异步调用数据变位信号API不阻塞主流程
}
processedCount++;
@ -690,6 +693,7 @@ namespace YunDa.Server.ISMSTcp.Services
if (_ycDataStorage.TryGetValue(dataId, out var dataList))
{
// 只返回未过期的数据5分钟内
var validData = dataList
.Where(d => (DateTime.Now - d.ReceivedTime).TotalMinutes <= DATA_EXPIRATION_MINUTES)
@ -717,6 +721,96 @@ namespace YunDa.Server.ISMSTcp.Services
return result;
}
/// <summary>
/// 🔧 新增根据DataId列表获取召唤的遥测数据
/// </summary>
/// <param name="dataIds">DataId列表</param>
/// <returns>遥测数据列表</returns>
public async Task<List<YCResultData>> GetYCDataByDataIds(List<string> dataIds, DateTime cmdTime, int times)
{
var result = new List<YCResultData>();
try
{
if (dataIds == null || dataIds.Count == 0)
{
_logger.LogWarning("DataId列表为空无法获取遥测数据");
return result;
}
foreach (var dataId in dataIds)
{
if (string.IsNullOrWhiteSpace(dataId))
{
continue;
}
if (_ycDataStorage.TryGetValue(dataId, out var dataList))
{
var validData = dataList
.Where(d => d.ReceivedTime >= cmdTime)
.Select(d => d.Data)
.ToList();
if (validData.Count >= times)
{
var item = new YCResultData();
item.Id = dataId;
item.ParseValue(validData[^times..]);
result.Add(item);
}
_logger.LogDebug("获取遥测数据 - DataId: {DataId}, Count: {Count}", dataId, validData.Count);
}
else
{
_logger.LogDebug("未找到遥测数据 - DataId: {DataId}", dataId);
}
}
_logger.LogInformation("获取遥测数据完成 - 请求DataId数: {RequestCount}, 返回数据数: {ResultCount}",
dataIds.Count, result.Count);
}
catch (Exception ex)
{
_logger.LogError(ex, "获取遥测数据失败");
}
if(result.Count == dataIds.Count)
{
string redisKey = $"{_telemeteringModelListRediskey}_Zongzi";
foreach (var item in result)
{
if (!_ycIdToHashKeysMapping.TryGetValue(item.Id, out List<string> haskeys))
{
continue;
}
if(haskeys.Count >= 0)
{
string haskey = haskeys.First();
var telemeteringModel = await _telemeteringModelListRedis.HashSetGetOneAsync(redisKey, haskey);
if (telemeteringModel == null)
{
_logger.LogWarning("Redis中未找到遥测数据: RedisKey={RedisKey}, HasKey={HasKey}", redisKey, haskey);
continue; // 继续处理下一个haskey
}
item.Name = telemeteringModel.Name;
item.DispatcherAddress = telemeteringModel.DispatcherAddress;
}
}
}
return result;
}
/// <summary>
/// 🔧 新增:清理过期的遥测数据(定时器回调)
/// </summary>

View File

@ -62,7 +62,8 @@ namespace YunDa.Server.ISMSTcp.Services
IWebSocketPushService webSocketPushService,
IRedisRepository<TelesignalisationModel, string> telesignalisationModelListRedis,
IRedisRepository<ProtectionDeviceCommInfoOutput, string> protectionDeviceCommInfoRedis,
WebApiRequest webApiRequest)
WebApiRequest webApiRequest,
ThingService thingService)
{
_apiEndpoints = apiEndpoints ?? throw new ArgumentNullException(nameof(apiEndpoints));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
@ -344,6 +345,7 @@ namespace YunDa.Server.ISMSTcp.Services
_logger.LogWarning("无法解析遥信数据项: {Item}", yxItem?.ToString());
return;
}
// 使用映射字典查找对应的haskey
if (!_yxIdToHashKeyMapping.TryGetValue(yxDataModel.YX_ID, out string haskey))
{
@ -389,6 +391,7 @@ namespace YunDa.Server.ISMSTcp.Services
string channel = $"{_telesignalisationInflectionInflectionZZChannelRediskey}_Zongzi";
_telesignalisationModelListRedis.PublishAsync(channel, telesignalisationModel);
// 🔧 优化如果跳过转发仅更新Redis后直接返回
if (skipForwarding)
{

View File

@ -0,0 +1,744 @@
using Microsoft.Extensions.Logging;
using Newtonsoft.Json.Linq;
using StackExchange.Redis;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.IO.Compression;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Numerics;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using YunDa.Server.ISMSTcp.Domain;
using YunDa.Server.ISMSTcp.Models;
using YunDa.SOMS.Core.Helper;
using YunDa.SOMS.DataTransferObject.GeneralInformation.EquipmentLiveDataDto;
using YunDa.SOMS.Entities.DataMonitoring;
using YunDa.SOMS.Redis.Repositories;
using static System.Net.Mime.MediaTypeNames;
namespace YunDa.Server.ISMSTcp.Services
{
public class ThingDeviceStatusModel
{
public string TwinID { get; set; } = string.Empty;
public string Metric { get; set; } = string.Empty;
public string Val { get; set; } = string.Empty;
}
public class ThingDeviceBindingModel
{
public string Description { get; set; } = string.Empty;
public string TwinID { get; set; } = string.Empty;
public string Metric { get; set; } = string.Empty; //格式1:绿灯1|0:红灯1
public string Val { get; set; } = string.Empty;
public int TwinDataBindingType { get; set; }
public string Remark { get; set; } = string.Empty;
public bool IsActive { get; set; }
public string Id { get; set; } = string.Empty;
public TelesignalisationConfigurationModel TelesignalisationConfiguration { get; set; }
public List<ThingDevicePushDataModel> MetricList { get; set; }
public List<ThingDevicePushDataModel> ValList { get; set; }
}
public class TelesignalisationConfigurationModel
{
public string Name { get; set; } = string.Empty;
public int DispatcherAddress { get; set; }
public string IsmsbaseYXId { get; set; } = string.Empty;
public int RemoteType { get; set; }
public string Id { get; set; } = string.Empty;
}
public class ThingDevicePushDataModel
{
public string Val { get; set; } = string.Empty;
public string Cmd { get; set; } = string.Empty;
public string CloseCmd { get; set; } = string.Empty;
}
public class ThingService
{
private Dictionary<string, ThingDeviceBindingModel> _deviceBindingConfigs = new Dictionary<string, ThingDeviceBindingModel>();
private readonly object _configLock = new object();
//更新状态锁,防止全部与变化更新时有冲突,因为更新全体时,会清空当前所有状态
private readonly SemaphoreSlim _pushLock = new SemaphoreSlim(1, 1);
private readonly string _telesignalisationModelListRediskey = "telesignalisationModelList_Zongzi";
private bool _updatedDeviceBindingConfigOk = false;
private readonly ILogger<SecondaryCircuitInspectionPlanService> _logger;
private readonly WebApiRequest _webApiRequest;
private readonly ThingApiService _thingApiService;
private readonly IRedisRepository<TelesignalisationModel, string> _telesignalisationModelListRedis;
public ThingService(
ILogger<SecondaryCircuitInspectionPlanService> logger,
WebApiRequest webApiRequest,
ThingApiService thingApiService,
IRedisRepository<TelesignalisationModel, string> telesignalisationModelListRedis
)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_webApiRequest = webApiRequest ?? throw new ArgumentNullException(nameof(webApiRequest));
_thingApiService = thingApiService ?? throw new ArgumentNullException(nameof(thingApiService));
_telesignalisationModelListRedis = telesignalisationModelListRedis ?? throw new ArgumentNullException(nameof(telesignalisationModelListRedis));
StartAsync();
}
private async Task StartAsync()
{
//更新配置
_ = Task.Run(async () =>
{
while (true)
{//每30秒更新一下配置
//更新孪生体与遥测数据绑定关系
await UpdateDeviceBindingConfig();
//更新设备和孪生体的关联
await UpdateSimDatas();
await Task.Delay(30000);
}
});
//推送状态
_ = Task.Run(async () =>
{
while (true)
{//每个小时,更新一下全体孪生状态
if(_updatedDeviceBindingConfigOk)
{
await UpdateDeviceAllStatus();
//await Task.Delay(60000); //测试时用
await Task.Delay(60 * 60 * 1000);
}
else
{//没获取到配置时10秒再检测
await Task.Delay(10000);
}
}
});
await Task.CompletedTask;
}
//更新孪生体与遥测数据绑定关系
private async Task UpdateDeviceBindingConfig()
{
try
{
List<ThingDeviceBindingModel> list = await _webApiRequest.GetThingDeviceBindingConfigAsync();
if (list != null)
{
lock(_configLock)
{
_deviceBindingConfigs.Clear();
foreach (var item in list)
{
item.MetricList = GetMetricList(item.Metric);
item.ValList = GetMetricList(item.Val);
_deviceBindingConfigs[item.TelesignalisationConfiguration.IsmsbaseYXId] = item;
}
//获取到绑定信息,可以更新全部状态了
if (_deviceBindingConfigs.Count > 0)
_updatedDeviceBindingConfigOk = true;
}
}
}
catch (Exception ex)
{
_updatedDeviceBindingConfigOk = false;
_logger.LogError(ex, "ThingService 更新孪生体与遥测数据绑定关系出错");
}
await Task.CompletedTask;
}
//解析tMetric和Val配置
private List<ThingDevicePushDataModel> GetMetricList(string metric)
{
List<ThingDevicePushDataModel> list = new List<ThingDevicePushDataModel>();
var items = metric.Split('|');
foreach (var item in items)
{
var parts = item.Split(':');
if (parts.Length == 3)
{
list.Add(new ThingDevicePushDataModel
{
Val = parts[0],
Cmd = parts[1],
CloseCmd = parts[2],
});
}
}
return list;
}
//状态有变化时,更新孪生体
public async Task UpdateDeviceChangeStatus(List<YXData> yXDatas)
{
if (yXDatas == null)
return;
try
{
//整理好要发送的命令
List<ThingDeviceStatusModel> statusList = new List<ThingDeviceStatusModel>();
var twinIDs = new HashSet<string>();
lock(_configLock)
{
foreach (var item in yXDatas)
{
if (!string.IsNullOrWhiteSpace(item.YX_ID))
{
if (_deviceBindingConfigs.TryGetValue(item.YX_ID, out var bindingItem))
{
var metric = bindingItem.MetricList.Find(e => e.Val == item.T);
var val = bindingItem.ValList.Find(e => e.Val == item.T);
if (metric != null && val != null)
{
var status = new ThingDeviceStatusModel { TwinID = bindingItem.TwinID.Trim(), Metric = metric.Cmd.Trim(), Val = val.Cmd.Trim() };
statusList.Add(status);
//变化时,由于没有清空所有动画状态,所有要手动关闭之前的状态
if (!string.IsNullOrWhiteSpace(metric.CloseCmd))
{
statusList.Add(new ThingDeviceStatusModel { TwinID = bindingItem.TwinID.Trim(), Metric = metric.CloseCmd.Trim(), Val = val.CloseCmd.Trim() });
}
twinIDs.Add(bindingItem.TwinID.Trim());
}
}
}
}
}
//开始发送命令
if (statusList.Count > 0)
{
await _pushLock.WaitAsync();
try
{
//发送状态
await _thingApiService.PushDeviceStatusAsync(statusList);
}
finally
{
_pushLock.Release();
}
}
}
catch(Exception ex)
{
_logger.LogError(ex, "ThingService 状态有变化时,更新孪生体出错");
}
await Task.CompletedTask;
}
//实时更新全体孪生体
public async Task UpdateDeviceAllStatus()
{
await _pushLock.WaitAsync();
try
{
List<ThingDeviceStatusModel> statusList = new List<ThingDeviceStatusModel>();
var twinIDs = new HashSet<string>();
//先从Redis里读取出全体状态
var telesignalisationModels = await _telesignalisationModelListRedis.HashSetGetAllAsync(_telesignalisationModelListRediskey);
if(telesignalisationModels != null)
{
lock(_configLock)
{
foreach (var item in telesignalisationModels)
{
if (!string.IsNullOrWhiteSpace(item.ismsbaseYXId))
{
if (_deviceBindingConfigs.TryGetValue(item.ismsbaseYXId, out var bindingItem))
{
int yxValue = item.ResultValue;
var metric = bindingItem.MetricList.Find(e => e.Val == yxValue.ToString());
var val = bindingItem.ValList.Find(e => e.Val == yxValue.ToString());
if (metric != null && val != null)
{
statusList.Add(new ThingDeviceStatusModel { TwinID = bindingItem.TwinID.Trim(), Metric = metric.Cmd.Trim(), Val = val.Cmd.Trim() });
twinIDs.Add(bindingItem.TwinID.Trim());
}
else
{
int k = 0;
}
}
}
}
}
}
//开始发送命令
if (statusList.Count > 0)
{
//发送清除动画命令
bool pushOk = true;
foreach (var twinID in twinIDs)
{
pushOk &= await _thingApiService.ClearDeviceStatusAsync(twinID, "*灯*");
}
//发送状态
if (pushOk)
{
await Task.Delay(_thingApiService.CallApiDelay);
pushOk = await _thingApiService.PushDeviceStatusAsync(statusList);
}
}
}
catch(Exception ex)
{
_logger.LogError(ex, "ThingService 实时更新全体孪生体出错");
}
finally
{
_pushLock.Release();
}
await Task.CompletedTask;
}
//更新设备和孪生体的关联
private async Task UpdateSimDatas()
{
try
{
}
catch (Exception ex)
{
}
await Task.CompletedTask;
}
}
public class ThingApiService
{
private readonly ILogger<ThingApiService> _logger;
private readonly string _apiServerUrl = "http://192.168.81.22";
private readonly string _loginCode = "admin";
private readonly string _loginKey = "Thing@123";
private readonly string _esAuthorization = "admin:uino";
public int CallApiDelay => 1200; //api调用间隔时间 ms
private string _apiToken = string.Empty;
private DateTime _apiTokenTime = DateTime.MinValue;
public ThingApiService(
ILogger<ThingApiService> logger)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
//获取Token
private async Task<string> GetTokenAsync()
{
try
{
if (string.IsNullOrWhiteSpace(_apiToken) || (DateTime.Now - _apiTokenTime).TotalHours > 6)
{//每6小时更新一次token
var http = new ThingWebClientHelper();
var json = await http.PostJsonAsync($"{_apiServerUrl}:1662/thing/provider/rest/getToken", new { loginCode = _loginCode, loginKey = _loginKey });
JObject obj = JObject.Parse(json);
_apiToken = obj["data"]?["token"]?.ToString();
if(!string.IsNullOrWhiteSpace(_apiToken))
_apiTokenTime = DateTime.Now;
}
}
catch (Exception ex)
{
}
if(string.IsNullOrWhiteSpace(_apiToken))
_logger.LogError("ThingApiService - 获取token失败");
return _apiToken;
}
//推送设备状态
public async Task<bool> PushDeviceStatusAsync(List<ThingDeviceStatusModel> deviceStatus, int retryCount = 3)
{
try
{
for (int i = 0; i < retryCount; i++)
{
var http = new ThingWebClientHelper();
http.SetHeader("Tk", await GetTokenAsync());
var json = await http.PostJsonAsync($"{_apiServerUrl}:1662/thing/provider/rest/monitor/dynamics", deviceStatus);
JObject obj = JObject.Parse(json);
bool success = obj.ContainsKey("success") ? obj.Value<bool>("success") : false;
if (!success)
{
string errMessage = obj.ContainsKey("message") ? obj.Value<string>("message") ?? "" : "";
_logger.LogError($"ThingApiService推送设备状态出错{errMessage}");
await Task.Delay(CallApiDelay);
}
else
{
return true;
}
}
}
catch(Exception ex)
{
}
return false;
}
//清空设备状态
public async Task<bool> ClearDeviceStatusAsync(string twinID, string metricKeyword, int retryCount = 3)
{
try
{
for (int i = 0; i < retryCount; i++)
{
var http = new ThingWebClientHelper();
http.SetHeader("Authorization", string.Format("Basic {0}", Convert.ToBase64String(Encoding.UTF8.GetBytes(_esAuthorization))));
string postData = string.Format(
@"{{
""query"": {{
""bool"": {{
""must"": [
{{
""term"": {{
""twinID"": ""{0}""
}}
}},
{{
""wildcard"": {{
""metric"": ""{1}""
}}
}}
]
}}
}}
}}",
twinID,
metricKeyword
);
var json = await http.PostJsonAsync($"{_apiServerUrl}:9200/performance/_delete_by_query", postData);
JObject obj = JObject.Parse(json);
//成功时没有success字段所有没有时设置为true
bool success = obj.ContainsKey("success") ? obj.Value<bool>("success") : true;
if (!success)
{
string errMessage = obj.ContainsKey("message") ? obj.Value<string>("message") ?? "" : "";
_logger.LogError($"ThingApiService 清空动画失败:{errMessage}");
await Task.Delay(CallApiDelay);
}
else
{
return true;
}
}
}
catch (Exception ex)
{
}
return false;
}
private async Task<bool> Test()
{
try
{
var http = new ThingWebClientHelper();
http.SetHeader("Tk", await GetTokenAsync());
string postData = @"{
""pageSize"": 30,
""pageNum"": 1,
""cdt"": {
""classId"": 6888439830657232
}
}";
var json = await http.PostJsonAsync($"{_apiServerUrl}/thing/twin/ci/queryPageBySearchBean", postData);
JObject obj = JObject.Parse(json);
}
catch (Exception ex)
{
}
return true;
}
private async Task Example()
{
await ClearDeviceStatusAsync("2号主变保护测控屏", "*灯*");
List<ThingDeviceStatusModel> deviceStatus = new List<ThingDeviceStatusModel>();
deviceStatus.Add(
new ThingDeviceStatusModel()
{
TwinID = "2号主变保护测控屏",
Metric = "绿灯3",
Val = "绿灯3"
}
);
await PushDeviceStatusAsync(deviceStatus);
}
}
public class ThingWebClientHelper
{
private readonly HttpClient _httpClient;
private readonly CookieContainer _cookieContainer;
public ThingWebClientHelper()
{
_cookieContainer = new CookieContainer();
var handler = new HttpClientHandler
{
CookieContainer = _cookieContainer,
UseCookies = true
};
_httpClient = new HttpClient(handler);
}
public void SetHeader(string key, string value, HttpContent content = null)
{
if (IsContentHeader(key))
{
if (content != null)
{
// 处理内容头
if (content.Headers.Contains(key))
{
content.Headers.Remove(key);
}
content.Headers.Add(key, value);
}
}
else
{
// 处理普通请求头
if (_httpClient.DefaultRequestHeaders.Contains(key))
{
_httpClient.DefaultRequestHeaders.Remove(key);
}
_httpClient.DefaultRequestHeaders.Add(key, value);
}
}
/// <summary>
/// 判断是否为内容头
/// </summary>
/// <param name="key">头部键名</param>
/// <returns>是否为内容头</returns>
private bool IsContentHeader(string key)
{
// 常见的内容头集合,可根据需要扩展
string[] contentHeaders =
{
"Content-Type",
"Content-Length",
"Content-Disposition",
"Content-Encoding",
"Content-Language",
"Content-Location",
"Content-Range",
"Content-MD5"
};
return contentHeaders.Contains(key, StringComparer.OrdinalIgnoreCase);
}
/// <summary>
/// 发送 GET 请求并获取响应内容。
/// </summary>
/// <param name="url">请求 URL。</param>
/// <returns>响应内容。</returns>
public async Task<string> GetAsync(string url)
{
try
{
var response = await _httpClient.GetAsync(url);
response.EnsureSuccessStatusCode();
return await GetResponseContent(response);
}
catch (Exception ex)
{
//return $"Error: {ex.Message}";
Console.WriteLine($"Error: {ex.Message}");
}
return string.Empty;
}
/// <summary>
/// 发送 POST 请求并获取响应内容。
/// </summary>
/// <param name="url">请求 URL。</param>
/// <param name="postData">POST 数据(键值对)。</param>
/// <returns>响应内容。</returns>
public async Task<string> PostAsync(string url, string postData, string contentType = "application/x-www-form-urlencoded")
{
try
{
var content = new StringContent(postData, Encoding.UTF8, contentType);
var response = await _httpClient.PostAsync(url, content);
response.EnsureSuccessStatusCode();
return await GetResponseContent(response);
}
catch (Exception ex)
{
return $"Error: {ex.Message}";
}
}
public async Task<string> PostJsonAsync(string url, object obj, string contentType = "application/json")
{
try
{
string jsonContent = obj is string ? (string)obj : JsonSerializer.Serialize(obj);
var content = new StringContent(jsonContent, Encoding.UTF8, contentType);
var response = await _httpClient.PostAsync(url, content);
response.EnsureSuccessStatusCode();
return await GetResponseContent(response);
}
catch (Exception ex)
{
}
return string.Empty;
}
private async Task<string> GetResponseContent(HttpResponseMessage response)
{
try
{
if (response.Content.Headers.ContentEncoding.Contains("gzip"))
{
// 获取压缩流
var compressedStream = await response.Content.ReadAsStreamAsync();
var gzipStream = new GZipStream(compressedStream, CompressionMode.Decompress);
var reader = new StreamReader(gzipStream, Encoding.UTF8);
return await reader.ReadToEndAsync();
}
else if (response.Content.Headers.ContentEncoding.Contains("deflate"))
{
var compressedStream = await response.Content.ReadAsStreamAsync();
var deflateStream = new DeflateStream(compressedStream, CompressionMode.Decompress);
var reader = new StreamReader(deflateStream, Encoding.UTF8);
return await reader.ReadToEndAsync();
}
else
{
return await response.Content.ReadAsStringAsync();
}
}
catch (Exception ex)
{
return $"Error: {ex.Message}";
}
}
}
}

View File

@ -22,6 +22,7 @@
<ItemGroup>
<PackageReference Include="DynamicExpresso.Core" Version="2.19.2" />
<PackageReference Include="Jint" Version="3.0.0" />
<PackageReference Include="Microsoft.ClearScript.Core" Version="7.5.0" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration" Version="8.0.0" />