using Abp.Dependency; using Newtonsoft.Json; using System.Threading.Tasks.Dataflow; using Yunda.ISAS.DataMonitoringServer.DataAnalysis.Helper; using Yunda.ISAS.DataMonitoringServer.DataAnalysis.LinkageAnalysis; using Yunda.ISAS.DataMonitoringServer.DataAnalysis.TeleInfoSave; using Yunda.ISAS.DataMonitoringServer.DataCenter; using Yunda.ISAS.DataMonitoringServer.WebSocket; using Yunda.ISAS.DataMonitoringServer.WebSocket.Model; using Yunda.ISAS.MongoDB.Entities.DataMonitoring; using Yunda.SOMS.DataMonitoringServer.Service.DataAnalysis.Model; using YunDa.ISAS.DataTransferObject.CommonDto; using YunDa.ISAS.DataTransferObject.EquipmentLiveData; using YunDa.ISAS.Entities.DataMonitoring; using YunDa.ISAS.Redis.Entities.DataMonitorCategory; namespace Yunda.ISAS.DataMonitoringServer.DataAnalysis.DataCollection { public class DataSendTask : ISingletonDependency { private readonly TelemeteringResultSaveTask _telemeteringResultSaveTask; private readonly DataRepository _dataRepository; private readonly WebApiRequest _webApiRequest; private readonly RunningDataCache _runningDataCache; private readonly LinkageAnalysisTask _linkageAnalysisTask; private readonly WebSocketServer _webSocketServer; private readonly ConfigurationHepler _configurationHepler; private readonly AlarmAnalysis _alarmAnalysis; private readonly RedisDataRepository _redisDataRepository; public DataSendTask(TelemeteringResultSaveTask telemeteringResultSaveTask, DataRepository dataRepository, WebApiRequest webApiRequest, RunningDataCache runningDataCache, LinkageAnalysisTask linkageAnalysisTask, WebSocketServer webSocketController, ConfigurationHepler configurationHepler, AlarmAnalysis alarmAnalysis, RedisDataRepository redisDataRepository ) { _configurationHepler = configurationHepler; _webSocketServer = webSocketController; _runningDataCache = runningDataCache; _linkageAnalysisTask = linkageAnalysisTask; _telemeteringResultSaveTask = telemeteringResultSaveTask; _dataRepository = dataRepository; _webApiRequest = webApiRequest; _alarmAnalysis = alarmAnalysis; _redisDataRepository = redisDataRepository; RECORDYXBURSTActionBlock = new ActionBlock(yx => { try { SendYXInfoToClient(yx); } catch (Exception ex) { MonitoringEventBus.LogHandler(ex.ToString(), "异常信息"); } }, new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1 }); YCTActionBlock = new ActionBlock(yc => { try { ClassifyAndSendYCInfo(yc); } catch (Exception ex) { MonitoringEventBus.LogHandler(ex.ToString(), "异常信息"); } }, new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1 }); //#if DEBUG // Task.Run(() => { // while (true) // { // YCTActionBlock.Post(new YC_TYPE() // { // addr = 1, // inf = 20012, // val = 20, // sector = 0 // }); // Task.Delay(3000).Wait(); // } // }); //#endif } public ActionBlock RECORDYXBURSTActionBlock = default; public ActionBlock YCTActionBlock = default; public int startInfoAddr = 0; public int endInfoAddr = 100000; /// /// 时间转换算法 /// private DateTime FormatDateTimeFunc(byte[] tm) { if (tm[5] <= 12 && tm[5] >= 1 && tm[4] <= 31 && tm[4] >= 1 && tm[3] <= 24 && tm[3] >= 0 && tm[2] <= 60 && tm[2] >= 0 && (((tm[1] << 8) + tm[0]) / 1000) <= 60 && (((tm[1] << 8) + tm[0]) / 1000) >= 0 && (((tm[1] << 8) + tm[0]) % 1000) <= 999 && (((tm[1] << 8) + tm[0]) % 1000) >= 0 ) { return new DateTime(2000 + tm[6], tm[5], tm[4], tm[3], tm[2], ((tm[1] << 8) + tm[0]) / 1000, ((tm[1] << 8) + tm[0]) % 1000); } else { var tmstr = JsonConvert.SerializeObject(tm); throw new Exception($"时间数值错误 年:{2000 + tm[6]}月:{tm[5]}日:{tm[4]}时:{tm[3]}分:{tm[2]}秒:{((tm[1] << 8) + tm[0]) / 1000}毫秒:{((tm[1] << 8) + tm[0]) % 1000} 原数据为:{tmstr}"); } } /// /// 处理数据并发送遥信数据到客户端 /// /// public async void SendYXInfoToClient(RECORDYXBURST_New yx) { //Stopwatch stopwatch = new Stopwatch(); //stopwatch.Start(); //动力环境 Dictionary eSendDataDic_0 = new Dictionary(); //在线监测 Dictionary eSendDataDic_1 = new Dictionary(); List tasks = new List(); foreach (EquipmentDataModel e in _runningDataCache.EquipmentDataDic.Values) { TelesignalisationModel telesignal = e.Telesignalisations.FirstOrDefault(m => m.DispatcherAddress == yx.dev_inf); if (telesignal == null) continue; int rstValue = telesignal.RemoteType == RemoteTypeEnum.SinglePoint ? (yx.yx_val - 1) : yx.yx_val; //如果值不变则不进行后续处理 //if (telesignal.ResultValue == rstValue) //{ // break; //} telesignal.ResultValue = rstValue; telesignal.ResultTime = yx.time; var yxres = new TelesignalisationResult() { ResultTime = telesignal.ResultTime, ResultValue = telesignal.ResultValue, TelesignalisationConfigurationId = telesignal.Id, SaveMethod = 2 }; _dataRepository.TelesignalisationResultRepository.InsertOneAsync(yxres); switch (e.GroupType) { case MonitoringGroupTypeEnum.DLHJ: SetSendTelesignalDataDic(e, telesignal, ref eSendDataDic_0); break; case MonitoringGroupTypeEnum.ZXJC: SetSendTelesignalDataDic(e, telesignal, ref eSendDataDic_1); break; } Task.Run(() => { var isArming = _webApiRequest.GetEquipmentSafetyState(e.EquipmentInfoId); if (isArming == 1 || isArming == 0) { _alarmAnalysis.HandleTelesignalAlarmAsync(e, telesignal); //联动检测 _linkageAnalysisTask.LinkAnanysisActionBlock.Post(telesignal.Id); } }); //修改对应设备的遥信值后,跳出本次循环,修改下一个遥信 } //} bool isChange_0 = eSendDataDic_0.Count > 0; bool isChange_1 = eSendDataDic_1.Count > 0; if (isChange_0) { SendMonitoringChangedDataAsync(eSendDataDic_0.Values.ToList(), GroupTypeEnum.DLHJ); } if (isChange_1) { SendMonitoringChangedDataAsync(eSendDataDic_1.Values.ToList(), GroupTypeEnum.ZXJC); } //stopwatch.Stop(); //Debug.WriteLine("发送时间:"+ stopwatch.ElapsedMilliseconds); } /// /// 处理数据发送遥测数据到客户端 /// /// public void ClassifyAndSendYCInfo(YC_TYPE_New yc) { //动力环境 Dictionary eSendDataDic_0 = new Dictionary(); //在线监测 Dictionary eSendDataDic_1 = new Dictionary(); List tasks = new List(); foreach (EquipmentDataModel e in _runningDataCache.EquipmentDataDic.Values) { TelemeteringModel telemetering = e.Telemeterings .FirstOrDefault(m=> m.DispatcherAddress == yc.inf); if (telemetering == null) continue; float value = yc.val * telemetering.Coefficient; telemetering.ResultValue = value; telemetering.ResultTime = yc.time; Task.Run(() => { var rst = new TelemeteringResult() { ResultTime = telemetering.ResultTime, ResultValue = telemetering.ResultValue, TelemeteringConfigurationId = telemetering.Id, SaveMethod = 2 }; //保存变化的数据 _telemeteringResultSaveTask.SaveTelemeteringResultActionBlock?.Post(rst); //在Redis中缓存环境温度 if (telemetering.IsEnvironmentTemp) { EnvironmentTempValue environmentTempValue = new EnvironmentTempValue(value); _redisDataRepository.EnvironmentTempValueRedis.HashSetUpdateOneAsync(nameof(EnvironmentTempValue), telemetering.Id.ToString(), environmentTempValue); } //如果值不变则不进行后续处理 var disArming = _webApiRequest.GetEquipmentSafetyState(e.EquipmentInfoId); if (disArming == 1 || disArming == 0) { _alarmAnalysis.HandleTelemeteringAlarmAsync(e, telemetering); //联动检测 _linkageAnalysisTask.LinkAnanysisActionBlock.Post(telemetering.Id); //修改对应设备的遥测值后,跳出本次循环,修改下一个遥信 } }); switch (e.GroupType) { case MonitoringGroupTypeEnum.DLHJ: SetSendTelemeteringDataDic(e, telemetering, ref eSendDataDic_0); break; case MonitoringGroupTypeEnum.ZXJC: SetSendTelemeteringDataDic(e, telemetering, ref eSendDataDic_1); break; default: break; } } bool isChange_0 = eSendDataDic_0.Count > 0; bool isChange_1 = eSendDataDic_1.Count > 0; if (isChange_0) { //发送动力环境遥测数据到客户端 SendMonitoringChangedDataAsync(eSendDataDic_0.Values.ToList(), GroupTypeEnum.DLHJ); } if (isChange_1) { //发送在线检测遥测数据到客户端 SendMonitoringChangedDataAsync(eSendDataDic_1.Values.ToList(), GroupTypeEnum.ZXJC); } } private Dictionary TelesignaleAlarmTempBuffDic = new Dictionary(); /// /// 初始化遥信设备信息 /// /// /// /// private void SetSendTelesignalDataDic(EquipmentDataModel e, TelesignalisationModel telesignal, ref Dictionary dic) { if (dic != null && dic.ContainsKey(e.EquipmentInfoId)) { dic[e.EquipmentInfoId].Telesignalisations.Add(telesignal); } else { EquipmentDataModel sendE = new EquipmentDataModel { EquipmentInfoId = e.EquipmentInfoId, EquipmentTypeId = e.EquipmentTypeId, SafetyStateType = e.SafetyStateType, GroupType = e.GroupType }; sendE.Telesignalisations.Add(telesignal); dic.Add(sendE.EquipmentInfoId, sendE); } } /// /// 初始化遥信设备信息 /// /// /// /// private void SetSendTelemeteringDataDic(EquipmentDataModel e, TelemeteringModel telemetering, ref Dictionary dic) { if (dic != null && dic.ContainsKey(e.EquipmentInfoId)) { dic[e.EquipmentInfoId].Telemeterings.Add(telemetering); } else { EquipmentDataModel sendE = new EquipmentDataModel { EquipmentInfoId = e.EquipmentInfoId, EquipmentTypeId = e.EquipmentTypeId, SafetyStateType = e.SafetyStateType, GroupType = e.GroupType }; sendE.Telemeterings.Add(telemetering); dic.Add(sendE.EquipmentInfoId, sendE); } } /// /// 发送变化的数据 /// /// /// private async void SendMonitoringChangedDataAsync(List eList, GroupTypeEnum groupType) { await Task.Run(() => { DataMonitorMessageModel messageModel = new DataMonitorMessageModel { Content = eList, GroupType = groupType, MessageType = MessgeTypeEnum.Changed }; //#warning Send _webSocketServer.SendMsg(messageModel); }).ConfigureAwait(false); } /// /// 发送遥测数据到主界面日志显示 /// /// public void RecordYCLogInfo(YC_TYPE_New info) { string infoStr = $"装置地址:{info.addr},CPU扇区号:{info.sector},编码地址:{info.inf},遥测值:{info.val}"; MonitoringEventBus.LogHandler(infoStr, "遥测数据"); //if (info.inf >= startInfoAddr && info.inf <= endInfoAddr) //{ // MonitoringEventBus.LogHandler(infoStr, "遥测数据"); //} } /// /// 发送遥信数据到主界面日志显示 /// /// public void RecordYXLogInfo(RECORDYXBURST_New info) { string infoStr = $"装置地址:{info.dev_addr},CPU扇区号{info.dev_sector},编码地址:{info.dev_inf},遥信值:{info.yx_val}"; MonitoringEventBus.LogHandler(infoStr, "双点信息"); //if (info.dev_inf >= startInfoAddr && info.dev_inf <= endInfoAddr) //{ // MonitoringEventBus.LogHandler(infoStr, "遥信数据"); //} } } }