using Abp.Dependency; using MongoDB.Driver; using MongoDB.Driver.Linq; using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Threading; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; using System.Windows.Documents; using ToolLibrary.LogHelper; using Yunda.ISAS.DataMonitoringServer.DataAnalysis.DataCollection.Dlls; using Yunda.ISAS.DataMonitoringServer.DataCenter; using Yunda.ISAS.DataMonitoringServer.WPF.ViewModel; using YunDa.ISAS.DataTransferObject.EquipmentLiveData; using YunDa.ISAS.Entities.DataMonitoring; using YunDa.ISAS.Redis.Entities.DataMonitorCategory; namespace Yunda.ISAS.DataMonitoringServer.DataAnalysis.DataCollection { /// /// 数据采集 /// public class DataCollectionTask : ISingletonDependency { private Content _settingModel = null; //public delegate void DataCollectionTaskCompleteDelegate(); //public event DataCollectionTaskCompleteDelegate DataCollectionTaskCompleteEvent; /// /// 时间转换算法 /// private Func FormatDateTime = tm => new DateTime(2000 + tm[6], tm[5], tm[4], tm[3], tm[2], ((tm[1] << 8) + tm[0]) / 1000, ((tm[1] << 8) + tm[0]) % 1000); CancellationTokenSource cancellationTokenSource = default; private readonly DataSendTask _dataSendTask; private readonly RunningDataCache _runningDataCache; private readonly RedisDataRepository _redisDataRepository; public DataCollectionTask(Content settingModel , DataSendTask dataSendTask, RedisDataRepository redisDataRepository, RunningDataCache runningDataCache) { _dataSendTask = dataSendTask; _settingModel = settingModel; _runningDataCache = runningDataCache; _redisDataRepository = redisDataRepository; Task.Factory.StartNew(() => { while (true) { if (_runningDataCache != null) { TimeSpan span = TimeSpan.FromMinutes(30); CallAllData(); Log4Helper.Info(this.GetType(), $"定时总召,时间间隔{span.TotalMinutes}分钟"); Task.Delay(span).Wait(); } } }); } #region 数据采集计时器 ActionBlock _handleDataAction; public static IEC104Client _client; private bool _isStarted= false; /// /// 开始 /// public void CollectionStart(Content settingModel, Action startWebsocket) { try { _handleDataAction = new ActionBlock(data => { Stopwatch stopwatch = Stopwatch.StartNew(); if (data.DataType == IECDataType.Telemetering) { YC_TYPE_New info = new YC_TYPE_New() { chgFlag = 0, addr = 1, sector = 0, val= data.FValue, inf = data.InfoAddr, time = data.Time }; if (_isStarted) { _dataSendTask.RecordYCLogInfo(info); _dataSendTask.YCTActionBlock.Post(info); } else { OnlyChangeYC(info); } } else if (data.DataType == IECDataType.Telesignal) { var info = new RECORDYXBURST_New() { dev_addr =1, dev_inf = data.InfoAddr, dev_sector = 0, yx_val = data.IValue, time = data.Time }; if (_isStarted) { _dataSendTask.RecordYXLogInfo(info); _dataSendTask.RECORDYXBURSTActionBlock.Post(info); } else { OnlyChangeYX(info); } } else if (data.DataType == IECDataType.Error) { MonitoringEventBus.LogHandler(data.Ex.ToString(), "异常信息"); Log4Helper.Error(this.GetType(), data.Ex.ToString()); } else if(data.DataType == IECDataType.Msg) { //MonitoringEventBus.LogHandler(data.Msg, "104数据"); Log4Helper.Info(this.GetType(), data.Msg); } else if (data.DataType == IECDataType.CompleteAll) { Log4Helper.Info(this.GetType(), $"完成总召时间: {DateTime.Now.ToLongTimeString}"); if (!_isStarted) { _isStarted = true; startWebsocket(); } } stopwatch.Stop(); //Debug.WriteLine("处理数据花费时间:"+stopwatch.ElapsedTicks); Log4Helper.Info(this.GetType(),$"处理数据花费时间: {stopwatch.ElapsedTicks}"); },new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism =2,MaxMessagesPerTask = 3000}); _client = new IEC104Client(settingModel.Dev_Ip, settingModel.Dev_Port, _handleDataAction); _client.Connect(); //#if DEBUG // while (true) // { // _handleDataAction.Post(new IECData // { // Cause = 1, // DataType = IECDataType.Telesignal, // IValue = 1, // Time = DateTime.Now, // InfoAddr = 3024 // }); // Task.Delay(10 * 1000).Wait(); // } //#endif return; } catch (Exception ex) { MonitoringEventBus.LogHandler(ex.Message, "异常信息"); } MonitoringEventBus.LogHandler("数据采集开始", "数据采集"); } public void CallAllData() { _isStarted = false; _client?.CallAll(); } /// /// 停止 /// public void CollectionStop() { _isStarted = false; _client.Close(); } #endregion 数据采集计时器 Dictionary TelesignalisationModelDic = new Dictionary(); /// /// 仅更新遥信数据字典 /// /// private void OnlyChangeYX(RECORDYXBURST_New yx) { //if (TelesignalisationModelDic.ContainsKey((int)yx.dev_inf)) //{ // TelesignalisationModel telesignal = TelesignalisationModelDic[(int)yx.dev_inf]; // var value = telesignal.RemoteType == RemoteTypeEnum.SinglePoint ? (yx.yx_val - 1) : yx.yx_val; // if (value != telesignal.ResultValue) // { // telesignal.ResultTime = yx.time; // telesignal.ResultValue = value;// telesignal.RemoteType == RemoteTypeEnum.SinglePoint ? (yx.yx_val - 1) : yx.yx_val; // } //} //else //{ //} foreach (EquipmentDataModel e in _runningDataCache.EquipmentDataDic.Values) { TelesignalisationModel telesignal = e.Telesignalisations.FirstOrDefault(m => m.DispatcherAddress == yx.dev_inf); if (telesignal == null) continue; var value = telesignal.RemoteType == RemoteTypeEnum.SinglePoint ? (yx.yx_val - 1) : yx.yx_val; if (value != telesignal.ResultValue) { telesignal.ResultTime = yx.time; telesignal.ResultValue = value;// telesignal.RemoteType == RemoteTypeEnum.SinglePoint ? (yx.yx_val - 1) : yx.yx_val; } break; } } /// /// 仅更新遥测数据字典 /// /// private void OnlyChangeYC(YC_TYPE_New yc) { foreach (EquipmentDataModel e in _runningDataCache.EquipmentDataDic.Values) { TelemeteringModel telemetering = e.Telemeterings.FirstOrDefault(m=>m.DispatcherAddress == yc.inf); if (telemetering == null) continue; float value = yc.val * telemetering.Coefficient; if (Math.Abs( telemetering.ResultValue - value)>=0.01) { telemetering.ResultTime = yc.time; telemetering.ResultValue = value; if (telemetering.IsEnvironmentTemp) { EnvironmentTempValue environmentTempValue = new EnvironmentTempValue(value); _redisDataRepository.EnvironmentTempValueRedis.HashSetUpdateOneAsync(nameof(EnvironmentTempValue), telemetering.Id.ToString(), environmentTempValue); } } break; } } /// /// 获取指定地址的数据 /// /// /// /// public void GetOneData(byte addr, byte sector, ushort inf) { //float val = default; //var res1 = ICE104EndPointController.Iec104ClnGetYC(addr, sector, inf, ref val); //MonitoringEventBus.LogHandler(res1.ToString(), "获取遥测返回结果状态"); //MonitoringEventBus.LogHandler(val.ToString(), "获取遥测返回结果值"); //if (res1 == 0) //{ // var info = new YC_TYPE_New() // { // sector = sector, // addr = addr, // inf = inf, // val = val, // }; // _dataSendTask.RecordYCLogInfo(info); // OnlyChangeYC(info); // //Task.Run(() => SendYCInfoToClient(info)); //} //byte yx_val = 4; //var res2 = ICE104EndPointController.Iec104ClnGetYX(addr, sector, inf, ref yx_val); //MonitoringEventBus.LogHandler(res2.ToString(), "获取遥信返回结果状态"); //MonitoringEventBus.LogHandler(yx_val.ToString(), "获取遥信返回结果值"); //if (res2 == 0) //{ // var info = new RECORDYXBURST_New() // { // dev_sector = sector, // dev_addr = addr, // dev_inf = inf, // yx_val = yx_val, // }; // _dataSendTask.RecordYXLogInfo(info); // OnlyChangeYX(info); //} } } }