2025-10-30 增加断线推送和事件巡检

This commit is contained in:
dk1st 2025-10-30 12:24:29 +08:00
parent 03ad3c1c8d
commit 9a6c967590
11 changed files with 2049 additions and 149 deletions

View File

@ -94,6 +94,11 @@ namespace YunDa.Server.ISMSTcp.Configuration
/// </summary> /// </summary>
public string SecondaryCircuitInspectionPlanUri => $"{BaseUrl.TrimEnd('/')}/api/services/SOMS/SecondaryCircuitInspectionPlan/GetList"; public string SecondaryCircuitInspectionPlanUri => $"{BaseUrl.TrimEnd('/')}/api/services/SOMS/SecondaryCircuitInspectionPlan/GetList";
/// <summary>
/// 二次回路获取事件的列表
/// </summary>
public string GetCircuitEventDrivenConfigUri => $"{BaseUrl.TrimEnd('/')}/api/services/SOMS/SecondaryCircuitEventDrivenConfig/FindDatasNoPageList";
/// <summary> /// <summary>
/// 获取孪生体与遥测数据绑定关系 URI /// 获取孪生体与遥测数据绑定关系 URI
/// </summary> /// </summary>
@ -103,5 +108,47 @@ namespace YunDa.Server.ISMSTcp.Configuration
/// 获取设备和孪生体的关联 URI /// 获取设备和孪生体的关联 URI
/// </summary> /// </summary>
public string ThingSimDatasUri => $"{BaseUrl.TrimEnd('/')}/api/services/SOMS/RackEquipment/GetSimDatas"; public string ThingSimDatasUri => $"{BaseUrl.TrimEnd('/')}/api/services/SOMS/RackEquipment/GetSimDatas";
/// <summary>
/// 查询遥测 URI
/// </summary>
public string GetTelemetryInfoDataUri => $"{BaseUrl.TrimEnd('/')}/api/services/SOMS/SecondaryCircuitInspectionItem/GetTelemetryInfoData";
/// <summary>
/// 查询遥信 URI
/// </summary>
public string GetTeleSignalDataUri => $"{BaseUrl.TrimEnd('/')}/api/services/SOMS/SecondaryCircuitInspectionItem/GetTeleSignalData";
/// <summary>
/// 查询定值 URI
/// </summary>
public string GetDeviceDzDataUri => $"{BaseUrl.TrimEnd('/')}/api/services/SOMS/SecondaryCircuitInspectionItem/GetDeviceDzData";
/// <summary>
/// 查询预置位的巡检结果 URI
/// </summary>
public string GetInspectionResultDataUri => $"{BaseUrl.TrimEnd('/')}/api/services/SOMS/OriginalInspectionStoreResult/GetInspectionResultData";
/// <summary>
/// 查询网线配置
/// </summary>
public string GetNetworkCableConfigUri => $"{BaseUrl.TrimEnd('/')}/api/services/SOMS/NetworkCable/GetList";
/// <summary>
/// 查询光纤配置
/// </summary>
public string GetOpticalFiberConfigUri => $"{BaseUrl.TrimEnd('/')}/api/services/SOMS/OpticalFiber/GetList";
/// <summary>
/// 查询光缆配置
/// </summary>
public string GetOpticalCableConfigUri => $"{BaseUrl.TrimEnd('/')}/api/services/SOMS/OpticalCable/GetList";
/// <summary>
/// 保存巡检计划执行结果
/// </summary>
public string SaveSecondaryCircuitInspectionPlanResultUri => $"{BaseUrl.TrimEnd('/')}/api/services/SOMS/OpticalCable/GetList";
} }
} }

View File

@ -34,6 +34,7 @@ namespace YunDa.Server.ISMSTcp.Controllers
private readonly WebApiRequest _webApiRequest; private readonly WebApiRequest _webApiRequest;
private readonly ITcpClient _tcpClient; private readonly ITcpClient _tcpClient;
private readonly TelemeteringHandle _telemeteringHandle; private readonly TelemeteringHandle _telemeteringHandle;
private readonly TelesignalisationHandle _telesignalisationHandle;
/// <summary> /// <summary>
/// 构造函数 /// 构造函数
@ -52,7 +53,8 @@ namespace YunDa.Server.ISMSTcp.Controllers
IWebSocketMessageBroadcaster webSocketBroadcaster, IWebSocketMessageBroadcaster webSocketBroadcaster,
WebApiRequest webApiRequest, WebApiRequest webApiRequest,
ITcpClient tcpClient, ITcpClient tcpClient,
TelemeteringHandle telemeteringHandle) TelemeteringHandle telemeteringHandle,
TelesignalisationHandle telesignalisationHandle)
{ {
_logger = logger ?? throw new ArgumentNullException(nameof(logger)); _logger = logger ?? throw new ArgumentNullException(nameof(logger));
_queryService = queryService ?? throw new ArgumentNullException(nameof(queryService)); _queryService = queryService ?? throw new ArgumentNullException(nameof(queryService));
@ -61,6 +63,7 @@ namespace YunDa.Server.ISMSTcp.Controllers
_webApiRequest = webApiRequest ?? throw new ArgumentNullException(nameof(webApiRequest)); _webApiRequest = webApiRequest ?? throw new ArgumentNullException(nameof(webApiRequest));
_tcpClient = tcpClient ?? throw new ArgumentNullException(nameof(tcpClient)); _tcpClient = tcpClient ?? throw new ArgumentNullException(nameof(tcpClient));
_telemeteringHandle = telemeteringHandle ?? throw new ArgumentNullException(nameof(telemeteringHandle)); _telemeteringHandle = telemeteringHandle ?? throw new ArgumentNullException(nameof(telemeteringHandle));
_telesignalisationHandle = telesignalisationHandle ?? throw new ArgumentNullException(nameof(telesignalisationHandle));
} }
/// <summary> /// <summary>
@ -249,11 +252,19 @@ namespace YunDa.Server.ISMSTcp.Controllers
/// <summary> /// <summary>
/// 根据数据ID召唤遥测数据 /// 根据数据ID召唤遥测数据
/// </summary> /// </summary>
/// <param name="cmd">数据ID字符串格式CallYCByDataID|DataID1#DataID2#DataID3#......</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns>召唤结果</returns> /// <returns>召唤结果</returns>
/// <remarks> /// <remarks>
/// 示例请求GET /api/CallYCByDataId?dataIds=CallYCByDataID|001#002#003 /// 示例请求POST
/*
{
     "id":
[
"YCB001211132",
"YCB001177030",
],
     "times": 10
}
*/
/// </remarks> /// </remarks>
[HttpPost("CallYCByDataId")] [HttpPost("CallYCByDataId")]
public async Task<IActionResult> CallYCByDataId( public async Task<IActionResult> CallYCByDataId(
@ -367,6 +378,73 @@ namespace YunDa.Server.ISMSTcp.Controllers
} }
} }
/// <summary>
/// 根据数据ID召唤遥信数据
/// </summary>
/// <returns>召唤结果</returns>
/// <remarks>
/// 示例请求POST
/*
{
     "id":
[
"YCB001211132",
"YCB001177030",
],
     "times": 10
}
*/
/// </remarks>
[HttpPost("CallYXByDataId")]
public async Task<IActionResult> CallYXByDataId(
[FromBody] CallYCByDataIdRequest request,
CancellationToken cancellationToken = default)
{
var ids = request.Id;
var times = request.Times;
ids.RemoveAll(string.IsNullOrWhiteSpace);
try
{
// 参数验证
if (ids.Count == 0)
{
return BadRequest(new
{
success = false,
message = "未提供数据ID",
timestamp = DateTime.Now
});
}
var dataList = await _telesignalisationHandle.GetYXDataByDataIds(ids, times, cancellationToken);
return Ok(new
{
success = true,
message = "获取遥信数据成功",
data = dataList,
count = dataList.Count,
timestamp = DateTime.Now
});
}
catch(Exception ex)
{
return StatusCode(500, new
{
success = false,
message = $"获取遥信数据失败: {ex.Message}",
timestamp = DateTime.Now
});
}
}
/// <summary> /// <summary>
/// 发送TCP消息 /// 发送TCP消息
/// </summary> /// </summary>

