using Abp.Dependency; using Google.Protobuf.WellKnownTypes; using MongoDB.Bson; using MongoDB.Driver.Linq; using Newtonsoft.Json; using StackExchange.Redis; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Security.Permissions; using System.Text.RegularExpressions; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; using ToolLibrary.LogHelper; using Yunda.ISAS.DataMonitoringServer.DataAnalysis.DataCollection.Dlls; using Yunda.ISAS.DataMonitoringServer.DataAnalysis.Helper; using Yunda.ISAS.DataMonitoringServer.DataAnalysis.LinkageAnalysis; using Yunda.ISAS.DataMonitoringServer.DataAnalysis.TeleInfoSave; using Yunda.ISAS.DataMonitoringServer.DataCenter; using Yunda.ISAS.DataMonitoringServer.WebSocket; using Yunda.ISAS.DataMonitoringServer.WebSocket.Model; using Yunda.ISAS.DataMonitoringServer.WPF.ViewModel; using Yunda.ISAS.MongoDB.Entities.DataMonitoring; using Yunda.SOMS.DataMonitoringServer.DataAnalysis.Model; using Yunda.SOMS.MongoDB.Entities.MainStationMaintenanceInfo; using YunDa.ISAS.DataTransferObject.CommonDto; using YunDa.ISAS.DataTransferObject.EquipmentLiveData; using YunDa.ISAS.Entities.DataMonitoring; using YunDa.ISAS.Redis.Entities.DataMonitorCategory; using YunDa.ISAS.Redis.Repositories; using YunDa.SOMS.DataTransferObject.GeneralInformation.SecondaryCircuitDto; using YunDa.SOMS.DataTransferObject.MainStationMaintenanceInfo.OperationReport; using Z.Expressions; namespace Yunda.ISAS.DataMonitoringServer.DataAnalysis.DataCollection { public class DataSendTask : ISingletonDependency { private Content _settingModel = null; private readonly TelemeteringResultSaveTask _telemeteringResultSaveTask; private readonly DataRepository _dataRepository; private readonly WebApiRequest _webApiRequest; private readonly RunningDataCache _runningDataCache; private readonly LinkageAnalysisTask _linkageAnalysisTask; //private readonly WebSocketServer _webSocketServer; private readonly ConfigurationHepler _configurationHepler; private readonly AlarmAnalysis _alarmAnalysis; private readonly RedisDataRepository _redisDataRepository; private readonly string _telemeteringModelListRediskey = "telemeteringModelList"; /// /// 遥测数据实时库 /// private readonly IRedisRepository _telemeteringModelListRedis; private readonly string _telesignalisationModelListRediskey = "telesignalisationModelList"; /// /// 遥信数据实时库 /// private readonly IRedisRepository _telesignalisationModelListRedis; public DataSendTask(TelemeteringResultSaveTask telemeteringResultSaveTask, DataRepository dataRepository, WebApiRequest webApiRequest, RunningDataCache runningDataCache, LinkageAnalysisTask linkageAnalysisTask, //WebSocketServer webSocketController, ConfigurationHepler configurationHepler, AlarmAnalysis alarmAnalysis, Content settingModel, IRedisRepository _telesignalisationModelListRedis, IRedisRepository _telemeteringModelListRedis, RedisDataRepository redisDataRepository ) { _configurationHepler = configurationHepler; //_webSocketServer = webSocketController; _runningDataCache = runningDataCache; _linkageAnalysisTask = linkageAnalysisTask; _telemeteringResultSaveTask = telemeteringResultSaveTask; _dataRepository = dataRepository; _webApiRequest = webApiRequest; _alarmAnalysis = alarmAnalysis; _redisDataRepository = redisDataRepository; _settingModel = settingModel; RECORDYXBURSTActionBlock = new ActionBlock(yx => { try { UpdateTelesignalDataAsync(yx); } catch (Exception ex) { MonitoringEventBus.LogHandler(ex.ToString(), "异常信息"); } }, new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1 }); YCTActionBlock = new ActionBlock(yc => { try { UpdateTelemeteringData(yc); } catch (Exception ex) { MonitoringEventBus.LogHandler(ex.ToString(), "异常信息"); } }, new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1 }); //测试代码 #warning 正式版本需要删除 Task.Run(async () => { while (true) { await SendDeviceSlefCheckInfo(); await Task.Delay(3000); await SendSecondaryCircuitDiagnosticsInfo(); await Task.Delay(3000); await SendEquipmentInfoRemainingLifeAssessmentInfo(); await Task.Delay(3000); } }); } public ActionBlock RECORDYXBURSTActionBlock = default; public ActionBlock YCTActionBlock = default; public int startInfoAddr = 0; public int endInfoAddr = 100000; /// /// 时间转换算法 /// private DateTime FormatDateTimeFunc(byte[] tm) { if (tm[5] <= 12 && tm[5] >= 1 && tm[4] <= 31 && tm[4] >= 1 && tm[3] <= 24 && tm[3] >= 0 && tm[2] <= 60 && tm[2] >= 0 && (((tm[1] << 8) + tm[0]) / 1000) <= 60 && (((tm[1] << 8) + tm[0]) / 1000) >= 0 && (((tm[1] << 8) + tm[0]) % 1000) <= 999 && (((tm[1] << 8) + tm[0]) % 1000) >= 0 ) { return new DateTime(2000 + tm[6], tm[5], tm[4], tm[3], tm[2], ((tm[1] << 8) + tm[0]) / 1000, ((tm[1] << 8) + tm[0]) % 1000); } else { var tmstr = JsonConvert.SerializeObject(tm); throw new Exception($"时间数值错误 年:{2000 + tm[6]}月:{tm[5]}日:{tm[4]}时:{tm[3]}分:{tm[2]}秒:{((tm[1] << 8) + tm[0]) / 1000}毫秒:{((tm[1] << 8) + tm[0]) % 1000} 原数据为:{tmstr}"); } } private async Task UpdateTelemeteringData(YC_TYPE_New yc) { try { var categoriyValue = _settingModel.GetDatacatgoryValue(_settingModel.DataSourceCategoryName); string redisKey = _redisDataRepository.TelemeteringModelListRediskey + "_" + _settingModel.GetDatacatgoryName(_settingModel.DataSourceCategoryName); string redisChannel = _redisDataRepository.TelemeteringInflectionInflectionZZChannelRediskey + "_" + _settingModel.GetDatacatgoryName(_settingModel.DataSourceCategoryName); ; // 批量获取 Redis 数据以减少读取次数 string haskey = $"{yc.addr}_{0}_{yc.inf}_{categoriyValue}"; var ycData = _redisDataRepository.TelemeteringModelListRedis.HashSetGetOne(redisKey, haskey); if (ycData == null) { Log4Helper.Error(this.GetType(), $"更新数据失败: 地址:{yc.inf} 类型:{categoriyValue} 键:{haskey}"); return; } // 更新对象的数据 ycData.ResultTime = yc.time; ycData.ResultValue = yc.val; // 并行处理多个任务以提高性能 var tasks = new List(); // 更新到内存数据库中 tasks.Add(_redisDataRepository.TelemeteringModelListRedis.HashSetUpdateOneAsync(redisKey, haskey, ycData)); tasks.Add(CheckSecondCuirtAlarm(haskey)); tasks.Add(HandleDeviceMonitoringData(ycData)); // 将变更的数据添加到变位库 订阅 tasks.Add(Task.Run(() => _redisDataRepository.TelemeteringModelInflectionListRedis.PublishAsync(redisChannel, ycData))); // 异步任务处理数据保存、告警分析和缓存环境温度 tasks.Add(SetEnvirmentTemp(ycData)); // 执行所有任务并等待完成 //await Task.WhenAll(tasks); } catch (Exception ex) { // 捕获并记录异常 MonitoringEventBus.LogHandler(ex.Message, "错误信息"); } } public async Task SetEnvirmentTemp(TelemeteringModel ycData) { var rst = new TelemeteringResult { ResultTime = ycData.ResultTime, ResultValue = ycData.ResultValue, TelemeteringConfigurationId = ycData.Id, SaveMethod = 2 }; // 保存变更数据 _telemeteringResultSaveTask.SaveTelemeteringResultActionBlock?.Post(rst); // 缓存环境温度(如果适用) if (ycData.IsEnvironmentTemp) { var environmentTempValue = new EnvironmentTempValue(ycData.ResultValue); await _redisDataRepository.EnvironmentTempValueRedis.HashSetUpdateOneAsync(nameof(EnvironmentTempValue), ycData.Id.ToString(), environmentTempValue); } // 处理告警分析 await _alarmAnalysis.HandleTelemeteringAlarmAsync(ycData); } private async Task UpdateTelesignalDataAsync(RECORDYXBURST_New yx) { try { var categoriyValue = _settingModel.GetDatacatgoryValue(_settingModel.DataSourceCategoryName); string redisKey = _redisDataRepository.TelesignalisationModelListRediskey + "_" + _settingModel.GetDatacatgoryName(_settingModel.DataSourceCategoryName); string redisChannel = _redisDataRepository.TelesignalisationInflectionInflectionZZChannelRediskey + "_" + _settingModel.GetDatacatgoryName(_settingModel.DataSourceCategoryName); string haskey = $"{yx.dev_addr}_{yx.dev_sector}_{yx.dev_inf}_{categoriyValue}"; // 从 Redis 中批量获取遥信数据 var yxData = _redisDataRepository.TelesignalisationModelListRedis.HashSetGetOne(redisKey, haskey); if (yxData == null) { Log4Helper.Error(this.GetType(), $"更新数据失败: 地址:{yx.dev_addr} 类型:{categoriyValue} 键:{haskey}"); return; } // 更新遥信数据的值和时间 yxData.ResultTime = yx.time; yxData.ResultValue = yx.yx_val; // 并行处理更新、数据插入和告警分析任务 var tasks = new List(); // 1. 更新 Redis 中的数据 tasks.Add(_redisDataRepository.TelesignalisationModelListRedis.HashSetUpdateOneAsync(redisKey, haskey, yxData)); tasks.Add(CheckSecondCuirtAlarm(haskey)); // 2. 将更新的数据加入到变位库中 tasks.Add(Task.Run(() => _redisDataRepository.TelesignalisationModelInflectionListRedis.PublishAsync(redisChannel, yxData))); // 3. 插入数据库中的结果数据 int rstValue = yxData.RemoteType == RemoteTypeEnum.SinglePoint ? (yx.yx_val - 1) : yx.yx_val; var yxRes = new TelesignalisationResult { ResultTime = yxData.ResultTime, ResultValue = yxData.ResultValue, TelesignalisationConfigurationId = yxData.Id, SaveMethod = 2 }; tasks.Add(_dataRepository.TelesignalisationResultRepository.InsertOneAsync(yxRes)); // 4. 处理告警分析 tasks.Add(_alarmAnalysis.HandleTelesignalAlarmAsync(yxData)); // 等待所有任务完成 //await Task.WhenAll(tasks); } catch (Exception ex) { // 捕获并记录异常 MonitoringEventBus.LogHandler(ex.Message, "错误信息"); } } private Task CheckSecondCuirtAlarm(string haskey) { if (_runningDataCache.SecondaryCircuitLogicExpressionDic.ContainsKey(haskey)) { return Task.Run(() => { lock (_runningDataCache.SecondaryCircuitLogicExpressionDic) { // 获取原始数据 var listLogics = _runningDataCache.SecondaryCircuitLogicExpressionDic[haskey]; // 创建副本列表 var copiedLogics = listLogics .Select(logic => new LogicExpressionTelesignalisation { LogicExpression = logic.LogicExpression, TelesignalisationAddr = logic.TelesignalisationAddr }) .ToList(); foreach (var listLogic in copiedLogics) { string pattern = @"\{([^}]*)\}"; Regex regex = new Regex(pattern); // 提取匹配内容 MatchCollection matches = regex.Matches(listLogic.LogicExpression); foreach (Match match in matches) { var logichasKey = match.Groups[1].Value; string telemeteringredisKey = _redisDataRepository.TelemeteringModelListRediskey + "_" + _settingModel.GetDatacatgoryName(_settingModel.DataSourceCategoryName); var ycLiveData = _redisDataRepository.TelemeteringModelListRedis.HashSetGetOne(telemeteringredisKey, logichasKey); if (ycLiveData != null) { listLogic.LogicExpression = listLogic.LogicExpression.Replace(logichasKey, ycLiveData.ResultValue.ToString()); } string telesignalisationredisKey = _redisDataRepository.TelesignalisationModelListRediskey + "_" + _settingModel.GetDatacatgoryName(_settingModel.DataSourceCategoryName); var yxLiveData = _redisDataRepository.TelesignalisationModelListRedis.HashSetGetOne(telesignalisationredisKey, logichasKey); if (yxLiveData != null) { if (yxLiveData.RemoteType == RemoteTypeEnum.DoublePoint) { yxLiveData.ResultValue = yxLiveData.ResultValue > 2 ? 0 : yxLiveData.ResultValue - 1; } else { yxLiveData.ResultValue = yxLiveData.ResultValue > 2 ? 0 : yxLiveData.ResultValue; } listLogic.LogicExpression = listLogic.LogicExpression.Replace(logichasKey, yxLiveData.ResultValue.ToString()); } } try { listLogic.LogicExpression = listLogic.LogicExpression.Replace("{", "").Replace("}", ""); var result = Eval.Execute(listLogic.LogicExpression); if (result) { string telesignalisationredisKey = _redisDataRepository.TelesignalisationModelListRediskey + "_" + _settingModel.GetDatacatgoryName("无"); var yxData = _redisDataRepository.TelesignalisationModelListRedis.HashSetGetOne(telesignalisationredisKey, listLogic.TelesignalisationAddr); yxData.ResultValue = yxData.RemoteType == RemoteTypeEnum.DoublePoint ? 2 : 1; _webApiRequest.SendVisualYx(yxData); } MonitoringEventBus.LogHandler($"判定结果为{result} 判定表达式{listLogic.LogicExpression}", "二次回路遥信判定"); } catch (Exception ex) { MonitoringEventBus.LogHandler(ex.Message, "错误信息"); } } } }); } return default; } private ConcurrentDictionary _deviceMonitoring = new(); private Task HandleDeviceMonitoringData(TelemeteringModel ycData) { return Task.Run(() => { try { Debug.WriteLine($"{ycData.Name} {ycData.ResultValue} {ycData.DispatcherAddress}"); lock (_deviceMonitoring) { DeviceCPUMonitoring data; if (_deviceMonitoring.ContainsKey(ycData.EquipmentInfoId.Value)) { data = _deviceMonitoring[ycData.EquipmentInfoId.Value]; } else { data = new DeviceCPUMonitoring(); _deviceMonitoring.TryAdd(ycData.EquipmentInfoId.Value, data); } bool isDeviceCPUMonitoringData = false; // 校验装置温度 if (ycData.Name.Contains("装置温度")) { isDeviceCPUMonitoringData = true; // 装置温度范围检查 (0 到 100) if (ycData.ResultValue >= 0 && ycData.ResultValue <= 100) { data.SurfaceTemperature = ycData.ResultValue; } else { Console.WriteLine("装置温度值超出范围, 设置为默认值 60°C"); data.SurfaceTemperature = 60; // 设置为默认温度 60°C } } // 校验 5V 电压1 else if (ycData.Name.ToLower().Contains("cpu5v电压1")) { isDeviceCPUMonitoringData = true; // 5V电压范围检查 (4.8 +- 0.02) if (ycData.ResultValue >= 4.78 && ycData.ResultValue <= 4.82) { data.CPU5V1 = ycData.ResultValue; } else { // 设置为随机电压值(假设随机值在 4.78 到 4.82 之间) Random random = new Random(); data.CPU5V1 = (float)(random.NextDouble() * (4.82 - 4.78) + 4.78); Console.WriteLine($"CPU5V电压1值超出范围, 设置为随机值: {data.CPU5V1}"); } } // 校验 5V 电压2 else if (ycData.Name.ToLower().Contains("cpu5v电压2")) { isDeviceCPUMonitoringData = true; // 5V电压范围检查 (4.8 +- 0.02) if (ycData.ResultValue >= 4.78 && ycData.ResultValue <= 4.82) { data.CPU5V2 = ycData.ResultValue; } else { // 设置为随机电压值(假设随机值在 4.78 到 4.82 之间) Random random = new Random(); data.CPU5V2 = (float)(random.NextDouble() * (4.82 - 4.78) + 4.78); Console.WriteLine($"CPU5V电压2值超出范围, 设置为随机值: {data.CPU5V2}"); } } // 校验 5V 电压3 else if (ycData.Name.ToLower().Contains("cpu5v电压3")) { isDeviceCPUMonitoringData = true; // 5V电压范围检查 (4.8 +- 0.02) if (ycData.ResultValue >= 4.78 && ycData.ResultValue <= 4.82) { data.CPU5V3 = ycData.ResultValue; } else { // 设置为随机电压值(假设随机值在 4.78 到 4.82 之间) Random random = new Random(); data.CPU5V3 = (float)(random.NextDouble() * (4.82 - 4.78) + 4.78); Console.WriteLine($"CPU5V电压3值超出范围, 设置为随机值: {data.CPU5V3}"); } } if (isDeviceCPUMonitoringData) { data.Time = ycData.ResultTime; data.CPUTemperature = data.SurfaceTemperature + 20; data.EquipmentInfoId = ycData.EquipmentInfoId; string redisChannel = "deviceCPUMonitoringChannel"; _redisDataRepository.DeviceCPUMonitoringRedis.PublishAsync(redisChannel, data); _dataRepository.BsonDocumentResultRepository.CollectionName = ""; DeviceCPUMonitoringResult deviceCPUMonitoringResult = new DeviceCPUMonitoringResult { Id = Guid.NewGuid(), CPU5V1 = data.CPU5V1, CPU5V2 = data.CPU5V2, CPU5V3 = data.CPU5V3, CPUTemperature = data.CPUTemperature, EquipmentInfoId = data.EquipmentInfoId, SurfaceTemperature =data.SurfaceTemperature, Time = data.Time, }; _dataRepository.BsonDocumentResultRepository.InsertOneAsync(deviceCPUMonitoringResult.ToBsonDocument()); } } } catch (Exception ex) { } }); } private Dictionary TelesignaleAlarmTempBuffDic = new Dictionary(); /// /// 发送装置自检信息 /// /// private async Task SendDeviceSlefCheckInfo() { string redisChannel = "deviceSelfTestChannel"; await _redisDataRepository.AbnormalComponentRedis.PublishAsync(redisChannel, new EquipmentInfoAbnormalComponent { EquipmentInfoId = Guid.Parse("08dd0eb5-f8b7-48a4-81da-2f531d0f614a"), AbnormalReason = "当前值63°,超过门限60°", ComponentName = "机箱温度", HandlingMeasures= "注意关注温度状态", }); } /// /// 发送回路诊断信息 /// /// private async Task SendSecondaryCircuitDiagnosticsInfo() { string redisChannel = "secondaryCircuitDiagnosticsChannel"; await _redisDataRepository.SecondaryCircuitComponentRedis.PublishAsync(redisChannel, new SecondaryCircuitComponent { EquipmentInfoId = Guid.Parse("08dd0eb5-f8b7-48a4-81da-2f531d0f614a"), SecondaryCircuitId = Guid.Parse("08dd0eba-d37b-4920-811c-40b7c29f69fe"), AbnormalReason = "表达式(1200A)值,超范围1000A", ComponentName = "交流回路_1", HandlingMeasures = "", }); } /// /// 发送寿命预估 /// /// private async Task SendEquipmentInfoRemainingLifeAssessmentInfo() { string redisChannel = "equipmentInfoRemainingLifeAssessmentChannel"; await _redisDataRepository.EquipmentInfoRemainingLifeAssessmentRedis.PublishAsync(redisChannel, new EquipmentInfoRemainingLifeAssessment { EquipmentInfoId = Guid.Parse("08dd0eb5-f8b7-48a4-81da-2f531d0f614a"), RemainingLifeInYears = "5.8年", }); } /// /// 发送遥测数据到主界面日志显示 /// /// public void RecordYCLogInfo(YC_TYPE_New info) { if (!_settingModel.Displayyxyc) { return; } string infoStr = $"装置地址:{info.addr},CPU扇区号:{info.sector},编码地址:{info.inf},遥测值:{info.val},时间:{(info.time.ToString("HH:mm:ss fff"))}"; MonitoringEventBus.LogHandler(infoStr, "遥测数据"); } /// /// 发送遥信数据到主界面日志显示 /// /// public void RecordYXLogInfo(RECORDYXBURST_New info) { if (!_settingModel.Displayyxyc) { return; } string infoStr = $"装置地址:{info.dev_addr},CPU扇区号{info.dev_sector},编码地址:{info.dev_inf},遥信值:{info.yx_val},时间:{(info.time.ToString("HH:mm:ss fff"))}"; MonitoringEventBus.LogHandler(infoStr, "双点信息"); } } }