using Abp.Dependency; using MongoDB.Driver; using MongoDB.Driver.Linq; using System; using System.Collections; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Threading; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; using System.Windows.Documents; using System.Windows.Markup; using ToolLibrary.LogHelper; using Yunda.ISAS.DataMonitoringServer.DataAnalysis.DataCollection.Dlls; using Yunda.ISAS.DataMonitoringServer.DataCenter; using Yunda.ISAS.DataMonitoringServer.WPF.ViewModel; using YunDa.ISAS.Entities.DataMonitoring; using YunDa.ISAS.Redis.Entities.DataMonitorCategory; using YunDa.SOMS.DataTransferObject.GeneralInformation.EquipmentLiveData; namespace Yunda.ISAS.DataMonitoringServer.DataAnalysis.DataCollection { /// /// 数据采集 /// public class DataCollectionTask : ISingletonDependency { private Content _settingModel = null; /// /// 时间转换算法 /// private Func FormatDateTime = tm => new DateTime(2000 + tm[6], tm[5], tm[4], tm[3], tm[2], ((tm[1] << 8) + tm[0]) / 1000, ((tm[1] << 8) + tm[0]) % 1000); CancellationTokenSource cancellationTokenSource = default; private readonly DataSendTask _dataSendTask; private readonly RunningDataCache _runningDataCache; private readonly RedisDataRepository _redisDataRepository; private readonly WebApiRequest _webApiRequest; public DataCollectionTask(Content settingModel , DataSendTask dataSendTask, RedisDataRepository redisDataRepository, WebApiRequest webApiRequest, RunningDataCache runningDataCache) { _dataSendTask = dataSendTask; _settingModel = settingModel; _runningDataCache = runningDataCache; _redisDataRepository = redisDataRepository; _webApiRequest = webApiRequest; Task.Factory.StartNew(async () => { int count = 0; while (true) { if (_runningDataCache != null) { if (count == 1440) { CallAllData(); Log4Helper.Info(this.GetType(), $"定时总召,时间间隔{count}分钟"); count =0; } } count++; await Task.Delay(TimeSpan.FromMinutes(1)); if (_isStarted) { if ((DateTime.Now - _lastTime).TotalSeconds>300) { Log4Helper.Info(this.GetType(), $"超过5分钟没有收到心跳报文,执行重启104"); CollectionStop(); await Task.Delay(TimeSpan.FromSeconds(5)); CollectionStart(_startWebsocket); } } } }); } #region 数据采集计时器 ActionBlock _handleDataAction; public static IEC104Client _client; DateTime _lastTime = DateTime.Now; private bool _isStarted= false; Action _startWebsocket; /// /// 开始 /// public void CollectionStart(Action startWebsocket) { _startWebsocket = startWebsocket; yxList.Clear(); ycList.Clear(); MonitoringEventBus.LogHandler($"开始连接远动机:ip:{_settingModel.Dev_Ip},端口:{_settingModel.Dev_Port}", "通知信息"); _webApiRequest.StartVisulDevice(2408); try { _handleDataAction = new ActionBlock(data => { if (data.DataType == IECDataType.Telemetering) { YC_TYPE_New info = new YC_TYPE_New() { chgFlag = 0, addr = (byte)_settingModel.Dev_addr, sector = 0, val= data.FValue, inf = data.InfoAddr, time = data.Time }; if (_isStarted) { _dataSendTask.RecordYCLogInfo(info); _dataSendTask.YCTActionBlock.Post(info); } else { //OnlyChangeYC(info); ycList.Add(info); } } else if (data.DataType == IECDataType.Telesignal) { var info = new RECORDYXBURST_New() { dev_addr = (byte)_settingModel.Dev_addr, dev_inf = data.InfoAddr, dev_sector = 0, yx_val = data.IValue, time = data.Time }; if (_isStarted) { _dataSendTask.RecordYXLogInfo(info); _dataSendTask.RECORDYXBURSTActionBlock.Post(info); } else { yxList.Add(info); } } else if (data.DataType == IECDataType.Error) { MonitoringEventBus.LogHandler(data.Ex.ToString(), "异常信息"); Log4Helper.Error(this.GetType(), data.Ex.ToString()); } else if(data.DataType == IECDataType.Msg) { MonitoringEventBus.LogHandler(data.Msg, "104数据"); //Log4Helper.Info(this.GetType(), data.Msg); //Debug.WriteLine(data.Msg); if (data.Msg.Contains("U格式")) { _lastTime = DateTime.Now; } } else if (data.DataType == IECDataType.CompleteAll) { Log4Helper.Info(this.GetType(), $"完成总召时间: {DateTime.Now.ToLongTimeString()}"); if (!_isStarted) { _isStarted = true; startWebsocket(); } Task.Run(() => { OnlyChangeYX(); OnlyChangeYC(); }); } },new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism =2,MaxMessagesPerTask = 3000}); _client = new IEC104Client(_settingModel.Dev_Ip, _settingModel.Dev_Port, _handleDataAction); _client.Connect(); return; } catch (Exception ex) { MonitoringEventBus.LogHandler(ex.StackTrace, "异常信息"); } MonitoringEventBus.LogHandler("数据采集开始", "数据采集"); } public void CallAllData() { _isStarted = false; _client?.CallAll(); } /// /// 停止 /// public void CollectionStop() { try { _isStarted = false; _client?.Close(); } catch (Exception ex) { MonitoringEventBus.LogHandler("数据采集关闭", "数据采集"); } } #endregion 数据采集计时器 Dictionary TelesignalisationModelDic = new Dictionary(); ConcurrentBag yxList = new ConcurrentBag(); ConcurrentBag ycList = new ConcurrentBag(); /// /// 仅更新遥信数据字典 /// /// private async void OnlyChangeYX() { string redisKey = _redisDataRepository.TelesignalisationModelListRediskey+"_"+ _settingModel.GetDatacatgoryName(_settingModel.DataSourceCategoryName); // 从 Redis 中批量获取所有数据 var yxDatas = await _redisDataRepository.TelesignalisationModelListRedis.HashSetGetAllAsync(redisKey); if (yxDatas == null || yxDatas.Count == 0) return; var updateList = new List(); var updateIdList = new List(); var categoriyValue = _settingModel.GetDatacatgoryValue(_settingModel.DataSourceCategoryName); foreach (RECORDYXBURST_New yx in yxList) { var yxData = yxDatas.FirstOrDefault(t => t.DispatcherAddress == yx.dev_inf&&t.CPUSector == yx.dev_sector&&yx.dev_addr ==t.DeviceAddress&&(int)t.DataSourceCategory == categoriyValue); if (yxData != null) { yxData.ResultTime = yx.time; yxData.ResultValue = yx.yx_val; updateList.Add(yxData); updateIdList.Add($"{yxData.DeviceAddress}_{yxData.CPUSector}_{yxData.DispatcherAddress}_{categoriyValue}"); //Log4Helper.Info(this.GetType(), $"查找数据成功: 地址:{yx.dev_inf} 类型:{categoriyValue}"); // 添加到更新列表 } } // 批量更新 Redis,减少写入操作的频次 await _redisDataRepository.TelesignalisationModelListRedis.HashSetUpdateManyAsync(redisKey, updateIdList, updateList); } /// /// 仅更新遥测数据字典 /// /// private async void OnlyChangeYC() { string redisKey = _redisDataRepository.TelemeteringModelListRediskey + "_" + _settingModel.GetDatacatgoryName(_settingModel.DataSourceCategoryName); ; // 从 Redis 中批量获取所有遥测数据 var ycDatas = await _redisDataRepository.TelemeteringModelListRedis.HashSetGetAllAsync(redisKey); if (ycDatas == null || ycDatas.Count == 0) return; // 用于批量更新的列表 var updateList = new List(); var updateIdList = new List(); var categoriyValue = _settingModel.GetDatacatgoryValue(_settingModel.DataSourceCategoryName); foreach (YC_TYPE_New yc in ycList) { // 在内存数据中查找符合条件的项 var ycData = ycDatas.FirstOrDefault(t => t.DispatcherAddress == yc.inf&&t.CPUSector ==yc.sector&&t.DeviceAddress == yc.addr&& (int)t.DataSourceCategory == categoriyValue); if (ycData != null) { ycData.ResultTime = yc.time; ycData.ResultValue = yc.val; // 将更新项添加到批量更新列表 updateList.Add( ycData); updateIdList.Add($"{ycData.DeviceAddress}_{ycData.CPUSector}_{ycData.DispatcherAddress}_{categoriyValue}"); //Log4Helper.Info(this.GetType(), $"查找数据成功: 地址:{yc.inf} 类型:{categoriyValue}"); } } // 一次性将所有更新写入 Redis await _redisDataRepository.TelemeteringModelListRedis.HashSetUpdateManyAsync(redisKey, updateIdList, updateList); } /// /// 获取指定地址的数据 /// /// /// /// public void GetOneData(byte addr, byte sector, ushort inf) { } } }