using Abp.Dependency; using Castle.MicroKernel.Util; using Jint; using Newtonsoft.Json; using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; using ToolLibrary.LogHelper; using Yunda.ISAS.DataMonitoringServer.DataAnalysis.AlarmQueue; using Yunda.ISAS.DataMonitoringServer.DataAnalysis.Helper; using Yunda.ISAS.DataMonitoringServer.DataAnalysis.Model; using Yunda.ISAS.DataMonitoringServer.DataCenter; using Yunda.ISAS.DataMonitoringServer.WebSocket; using Yunda.ISAS.DataMonitoringServer.WebSocket.Model; using YunDa.ISAS.DataTransferObject.DataMonitoring.LinkageConditionDto; using YunDa.ISAS.DataTransferObject.EquipmentLiveData; using YunDa.ISAS.Entities.MySQL.DataMonitoring; using ConstantModel = Yunda.ISAS.DataMonitoringServer.DataAnalysis.Model.ConstantModel; namespace Yunda.ISAS.DataMonitoringServer.DataAnalysis.LinkageAnalysis { public class LinkageAnalysisTask : ISingletonDependency { /// /// 联动策略及其条件列表 /// private double _analysisInterval = ConstantModel.DefaultInterval; private readonly RunningDataCache _runningDataCache; private readonly int ConditionListMaxCount = 1000; public ActionBlock LinkAnanysisActionBlock { get; } = default; private readonly ImpletementLinkActives _impletementLinkActives; private readonly WebSocketServer _webSocketServer; private readonly AlarmQueueDataPublish _alarmQueueDataPublish; private readonly RedisDataRepository _redisDataRepository; private readonly ConfigurationHepler _configurationHepler; public LinkageAnalysisTask(ImpletementLinkActives impletementLinkActives, RunningDataCache runningDataCache, WebSocketServer webSocketController, AlarmQueueDataPublish alarmQueueDataPublish, ConfigurationHepler configurationHepler, //MonitoringDataService monitoringDataService, RedisDataRepository redisDataRepository ) { _impletementLinkActives = impletementLinkActives; _runningDataCache = runningDataCache; _webSocketServer = webSocketController; _alarmQueueDataPublish = alarmQueueDataPublish; _configurationHepler = configurationHepler; _redisDataRepository = redisDataRepository; Action action = async (configurationId) => { try { List linkageStrategys = _runningDataCache.LinkageStrategys .Where(ls => ls.LinkageConditions .Any(lc => lc.TelemeteringConfigurationId == configurationId || lc.TelesignalisationConfigurationId == configurationId)) .ToList(); if (linkageStrategys != null&& linkageStrategys.Count>0) { //联动策略分析 foreach (LinkageStrategyModel linkageStrategy in linkageStrategys) { Log4Helper.Info(this.GetType(), "联动分析"); Log4Helper.Info(this.GetType(),JsonConvert.SerializeObject( linkageStrategy)); if (_alarmQueueDataPublish.IsClearAlarm((Guid)linkageStrategy.Id)) { Log4Helper.Info(this.GetType(), "联动报警解除,无法联动"); continue; } //已经发出联动命令,无需重复判断 //没有设置条件的联动 , 不执行 if (linkageStrategy.LinkageConditions == null && linkageStrategy.LinkageConditions.Count() == 0) continue; List guids = GetConditionIds(linkageStrategy); if (guids == null || guids.Count == 0) continue; //联动条件判断字符串容器 List conditionStrs = new List(); List lcValues = new List(); LinkageConditionValueModel lcValue = null; Dictionary telemterings = new Dictionary(); Log4Helper.Info(this.GetType(), "联动条件判定"); //按顺序获取联动条件判断字符串 foreach (Guid conditionId in guids) { string conditionStr = ""; try { if (!linkageStrategy.LinkageConditionDic.ContainsKey(conditionId)) continue; LinkageConditionProperty condition = linkageStrategy.LinkageConditionDic[conditionId]; EquipmentDataModel equipment = null; if (condition.EquipmentInfoId.HasValue && _runningDataCache.EquipmentDataDic.ContainsKey((Guid)condition.EquipmentInfoId)) equipment = _runningDataCache.EquipmentDataDic[(Guid)condition.EquipmentInfoId]; //_runningDataCache.EquipmentDataDic.Values.FirstOrDefault(t=>t.Telesignalisations.FirstOrDefault(x=>x.Id == )); if (equipment == null) { Log4Helper.Info(this.GetType(), "没有找到设备"); continue; } lcValue = new LinkageConditionValueModel(); Log4Helper.Info(this.GetType(), $"条件:{JsonConvert.SerializeObject(condition)},规则:{JsonConvert.SerializeObject(linkageStrategy)}"); Log4Helper.Info(this.GetType(), "表达式运算"); switch (condition.ConditionType) { case ConditionTypeEnum.Telemetering: if (condition.TelemeteringConfigurationId.HasValue) { telemterings.Add(condition.TelemeteringConfigurationId.Value, 1); } conditionStr = TelemeteringConditionComparison(equipment, condition, linkageStrategy, ref lcValue); break; case ConditionTypeEnum.Telesignalisation: if (condition.TelesignalisationConfigurationId.HasValue) { telemterings.Add(condition.TelesignalisationConfigurationId.Value, 2); } conditionStr = TelesignalisationConditionComparison(equipment, condition, linkageStrategy, ref lcValue); break; case ConditionTypeEnum.Time: if (condition.DeterminationTime.HasValue) { conditionStr = TimeConditionComparison(condition); //telemterings.Add(condition.TelesignalisationConfigurationId.Value, 2); } else { conditionStr = "true"; } break; } } catch (Exception ex) { Log4Helper.Error(this.GetType(),"联动条件判定",ex); conditionStr = "true"; } if (string.IsNullOrWhiteSpace(conditionStr)) continue; if (bool.Parse(conditionStr)) { Log4Helper.Info(this.GetType(), "联动条件成立"); long listConut = await _redisDataRepository.LastTrueResultTimeList.ListLeftPushAsync(conditionId.ToString(), DateTime.Now); if (listConut > ConditionListMaxCount) { await _redisDataRepository.LastTrueResultTimeList.ListRightPopAsync(conditionId.ToString()); } } else { Log4Helper.Info(this.GetType(), "联动条件失败"); } conditionStrs.Add(conditionStr); lcValues.Add(lcValue); } //联动字符串数组长度与联动策略规则需要的长度不一致,则出现数据问题,忽略此联动 if (conditionStrs.Count != guids.Count) continue; Log4Helper.Info(this.GetType(), "联动字符串数组长度与联动策略规则需要的长度一致"); //拼装联动策略规则字符串,加入条件判断 bool isNeedLinkage = false; //是否需要联动 //执行联动策略规则字符串 if (conditionStrs.Count ==1) { bool conditionparse = bool.Parse(conditionStrs[0]); if (conditionparse) { if (linkageStrategy.ForceLinkageSeconds == 0 || linkageStrategy.ForceLinkageTimes == 0) { isNeedLinkage = conditionparse; } else if (linkageStrategy.ForceLinkageSeconds > 0 && linkageStrategy.ForceLinkageTimes > 0) { var lastTrueList = await _redisDataRepository.LastTrueResultTimeList.ListRangeAsync(linkageStrategy.ConditionIds.ToString(), 0, linkageStrategy.ForceLinkageTimes); if (lastTrueList.Count > 1) { var isForceLink = lastTrueList[lastTrueList.Count - 1] + TimeSpan.FromSeconds(linkageStrategy.ForceLinkageSeconds) <= DateTime.Now; isForceLink = isForceLink && lastTrueList.Count>= linkageStrategy.ForceLinkageTimes; if (isForceLink) { isNeedLinkage = conditionparse; } } else { isNeedLinkage = conditionparse; } } if (linkageStrategy.LastLinkTime + TimeSpan.FromSeconds(linkageStrategy.RepeatLinkageInterval) > DateTime.Now) { isNeedLinkage = false; } } } //多个联动条件则判断是否超过时间间隔 else if (conditionStrs.Count > 1) { if (linkageStrategy.LastLinkTime + TimeSpan.FromSeconds(linkageStrategy.RepeatLinkageInterval) > DateTime.Now) { isNeedLinkage = false; } else { var conditionids = linkageStrategy.ConditionIds.Split(','); if (conditionStrs.Count == conditionids.Count()) { if (linkageStrategy.TimeOfWithRelationship > 0) { for (int i = 0; i < conditionStrs.Count; i++) { var dataCfgId = conditionids[i].ToString(); var datetimes = await _redisDataRepository.LastTrueResultTimeList.ListRangeAsync(dataCfgId, 0, 0); bool conditionparse = bool.Parse(conditionStrs[i]); if (!conditionparse) { if (datetimes != null && datetimes.Count() > 0) { if (DateTime.Now - datetimes[0] < TimeSpan.FromSeconds(linkageStrategy.TimeOfWithRelationship)) { conditionStrs[i] = "true"; } } } } } } string ruleStr = string.Format(linkageStrategy.Rule, conditionStrs.ToArray()); using (var engine = new Engine()) { object isLinkageObj = engine.Evaluate(ruleStr); bool.TryParse(isLinkageObj.ToString(), out isNeedLinkage); // 评估表达式 } } } linkageStrategy.LastLinkState = isNeedLinkage; if (!isNeedLinkage) { Log4Helper.Info(this.GetType(), "不需要联动"); continue; } MonitoringEventBus.LogHandler(linkageStrategy.Name, "联动信息"); Log4Helper.Info(this.GetType(), "执行联动"); LinkageActionHandle(linkageStrategy, lcValues); } } } catch (Exception ex) { MonitoringEventBus.LogHandler(ex.ToString(), "异常信息"); } }; LinkAnanysisActionBlock = new ActionBlock(action, new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1, BoundedCapacity = 10 }); Task.Factory.StartNew(async () => { while (true) { if (runningDataCache.IsRunning) { CheckTimeLinkage(); } await Task.Delay(TimeSpan.FromSeconds(30)); } }); } private void LinkageActionHandle(LinkageStrategyModel linkageStrategy, List lcValues) { try { linkageStrategy.LastLinkTime = DateTime.Now; var exceptionAuth = linkageStrategy.LinkageConditions.FirstOrDefault(t => t.CameraAuthentication == null ? false : t.CameraAuthentication.Level == 1); var code = exceptionAuth == null ? linkageStrategy.LinkageConditions.FirstOrDefault()?.CameraAuthentication?.Code : exceptionAuth.CameraAuthentication.Code; if (string.IsNullOrEmpty(code)) { code = "PTXH"; } //获取遥信遥测值用于联动数据存储 string reslutStr = lcValues != null ? SpawnLinkResultStr(lcValues) : SpawnLinkResultStr(linkageStrategy); List tasks = new List(); //执行联动 tasks.Add(_impletementLinkActives.ExecuteLinkageDataAsync(linkageStrategy, reslutStr, code)); //向客户端发送联动数据 tasks.Add(SendLinkageDataAsync(linkageStrategy, lcValues)); } catch (Exception ex) { MonitoringEventBus.LogHandler(ex.ToString(), "异常信息"); } } private void CheckTimeLinkage() { try { var datas = _runningDataCache.LinkageStrategys .Where(t => t.LinkageConditions.Count() == 1 && t.LinkageConditions.First().DeterminationTime.HasValue); foreach (var item in datas) { if (DateTime.Now - item.LastLinkTime > TimeSpan.FromSeconds(item.RepeatLinkageInterval)) { var resStr = TimeConditionComparison(item.LinkageConditions.First()); if (bool.Parse(resStr)) { LinkageActionHandle(item, null); item.LastLinkTime = DateTime.Now; } } } } catch (Exception ex) { MonitoringEventBus.LogHandler(ex.ToString(), "异常信息"); } } private string TimeConditionComparison(LinkageConditionProperty condition) { var conditionStr = "false"; try { if (condition != null) { DateTime dateTime = new DateTime( DateTime.Now.Year, DateTime.Now.Month, DateTime.Now.Day, condition.DeterminationTime.Value.Hour, condition.DeterminationTime.Value.Minute, condition.DeterminationTime.Value.Second ); string logicalOperator = condition.LogicalOperator; if (logicalOperator == "==") { conditionStr = (DateTime.Now - dateTime < TimeSpan.FromSeconds(10) || dateTime - DateTime.Now < TimeSpan.FromSeconds(10)).ToString().ToLower(); } else if (logicalOperator == ">=") { conditionStr = (dateTime >= DateTime.Now).ToString().ToLower(); } else if (logicalOperator == "<=") { conditionStr = (dateTime <= DateTime.Now).ToString().ToLower(); } else if (logicalOperator == ">") { conditionStr = (dateTime > DateTime.Now).ToString().ToLower(); } else if (logicalOperator == "<") { conditionStr = (dateTime < DateTime.Now).ToString().ToLower(); } } } catch (Exception ex) { MonitoringEventBus.LogHandler(ex.ToString(), "异常信息"); } return conditionStr; } /// /// 生成联动判断表达式 /// /// /// private string SpawnLinkResultStr(List lcValues) { string reslutStr = ""; try { lcValues = lcValues.OrderBy(lcv => lcv.ConditionType).ToList(); foreach (LinkageConditionValueModel lcValue in lcValues) { switch (lcValue.ConditionType) { case ConditionTypeEnum.Telemetering: reslutStr += $"{lcValue.ConfigurationName}:{lcValue.ResultValueStr}\r\n"; break; case ConditionTypeEnum.Telesignalisation: reslutStr += $"{lcValue.ConfigurationName}:{lcValue.ResultValueStr}({lcValue.ResultValue})\r\n"; break; case ConditionTypeEnum.Time: reslutStr += $"{lcValue.ConfigurationName}:时间触发联动)\r\n"; break; } } } catch (Exception ex) { MonitoringEventBus.LogHandler(ex.ToString(), "异常信息"); } return reslutStr; } private string SpawnLinkResultStr(LinkageStrategyModel model) { string reslutStr = ""; reslutStr += $"{model.Name}:时间触发联动)\r\n"; return reslutStr; } /// /// 遥测条件比较是否符合 /// /// /// private string TelemeteringConditionComparison(EquipmentDataModel equipment, LinkageConditionProperty condition, LinkageStrategyModel linkageStrategy, ref LinkageConditionValueModel lcValue) { string conditionStr = "false"; try { if (!condition.TelemeteringConfigurationId.HasValue || string.IsNullOrWhiteSpace(condition.LogicalOperator)) return conditionStr; TelemeteringModel telemeter = equipment.Telemeterings .Where(t => t.Id == condition.TelemeteringConfigurationId).FirstOrDefault(); if (telemeter == null) return conditionStr; Log4Helper.Info(this.GetType(), $"比较类型:{condition.CompareType},结果值:{telemeter.ResultValue},比较符号:{condition.LogicalOperator}"); switch (condition.CompareType) { case CompareTypeEnum.AlarmLevel: //conditionStr = telemeter.AlarmLevel + condition.LogicalOperator + condition.ComparisonValue; conditionStr = GetconditionStr(condition.LogicalOperator, telemeter.AlarmLevel, condition.ComparisonValue.Value); break; case CompareTypeEnum.ResultValue: //conditionStr = telemeter.ResultValue.ToString("0.00") + condition.LogicalOperator + condition.ComparisonValue; conditionStr = GetconditionStr(condition.LogicalOperator, telemeter.ResultValue, condition.ComparisonValue.Value); break; } lcValue = new LinkageConditionValueModel { ConditionType = ConditionTypeEnum.Telemetering, ConfigurationId = telemeter.Id, ConfigurationName = telemeter.Name, ResultTime = telemeter.ResultTime, ResultValue = telemeter.ResultValue, Unit = telemeter.Unit, ResultValueStr = telemeter.ResultValue + telemeter.Unit, DecimalDigits = telemeter.DecimalDigits }; } catch (Exception ex) { MonitoringEventBus.LogHandler(ex.ToString(), "异常信息"); } return conditionStr; } private string GetconditionStr(string logicalOperator,float sourceValue,float comparisonValue) { string conditionStr = string.Empty; try { float baseComparaValue = 0.01f; if (logicalOperator == "==") { conditionStr = (Math.Abs(sourceValue - comparisonValue) < baseComparaValue).ToString().ToLower(); } else if (logicalOperator == ">=") { conditionStr = (sourceValue >= comparisonValue).ToString().ToLower(); } else if (logicalOperator == "<=") { conditionStr = (sourceValue <= comparisonValue).ToString().ToLower(); } else if (logicalOperator == ">") { conditionStr = (sourceValue > comparisonValue).ToString().ToLower(); } else if (logicalOperator == "<") { conditionStr = (sourceValue < comparisonValue).ToString().ToLower(); } else { conditionStr = "false"; } } catch (Exception ex) { MonitoringEventBus.LogHandler(ex.ToString(), "异常信息"); } return conditionStr; } /// /// 遥信条件比较是否符合 /// /// /// private string TelesignalisationConditionComparison(EquipmentDataModel equipment, LinkageConditionProperty condition, LinkageStrategyModel linkageStrategy, ref LinkageConditionValueModel lcValue) { string conditionStr = "false"; try { if (!condition.TelesignalisationConfigurationId.HasValue || string.IsNullOrWhiteSpace(condition.LogicalOperator)) return conditionStr; TelesignalisationModel telesignal = equipment.Telesignalisations .Where(t => t.Id == condition.TelesignalisationConfigurationId).FirstOrDefault(); Log4Helper.Info(this.GetType(), $"设备名称:{equipment.EquipmentInfoName}"); Log4Helper.Info(this.GetType(), $"遥信数组:{JsonConvert.SerializeObject(equipment.Telesignalisations)}"); Log4Helper.Info(this.GetType(), $"查找id:{condition.TelesignalisationConfigurationId}"); if (telesignal == null) { Log4Helper.Info(this.GetType(), "没有找到遥信"); return conditionStr; } Log4Helper.Info(this.GetType(), $"比较类型:{condition.CompareType},结果值:{telesignal.ResultValue},比较符号:{condition.LogicalOperator}"); switch (condition.CompareType) { case CompareTypeEnum.AlarmLevel: //conditionStr = telesignal.DMAlarmCategory.Level + condition.LogicalOperator + condition.ComparisonValue; conditionStr = GetTelesignalconditionStr(condition.LogicalOperator, telesignal.DMAlarmCategory.Level, (int)condition.ComparisonValue); break; case CompareTypeEnum.ResultValue: //conditionStr = telesignal.ResultValue + condition.LogicalOperator + condition.ComparisonValue; conditionStr = GetTelesignalconditionStr(condition.LogicalOperator, telesignal.ResultValue, (int)condition.ComparisonValue); break; } lcValue = new LinkageConditionValueModel { ConditionType = ConditionTypeEnum.Telesignalisation, ConfigurationId = telesignal.Id, ConfigurationName = telesignal.Name, ResultTime = telesignal.ResultTime, ResultValue = telesignal.ResultValue, ResultValueStr = telesignal.ResultValueStr }; } catch (Exception ex) { MonitoringEventBus.LogHandler(ex.ToString(), "异常信息"); } return conditionStr; } private string GetTelesignalconditionStr(string logicalOperator, int sourceValue, int comparisonValue) { string conditionStr = string.Empty; try { //int baseComparaValue = 0; if (logicalOperator == "==") { conditionStr = (sourceValue == comparisonValue ).ToString().ToLower(); } else if (logicalOperator == ">=") { conditionStr = (sourceValue >= comparisonValue ).ToString().ToLower(); } else if (logicalOperator == "<=") { conditionStr = (sourceValue <= comparisonValue ).ToString().ToLower(); } else if (logicalOperator == ">") { conditionStr = (sourceValue > comparisonValue).ToString().ToLower(); } else if (logicalOperator == "<") { conditionStr = (sourceValue < comparisonValue ).ToString().ToLower(); } else { conditionStr = "false"; } } catch (Exception ex) { MonitoringEventBus.LogHandler(ex.ToString(), "异常信息"); } return conditionStr; } /// /// 根据联动策略按顺序获取条件ID列表 /// /// /// private List GetConditionIds(LinkageStrategyModel linkageStrategy) { List ids = new List(); try { if (!string.IsNullOrEmpty(linkageStrategy.ConditionIds)) { string[] idStrArray = linkageStrategy.ConditionIds.Split(','); foreach (string idStr in idStrArray) { Guid guid; Guid.TryParse(idStr, out guid); if (guid != Guid.Empty) ids.Add(guid); } if (ids.Count != idStrArray.Length) ids = null; } } catch (Exception ex) { MonitoringEventBus.LogHandler(ex.ToString(), "异常信息"); } return ids; } /// /// 向客户端发送数据 /// /// private Task SendLinkageDataAsync(LinkageStrategyModel linkageStrategy, List lcValues) { return Task.Run(() => { try { DataMonitorMessageModel messageModel = new DataMonitorMessageModel(); messageModel.Content = new { LinkageStrategyId = linkageStrategy.Id, LinkageStrategyName = linkageStrategy.Name, LinkageConditionValue = lcValues }; messageModel.GroupType = GroupTypeEnum.All; messageModel.MessageType = MessgeTypeEnum.Linkage; _webSocketServer.SendMsg(messageModel); } catch (Exception ex) { MonitoringEventBus.LogHandler(ex.ToString(), "异常信息"); } }); } } }