using Abp.Dependency; using MongoDB.Driver; using MongoDB.Driver.Linq; using System; using System.Collections; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Threading; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; using System.Windows.Documents; using System.Windows.Markup; using ToolLibrary.LogHelper; using Yunda.ISAS.DataMonitoringServer.DataAnalysis.DataCollection.Dlls; using Yunda.ISAS.DataMonitoringServer.DataCenter; using Yunda.ISAS.DataMonitoringServer.WPF.ViewModel; using YunDa.ISAS.DataTransferObject.EquipmentLiveData; using YunDa.ISAS.Entities.DataMonitoring; using YunDa.ISAS.Redis.Entities.DataMonitorCategory; namespace Yunda.ISAS.DataMonitoringServer.DataAnalysis.DataCollection { /// /// 数据采集 /// public class DataCollectionTask : ISingletonDependency { private Content _settingModel = null; //public delegate void DataCollectionTaskCompleteDelegate(); //public event DataCollectionTaskCompleteDelegate DataCollectionTaskCompleteEvent; /// /// 时间转换算法 /// private Func FormatDateTime = tm => new DateTime(2000 + tm[6], tm[5], tm[4], tm[3], tm[2], ((tm[1] << 8) + tm[0]) / 1000, ((tm[1] << 8) + tm[0]) % 1000); CancellationTokenSource cancellationTokenSource = default; private readonly DataSendTask _dataSendTask; private readonly RunningDataCache _runningDataCache; private readonly RedisDataRepository _redisDataRepository; public DataCollectionTask(Content settingModel , DataSendTask dataSendTask, RedisDataRepository redisDataRepository, RunningDataCache runningDataCache) { _dataSendTask = dataSendTask; _settingModel = settingModel; _runningDataCache = runningDataCache; _redisDataRepository = redisDataRepository; Task.Factory.StartNew(() => { while (true) { if (_runningDataCache != null) { TimeSpan span = TimeSpan.FromMinutes(30); CallAllData(); Log4Helper.Info(this.GetType(), $"定时总召,时间间隔{span.TotalMinutes}分钟"); Task.Delay(span).Wait(); } } }); } #region 数据采集计时器 ActionBlock _handleDataAction; public static IEC104Client _client; private bool _isStarted= false; /// /// 开始 /// public void CollectionStart(Content settingModel, Action startWebsocket) { yxList.Clear(); ; ycList.Clear(); MonitoringEventBus.LogHandler($"开始连接远动机:ip:{settingModel.Dev_Ip},端口:{settingModel.Dev_Port}", "通知信息"); try { _handleDataAction = new ActionBlock(data => { Stopwatch stopwatch = Stopwatch.StartNew(); if (data.DataType == IECDataType.Telemetering) { YC_TYPE_New info = new YC_TYPE_New() { chgFlag = 0, addr = (byte)data.Address, sector = 0, val= data.FValue, inf = data.InfoAddr, time = data.Time }; if (_isStarted) { _dataSendTask.RecordYCLogInfo(info); _dataSendTask.YCTActionBlock.Post(info); } else { //OnlyChangeYC(info); ycList.Add(info); } } else if (data.DataType == IECDataType.Telesignal) { var info = new RECORDYXBURST_New() { dev_addr = (byte)data.Address, dev_inf = data.InfoAddr, dev_sector = 0, yx_val = data.IValue, time = data.Time }; if (_isStarted) { _dataSendTask.RecordYXLogInfo(info); _dataSendTask.RECORDYXBURSTActionBlock.Post(info); } else { yxList.Add(info); } } else if (data.DataType == IECDataType.Error) { MonitoringEventBus.LogHandler(data.Ex.ToString(), "异常信息"); Log4Helper.Error(this.GetType(), data.Ex.ToString()); } else if(data.DataType == IECDataType.Msg) { MonitoringEventBus.LogHandler(data.Msg, "104数据"); Log4Helper.Info(this.GetType(), data.Msg); //Debug.WriteLine(data.Msg); } else if (data.DataType == IECDataType.CompleteAll) { Log4Helper.Info(this.GetType(), $"完成总召时间: {DateTime.Now.ToLongTimeString}"); if (!_isStarted) { _isStarted = true; startWebsocket(); } Task.Run(() => { OnlyChangeYX(); OnlyChangeYC(); }); } stopwatch.Stop(); //Debug.WriteLine("处理数据花费时间:"+stopwatch.ElapsedTicks); Log4Helper.Info(this.GetType(),$"处理数据花费时间: {stopwatch.ElapsedTicks}"); },new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism =2,MaxMessagesPerTask = 3000}); _client = new IEC104Client(settingModel.Dev_Ip, settingModel.Dev_Port, _handleDataAction); _client.Connect(); //#if DEBUG // while (true) // { // _handleDataAction.Post(new IECData // { // Cause = 1, // DataType = IECDataType.Telesignal, // IValue = 1, // Time = DateTime.Now, // InfoAddr = 3024 // }); // Task.Delay(10 * 1000).Wait(); // } //#endif return; } catch (Exception ex) { MonitoringEventBus.LogHandler(ex.Message, "异常信息"); } MonitoringEventBus.LogHandler("数据采集开始", "数据采集"); } public void CallAllData() { _isStarted = false; _client?.CallAll(); } /// /// 停止 /// public void CollectionStop() { _isStarted = false; _client?.Close(); } #endregion 数据采集计时器 Dictionary TelesignalisationModelDic = new Dictionary(); List yxList = new List(); List ycList = new List(); /// /// 仅更新遥信数据字典 /// /// private async void OnlyChangeYX() { string redisKey = _redisDataRepository.TelesignalisationModelListRediskey; string redisInflintkey = _redisDataRepository.TelesignalisationModelInflectionListRediskey; var yxDatas = await _redisDataRepository.TelesignalisationModelListRedis.HashSetGetAllAsync(redisKey); if (yxDatas != null && yxDatas.Count > 0) { foreach (RECORDYXBURST_New yx in yxList) { var yxData = yxDatas.FirstOrDefault(t=> t.DispatcherAddress == yx.dev_inf); if (yxData != null) { yxData.ResultTime = yx.time; yxData.ResultValue = yx.yx_val; //更新到内存数据库中 await _redisDataRepository.TelesignalisationModelListRedis.HashSetUpdateOneAsync(redisKey, yxData.Id.ToString(), yxData); //_redisDataRepository.TelesignalisationModelInflectionListRedis.ListRightPush(redisInflintkey, yxData); } } } } /// /// 仅更新遥测数据字典 /// /// private async void OnlyChangeYC() { string rediskey = _redisDataRepository.TelemeteringModelListRediskey; string redisInflintkey = _redisDataRepository.TelemeteringModelInflectionInflectionListRediskey; var ycDatas = await _redisDataRepository.TelemeteringModelListRedis.HashSetGetAllAsync(rediskey); if (ycDatas != null && ycDatas.Count > 0) { foreach (YC_TYPE_New yc in ycList) { var ycData = ycDatas.FirstOrDefault(t => t.DispatcherAddress == yc.inf); if (ycData != null) { ycData.ResultTime = yc.time; ycData.ResultValue = yc.val; //更新到内存数据库中 await _redisDataRepository.TelemeteringModelListRedis.HashSetUpdateOneAsync(rediskey, ycData.Id.ToString(), ycData); } } } } /// /// 获取指定地址的数据 /// /// /// /// public void GetOneData(byte addr, byte sector, ushort inf) { } } }