View File

@ -51,16 +51,61 @@ namespace YunDa.Server.ISMSTcp.Domain
/// </summary> /// </summary>
string SecondaryCircuitInspectionPlanUri { get; } string SecondaryCircuitInspectionPlanUri { get; }
/// <summary>
/// 二次回路获取事件的列表
/// </summary>
string GetCircuitEventDrivenConfigUri { get; }
/// <summary> /// <summary>
/// 获取孪生体与遥测数据绑定关系 URI /// 获取孪生体与遥测数据绑定关系 URI
/// </summary> /// </summary>
string ThingDeviceBindingConfigUri { get; } string ThingDeviceBindingConfigUri { get; }
/// <summary> /// <summary>
///
/// 获取设备和孪生体的关联 URI /// 获取设备和孪生体的关联 URI
/// </summary> /// </summary>
string ThingSimDatasUri { get; } string ThingSimDatasUri { get; }
/// <summary>
/// 查询遥测 URI
/// </summary>
string GetTelemetryInfoDataUri { get; }
/// <summary>
/// 查询遥信 URI
/// </summary>
string GetTeleSignalDataUri{ get; }
/// <summary>
/// 查询定值 URI
/// </summary>
string GetDeviceDzDataUri { get; }
/// <summary>
/// 查询预置位的巡检结果 URI
/// </summary>
string GetInspectionResultDataUri { get; }
/// <summary>
/// 查询网线配置
/// </summary>
string GetNetworkCableConfigUri { get; }
/// <summary>
/// 查询光纤配置
/// </summary>
string GetOpticalFiberConfigUri { get; }
/// <summary>
/// 查询光缆配置
/// </summary>
string GetOpticalCableConfigUri { get; }
/// <summary>
/// 保存巡检计划执行结果
/// </summary>
string SaveSecondaryCircuitInspectionPlanResultUri { get; }
} }
/// <summary> /// <summary>
@ -125,6 +170,12 @@ namespace YunDa.Server.ISMSTcp.Domain
/// </summary> /// </summary>
public string SecondaryCircuitInspectionPlanUri => _config.SecondaryCircuitInspectionPlanUri; public string SecondaryCircuitInspectionPlanUri => _config.SecondaryCircuitInspectionPlanUri;
/// <summary>
/// 二次回路获取事件的列表
/// </summary>
public string GetCircuitEventDrivenConfigUri => _config.GetCircuitEventDrivenConfigUri;
/// <summary> /// <summary>
/// 获取孪生体与遥测数据绑定关系 URI /// 获取孪生体与遥测数据绑定关系 URI
/// </summary> /// </summary>
@ -134,5 +185,47 @@ namespace YunDa.Server.ISMSTcp.Domain
/// 获取设备和孪生体的关联 URI /// 获取设备和孪生体的关联 URI
/// </summary> /// </summary>
public string ThingSimDatasUri => _config.ThingSimDatasUri; public string ThingSimDatasUri => _config.ThingSimDatasUri;
/// <summary>
/// 查询遥测 URI
/// </summary>
public string GetTelemetryInfoDataUri => _config.GetTelemetryInfoDataUri;
/// <summary>
/// 查询遥信 URI
/// </summary>
public string GetTeleSignalDataUri => _config.GetTeleSignalDataUri;
/// <summary>
/// 查询定值 URI
/// </summary>
public string GetDeviceDzDataUri => _config.GetDeviceDzDataUri;
/// <summary>
/// 查询预置位的巡检结果 URI
/// </summary>
public string GetInspectionResultDataUri => _config.GetInspectionResultDataUri;
/// <summary>
/// 查询网线配置
/// </summary>
public string GetNetworkCableConfigUri => _config.GetNetworkCableConfigUri;
/// <summary>
/// 查询光纤配置
/// </summary>
public string GetOpticalFiberConfigUri => _config.GetOpticalFiberConfigUri;
/// <summary>
/// 查询光缆配置
/// </summary>
public string GetOpticalCableConfigUri => _config.GetOpticalCableConfigUri;
/// <summary>
/// 保存巡检计划执行结果
/// </summary>
public string SaveSecondaryCircuitInspectionPlanResultUri => _config.SaveSecondaryCircuitInspectionPlanResultUri;
} }
} }

View File

