using Abp.Dependency; using Google.Protobuf.WellKnownTypes; using MongoDB.Driver.Linq; using Newtonsoft.Json; using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; using ToolLibrary.LogHelper; using Yunda.ISAS.DataMonitoringServer.DataAnalysis.DataCollection.Dlls; using Yunda.ISAS.DataMonitoringServer.DataAnalysis.Helper; using Yunda.ISAS.DataMonitoringServer.DataAnalysis.LinkageAnalysis; using Yunda.ISAS.DataMonitoringServer.DataAnalysis.TeleInfoSave; using Yunda.ISAS.DataMonitoringServer.DataCenter; using Yunda.ISAS.DataMonitoringServer.WebSocket; using Yunda.ISAS.DataMonitoringServer.WebSocket.Model; using Yunda.ISAS.DataMonitoringServer.WPF.ViewModel; using Yunda.ISAS.MongoDB.Entities.DataMonitoring; using YunDa.ISAS.DataTransferObject.CommonDto; using YunDa.ISAS.DataTransferObject.EquipmentLiveData; using YunDa.ISAS.Entities.DataMonitoring; using YunDa.ISAS.Redis.Entities.DataMonitorCategory; using YunDa.ISAS.Redis.Repositories; namespace Yunda.ISAS.DataMonitoringServer.DataAnalysis.DataCollection { public class DataSendTask : ISingletonDependency { private Content _settingModel = null; private readonly TelemeteringResultSaveTask _telemeteringResultSaveTask; private readonly DataRepository _dataRepository; private readonly WebApiRequest _webApiRequest; private readonly RunningDataCache _runningDataCache; private readonly LinkageAnalysisTask _linkageAnalysisTask; //private readonly WebSocketServer _webSocketServer; private readonly ConfigurationHepler _configurationHepler; private readonly AlarmAnalysis _alarmAnalysis; private readonly RedisDataRepository _redisDataRepository; private readonly string _telemeteringModelListRediskey = "telemeteringModelList"; /// /// 遥测数据实时库 /// private readonly IRedisRepository _telemeteringModelListRedis; private readonly string _telesignalisationModelListRediskey = "telesignalisationModelList"; /// /// 遥信数据实时库 /// private readonly IRedisRepository _telesignalisationModelListRedis; public DataSendTask(TelemeteringResultSaveTask telemeteringResultSaveTask, DataRepository dataRepository, WebApiRequest webApiRequest, RunningDataCache runningDataCache, LinkageAnalysisTask linkageAnalysisTask, //WebSocketServer webSocketController, ConfigurationHepler configurationHepler, AlarmAnalysis alarmAnalysis, Content settingModel, IRedisRepository _telesignalisationModelListRedis, IRedisRepository _telemeteringModelListRedis, RedisDataRepository redisDataRepository ) { _configurationHepler = configurationHepler; //_webSocketServer = webSocketController; _runningDataCache = runningDataCache; _linkageAnalysisTask = linkageAnalysisTask; _telemeteringResultSaveTask = telemeteringResultSaveTask; _dataRepository = dataRepository; _webApiRequest = webApiRequest; _alarmAnalysis = alarmAnalysis; _redisDataRepository = redisDataRepository; _settingModel = settingModel; RECORDYXBURSTActionBlock = new ActionBlock(yx => { try { UpdateTelesignalDataAsync(yx); } catch (Exception ex) { MonitoringEventBus.LogHandler(ex.ToString(), "异常信息"); } }, new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1 }); YCTActionBlock = new ActionBlock(yc => { try { UpdateTelemeteringData(yc); } catch (Exception ex) { MonitoringEventBus.LogHandler(ex.ToString(), "异常信息"); } }, new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1 }); } public ActionBlock RECORDYXBURSTActionBlock = default; public ActionBlock YCTActionBlock = default; public int startInfoAddr = 0; public int endInfoAddr = 100000; /// /// 时间转换算法 /// private DateTime FormatDateTimeFunc(byte[] tm) { if (tm[5] <= 12 && tm[5] >= 1 && tm[4] <= 31 && tm[4] >= 1 && tm[3] <= 24 && tm[3] >= 0 && tm[2] <= 60 && tm[2] >= 0 && (((tm[1] << 8) + tm[0]) / 1000) <= 60 && (((tm[1] << 8) + tm[0]) / 1000) >= 0 && (((tm[1] << 8) + tm[0]) % 1000) <= 999 && (((tm[1] << 8) + tm[0]) % 1000) >= 0 ) { return new DateTime(2000 + tm[6], tm[5], tm[4], tm[3], tm[2], ((tm[1] << 8) + tm[0]) / 1000, ((tm[1] << 8) + tm[0]) % 1000); } else { var tmstr = JsonConvert.SerializeObject(tm); throw new Exception($"时间数值错误 年:{2000 + tm[6]}月:{tm[5]}日:{tm[4]}时:{tm[3]}分:{tm[2]}秒:{((tm[1] << 8) + tm[0]) / 1000}毫秒:{((tm[1] << 8) + tm[0]) % 1000} 原数据为:{tmstr}"); } } public async Task UpdateTelemeteringData(YC_TYPE_New yc) { try { var categoriyValue = _settingModel.GetDatacatgoryValue(_settingModel.DataSourceCategoryName); string redisKey = _redisDataRepository.TelemeteringModelListRediskey + "_" + _settingModel.GetDatacatgoryName(_settingModel.DataSourceCategoryName); string redisChannel = _redisDataRepository.TelemeteringInflectionInflectionZZChannelRediskey + "_" + _settingModel.GetDatacatgoryName(_settingModel.DataSourceCategoryName); ; // 批量获取 Redis 数据以减少读取次数 string hashKey = $"{yc.addr}_{0}_{yc.inf}_{categoriyValue}"; var ycData = _redisDataRepository.TelemeteringModelListRedis.HashSetGetOne(redisKey, hashKey); if (ycData == null) { Log4Helper.Error(this.GetType(), $"更新数据失败: 地址:{yc.inf} 类型:{categoriyValue} 键:{hashKey}"); return; } // 更新对象的数据 ycData.ResultTime = yc.time; ycData.ResultValue = yc.val; // 并行处理多个任务以提高性能 var tasks = new List(); // 更新到内存数据库中 tasks.Add(_redisDataRepository.TelemeteringModelListRedis.HashSetUpdateOneAsync(redisKey, hashKey, ycData)); //Log4Helper.Info(this.GetType(), $"更新数据成功: 地址:{yc.inf} 类型:{categoriyValue}"); // 将变更的数据添加到变位库 订阅 tasks.Add(Task.Run(() => _redisDataRepository.TelemeteringModelInflectionListRedis.PublishAsync(redisChannel, ycData))); // 异步任务处理数据保存、告警分析和缓存环境温度 tasks.Add(Task.Run(async () => { var rst = new TelemeteringResult { ResultTime = ycData.ResultTime, ResultValue = ycData.ResultValue, TelemeteringConfigurationId = ycData.Id, SaveMethod = 2 }; // 保存变更数据 _telemeteringResultSaveTask.SaveTelemeteringResultActionBlock?.Post(rst); // 缓存环境温度(如果适用) if (ycData.IsEnvironmentTemp) { var environmentTempValue = new EnvironmentTempValue(yc.val); await _redisDataRepository.EnvironmentTempValueRedis.HashSetUpdateOneAsync(nameof(EnvironmentTempValue), ycData.Id.ToString(), environmentTempValue); } // 处理告警分析 await _alarmAnalysis.HandleTelemeteringAlarmAsync(ycData); })); // 执行所有任务并等待完成 await Task.WhenAll(tasks); } catch (Exception ex) { // 捕获并记录异常 MonitoringEventBus.LogHandler(ex.Message, "错误信息"); } } public async Task UpdateTelesignalDataAsync(RECORDYXBURST_New yx) { try { var categoriyValue = _settingModel.GetDatacatgoryValue(_settingModel.DataSourceCategoryName); string redisKey = _redisDataRepository.TelesignalisationModelListRediskey + "_" + _settingModel.GetDatacatgoryName(_settingModel.DataSourceCategoryName); ; string redisChannel = _redisDataRepository.TelesignalisationInflectionInflectionZZChannelRediskey + "_" + _settingModel.GetDatacatgoryName(_settingModel.DataSourceCategoryName); ; string haskey = $"{yx.dev_addr}_{yx.dev_sector}_{yx.dev_inf}_{categoriyValue}"; // 从 Redis 中批量获取遥信数据 var yxData = _redisDataRepository.TelesignalisationModelListRedis.HashSetGetOne(redisKey, haskey); if (yxData == null) { Log4Helper.Error(this.GetType(), $"更新数据失败: 地址:{yx.dev_addr} 类型:{categoriyValue} 键:{haskey}"); return; } // 更新遥信数据的值和时间 yxData.ResultTime = yx.time; yxData.ResultValue = yx.yx_val; // 并行处理更新、数据插入和告警分析任务 var tasks = new List(); // 1. 更新 Redis 中的数据 tasks.Add(_redisDataRepository.TelesignalisationModelListRedis.HashSetUpdateOneAsync(redisKey, haskey, yxData)); //Log4Helper.Info(this.GetType(), $"更新数据成功: 地址:{yx.dev_inf} 类型:{categoriyValue}"); // 2. 将更新的数据加入到变位库中 tasks.Add(Task.Run(() => _redisDataRepository.TelesignalisationModelInflectionListRedis.PublishAsync(redisChannel, yxData))); // 3. 插入数据库中的结果数据 int rstValue = yxData.RemoteType == RemoteTypeEnum.SinglePoint ? (yx.yx_val - 1) : yx.yx_val; var yxRes = new TelesignalisationResult { ResultTime = yxData.ResultTime, ResultValue = yxData.ResultValue, TelesignalisationConfigurationId = yxData.Id, SaveMethod = 2 }; tasks.Add(_dataRepository.TelesignalisationResultRepository.InsertOneAsync(yxRes)); // 4. 处理告警分析 tasks.Add(_alarmAnalysis.HandleTelesignalAlarmAsync(yxData)); // 等待所有任务完成 await Task.WhenAll(tasks); } catch (Exception ex) { // 捕获并记录异常 MonitoringEventBus.LogHandler(ex.Message, "错误信息"); } } private Dictionary TelesignaleAlarmTempBuffDic = new Dictionary(); /// /// 发送遥测数据到主界面日志显示 /// /// public void RecordYCLogInfo(YC_TYPE_New info) { if (!_settingModel.Displayyxyc) { return; } string infoStr = $"装置地址:{info.addr},CPU扇区号:{info.sector},编码地址:{info.inf},遥测值:{info.val},时间:{(info.time.ToString("HH:mm:ss fff"))}"; MonitoringEventBus.LogHandler(infoStr, "遥测数据"); } /// /// 发送遥信数据到主界面日志显示 /// /// public void RecordYXLogInfo(RECORDYXBURST_New info) { if (!_settingModel.Displayyxyc) { return; } string infoStr = $"装置地址:{info.dev_addr},CPU扇区号{info.dev_sector},编码地址:{info.dev_inf},遥信值:{info.yx_val},时间:{(info.time.ToString("HH:mm:ss fff"))}"; MonitoringEventBus.LogHandler(infoStr, "双点信息"); } } }