diff --git a/src/YunDa.Application/YunDa.SOMS.MongoDB.Application/YunDa.SOMS.MongoDB.Application.xml b/src/YunDa.Application/YunDa.SOMS.MongoDB.Application/YunDa.SOMS.MongoDB.Application.xml index 007af48..d7807cb 100644 --- a/src/YunDa.Application/YunDa.SOMS.MongoDB.Application/YunDa.SOMS.MongoDB.Application.xml +++ b/src/YunDa.Application/YunDa.SOMS.MongoDB.Application/YunDa.SOMS.MongoDB.Application.xml @@ -1236,12 +1236,6 @@ - - - 测试报警api - - - 获取报警信息 diff --git a/src/YunDa.Server/YunDa.Server.ISMSTcp/Configuration/ApiEndpointsConfiguration.cs b/src/YunDa.Server/YunDa.Server.ISMSTcp/Configuration/ApiEndpointsConfiguration.cs index 2ebf3bb..db78372 100644 --- a/src/YunDa.Server/YunDa.Server.ISMSTcp/Configuration/ApiEndpointsConfiguration.cs +++ b/src/YunDa.Server/YunDa.Server.ISMSTcp/Configuration/ApiEndpointsConfiguration.cs @@ -167,6 +167,9 @@ namespace YunDa.Server.ISMSTcp.Configuration //获取网关信息 public string GetGateWayBaseInfoUri => $"{BaseUrl.TrimEnd('/')}/api/services/SOMS/SecondaryCircuitInspectionItem/GetGateWayBaseInfo"; + //获取没有调用ai的巡检结果记录 + public string GetFailedAiInspectionListUri => $"{BaseUrl.TrimEnd('/')}/api/services/SOMS/SecondaryCircuitInspectionItem/GetGateWayBaseInfo"; + } } diff --git a/src/YunDa.Server/YunDa.Server.ISMSTcp/Domain/ApiEndpoints.cs b/src/YunDa.Server/YunDa.Server.ISMSTcp/Domain/ApiEndpoints.cs index ab00ff0..1145ef5 100644 --- a/src/YunDa.Server/YunDa.Server.ISMSTcp/Domain/ApiEndpoints.cs +++ b/src/YunDa.Server/YunDa.Server.ISMSTcp/Domain/ApiEndpoints.cs @@ -125,6 +125,9 @@ namespace YunDa.Server.ISMSTcp.Domain //获取网关信息 string GetGateWayBaseInfoUri { get; } + //获取没有调用ai的巡检结果记录 + string GetFailedAiInspectionListUri { get; } + } /// @@ -265,5 +268,8 @@ namespace YunDa.Server.ISMSTcp.Domain //获取网关信息 public string GetGateWayBaseInfoUri => _config.GetGateWayBaseInfoUri; + //获取没有调用ai的巡检结果记录 + public string GetFailedAiInspectionListUri => _config.GetFailedAiInspectionListUri; + } } diff --git a/src/YunDa.Server/YunDa.Server.ISMSTcp/Domain/WebApiRequest.cs b/src/YunDa.Server/YunDa.Server.ISMSTcp/Domain/WebApiRequest.cs index c309ee4..ae08df9 100644 --- a/src/YunDa.Server/YunDa.Server.ISMSTcp/Domain/WebApiRequest.cs +++ b/src/YunDa.Server/YunDa.Server.ISMSTcp/Domain/WebApiRequest.cs @@ -1118,5 +1118,28 @@ namespace YunDa.Server.ISMSTcp.Domain return null; } + + //获取没有调用ai的巡检结果记录 + public async Task> GetFailedAiInspectionListAsync() + { + try + { + var response = await Task.Run(() => ToolLibrary.HttpHelper.HttpGetRequest(_apiEndpoints.GetFailedAiInspectionListUri)); + + if (response != null) + { + + var result = ExtractDataFromAbpResponse>(response); + + return result; + } + } + catch (Exception ex) + { + _logger.LogError(ex, "Error Call GetOpticalCableConfigAsync Api"); + } + + return null; + } } } diff --git a/src/YunDa.Server/YunDa.Server.ISMSTcp/Helpers/ChannelEx.cs b/src/YunDa.Server/YunDa.Server.ISMSTcp/Helpers/ChannelEx.cs new file mode 100644 index 0000000..cb1908d --- /dev/null +++ b/src/YunDa.Server/YunDa.Server.ISMSTcp/Helpers/ChannelEx.cs @@ -0,0 +1,69 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; + +namespace YunDa.Server.ISMSTcp.Helpers +{ + public class ChannelEx where TKey : notnull + { + private readonly Channel _channel; + private readonly ConcurrentDictionary _exists = new ConcurrentDictionary(); + private readonly Func _keySelector; + + public ChannelEx(Func keySelector) + { + _channel = Channel.CreateUnbounded(); + _keySelector = keySelector ?? throw new ArgumentNullException(nameof(keySelector)); + } + + // 判断是否有内容 + public bool HasItems => !_exists.IsEmpty; + + // 判断某条记录是否存在 + public bool Contains(T item) => _exists.ContainsKey(_keySelector(item)); + + // 异步添加 + public async ValueTask WriteAsync(T item) + { + if (!TryAdd(item)) + return false; + + await _channel.Writer.WriteAsync(item); + return true; + } + + // 消费单个元素 + public async ValueTask ReadAsync() + { + var item = await _channel.Reader.ReadAsync(); + _exists.TryRemove(_keySelector(item), out _); + return item; + } + + // 封装成 IAsyncEnumerable,可直接 await foreach + public async IAsyncEnumerable ReadAllAsync([System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default) + { + while (await _channel.Reader.WaitToReadAsync(cancellationToken)) + { + while (_channel.Reader.TryRead(out var item)) + { + _exists.TryRemove(_keySelector(item), out _); + yield return item; + } + } + } + + + // 内部去重逻辑 + private bool TryAdd(T item) + { + var key = _keySelector(item); + return _exists.TryAdd(key, true); + } + } + + +} diff --git a/src/YunDa.Server/YunDa.Server.ISMSTcp/Services/GwErrorRatioService.cs b/src/YunDa.Server/YunDa.Server.ISMSTcp/Services/GwErrorRatioService.cs index f5a5ebd..758d8ca 100644 --- a/src/YunDa.Server/YunDa.Server.ISMSTcp/Services/GwErrorRatioService.cs +++ b/src/YunDa.Server/YunDa.Server.ISMSTcp/Services/GwErrorRatioService.cs @@ -24,7 +24,7 @@ namespace YunDa.Server.ISMSTcp.Services //数据缓冲队列 - private ZzDataCacheContainer _cacheContainer = new ZzDataCacheContainer(2 * 60 + 1);//保留2小时的数据,多1分钟,兼容发送送数据发送延迟情况(每1小时发送一次) + private ZzDataCacheContainer _cacheContainer = new ZzDataCacheContainer(ZzDataCacheContainerDataType.eGW, 2 * 60 + 1);//保留2小时的数据,多1分钟,兼容发送送数据发送延迟情况(每1小时发送一次) public GwErrorRatioService(ILogger logger, ZzDataCacheContainerInit zzDataCacheContainerInit) { diff --git a/src/YunDa.Server/YunDa.Server.ISMSTcp/Services/SecondaryCircuitInspectionPlanService.cs b/src/YunDa.Server/YunDa.Server.ISMSTcp/Services/SecondaryCircuitInspectionPlanService.cs index 4961452..43f8f11 100644 --- a/src/YunDa.Server/YunDa.Server.ISMSTcp/Services/SecondaryCircuitInspectionPlanService.cs +++ b/src/YunDa.Server/YunDa.Server.ISMSTcp/Services/SecondaryCircuitInspectionPlanService.cs @@ -31,6 +31,8 @@ using YunDa.SOMS.DataTransferObject.DataMonitoring.SecondaryCircuitInspection; using YunDa.SOMS.DataTransferObject.DataMonitoring.SecondaryCircuitInspection.Configurations; using YunDa.SOMS.DataTransferObject.MaintenanceAndOperations.SecondaryEquipment; using YunDa.SOMS.Entities.DataMonitoring.SecondaryCircuitInspection; +using YunDa.Server.ISMSTcp.Helpers; +using Abp.AutoMapper; namespace YunDa.Server.ISMSTcp.Services { @@ -155,8 +157,14 @@ namespace YunDa.Server.ISMSTcp.Services public class SecondaryCircuitInspectionAiParamModel { public string InspectionResultId { get; set; } = string.Empty; - public string PlanName { get; set; } = string.Empty; - public SecondaryCircuitInspectionResultSaveModel SaveModel { get; set; } + public string InspectionName { get; set; } = string.Empty; + public string InspectionDescription { get; set; } = string.Empty; + public string PlanName => string.Concat(InspectionName, " ", InspectionDescription); + public string Status { get; set; } = string.Empty; //"异常|正常|故障", + public string InspectionResult { get; set; } = string.Empty; //巡检结果 inspectionResult不能超过15个字符 + public string CalculationProcess { get; set; } = string.Empty; //计算过程 + public string VerificationResult { get; set; } = string.Empty; //校验结果 + } public class SecondaryCircuitInspectionAiSaveModel @@ -184,7 +192,7 @@ namespace YunDa.Server.ISMSTcp.Services private int _planCheckDay = 0; private readonly Channel _singlePlanChannel; //巡检计划 - private readonly Channel _aiChannel; //Ai调用 + private readonly ChannelEx _aiChannel; //Ai调用 //巡检事件 private Queue _eventPlanList; @@ -208,7 +216,7 @@ namespace YunDa.Server.ISMSTcp.Services _singlePlanChannel = Channel.CreateUnbounded(); - _aiChannel = Channel.CreateUnbounded(); + _aiChannel = new ChannelEx( item => item.InspectionResultId); _eventPlanList = new Queue(); @@ -226,11 +234,13 @@ namespace YunDa.Server.ISMSTcp.Services { while (true) {//每30秒更新一下配置 - + await UpdatePlans(); await UpdateEventPlans(); + //await CheckAiChannel(); + await Task.Delay(30000); } }); @@ -302,7 +312,7 @@ namespace YunDa.Server.ISMSTcp.Services var rand = new Random(Guid.NewGuid().GetHashCode()); - await foreach (var item in _aiChannel.Reader.ReadAllAsync()) + await foreach (var item in _aiChannel.ReadAllAsync()) { // 让每个线程在执行之间有不同的节奏 await Task.Delay(rand.Next(0, 300)); @@ -683,11 +693,18 @@ namespace YunDa.Server.ISMSTcp.Services if (!string.IsNullOrWhiteSpace(inspectionResultId) && saveData.Status != "正常") { SecondaryCircuitInspectionAiParamModel aiParamModel = new SecondaryCircuitInspectionAiParamModel(); - aiParamModel.InspectionResultId = inspectionResultId; - aiParamModel.PlanName = $"{itemEx.Item.Name} {itemEx.Item.Description}"; - aiParamModel.SaveModel = saveData; - _aiChannel.Writer.WriteAsync(aiParamModel); + aiParamModel.InspectionResultId = inspectionResultId; + + aiParamModel.InspectionName = item.Name; + aiParamModel.InspectionDescription = item.Description; + + aiParamModel.Status = saveData.Status; + aiParamModel.InspectionResult = saveData.InspectionResult; + aiParamModel.CalculationProcess = saveData.CalculationProcess; + aiParamModel.VerificationResult = saveData.VerificationResult; + + _aiChannel.WriteAsync(aiParamModel); } } @@ -700,6 +717,20 @@ namespace YunDa.Server.ISMSTcp.Services } } + //获取没有调用Ai接口或调用失败的巡检记录结果 + private async Task CheckAiChannel() + { + if (!_aiChannel.HasItems) + { + var list = await _webApiRequest.GetFailedAiInspectionListAsync(); + if (list != null) + { + foreach (var item in list) + await _aiChannel.WriteAsync(item); + } + } + } + private async Task CallAiAndSave(SecondaryCircuitInspectionAiParamModel aiParamModel) { try @@ -765,10 +796,10 @@ namespace YunDa.Server.ISMSTcp.Services sb.AppendLine(aiParamModel.PlanName); sb.AppendLine(); sb.AppendLine("巡检结果为:"); - sb.AppendLine($"巡检状态: {aiParamModel.SaveModel.Status}"); - sb.AppendLine($"计算过程: {aiParamModel.SaveModel.CalculationProcess}"); - sb.AppendLine($"巡检结果: {aiParamModel.SaveModel.InspectionResult}"); - sb.AppendLine($"校验结果: {aiParamModel.SaveModel.VerificationResult}"); + sb.AppendLine($"巡检状态: {aiParamModel.Status}"); + sb.AppendLine($"计算过程: {aiParamModel.CalculationProcess}"); + sb.AppendLine($"巡检结果: {aiParamModel.InspectionResult}"); + sb.AppendLine($"校验结果: {aiParamModel.VerificationResult}"); sb.AppendLine(); sb.AppendLine(); @@ -797,10 +828,10 @@ namespace YunDa.Server.ISMSTcp.Services // 巡检结果 sb.AppendLine("巡检结果为:"); - sb.AppendLine($"巡检状态: {aiParamModel.SaveModel.Status}"); - sb.AppendLine($"计算过程: {aiParamModel.SaveModel.CalculationProcess}"); - sb.AppendLine($"巡检结果: {aiParamModel.SaveModel.InspectionResult}"); - sb.AppendLine($"校验结果: {aiParamModel.SaveModel.VerificationResult}"); + sb.AppendLine($"巡检状态: {aiParamModel.Status}"); + sb.AppendLine($"计算过程: {aiParamModel.CalculationProcess}"); + sb.AppendLine($"巡检结果: {aiParamModel.InspectionResult}"); + sb.AppendLine($"校验结果: {aiParamModel.VerificationResult}"); sb.AppendLine(); sb.AppendLine(); @@ -1004,10 +1035,10 @@ namespace YunDa.Server.ISMSTcp.Services } - + ////获取遥测数据 - //public async Task> CallYCByDataIdAsync(CallYCByDataIdRequest request, CancellationToken cancellationToken = default) + //public async Task> CallYCByDataIdAsync(ZzDataRequestModel request, CancellationToken cancellationToken = default) //{ // try // { @@ -1023,13 +1054,13 @@ namespace YunDa.Server.ISMSTcp.Services // JArray jArray = obj.Value("data"); // if (jArray != null) // { - // List list = jArray.ToObject>(); + // List list = jArray.ToObject>(); // return list; // } // } // } - // catch(Exception ex) + // catch (Exception ex) // { // _logger.LogError(ex, "SecondaryCircuitInspectionPlanService - CallYCByDataIdAsync:发生错误"); // } diff --git a/src/YunDa.Server/YunDa.Server.ISMSTcp/Services/TelemeteringHandle.cs b/src/YunDa.Server/YunDa.Server.ISMSTcp/Services/TelemeteringHandle.cs index 1b44521..a0926f3 100644 --- a/src/YunDa.Server/YunDa.Server.ISMSTcp/Services/TelemeteringHandle.cs +++ b/src/YunDa.Server/YunDa.Server.ISMSTcp/Services/TelemeteringHandle.cs @@ -67,7 +67,7 @@ namespace YunDa.Server.ISMSTcp.Services //private readonly Timer _cleanupTimer; //private const int CLEANUP_INTERVAL_MS = 60000; // 1分钟清理一次 //private const int DATA_EXPIRATION_MINUTES = 5; // 数据保留5分钟 - private ZzDataCacheContainer _zzDataCacheContainer = new ZzDataCacheContainer(3); // 数据保留3分钟 + private ZzDataCacheContainer _zzDataCacheContainer = new ZzDataCacheContainer(ZzDataCacheContainerDataType.eYC, 3); // 数据保留3分钟 // 初始化状态 public volatile bool _isInitialized = false; @@ -101,6 +101,8 @@ namespace YunDa.Server.ISMSTcp.Services // 🔧 新增:初始化数据清理定时器 //_cleanupTimer = new Timer(CleanupExpiredData, null, CLEANUP_INTERVAL_MS, CLEANUP_INTERVAL_MS); + + _zzDataCacheContainer.SetTelemeteringHandle(this); } /// @@ -420,11 +422,6 @@ namespace YunDa.Server.ISMSTcp.Services // 保存到Redis await _telemeteringModelListRedis.HashSetUpdateOneAsync(redisKey, haskey, telemeteringModel); - //if(ycDataModel.YC_ID == "YCB001101001") - //{ - // Debug.WriteLine("YCB001101001 : ", ycDataModel.V.ToString(), telemeteringModel.ResultValue.ToString(), telemeteringModel.Coefficient.ToString()); - //} - //保存到缓存 _zzDataCacheContainer.Write(ycDataModel.YC_ID, (float)ycDataModel.V, telemeteringModel.ResultTime, telemeteringModel.Name, $"{(float)ycDataModel.V} {telemeteringModel.Unit}", telemeteringModel.DispatcherAddress); @@ -461,6 +458,10 @@ namespace YunDa.Server.ISMSTcp.Services // 简化的批量推送处理 CollectTelemeteringDataForBatchPush(batchTelemeteringModels); + + + //将状态推送到孪生体 + _thingService.UpdateThingYCStatus(batchTelemeteringModels); } // 等待所有数据变位信号API调用完成(不阻塞主流程) @@ -673,7 +674,32 @@ namespace YunDa.Server.ISMSTcp.Services return result; } - + public async Task GetDataFromRedis(string ycId) + { + string redisKey = $"{_telemeteringModelListRediskey}_Zongzi"; + + if (!_ycIdToHashKeysMapping.TryGetValue(ycId, out List haskeys)) + { + return null; + } + + if (haskeys.Count >= 0) + { + string haskey = haskeys.First(); + + var telemeteringModel = await _telemeteringModelListRedis.HashSetGetOneAsync(redisKey, haskey); + + return telemeteringModel; + } + + return null; + } + + public void LogWarning(string msg) + { + _logger.LogWarning(msg); + } + /// /// 🔧 新增:释放资源 diff --git a/src/YunDa.Server/YunDa.Server.ISMSTcp/Services/TelesignalisationHandle.cs b/src/YunDa.Server/YunDa.Server.ISMSTcp/Services/TelesignalisationHandle.cs index 2e0ad24..62d6cf0 100644 --- a/src/YunDa.Server/YunDa.Server.ISMSTcp/Services/TelesignalisationHandle.cs +++ b/src/YunDa.Server/YunDa.Server.ISMSTcp/Services/TelesignalisationHandle.cs @@ -53,7 +53,7 @@ namespace YunDa.Server.ISMSTcp.Services private readonly object _expressionInitLock = new object(); //遥信数据缓冲队列 - private ZzDataCacheContainer _zzDataCacheContainer = new ZzDataCacheContainer(30); //只保留5分钟的数据 + private ZzDataCacheContainer _zzDataCacheContainer = new ZzDataCacheContainer(ZzDataCacheContainerDataType.eYX, 30); //只保留5分钟的数据 diff --git a/src/YunDa.Server/YunDa.Server.ISMSTcp/Services/ThingService.cs b/src/YunDa.Server/YunDa.Server.ISMSTcp/Services/ThingService.cs index 3767c79..b0162a7 100644 --- a/src/YunDa.Server/YunDa.Server.ISMSTcp/Services/ThingService.cs +++ b/src/YunDa.Server/YunDa.Server.ISMSTcp/Services/ThingService.cs @@ -632,6 +632,7 @@ namespace YunDa.Server.ISMSTcp.Services await UpdateOpticalCableAlarmStatus(null, telesignalisationModels); } + //推送全体性能数据 private int _updateAllSimDatasCnt = 0; @@ -682,11 +683,6 @@ namespace YunDa.Server.ISMSTcp.Services { if (_simDatasConfig.TryGetValue(equipmentInfoId, out var bindingItem)) { - //if (bindingItem.TwinId == "2F-kzs-024") - //{ - // int k = 0; - //} - if (bindingItem.IsActive) { var status = new ThingDeviceStatusModel { TwinID = bindingItem.TwinId.Trim(), Metric = item.Name.Trim(), Val = $"{item.ResultValue} {item.Unit}" }; @@ -860,8 +856,8 @@ namespace YunDa.Server.ISMSTcp.Services { if (!string.IsNullOrWhiteSpace(item.ismsbaseYXId)) { - var configItem = _networkCableConfig.FirstOrDefault(e => e.LogicalExpressions.Contains(item.ismsbaseYXId)); - if (configItem != null) + var configItems = _networkCableConfig.Where(e => e.LogicalExpressions.Contains(item.ismsbaseYXId)); + foreach (var configItem in configItems) { if (configItem.IsActive) { @@ -956,8 +952,8 @@ namespace YunDa.Server.ISMSTcp.Services { if (!string.IsNullOrWhiteSpace(item.VA_ID)) { - var configItem = configs.FirstOrDefault(e => e.VirtualPointCodes.Contains(item.VA_ID)); - if (configItem != null) + var configItems = configs.Where(e => e.VirtualPointCodes.Contains(item.VA_ID)); + foreach (var configItem in configItems) { if (configItem.IsActive) { @@ -997,10 +993,10 @@ namespace YunDa.Server.ISMSTcp.Services if (!string.IsNullOrWhiteSpace(item.ismsbaseYCId)) { //性能数据 - var configItem = configs.FirstOrDefault(e => e.LinkageDatas.Contains(item.ismsbaseYCId)); - if (configItem != null) + var configItems = configs.Where(e => e.LinkageDatas.Contains(item.ismsbaseYCId)); + foreach (var configItem in configItems) { - if(configItem.IsActive) + if (configItem.IsActive) { var status = new ThingDeviceStatusModel { TwinID = configItem.TwinId.Trim(), Metric = item.Name.Trim(), Val = $"{item.ResultValue} {item.Unit}" }; @@ -1017,8 +1013,8 @@ namespace YunDa.Server.ISMSTcp.Services foreach (var item in telesignalisationModels) { //报警 - var configItem = configs.FirstOrDefault(e => e.VirtualPointCodes.Contains(item.ismsbaseYXId)); - if (configItem != null) + var configItems = configs.Where(e => e.VirtualPointCodes.Contains(item.ismsbaseYXId)); + foreach (var configItem in configItems) { if (configItem.IsActive) { @@ -1038,8 +1034,8 @@ namespace YunDa.Server.ISMSTcp.Services } //性能数据 - configItem = configs.FirstOrDefault(e => e.LinkageDatas.Contains(item.ismsbaseYXId)); - if (configItem != null) + configItems = configs.Where(e => e.LinkageDatas.Contains(item.ismsbaseYXId)); + foreach (var configItem in configItems) { if (configItem.IsActive) { @@ -1143,8 +1139,8 @@ namespace YunDa.Server.ISMSTcp.Services { if (!string.IsNullOrWhiteSpace(item.VA_ID)) { - var configItem = configs.FirstOrDefault(e => e.LogicalExpressions.Contains(item.VA_ID)); - if (configItem != null) + var configItems = configs.Where(e => e.LogicalExpressions.Contains(item.VA_ID)); + foreach (var configItem in configItems) { if (configItem.IsActive) { @@ -1185,8 +1181,8 @@ namespace YunDa.Server.ISMSTcp.Services if (!string.IsNullOrWhiteSpace(item.ismsbaseYXId)) { //报警 - var configItem = configs.FirstOrDefault(e => e.LogicalExpressions.Contains(item.ismsbaseYXId)); - if (configItem != null) + var configItems = configs.Where(e => e.LogicalExpressions.Contains(item.ismsbaseYXId)); + foreach (var configItem in configItems) { if (configItem.IsActive) { diff --git a/src/YunDa.Server/YunDa.Server.ISMSTcp/Services/VirtualTerminalHandler.cs b/src/YunDa.Server/YunDa.Server.ISMSTcp/Services/VirtualTerminalHandler.cs index 2949424..5d1d3fc 100644 --- a/src/YunDa.Server/YunDa.Server.ISMSTcp/Services/VirtualTerminalHandler.cs +++ b/src/YunDa.Server/YunDa.Server.ISMSTcp/Services/VirtualTerminalHandler.cs @@ -34,7 +34,7 @@ namespace YunDa.Server.ISMSTcp.Services //数据缓冲队列 - private ZzDataCacheContainer _zzDataCacheContainer = new ZzDataCacheContainer(5); + private ZzDataCacheContainer _zzDataCacheContainer = new ZzDataCacheContainer(ZzDataCacheContainerDataType.eVA, 5); public VirtualTerminalHandler(ILogger logger, WebApiRequest webApiRequest, ZzDataCacheContainerInit zzDataCacheContainerInit) diff --git a/src/YunDa.Server/YunDa.Server.ISMSTcp/Services/ZzDataCacheContainer.cs b/src/YunDa.Server/YunDa.Server.ISMSTcp/Services/ZzDataCacheContainer.cs index 85b676e..5e84428 100644 --- a/src/YunDa.Server/YunDa.Server.ISMSTcp/Services/ZzDataCacheContainer.cs +++ b/src/YunDa.Server/YunDa.Server.ISMSTcp/Services/ZzDataCacheContainer.cs @@ -1,8 +1,11 @@ -using Microsoft.Extensions.Logging; +using Google.Protobuf.WellKnownTypes; +using Microsoft.Extensions.Logging; +using Microsoft.Identity.Client; using Org.BouncyCastle.Utilities; using System; using System.Collections.Concurrent; using System.Collections.Generic; +using System.ComponentModel.DataAnnotations; using System.Diagnostics; using System.Linq; using System.Threading; @@ -37,11 +40,14 @@ namespace YunDa.Server.ISMSTcp.Services public double Value { get; set; } public string ValueStr { get; set; } = string.Empty; public DateTime TimeStamp { get; set; } + public uint ValCount { get; set; } = 0; + public DateTime StartTimeStamp { get; set; } = DateTime.MinValue; ///////////////////////////////////////////////////////////////////////////////////// private readonly ConcurrentQueue<(DateTime time, ZzDataPoint point)> _queue = new ConcurrentQueue<(DateTime time, ZzDataPoint point)>(); + public ConcurrentQueue<(DateTime time, ZzDataPoint point)> Datas => _queue; public ZzData(int minutes, string id, string name, int dispatcherAddress) @@ -57,7 +63,7 @@ namespace YunDa.Server.ISMSTcp.Services { DateTime now = DateTime.Now; - point.TimeStamp = now; //2025-11-18 用当前时间(临时修改) + point.TimeStamp = now; //2025-11-18 用当前时间 _queue.Enqueue((now, point)); @@ -65,6 +71,13 @@ namespace YunDa.Server.ISMSTcp.Services Value = point.Value; ValueStr = point.ValueStr; + ////////////////////////////// + ValCount++; + if(StartTimeStamp == DateTime.MinValue) + StartTimeStamp = now; + + + CleanupOldData(); } @@ -125,17 +138,35 @@ namespace YunDa.Server.ISMSTcp.Services public string Id { get; set; } = string.Empty; public string Name { get; set; } = string.Empty; } + + public enum ZzDataCacheContainerDataType + { + eYC = 0, //遥测 + eYX, //遥信 + eVA, //虚点 + eGW //网关 + } public class ZzDataCacheContainer { private readonly ConcurrentDictionary _datas = new ConcurrentDictionary(); private readonly int _cleanupMinutes = 5; - public ZzDataCacheContainer(int cleanupMinutes) + private ZzDataCacheContainerDataType _dataType = ZzDataCacheContainerDataType.eYC; + + private TelemeteringHandle? _telemeteringHandle = null; + + public ZzDataCacheContainer(ZzDataCacheContainerDataType dataType, int cleanupMinutes) { + _dataType = dataType; _cleanupMinutes = cleanupMinutes; } + public void SetTelemeteringHandle(TelemeteringHandle handle) + { + _telemeteringHandle = handle; + } + public void Write(string id, double val, DateTime time, string name) { var data = _datas.GetOrAdd(id, _ => new ZzData(_cleanupMinutes, id, name, 0)); @@ -161,7 +192,7 @@ namespace YunDa.Server.ISMSTcp.Services data.AddData(new ZzDataPoint(time, val, valStr)); } - public Dictionary Read(List ids, DateTime start, DateTime end) + public async Task> Read(List ids, DateTime start, DateTime end) { var result = new Dictionary(); @@ -178,13 +209,35 @@ namespace YunDa.Server.ISMSTcp.Services ZzDataResultModel data = new ZzDataResultModel(channel); data.Data = channel.GetData(start, end); - if(data.Data.Count == 0 && data.TimeStamp != DateTime.MinValue) + bool isFind = data.Data.Count > 0; + + if (data.Data.Count == 0 && data.TimeStamp != DateTime.MinValue) { - ZzDataPoint zzDataPoint = new ZzDataPoint(DateTime.Now, data.Value, data.ValueStr); + ZzDataPoint zzDataPoint = new ZzDataPoint(DateTime.Now, data.Value, data.ValueStr.Replace(" ", "")); data.Data.Add(zzDataPoint); } + if (_telemeteringHandle != null &&_dataType == ZzDataCacheContainerDataType.eYC && id == "YCB001103003") + { + if (data.Data.Where(e => Math.Abs(e.Value) < 0.0001).Count() == data.Data.Count) + { + string time = DateTime.Now.ToString("yyy/MM/dd HH:mm:ss"); + _telemeteringHandle.LogWarning($"【{time}】: {id}所有值为0[{data.Data.Count}]:状态:{isFind}, Cache长度:{channel.Datas.Count}, Cache时间:{channel.StartTimeStamp.ToString("yyy/MM/dd HH:mm:ss")} ~ {channel.TimeStamp.ToString("yyy/MM/dd HH:mm:ss")}, 最后一个值:{channel.ValueStr}"); + + var redisData = await _telemeteringHandle.GetDataFromRedis(id); + if (redisData != null) + { + _telemeteringHandle.LogWarning($"【{time}】: {id}在Redis中的值为:{redisData.ResultValue} - {redisData.ResultTime }"); + } + else + { + _telemeteringHandle.LogWarning($"【{time}】: {id}在Redis中没有找到"); + } + + } + } + result[id] = data; } @@ -206,7 +259,7 @@ namespace YunDa.Server.ISMSTcp.Services if (timeWindowType == 0 || timeWindowType == 2) { - matched1 = Read(ids, now.AddSeconds(-seconds), now); + matched1 = await Read(ids, now.AddSeconds(-seconds), now); } if (timeWindowType == 1 || timeWindowType == 2) @@ -215,7 +268,7 @@ namespace YunDa.Server.ISMSTcp.Services if(span > 0) await Task.Delay(span, cancellationToken); - matched2 = Read(ids, now, DateTime.Now); + matched2 = await Read(ids, now, DateTime.Now); } foreach (var kv in matched2)