@ -12,10 +12,12 @@ using YunDa.Server.ISMSTcp.Services;
using YunDa.SOMS.DataTransferObject; using YunDa.SOMS.DataTransferObject;
using YunDa.SOMS.DataTransferObject.Account; using YunDa.SOMS.DataTransferObject.Account;
using YunDa.SOMS.DataTransferObject.DataMonitoring.SecondaryCircuitInspection; using YunDa.SOMS.DataTransferObject.DataMonitoring.SecondaryCircuitInspection;
using YunDa.SOMS.DataTransferObject.DataMonitoring.SecondaryCircuitInspection.Configurations;
using YunDa.SOMS.DataTransferObject.ExternalEntities.BeijingYounuo; using YunDa.SOMS.DataTransferObject.ExternalEntities.BeijingYounuo;
using YunDa.SOMS.DataTransferObject.GeneralInformation.ProtectionDeviceInfoDto; using YunDa.SOMS.DataTransferObject.GeneralInformation.ProtectionDeviceInfoDto;
using YunDa.SOMS.DataTransferObject.MaintenanceAndOperations.SecondaryEquipment; using YunDa.SOMS.DataTransferObject.MaintenanceAndOperations.SecondaryEquipment;
using YunDa.SOMS.Entities.DataMonitoring; using YunDa.SOMS.Entities.DataMonitoring;
using YunDa.SOMS.Entities.ExternalEntities.BeijingYounuo;
namespace YunDa.Server.ISMSTcp.Domain namespace YunDa.Server.ISMSTcp.Domain
{ {
@ -625,6 +627,7 @@ namespace YunDa.Server.ISMSTcp.Domain
} }
} }
//上传报警
public async Task UploadAlertMessageAsync(List<AlertData> alertDatas) public async Task UploadAlertMessageAsync(List<AlertData> alertDatas)
{ {
try try
@ -662,12 +665,37 @@ namespace YunDa.Server.ISMSTcp.Domain
} }
catch(Exception ex) catch(Exception ex)
{ {
_logger.LogError(ex, "Error Call GetSecondaryCircuitInspectionPlanListAsync Api");
} }
return null; return null;
} }
/// <summary>
/// 获取事件的列表
/// </summary>
public async Task<List<SecondaryCircuitEventDrivenConfigOutput>> GetCircuitEventDrivenConfigAsync()
{
try
{
var response = await HttpHelper.HttpPostRequestAsync<JObject>(_apiEndpoints.GetCircuitEventDrivenConfigUri, new object());
if (response != null)
{
var result = ExtractDataFromAbpResponse<List<SecondaryCircuitEventDrivenConfigOutput>>(response);
return result;
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error Call GetCircuitEventDrivenConfigAsync Api");
}
return null;
}
/// <summary> /// <summary>
/// 获取孪生体与遥测数据绑定关系 /// 获取孪生体与遥测数据绑定关系
/// </summary> /// </summary>
@ -688,7 +716,7 @@ namespace YunDa.Server.ISMSTcp.Domain
} }
catch (Exception ex) catch (Exception ex)
{ {
_logger.LogError(ex, "Error Call GetThingDeviceBindingConfigAsync Api");
} }
return null; return null;
@ -697,27 +725,232 @@ namespace YunDa.Server.ISMSTcp.Domain
/// <summary> /// <summary>
/// 获取设备和孪生体的关联,使用这个关联关系,推送设备数据到是三维场景 /// 获取设备和孪生体的关联,使用这个关联关系,推送设备数据到是三维场景
/// </summary> /// </summary>
public async Task<List<ThingDeviceBindingModel>> GetThingSimDatasAsync() public async Task<List<TingSimDataModel>> GetThingSimDatasConfigAsync()
{ {
try try
{ {
var response = await HttpHelper.HttpPostRequestAsync<JObject>(_apiEndpoints.ThingSimDatasUri, new object()); var response = await Task.Run(() => ToolLibrary.HttpHelper.HttpGetRequest<JObject>(_apiEndpoints.ThingSimDatasUri));
if (response != null) if (response != null)
{ {
var result = ExtractDataFromAbpResponse<List<ThingDeviceBindingModel>>(response); var result = ExtractDataFromAbpResponse<List<TingSimDataModel>>(response);
return result; return result;
} }
} }
catch (Exception ex) catch (Exception ex)
{ {
_logger.LogError(ex, "Error Call GetThingSimDatasConfigAsync Api");
} }
return null; return null;
} }
/// <summary>
/// 查询遥测
/// </summary>
public async Task<List<YCResultData>?> GetTelemetryInfoData(string id)
{
try
{
var response = await Task.Run(() => ToolLibrary.HttpHelper.HttpGetRequest<JObject>($"{_apiEndpoints.GetTelemetryInfoDataUri}?id={id}"));
if (response != null)
{
var result = ExtractDataFromAbpResponse<List<YCResultData>>(response);
return result;
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error Call GetTelemetryInfoData Api");
}
return null;
}
/// <summary>
/// 查询遥信
/// </summary>
public async Task<List<YXResultData>?> GetTeleSignalData(string id)
{
try
{
var response = await Task.Run(() => ToolLibrary.HttpHelper.HttpGetRequest<JObject>($"{_apiEndpoints.GetTeleSignalDataUri}?id={id}"));
if (response != null)
{
var result = ExtractDataFromAbpResponse<List<YXResultData>>(response);
return result;
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error Call GetTeleSignalData Api");
}
return null;
}
/// <summary>
/// 查询定值
/// </summary>
public async Task<List<DeviceDzData>?> GetDeviceDzData(string id)
{
try
{
var response = await Task.Run(() => ToolLibrary.HttpHelper.HttpGetRequest<JObject>($"{_apiEndpoints.GetDeviceDzDataUri}?id={id}"));
if (response != null)
{
var result = ExtractDataFromAbpResponse<List<DeviceDzData>>(response);
return result;
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error Call GetTeleSignalData Api");
}
return null;
}
/// <summary>
/// 查询定值
/// </summary>
public async Task<List<InspectionResultData>?> GetInspectionResultData(string id)
{
try
{
var response = await Task.Run(() => ToolLibrary.HttpHelper.HttpGetRequest<JObject>($"{_apiEndpoints.GetInspectionResultDataUri}?id={id}"));
if (response != null)
{
var result = ExtractDataFromAbpResponse<List<InspectionResultData>>(response);
return result;
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error Call GetTeleSignalData Api");
}
return null;
}
/// <summary>
/// 获取网线配置
/// </summary>
public async Task<List<OpticalFiberConfigModel>> GetNetworkCableConfigAsync()
{
try
{
var response = await Task.Run(() => HttpHelper.HttpPostRequestAsync<JObject>(_apiEndpoints.GetNetworkCableConfigUri, new object()));
if (response != null)
{
var result = ExtractDataFromAbpResponse<List<OpticalFiberConfigModel>>(response);
return result;
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error Call GetNetworkCableConfigAsync Api");
}
return null;
}
/// <summary>
/// 获取光纤配置
/// </summary>
public async Task<List<OpticalFiberConfigModel>> GetOpticalFiberConfigAsync()
{
try
{
var response = await Task.Run(() => HttpHelper.HttpPostRequestAsync<JObject>(_apiEndpoints.GetOpticalFiberConfigUri, new object()));
if (response != null)
{
var result = ExtractDataFromAbpResponse<List<OpticalFiberConfigModel>>(response);
return result;
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error Call GetOpticalFiberConfigAsync Api");
}
return null;
}
/// <summary>
/// 获取光缆配置
/// </summary>
public async Task<List<OpticalFiberConfigModel>> GetOpticalCableConfigAsync()
{
try
{
var response = await Task.Run(() => HttpHelper.HttpPostRequestAsync<JObject>(_apiEndpoints.GetOpticalCableConfigUri, new object()));
if (response != null)
{
var result = ExtractDataFromAbpResponse<List<OpticalFiberConfigModel>>(response);
return result;
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error Call GetOpticalCableConfigAsync Api");
}
return null;
}
/// <summary>
/// 保存巡检计划执行结果
/// </summary>
public async Task<bool> SaveSecondaryCircuitInspectionPlanResultAsync()
{
try
{
var response = await Task.Run(() => HttpHelper.HttpPostRequestAsync<JObject>(_apiEndpoints.SaveSecondaryCircuitInspectionPlanResultUri, new object()));
if (response != null)
{
return true;
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error Call GetOpticalCableConfigAsync Api");
}
return false;
}
} }
} }

View File

@ -19,4 +19,73 @@ namespace YunDa.Server.ISMSTcp.Models
[JsonPropertyName("T")] [JsonPropertyName("T")]
public string T { get; set; } = string.Empty; public string T { get; set; } = string.Empty;
} }
public class YXCacheData
{
[JsonPropertyName("Id")]
public string Id { get; set; } = string.Empty;
[JsonPropertyName("Value")]
public int Value { get; set; }
[JsonPropertyName("ValueStr")]
public string ValueStr { get; set; } = string.Empty;
[JsonPropertyName("TimeStamp")]
public string TimeStamp { get; set; } = string.Empty;
}
public class YXResultData
{
[JsonPropertyName("Id")]
public string Id { get; set; } = string.Empty;
[JsonPropertyName("Name")]
public string Name { get; set; } = string.Empty;
[JsonPropertyName("DispatcherAddress")]
public int? DispatcherAddress { get; set; } = null;
[JsonPropertyName("Data")]
public List<YXCacheData> Data { get; set; } = new List<YXCacheData>();
[JsonPropertyName("Value")]
public int Value { get; set; }
[JsonPropertyName("ValueStr")]
public string ValueStr { get; set; } = string.Empty;
[JsonPropertyName("TimeStamp")]
public string TimeStamp { get; set; } = string.Empty;
public void ParseValue(List<YXCacheData> datas)
{
Data.Clear();
foreach (var data in datas)
{
Data.Add(data);
}
if (Data.Count > 0)
{
Value = Data.Last().Value;
ValueStr = Data.Last().ValueStr;
TimeStamp = Data.Last().TimeStamp;
}
}
}
public class YXResultValue
{
[JsonPropertyName("Value")]
public decimal Value { get; set; }
[JsonPropertyName("TimeStamp")]
public string TimeStamp { get; set; } = string.Empty;
}
} }

View File

@ -23,6 +23,7 @@ using YunDa.Server.ISMSTcp.Filters.Extensions;
using YunDa.Server.ISMSTcp.Filters.Interfaces; using YunDa.Server.ISMSTcp.Filters.Interfaces;
using YunDa.Server.ISMSTcp.Interfaces; using YunDa.Server.ISMSTcp.Interfaces;
using YunDa.Server.ISMSTcp.Models; using YunDa.Server.ISMSTcp.Models;
using YunDa.SOMS.DataTransferObject.GeneralInformation.EquipmentLiveDataDto;
using YunDa.SOMS.DataTransferObject.GeneralInformation.ProtectionDeviceInfoDto; using YunDa.SOMS.DataTransferObject.GeneralInformation.ProtectionDeviceInfoDto;
using YunDa.SOMS.DataTransferObject.MaintenanceAndOperations.SecondaryEquipment; using YunDa.SOMS.DataTransferObject.MaintenanceAndOperations.SecondaryEquipment;
using YunDa.SOMS.Redis.Repositories; using YunDa.SOMS.Redis.Repositories;
@ -267,6 +268,8 @@ namespace YunDa.Server.ISMSTcp.Services
} }
} }
List<TelesignalisationModel> telesignalisationModels = new List<TelesignalisationModel>();
// 🔧 优化:根据数据量区分处理方式 // 🔧 优化:根据数据量区分处理方式
// 大批量数据(>10仅更新Redis不触发转发逻辑命令响应场景 // 大批量数据(>10仅更新Redis不触发转发逻辑命令响应场景
// 小批量数据(<=10完整处理包括转发主动变位场景 // 小批量数据(<=10完整处理包括转发主动变位场景
@ -277,7 +280,9 @@ namespace YunDa.Server.ISMSTcp.Services
// 仅更新Redis不触发WebSocket转发和其他处理逻辑 // 仅更新Redis不触发WebSocket转发和其他处理逻辑
foreach (var item in contentArray) foreach (var item in contentArray)
{ {
await _telesignalisationHandle.ProcessYXDataAsync(item, skipForwarding: true); var model = await _telesignalisationHandle.ProcessYXDataAsync(item, skipForwarding: true);
if(model != null)
telesignalisationModels.Add(model);
} }
} }
else else
@ -287,12 +292,14 @@ namespace YunDa.Server.ISMSTcp.Services
// 完整处理更新Redis + WebSocket转发 + 告警处理等 // 完整处理更新Redis + WebSocket转发 + 告警处理等
foreach (var item in contentArray) foreach (var item in contentArray)
{ {
await _telesignalisationHandle.ProcessYXDataAsync(item, skipForwarding: false); var model = await _telesignalisationHandle.ProcessYXDataAsync(item, skipForwarding: false);
if (model != null)
telesignalisationModels.Add(model);
} }
} }
//将状态推送到孪生体 //将状态推送到孪生体
_thingService.UpdateDeviceChangeStatus(contentArray.ToObject<List<YXData>>()); _thingService.UpdateThingYXStatus(contentArray.ToObject<List<YXData>>(), telesignalisationModels);
} }
else else
@ -322,12 +329,15 @@ namespace YunDa.Server.ISMSTcp.Services
if (contentToken is JArray contentArray && contentArray.Count > 0) if (contentToken is JArray contentArray && contentArray.Count > 0)
{ {
List<VirtualTerminalData> virtualTerminalDatas = new List<VirtualTerminalData>();
foreach (var item in contentArray) foreach (var item in contentArray)
{ {
if (item is JObject itemObject) if (item is JObject itemObject)
{ {
if (_virtualTerminalHandler.TryParseVirtualTerminalData(item.ToString(), out var virtualTerminalData)) if (_virtualTerminalHandler.TryParseVirtualTerminalData(item.ToString(), out var virtualTerminalData))
{ {
virtualTerminalDatas.Add(virtualTerminalData);
await _virtualTerminalHandler.ProcessVirtualTerminalDataAsync(virtualTerminalData); await _virtualTerminalHandler.ProcessVirtualTerminalDataAsync(virtualTerminalData);
} }
else else
@ -336,6 +346,10 @@ namespace YunDa.Server.ISMSTcp.Services
} }
} }
} }
//向孪生体推送断线数据
_thingService.UpdateOpticalFiberAlarmStatus(virtualTerminalDatas, null, null);
_thingService.UpdateOpticalCableAlarmStatus(virtualTerminalDatas, null);
} }
else else
{ {
@ -607,6 +621,9 @@ namespace YunDa.Server.ISMSTcp.Services
//报警上传 //报警上传
await _webApiRequest.UploadAlertMessageAsync(alertDatas); await _webApiRequest.UploadAlertMessageAsync(alertDatas);
//报警推送到孪生体
await _thingService.UpdateAlarmDatas(alertDatas);
// WebSocket实时推送 - 异步执行,不阻塞主流程 // WebSocket实时推送 - 异步执行,不阻塞主流程

View File

@ -11,21 +11,27 @@ using System.Linq;
using System.Net.Http; using System.Net.Http;
using System.Net.Http.Json; using System.Net.Http.Json;
using System.Numerics; using System.Numerics;
using System.Security.Cryptography;
using System.Text; using System.Text;
using System.Threading; using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks; using System.Threading.Tasks;
using YunDa.Server.ISMSTcp.Domain; using YunDa.Server.ISMSTcp.Domain;
using YunDa.Server.ISMSTcp.Interfaces; using YunDa.Server.ISMSTcp.Interfaces;
using YunDa.Server.ISMSTcp.Models; using YunDa.Server.ISMSTcp.Models;
using YunDa.SOMS.DataTransferObject.DataMonitoring.SecondaryCircuitInspection; using YunDa.SOMS.DataTransferObject.DataMonitoring.SecondaryCircuitInspection;
using YunDa.SOMS.DataTransferObject.DataMonitoring.SecondaryCircuitInspection.Configurations;
using YunDa.SOMS.DataTransferObject.MaintenanceAndOperations.SecondaryEquipment;
namespace YunDa.Server.ISMSTcp.Services namespace YunDa.Server.ISMSTcp.Services
{ {
//Web服务器设置
public class WebApiSettings public class WebApiSettings
{ {
public int Port { get; set; } public int Port { get; set; }
} }
//带执行状态的巡检计划结构体
public class SecondaryCircuitInspectionPlanStateModel public class SecondaryCircuitInspectionPlanStateModel
{ {
//执行时间 //执行时间
@ -37,6 +43,43 @@ namespace YunDa.Server.ISMSTcp.Services
{ {
Plan = new SecondaryCircuitInspectionPlanOutput(); Plan = new SecondaryCircuitInspectionPlanOutput();
} }
}
public class DeviceDzData
{
public List<DzInfo> UserDz { get; set; }
public List<DzInfo> SysDz { get; set; }
public string DeviceName { get; set; } = string.Empty;
}
public class InspectionResultData
{
public string PresetName { get; set; } = string.Empty;
public int PresetCode { get; set; }
public string TimeStamp { get; set; } = string.Empty;
public string Value { get; set; } = string.Empty;
}
//巡检结果
public class SecondaryCircuitInspectionResultModel
{
//遥测数据
public List<YCResultData> TelemetryInfoData { get; set; }
//遥信数据
public List<YXResultData> TeleSignalData { get; set; }
//定值
public List<DeviceDzData> DeviceDzData { get; set; }
//预置位的巡检结果
public List<InspectionResultData> InspectionResultData { get; set; }
} }
@ -53,8 +96,13 @@ namespace YunDa.Server.ISMSTcp.Services
private Queue<SecondaryCircuitInspectionPlanStateModel> _planList; private Queue<SecondaryCircuitInspectionPlanStateModel> _planList;
private readonly object _planLock = new object(); private readonly object _planLock = new object();
private int _planCheckDay = 0; private int _planCheckDay = 0;
private readonly Channel<SecondaryCircuitInspectionItemOutput> _singlePlanChannel;
//巡检事件
private Queue<SecondaryCircuitEventDrivenConfigOutput> _eventPlanList;
private readonly object _eventPlanLock = new object();
private bool _getEventPlanListOk = false;
public SecondaryCircuitInspectionPlanService( public SecondaryCircuitInspectionPlanService(
ILogger<SecondaryCircuitInspectionPlanService> logger, ILogger<SecondaryCircuitInspectionPlanService> logger,
@ -71,6 +119,10 @@ namespace YunDa.Server.ISMSTcp.Services
_planList = new Queue<SecondaryCircuitInspectionPlanStateModel>(); _planList = new Queue<SecondaryCircuitInspectionPlanStateModel>();
_singlePlanChannel = Channel.CreateUnbounded<SecondaryCircuitInspectionItemOutput>();
_eventPlanList = new Queue<SecondaryCircuitEventDrivenConfigOutput>();
StartAsync(); StartAsync();
} }
@ -86,22 +138,24 @@ namespace YunDa.Server.ISMSTcp.Services
await UpdatePlans(); await UpdatePlans();
await UpdateEventPlans();
await Task.Delay(30000); await Task.Delay(30000);
} }
}); });
//执行计划 //检测计划
_ = Task.Run(async () => _ = Task.Run(async () =>
{ {
while (true) while (true)
{//每个小时,更新一下全体孪生状态 {//每10秒检测一下计划
if (_updatedPlanOk) if (_updatedPlanOk)
{ {
await CheckPlan(); await CheckPlan();
await Task.Delay(10 * 1000); await Task.Delay(10000);
} }
else else
{ {
@ -111,6 +165,44 @@ namespace YunDa.Server.ISMSTcp.Services
} }
}); });
//事件计划
_ = Task.Run(async () =>
{
while (true)
{
if (_getEventPlanListOk)
{
await CheckEventPlan();
}
else
{
await Task.Delay(10000);
}
}
});
//执行计划(两个线程同时执行)
int threadNumber = 3;
for (int i = 0; i < threadNumber; ++i)
{
_ = Task.Run(async () => {
var rand = new Random(Guid.NewGuid().GetHashCode());
await foreach (var item in _singlePlanChannel.Reader.ReadAllAsync())
{
// 让每个线程在执行之间有不同的节奏
await Task.Delay(rand.Next(0, 300));
await ExecutePlan(item);
await Task.Delay(500);
}
});
}
await Task.CompletedTask; await Task.CompletedTask;
} }
@ -137,7 +229,7 @@ namespace YunDa.Server.ISMSTcp.Services
foreach (var item in planlist) foreach (var item in planlist)
{ {
var plan = oldPlan.FirstOrDefault(x => x.Plan.Name == item.Plan.Name && x.Plan.ScheduledTimeString == item.Plan.ScheduledTimeString); var plan = oldPlan.FirstOrDefault(x => x.Plan.Id == item.Plan.Id && x.Plan.ScheduledTimeString == item.Plan.ScheduledTimeString);
if (plan != null) if (plan != null)
{ {
item.ExecuteTime = plan.ExecuteTime; item.ExecuteTime = plan.ExecuteTime;
@ -162,6 +254,35 @@ namespace YunDa.Server.ISMSTcp.Services
await Task.CompletedTask; await Task.CompletedTask;
} }
//更新事件计划
private async Task UpdateEventPlans()
{
try
{
var list = await _webApiRequest.GetCircuitEventDrivenConfigAsync();
if (list != null)
{
lock (_eventPlanLock)
{
_eventPlanList.Clear();
foreach (var item in list)
_eventPlanList.Enqueue(item);
//获取到绑定信息,可以更新全部状态了
if (_eventPlanList.Count > 0)
_getEventPlanListOk = true;
}
}
}
catch (Exception ex)
{
_getEventPlanListOk = false;
}
await Task.CompletedTask;
}
//检测计划,判断计划是否该执行 //检测计划,判断计划是否该执行
private async Task<bool> CheckPlan() private async Task<bool> CheckPlan()
@ -201,19 +322,19 @@ namespace YunDa.Server.ISMSTcp.Services
{ {
if (item.Plan.ScheduledWeekDaysList.IndexOf(week) != -1) if (item.Plan.ScheduledWeekDaysList.IndexOf(week) != -1)
{ {
if (await ExecutePlan(item.Plan)) await PushPlanToChannel(item.Plan);
{//执行成功
lock (_planLock) lock (_planLock)
{ {
item.ExecuteTime = DateTime.Now; item.ExecuteTime = DateTime.Now;
} }
ret = true; ret = true;
} }
} }
} }
} }
} }
}
catch (Exception ex) catch (Exception ex)
{ {
_logger.LogError(ex, "SecondaryCircuitInspectionPlanService - CheckPlan发生错误"); _logger.LogError(ex, "SecondaryCircuitInspectionPlanService - CheckPlan发生错误");
@ -223,22 +344,21 @@ namespace YunDa.Server.ISMSTcp.Services
} }
//执行计划 //将需要执行计划写到队列里
private async Task<bool> ExecutePlan(SecondaryCircuitInspectionPlanOutput plan) private async Task<bool> PushPlanToChannel(SecondaryCircuitInspectionPlanOutput plan)
{ {
try try
{ {
bool ret = true; bool ret = true;
var engine = new Jint.Engine();
foreach (var item in plan.InspectionItems) foreach (var item in plan.InspectionItems)
{ {
if(!string.IsNullOrWhiteSpace(item.CalculationExpression)) string id = item.Id.ToString();
if(item.IsActive && !string.IsNullOrWhiteSpace(item.CalculationExpression) && !string.IsNullOrWhiteSpace(id))
{ {
//写到队列里,由队列控制执行频率
engine.Execute(item.CalculationExpression); await _singlePlanChannel.Writer.WriteAsync(item);
var result = engine.Invoke("calculate", new List<YCResultData>()).AsNumber();
} }
} }
@ -252,37 +372,224 @@ namespace YunDa.Server.ISMSTcp.Services
return false; return false;
} }
//获取遥测数据 //执行计划
public async Task<List<YCResultData>> CallYCByDataIdAsync(CallYCByDataIdRequest request, CancellationToken cancellationToken = default) private async Task ExecutePlan(SecondaryCircuitInspectionItemOutput item)
{ {
try try
{ {
using var httpClient = new HttpClient(); string id = item.Id.ToString();
string url = $"http://127.0.0.1:{_webApiSettings.Port}/api/CallYCByDataId";
var response = await httpClient.PostAsJsonAsync(url, request);
response.EnsureSuccessStatusCode();
var result = await response.Content.ReadAsStringAsync();
JObject obj = JObject.Parse(result);
if (obj.ContainsKey("data"))
{
JArray jArray = obj.Value<JArray>("data");
if (jArray != null)
{
List<YCResultData> list = jArray.ToObject<List<YCResultData>>();
return list; if (!string.IsNullOrWhiteSpace(id))
{
var engine = new Jint.Engine();
SecondaryCircuitInspectionResultModel resultModel = new SecondaryCircuitInspectionResultModel();
var task1 = GetSourceDataAsync<List<YCResultData>>(id, 1);
var task2 = GetSourceDataAsync<List<YXResultData>>(id, 2);
var task3 = GetSourceDataAsync<List<DeviceDzData>>(id, 3);
var task4 = GetSourceDataAsync<List<InspectionResultData>>(id, 4);
// 等待全部完成
await Task.WhenAll(task1, task2, task3, task4);
resultModel.TelemetryInfoData = await task1;
resultModel.TeleSignalData = await task2;
resultModel.DeviceDzData = await task3;
resultModel.InspectionResultData = await task4;
engine.Execute(item.CalculationExpression);
var result = engine.Invoke("calculate", resultModel).AsObject();
}
}
catch(Exception ex)
{
_logger.LogError(ex, $"SecondaryCircuitInspectionPlanService 执行巡检项失败:{item.Id.ToString()}");
}
}
//获取计划数据
private async Task<T> GetSourceDataAsync<T>(string id, int type, int retryCount = 3)
{
object result = null;
for(int i = 0; i < retryCount; ++i)
{
switch (type)
{
case 1:
result = await _webApiRequest.GetTelemetryInfoData(id);
break;
case 2:
result = await _webApiRequest.GetTeleSignalData(id);
break;
case 3:
result = await _webApiRequest.GetDeviceDzData(id);
break;
case 4:
result = await _webApiRequest.GetInspectionResultData(id);
break;
default:
throw new ArgumentException("Invalid type");
}
if(result != null)
{
break;
}
else
{
await Task.Delay(200);
}
}
return (T)result;
}
//执行事件计划
private async Task<bool> CheckEventPlan()
{
bool ret = false;
if (_eventPlanList == null)
return ret;
try
{
SecondaryCircuitEventDrivenConfigOutput[] planList;
lock (_planLock)
{
planList = _eventPlanList.ToArray();
}
foreach (var item in planList)
{
await Task.Delay(item.DelayTriggerSeconds * 1000);
if (item.IsActive && item.SecondaryCircuitInspectionEventItems?.Count > 0 && !string.IsNullOrWhiteSpace(item.TriggerExpression))
{
string id = item.Id.ToString() ?? "";
if (!string.IsNullOrWhiteSpace(id))
{
var engine = new Jint.Engine();
SecondaryCircuitInspectionResultModel resultModel = new SecondaryCircuitInspectionResultModel();
var task1 = GetSourceDataAsync<List<YCResultData>>(id, 1);
var task2 = GetSourceDataAsync<List<YXResultData>>(id, 2);
// 等待全部完成
await Task.WhenAll(task1, task2);
resultModel.TelemetryInfoData = await task1;
resultModel.TeleSignalData = await task2;
engine.Execute(item.TriggerExpression);
var canCheck = engine.Invoke("calculate", resultModel).AsBoolean();
if(canCheck)
{
await Task.Delay(item.MandatoryWaitSeconds * 1000);
CheckPlanFormIds(item.SecondaryCircuitInspectionEventItems);
}
}
} }
} }
} }
catch (Exception ex) catch (Exception ex)
{ {
_logger.LogError(ex, "SecondaryCircuitInspectionPlanService - CallYCByDataIdAsync发生错误"); _logger.LogError(ex, "SecondaryCircuitInspectionPlanService - CheckEventPlan:发生错误");
} }
return ret;
}
//检测巡检计划中的计划
private async Task CheckPlanFormIds(List<Guid> planIds)
{
if (_planList == null)
return;
try
{
SecondaryCircuitInspectionPlanStateModel[] planList;
lock (_planLock)
{
planList = _planList.ToArray();
}
foreach (var plan in planList)
{
if(plan.Plan.IsActive)
{
foreach (var item in plan.Plan.InspectionItems)
{
if (planIds.IndexOf(item.Id) > -1 && item.IsActive)
{
if (!string.IsNullOrWhiteSpace(item.CalculationExpression) && !string.IsNullOrWhiteSpace(item.Id.ToString()))
{
//写到队列里,由队列控制执行频率
await _singlePlanChannel.Writer.WriteAsync(item);
}
}
}
}
}
}
catch (Exception ex)
{
_logger.LogError(ex, "SecondaryCircuitInspectionPlanService - CheckPlanFormIds :发生错误");
}
return;
}
////获取遥测数据
//public async Task<List<YCResultData>> CallYCByDataIdAsync(CallYCByDataIdRequest request, CancellationToken cancellationToken = default)
//{
// try
// {
// using var httpClient = new HttpClient();
// string url = $"http://127.0.0.1:{_webApiSettings.Port}/api/CallYCByDataId";
// var response = await httpClient.PostAsJsonAsync(url, request);
// response.EnsureSuccessStatusCode();
// var result = await response.Content.ReadAsStringAsync();
// JObject obj = JObject.Parse(result);
// if (obj.ContainsKey("data"))
// {
// JArray jArray = obj.Value<JArray>("data");
// if (jArray != null)
// {
// List<YCResultData> list = jArray.ToObject<List<YCResultData>>();
// return list;
// }
// }
// }
// catch(Exception ex)
// {
// _logger.LogError(ex, "SecondaryCircuitInspectionPlanService - CallYCByDataIdAsync发生错误");
// }
return null; // return null;
} //}
} }
} }

View File

@ -71,6 +71,9 @@ namespace YunDa.Server.ISMSTcp.Services
public volatile bool _isInitialized = false; public volatile bool _isInitialized = false;
private readonly object _initLock = new object(); private readonly object _initLock = new object();
//孪生体服务
private readonly ThingService _thingService;
/// <summary> /// <summary>
/// 构造函数 /// 构造函数
/// </summary> /// </summary>
@ -81,13 +84,15 @@ namespace YunDa.Server.ISMSTcp.Services
IWebSocketPushService webSocketPushService, IWebSocketPushService webSocketPushService,
IApiEndpoints apiEndpoints, IApiEndpoints apiEndpoints,
IRedisRepository<TelemeteringModel, string> telemeteringModelListRedis, IRedisRepository<TelemeteringModel, string> telemeteringModelListRedis,
IRedisRepository<ProtectionDeviceCommInfoOutput, string> protectionDeviceCommInfoRedis) IRedisRepository<ProtectionDeviceCommInfoOutput, string> protectionDeviceCommInfoRedis,
ThingService thingService)
{ {
_apiEndpoints = apiEndpoints ?? throw new ArgumentNullException(nameof(apiEndpoints)); _apiEndpoints = apiEndpoints ?? throw new ArgumentNullException(nameof(apiEndpoints));
_logger = logger ?? throw new ArgumentNullException(nameof(logger)); _logger = logger ?? throw new ArgumentNullException(nameof(logger));
_telemeteringModelListRedis = telemeteringModelListRedis ?? throw new ArgumentNullException(nameof(telemeteringModelListRedis)); _telemeteringModelListRedis = telemeteringModelListRedis ?? throw new ArgumentNullException(nameof(telemeteringModelListRedis));
_webSocketPushService = webSocketPushService ?? throw new ArgumentNullException(nameof(webSocketPushService)); _webSocketPushService = webSocketPushService ?? throw new ArgumentNullException(nameof(webSocketPushService));
_protectionDeviceCommInfoRedis = protectionDeviceCommInfoRedis ?? throw new ArgumentNullException(nameof(protectionDeviceCommInfoRedis)); _protectionDeviceCommInfoRedis = protectionDeviceCommInfoRedis ?? throw new ArgumentNullException(nameof(protectionDeviceCommInfoRedis));
_thingService = thingService ?? throw new ArgumentNullException(nameof(thingService));
// 🔧 新增:初始化批量推送定时器 // 🔧 新增:初始化批量推送定时器
_batchPushTimer = new Timer(ProcessBatchPush, null, BATCH_PUSH_INTERVAL_MS, BATCH_PUSH_INTERVAL_MS); _batchPushTimer = new Timer(ProcessBatchPush, null, BATCH_PUSH_INTERVAL_MS, BATCH_PUSH_INTERVAL_MS);
@ -359,6 +364,7 @@ namespace YunDa.Server.ISMSTcp.Services
_logger.LogWarning("无法解析遥测数据项: {Items}", ycItems?.ToString()); _logger.LogWarning("无法解析遥测数据项: {Items}", ycItems?.ToString());
return (0, ycItems?.Count ?? 0); return (0, ycItems?.Count ?? 0);
} }
Guid equipmentId = default; Guid equipmentId = default;
foreach (var ycDataModel in ycDataModels) foreach (var ycDataModel in ycDataModels)
{ {

View File

@ -19,6 +19,7 @@ using YunDa.SOMS.DataTransferObject.GeneralInformation.EquipmentLiveDataDto;
using YunDa.SOMS.DataTransferObject.GeneralInformation.ProtectionDeviceInfoDto; using YunDa.SOMS.DataTransferObject.GeneralInformation.ProtectionDeviceInfoDto;
using YunDa.SOMS.Entities.GeneralInformation; using YunDa.SOMS.Entities.GeneralInformation;
using YunDa.SOMS.Redis.Repositories; using YunDa.SOMS.Redis.Repositories;
using static System.Runtime.InteropServices.JavaScript.JSType;
namespace YunDa.Server.ISMSTcp.Services namespace YunDa.Server.ISMSTcp.Services
{ {
@ -51,6 +52,13 @@ namespace YunDa.Server.ISMSTcp.Services
private readonly object _initLock = new object(); private readonly object _initLock = new object();
private readonly object _expressionInitLock = new object(); private readonly object _expressionInitLock = new object();
//遥信数据缓冲队列
private static readonly ConcurrentQueue<(DateTime time, YXCacheData yxData)> _yxDataBuffer = new ConcurrentQueue<(DateTime time, YXCacheData yxData)>();
private const int YXDATA_BUFFER_SECONDS = 60; //只保留60秒的数据
private static int _yxDataRecvCount = 0; //每100次清除一下超时数据用于计数
/// <summary> /// <summary>
/// 构造函数 /// 构造函数
/// </summary> /// </summary>
@ -304,27 +312,28 @@ namespace YunDa.Server.ISMSTcp.Services
/// <param name="yxData">遥信数据JSON令牌</param> /// <param name="yxData">遥信数据JSON令牌</param>
/// <param name="skipForwarding">是否跳过转发逻辑仅更新Redis</param> /// <param name="skipForwarding">是否跳过转发逻辑仅更新Redis</param>
/// <returns>处理任务</returns> /// <returns>处理任务</returns>
public async Task ProcessYXDataAsync(JToken yxData, bool skipForwarding = false) public async Task<TelesignalisationModel?> ProcessYXDataAsync(JToken yxData, bool skipForwarding = false)
{ {
try try
{ {
if (yxData == null) if (yxData == null)
{ {
_logger.LogWarning("遥信数据为空,跳过处理"); _logger.LogWarning("遥信数据为空,跳过处理");
return; return null;
} }
if (!_isInitialized) if (!_isInitialized)
{ {
_logger.LogWarning("遥信数据映射尚未初始化,跳过处理"); _logger.LogWarning("遥信数据映射尚未初始化,跳过处理");
return; return null;
} }
await ProcessSingleYXDataAsync(yxData, skipForwarding); return await ProcessSingleYXDataAsync(yxData, skipForwarding);
//_logger.LogInformation("遥信数据处理完成"); //_logger.LogInformation("遥信数据处理完成");
} }
catch (Exception ex) catch (Exception ex)
{ {
_logger.LogError(ex, "处理遥信数据时发生错误: {YXData}", yxData?.ToString()); _logger.LogError(ex, "处理遥信数据时发生错误: {YXData}", yxData?.ToString());
throw; throw;
} }
} }
@ -334,7 +343,7 @@ namespace YunDa.Server.ISMSTcp.Services
/// <param name="yxItem">单个遥信数据项</param> /// <param name="yxItem">单个遥信数据项</param>
/// <param name="skipForwarding">是否跳过转发逻辑仅更新Redis</param> /// <param name="skipForwarding">是否跳过转发逻辑仅更新Redis</param>
/// <returns>处理任务</returns> /// <returns>处理任务</returns>
private async Task ProcessSingleYXDataAsync(JToken yxItem, bool skipForwarding = false) private async Task<TelesignalisationModel?> ProcessSingleYXDataAsync(JToken yxItem, bool skipForwarding = false)
{ {
try try
{ {
@ -343,14 +352,14 @@ namespace YunDa.Server.ISMSTcp.Services
if (yxDataModel == null) if (yxDataModel == null)
{ {
_logger.LogWarning("无法解析遥信数据项: {Item}", yxItem?.ToString()); _logger.LogWarning("无法解析遥信数据项: {Item}", yxItem?.ToString());
return; return null;
} }
// 使用映射字典查找对应的haskey // 使用映射字典查找对应的haskey
if (!_yxIdToHashKeyMapping.TryGetValue(yxDataModel.YX_ID, out string haskey)) if (!_yxIdToHashKeyMapping.TryGetValue(yxDataModel.YX_ID, out string haskey))
{ {
// _logger.LogWarning("未找到YX_ID {YxId} 对应的映射", yxDataModel.YX_ID); // _logger.LogWarning("未找到YX_ID {YxId} 对应的映射", yxDataModel.YX_ID);
return; return null;
} }
// 构建Redis键 // 构建Redis键
@ -361,7 +370,7 @@ namespace YunDa.Server.ISMSTcp.Services
if (telesignalisationModel == null) if (telesignalisationModel == null)
{ {
_logger.LogWarning("Redis中未找到遥信数据: RedisKey={RedisKey}, HasKey={HasKey}", redisKey, haskey); _logger.LogWarning("Redis中未找到遥信数据: RedisKey={RedisKey}, HasKey={HasKey}", redisKey, haskey);
return; return null;
} }
// 解析时间 // 解析时间
@ -391,12 +400,23 @@ namespace YunDa.Server.ISMSTcp.Services
string channel = $"{_telesignalisationInflectionInflectionZZChannelRediskey}_Zongzi"; string channel = $"{_telesignalisationInflectionInflectionZZChannelRediskey}_Zongzi";
_telesignalisationModelListRedis.PublishAsync(channel, telesignalisationModel); _telesignalisationModelListRedis.PublishAsync(channel, telesignalisationModel);
//缓存遥信数据到内存
YXCacheData yXCacheData = new YXCacheData
{
Id = yxDataModel.YX_ID,
Value = yxDataModel.V,
ValueStr = telesignalisationModel.ResultValueStr,
TimeStamp = resultTime.ToString("yyyy-MM-dd HH:mm:ss")
};
SaveYxDataToCache(resultTime, yXCacheData);
// 🔧 优化如果跳过转发仅更新Redis后直接返回 // 🔧 优化如果跳过转发仅更新Redis后直接返回
if (skipForwarding) if (skipForwarding)
{ {
_logger.LogDebug("跳过转发逻辑 - YX_ID: {YxId}", yxDataModel.YX_ID); _logger.LogDebug("跳过转发逻辑 - YX_ID: {YxId}", yxDataModel.YX_ID);
return; return telesignalisationModel;
} }
// 提取设备ID并调用数据变位信号API // 提取设备ID并调用数据变位信号API
@ -409,7 +429,7 @@ namespace YunDa.Server.ISMSTcp.Services
{ {
_logger.LogDebug("设备处于检修状态,跳过遥信数据推送 - EquipmentInfoId: {EquipmentId}", equipmentId); _logger.LogDebug("设备处于检修状态,跳过遥信数据推送 - EquipmentInfoId: {EquipmentId}", equipmentId);
// 设备处于检修状态,跳过数据推送 // 设备处于检修状态,跳过数据推送
return; return telesignalisationModel;
} }
// WebSocket实时推送 // WebSocket实时推送
@ -429,12 +449,129 @@ namespace YunDa.Server.ISMSTcp.Services
{ {
await ProcessLogicExpressionsAsync(yxDataModel.YX_ID, yxDataModel.V); await ProcessLogicExpressionsAsync(yxDataModel.YX_ID, yxDataModel.V);
} }
return telesignalisationModel;
} }
catch (Exception ex) catch (Exception ex)
{ {
_logger.LogError(ex, "处理单个遥信数据项时发生错误: {Item}", yxItem?.ToString()); _logger.LogError(ex, "处理单个遥信数据项时发生错误: {Item}", yxItem?.ToString());
return null;
} }
} }
//缓存遥信数据到内存
private void SaveYxDataToCache(DateTime dateTime, YXCacheData yxData)
{
_yxDataBuffer.Enqueue((dateTime, yxData));
// 每100次接收清理一次过期数据
if (Interlocked.Increment(ref _yxDataRecvCount) % 100 == 0)
{
var now = DateTime.Now;
while (_yxDataBuffer.TryPeek(out var oldest) &&
(now - oldest.time).TotalSeconds > YXDATA_BUFFER_SECONDS)
{
_yxDataBuffer.TryDequeue(out _);
}
}
}
public async Task<List<YXResultData>> GetYXDataByDataIds(List<string> ids, int times, CancellationToken cancellationToken)
{
var startTime = DateTime.Now;
var resultDict = new Dictionary<string, List<YXCacheData>>(); // key = YX_ID
// 初始化每个ID的结果列表
foreach (var id in ids)
resultDict[id] = new List<YXCacheData>();
var tcs = new TaskCompletionSource();
// 异步监听任务
_ = Task.Run(async () =>
{
while ((DateTime.Now - startTime).TotalSeconds < times)
{
var matched = _yxDataBuffer
.Where(x => x.time >= startTime && ids.Contains(x.yxData.Id))
.ToList();
// 按 ID 分类
foreach (var id in ids)
{
var datas = matched.Where(x => x.yxData.Id == id)
.Select(x => x.yxData)
.ToList();
lock (resultDict[id])
{
resultDict[id].Clear();
resultDict[id].AddRange(datas);
}
}
await Task.Delay(500, cancellationToken);
}
tcs.TrySetResult();
}, cancellationToken);
await Task.WhenAny(tcs.Task, Task.Delay(times * 1000, cancellationToken));
// 构建最终返回数据
var finalResult = new List<YXResultData>();
foreach (var id in ids)
{
var yxResult = new YXResultData
{
Id = id,
};
lock (resultDict[id])
{
yxResult.ParseValue(resultDict[id]);
}
finalResult.Add(yxResult);
}
//设置Name和DispatcherAddress并且检查数据是否为空如果为空从Redis里取最新值
if(!_isInitialized)
{
await InitAsync();
}
string redisKey = $"{_telesignalisationModelListRediskey}_Zongzi";
foreach (var item in finalResult)
{
if (!_yxIdToHashKeyMapping.TryGetValue(item.Id, out string haskey))
{
continue;
}
var telesignalisationModel = await _telesignalisationModelListRedis.HashSetGetOneAsync(redisKey, haskey);
if (telesignalisationModel == null)
{
_logger.LogWarning("Redis中未找到遥信数据: RedisKey={RedisKey}, HasKey={HasKey}", redisKey, haskey);
continue; // 继续处理下一个haskey
}
item.Name = telesignalisationModel.Name;
item.DispatcherAddress = telesignalisationModel.DispatcherAddress;
if (item.Data.Count == 0)
{
item.Value = telesignalisationModel.ResultValue;
item.ValueStr = telesignalisationModel.ResultValueStr;
item.TimeStamp = telesignalisationModel.ResultTime.ToString("yyyy-MM-dd HH:mm:ss");
}
}
return finalResult;
}
private readonly ConcurrentQueue<DateTime> _callTimestamps = new ConcurrentQueue<DateTime>(); private readonly ConcurrentQueue<DateTime> _callTimestamps = new ConcurrentQueue<DateTime>();
private readonly SemaphoreSlim _rateLimitLock = new SemaphoreSlim(1, 1); private readonly SemaphoreSlim _rateLimitLock = new SemaphoreSlim(1, 1);
private const int MaxCallsPerWindow = 300; private const int MaxCallsPerWindow = 300;

File diff suppressed because it is too large Load Diff

View File

@ -84,8 +84,12 @@ namespace ToolLibrary
var paramStr = ObjectToJsonStr(dicParams); var paramStr = ObjectToJsonStr(dicParams);
byte[] buffer = encoding.GetBytes(paramStr); byte[] buffer = encoding.GetBytes(paramStr);
request.ContentLength = buffer.Length; request.ContentLength = buffer.Length;
requestStream = request.GetRequestStream();
requestStream.Write(buffer, 0, buffer.Length); using (var stream = await request.GetRequestStreamAsync())
{
await stream.WriteAsync(buffer, 0, buffer.Length);
}
} }
return await GetResponseValueAsync<T>(request, timeout); return await GetResponseValueAsync<T>(request, timeout);
} }