679 lines
33 KiB
C#
Raw Normal View History

2024-08-21 16:50:14 +08:00
using Abp.Dependency;
using Castle.MicroKernel.Util;
using Jint;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using ToolLibrary.LogHelper;
using Yunda.ISAS.DataMonitoringServer.DataAnalysis.AlarmQueue;
using Yunda.ISAS.DataMonitoringServer.DataAnalysis.Helper;
using Yunda.ISAS.DataMonitoringServer.DataAnalysis.Model;
using Yunda.ISAS.DataMonitoringServer.DataCenter;
using Yunda.ISAS.DataMonitoringServer.WebSocket;
using Yunda.ISAS.DataMonitoringServer.WebSocket.Model;
using YunDa.ISAS.DataTransferObject.DataMonitoring.LinkageConditionDto;
using YunDa.ISAS.DataTransferObject.EquipmentLiveData;
using YunDa.ISAS.Entities.MySQL.DataMonitoring;
using ConstantModel = Yunda.ISAS.DataMonitoringServer.DataAnalysis.Model.ConstantModel;
namespace Yunda.ISAS.DataMonitoringServer.DataAnalysis.LinkageAnalysis
{
public class LinkageAnalysisTask : ISingletonDependency
{
/// <summary>
/// 联动策略及其条件列表
/// </summary>
private double _analysisInterval = ConstantModel.DefaultInterval;
private readonly RunningDataCache _runningDataCache;
private readonly int ConditionListMaxCount = 1000;
public ActionBlock<Guid> LinkAnanysisActionBlock { get; } = default;
private readonly ImpletementLinkActives _impletementLinkActives;
private readonly WebSocketServer _webSocketServer;
private readonly AlarmQueueDataPublish _alarmQueueDataPublish;
private readonly RedisDataRepository _redisDataRepository;
private readonly ConfigurationHepler _configurationHepler;
public LinkageAnalysisTask(ImpletementLinkActives impletementLinkActives,
RunningDataCache runningDataCache,
WebSocketServer webSocketController,
AlarmQueueDataPublish alarmQueueDataPublish,
ConfigurationHepler configurationHepler,
//MonitoringDataService monitoringDataService,
RedisDataRepository redisDataRepository
)
{
_impletementLinkActives = impletementLinkActives;
_runningDataCache = runningDataCache;
_webSocketServer = webSocketController;
_alarmQueueDataPublish = alarmQueueDataPublish;
_configurationHepler = configurationHepler;
_redisDataRepository = redisDataRepository;
Action<Guid> action = async (configurationId) =>
{
try
{
List<LinkageStrategyModel> linkageStrategys =
_runningDataCache.LinkageStrategys
.Where(ls => ls.LinkageConditions
.Any(lc => lc.TelemeteringConfigurationId == configurationId
|| lc.TelesignalisationConfigurationId == configurationId))
.ToList();
if (linkageStrategys != null&& linkageStrategys.Count>0)
{
//联动策略分析
foreach (LinkageStrategyModel linkageStrategy in linkageStrategys)
{
Log4Helper.Info(this.GetType(), "联动分析");
Log4Helper.Info(this.GetType(),JsonConvert.SerializeObject( linkageStrategy));
if (_alarmQueueDataPublish.IsClearAlarm((Guid)linkageStrategy.Id))
{
Log4Helper.Info(this.GetType(), "联动报警解除,无法联动");
continue;
}
//已经发出联动命令,无需重复判断
//没有设置条件的联动 不执行
if (linkageStrategy.LinkageConditions == null && linkageStrategy.LinkageConditions.Count() == 0) continue;
List<Guid> guids = GetConditionIds(linkageStrategy);
if (guids == null || guids.Count == 0) continue;
//联动条件判断字符串容器
List<string> conditionStrs = new List<string>();
List<LinkageConditionValueModel> lcValues = new List<LinkageConditionValueModel>();
LinkageConditionValueModel lcValue = null;
Dictionary<Guid,int> telemterings = new Dictionary<Guid, int>();
Log4Helper.Info(this.GetType(), "联动条件判定");
//按顺序获取联动条件判断字符串
foreach (Guid conditionId in guids)
{
string conditionStr = "";
try
{
if (!linkageStrategy.LinkageConditionDic.ContainsKey(conditionId))
continue;
LinkageConditionProperty condition = linkageStrategy.LinkageConditionDic[conditionId];
EquipmentDataModel equipment = null;
if (condition.EquipmentInfoId.HasValue && _runningDataCache.EquipmentDataDic.ContainsKey((Guid)condition.EquipmentInfoId))
equipment = _runningDataCache.EquipmentDataDic[(Guid)condition.EquipmentInfoId];
//_runningDataCache.EquipmentDataDic.Values.FirstOrDefault(t=>t.Telesignalisations.FirstOrDefault(x=>x.Id == ));
if (equipment == null)
{
Log4Helper.Info(this.GetType(), "没有找到设备");
continue;
}
lcValue = new LinkageConditionValueModel();
Log4Helper.Info(this.GetType(), $"条件:{JsonConvert.SerializeObject(condition)},规则:{JsonConvert.SerializeObject(linkageStrategy)}");
Log4Helper.Info(this.GetType(), "表达式运算");
switch (condition.ConditionType)
{
case ConditionTypeEnum.Telemetering:
if (condition.TelemeteringConfigurationId.HasValue)
{
telemterings.Add(condition.TelemeteringConfigurationId.Value, 1);
}
conditionStr = TelemeteringConditionComparison(equipment, condition, linkageStrategy, ref lcValue);
break;
case ConditionTypeEnum.Telesignalisation:
if (condition.TelesignalisationConfigurationId.HasValue)
{
telemterings.Add(condition.TelesignalisationConfigurationId.Value, 2);
}
conditionStr = TelesignalisationConditionComparison(equipment, condition, linkageStrategy, ref lcValue);
break;
case ConditionTypeEnum.Time:
if (condition.DeterminationTime.HasValue)
{
conditionStr = TimeConditionComparison(condition);
//telemterings.Add(condition.TelesignalisationConfigurationId.Value, 2);
}
else
{
conditionStr = "true";
}
break;
}
}
catch (Exception ex)
{
Log4Helper.Error(this.GetType(),"联动条件判定",ex);
conditionStr = "true";
}
if (string.IsNullOrWhiteSpace(conditionStr)) continue;
if (bool.Parse(conditionStr))
{
Log4Helper.Info(this.GetType(), "联动条件成立");
long listConut = await _redisDataRepository.LastTrueResultTimeList.ListLeftPushAsync(conditionId.ToString(), DateTime.Now);
if (listConut > ConditionListMaxCount)
{
await _redisDataRepository.LastTrueResultTimeList.ListRightPopAsync(conditionId.ToString());
}
}
else
{
Log4Helper.Info(this.GetType(), "联动条件失败");
}
conditionStrs.Add(conditionStr);
lcValues.Add(lcValue);
}
//联动字符串数组长度与联动策略规则需要的长度不一致,则出现数据问题,忽略此联动
if (conditionStrs.Count != guids.Count)
continue;
Log4Helper.Info(this.GetType(), "联动字符串数组长度与联动策略规则需要的长度一致");
//拼装联动策略规则字符串,加入条件判断
bool isNeedLinkage = false; //是否需要联动
//执行联动策略规则字符串
if (conditionStrs.Count ==1)
{
bool conditionparse = bool.Parse(conditionStrs[0]);
if (conditionparse)
{
if (linkageStrategy.ForceLinkageSeconds == 0 || linkageStrategy.ForceLinkageTimes == 0)
{
isNeedLinkage = conditionparse;
}
else if (linkageStrategy.ForceLinkageSeconds > 0 && linkageStrategy.ForceLinkageTimes > 0)
{
var lastTrueList = await _redisDataRepository.LastTrueResultTimeList.ListRangeAsync(linkageStrategy.ConditionIds.ToString(), 0, linkageStrategy.ForceLinkageTimes);
if (lastTrueList.Count > 1)
{
var isForceLink = lastTrueList[lastTrueList.Count - 1] + TimeSpan.FromSeconds(linkageStrategy.ForceLinkageSeconds) <= DateTime.Now;
isForceLink = isForceLink && lastTrueList.Count>= linkageStrategy.ForceLinkageTimes;
if (isForceLink)
{
isNeedLinkage = conditionparse;
}
}
else
{
isNeedLinkage = conditionparse;
}
}
if (linkageStrategy.LastLinkTime + TimeSpan.FromSeconds(linkageStrategy.RepeatLinkageInterval) > DateTime.Now)
{
isNeedLinkage = false;
}
}
}
//多个联动条件则判断是否超过时间间隔
else if (conditionStrs.Count > 1)
{
if (linkageStrategy.LastLinkTime + TimeSpan.FromSeconds(linkageStrategy.RepeatLinkageInterval) > DateTime.Now)
{
isNeedLinkage = false;
}
else
{
var conditionids = linkageStrategy.ConditionIds.Split(',');
if (conditionStrs.Count == conditionids.Count())
{
if (linkageStrategy.TimeOfWithRelationship > 0)
{
for (int i = 0; i < conditionStrs.Count; i++)
{
var dataCfgId = conditionids[i].ToString();
var datetimes = await _redisDataRepository.LastTrueResultTimeList.ListRangeAsync(dataCfgId, 0, 0);
bool conditionparse = bool.Parse(conditionStrs[i]);
if (!conditionparse)
{
if (datetimes != null && datetimes.Count() > 0)
{
if (DateTime.Now - datetimes[0] < TimeSpan.FromSeconds(linkageStrategy.TimeOfWithRelationship))
{
conditionStrs[i] = "true";
}
}
}
}
}
}
string ruleStr = string.Format(linkageStrategy.Rule, conditionStrs.ToArray());
using (var engine = new Engine())
{
object isLinkageObj = engine.Evaluate(ruleStr);
bool.TryParse(isLinkageObj.ToString(), out isNeedLinkage);
// 评估表达式
}
}
}
linkageStrategy.LastLinkState = isNeedLinkage;
if (!isNeedLinkage)
{
Log4Helper.Info(this.GetType(), "不需要联动");
continue;
}
MonitoringEventBus.LogHandler(linkageStrategy.Name, "联动信息");
Log4Helper.Info(this.GetType(), "执行联动");
LinkageActionHandle(linkageStrategy, lcValues);
}
}
}
catch (Exception ex)
{
MonitoringEventBus.LogHandler(ex.ToString(), "异常信息");
}
};
LinkAnanysisActionBlock = new ActionBlock<Guid>(action,
new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1, BoundedCapacity = 10 });
Task.Factory.StartNew(async () =>
{
while (true)
{
if (runningDataCache.IsRunning)
{
CheckTimeLinkage();
}
await Task.Delay(TimeSpan.FromSeconds(30));
}
});
}
private void LinkageActionHandle(LinkageStrategyModel linkageStrategy, List<LinkageConditionValueModel> lcValues)
{
try
{
linkageStrategy.LastLinkTime = DateTime.Now;
var exceptionAuth = linkageStrategy.LinkageConditions.FirstOrDefault(t => t.CameraAuthentication == null ? false : t.CameraAuthentication.Level == 1);
var code = exceptionAuth == null ? linkageStrategy.LinkageConditions.FirstOrDefault()?.CameraAuthentication?.Code : exceptionAuth.CameraAuthentication.Code;
if (string.IsNullOrEmpty(code))
{
code = "PTXH";
}
//获取遥信遥测值用于联动数据存储
string reslutStr = lcValues != null ? SpawnLinkResultStr(lcValues) : SpawnLinkResultStr(linkageStrategy);
List<Task> tasks = new List<Task>();
//执行联动
tasks.Add(_impletementLinkActives.ExecuteLinkageDataAsync(linkageStrategy, reslutStr, code));
//向客户端发送联动数据
tasks.Add(SendLinkageDataAsync(linkageStrategy, lcValues));
}
catch (Exception ex)
{
MonitoringEventBus.LogHandler(ex.ToString(), "异常信息");
}
}
private void CheckTimeLinkage()
{
try
{
var datas = _runningDataCache.LinkageStrategys
.Where(t => t.LinkageConditions.Count() == 1 && t.LinkageConditions.First().DeterminationTime.HasValue);
foreach (var item in datas)
{
if (DateTime.Now - item.LastLinkTime > TimeSpan.FromSeconds(item.RepeatLinkageInterval))
{
var resStr = TimeConditionComparison(item.LinkageConditions.First());
if (bool.Parse(resStr))
{
LinkageActionHandle(item, null);
item.LastLinkTime = DateTime.Now;
}
}
}
}
catch (Exception ex)
{
MonitoringEventBus.LogHandler(ex.ToString(), "异常信息");
}
}
private string TimeConditionComparison(LinkageConditionProperty condition)
{
var conditionStr = "false";
try
{
if (condition != null)
{
DateTime dateTime = new DateTime(
DateTime.Now.Year,
DateTime.Now.Month,
DateTime.Now.Day,
condition.DeterminationTime.Value.Hour,
condition.DeterminationTime.Value.Minute,
condition.DeterminationTime.Value.Second
);
string logicalOperator = condition.LogicalOperator;
if (logicalOperator == "==")
{
conditionStr = (DateTime.Now - dateTime < TimeSpan.FromSeconds(10) || dateTime - DateTime.Now < TimeSpan.FromSeconds(10)).ToString().ToLower();
}
else if (logicalOperator == ">=")
{
conditionStr = (dateTime >= DateTime.Now).ToString().ToLower();
}
else if (logicalOperator == "<=")
{
conditionStr = (dateTime <= DateTime.Now).ToString().ToLower();
}
else if (logicalOperator == ">")
{
conditionStr = (dateTime > DateTime.Now).ToString().ToLower();
}
else if (logicalOperator == "<")
{
conditionStr = (dateTime < DateTime.Now).ToString().ToLower();
}
}
}
catch (Exception ex)
{
MonitoringEventBus.LogHandler(ex.ToString(), "异常信息");
}
return conditionStr;
}
/// <summary>
/// 生成联动判断表达式
/// </summary>
/// <param name="lcValues"></param>
/// <returns></returns>
private string SpawnLinkResultStr(List<LinkageConditionValueModel> lcValues)
{
string reslutStr = "";
try
{
lcValues = lcValues.OrderBy(lcv => lcv.ConditionType).ToList();
foreach (LinkageConditionValueModel lcValue in lcValues)
{
switch (lcValue.ConditionType)
{
case ConditionTypeEnum.Telemetering:
reslutStr += $"{lcValue.ConfigurationName}:{lcValue.ResultValueStr}\r\n";
break;
case ConditionTypeEnum.Telesignalisation:
reslutStr += $"{lcValue.ConfigurationName}:{lcValue.ResultValueStr}({lcValue.ResultValue})\r\n";
break;
case ConditionTypeEnum.Time:
reslutStr += $"{lcValue.ConfigurationName}:时间触发联动)\r\n";
break;
}
}
}
catch (Exception ex)
{
MonitoringEventBus.LogHandler(ex.ToString(), "异常信息");
}
return reslutStr;
}
private string SpawnLinkResultStr(LinkageStrategyModel model)
{
string reslutStr = "";
reslutStr += $"{model.Name}:时间触发联动)\r\n";
return reslutStr;
}
/// <summary>
/// 遥测条件比较是否符合
/// </summary>
/// <param name="condition"></param>
/// <returns></returns>
private string TelemeteringConditionComparison(EquipmentDataModel equipment,
LinkageConditionProperty condition,
LinkageStrategyModel linkageStrategy,
ref LinkageConditionValueModel lcValue)
{
string conditionStr = "false";
try
{
if (!condition.TelemeteringConfigurationId.HasValue ||
string.IsNullOrWhiteSpace(condition.LogicalOperator)) return conditionStr;
TelemeteringModel telemeter = equipment.Telemeterings
.Where(t => t.Id == condition.TelemeteringConfigurationId).FirstOrDefault();
if (telemeter == null) return conditionStr;
Log4Helper.Info(this.GetType(), $"比较类型:{condition.CompareType},结果值:{telemeter.ResultValue},比较符号:{condition.LogicalOperator}");
switch (condition.CompareType)
{
case CompareTypeEnum.AlarmLevel:
//conditionStr = telemeter.AlarmLevel + condition.LogicalOperator + condition.ComparisonValue;
conditionStr = GetconditionStr(condition.LogicalOperator, telemeter.AlarmLevel, condition.ComparisonValue.Value);
break;
case CompareTypeEnum.ResultValue:
//conditionStr = telemeter.ResultValue.ToString("0.00") + condition.LogicalOperator + condition.ComparisonValue;
conditionStr = GetconditionStr(condition.LogicalOperator, telemeter.ResultValue, condition.ComparisonValue.Value);
break;
}
lcValue = new LinkageConditionValueModel
{
ConditionType = ConditionTypeEnum.Telemetering,
ConfigurationId = telemeter.Id,
ConfigurationName = telemeter.Name,
ResultTime = telemeter.ResultTime,
ResultValue = telemeter.ResultValue,
Unit = telemeter.Unit,
ResultValueStr = telemeter.ResultValue + telemeter.Unit,
DecimalDigits = telemeter.DecimalDigits
};
}
catch (Exception ex)
{
MonitoringEventBus.LogHandler(ex.ToString(), "异常信息");
}
return conditionStr;
}
private string GetconditionStr(string logicalOperator,float sourceValue,float comparisonValue)
{
string conditionStr = string.Empty;
try
{
float baseComparaValue = 0.01f;
if (logicalOperator == "==")
{
conditionStr = (Math.Abs(sourceValue - comparisonValue) < baseComparaValue).ToString().ToLower();
}
else if (logicalOperator == ">=")
{
conditionStr = (sourceValue >= comparisonValue).ToString().ToLower();
}
else if (logicalOperator == "<=")
{
conditionStr = (sourceValue <= comparisonValue).ToString().ToLower();
}
else if (logicalOperator == ">")
{
conditionStr = (sourceValue > comparisonValue).ToString().ToLower();
}
else if (logicalOperator == "<")
{
conditionStr = (sourceValue < comparisonValue).ToString().ToLower();
}
else
{
conditionStr = "false";
}
}
catch (Exception ex)
{
MonitoringEventBus.LogHandler(ex.ToString(), "异常信息");
}
return conditionStr;
}
/// <summary>
/// 遥信条件比较是否符合
/// </summary>
/// <param name="condition"></param>
/// <returns></returns>
private string TelesignalisationConditionComparison(EquipmentDataModel equipment,
LinkageConditionProperty condition,
LinkageStrategyModel linkageStrategy,
ref LinkageConditionValueModel lcValue)
{
string conditionStr = "false";
try
{
if (!condition.TelesignalisationConfigurationId.HasValue ||
string.IsNullOrWhiteSpace(condition.LogicalOperator)) return conditionStr;
TelesignalisationModel telesignal = equipment.Telesignalisations
.Where(t => t.Id == condition.TelesignalisationConfigurationId).FirstOrDefault();
Log4Helper.Info(this.GetType(), $"设备名称:{equipment.EquipmentInfoName}");
Log4Helper.Info(this.GetType(), $"遥信数组:{JsonConvert.SerializeObject(equipment.Telesignalisations)}");
Log4Helper.Info(this.GetType(), $"查找id{condition.TelesignalisationConfigurationId}");
if (telesignal == null)
{
Log4Helper.Info(this.GetType(), "没有找到遥信");
return conditionStr;
}
Log4Helper.Info(this.GetType(), $"比较类型:{condition.CompareType},结果值:{telesignal.ResultValue},比较符号:{condition.LogicalOperator}");
switch (condition.CompareType)
{
case CompareTypeEnum.AlarmLevel:
//conditionStr = telesignal.DMAlarmCategory.Level + condition.LogicalOperator + condition.ComparisonValue;
conditionStr = GetTelesignalconditionStr(condition.LogicalOperator, telesignal.DMAlarmCategory.Level, (int)condition.ComparisonValue);
break;
case CompareTypeEnum.ResultValue:
//conditionStr = telesignal.ResultValue + condition.LogicalOperator + condition.ComparisonValue;
conditionStr = GetTelesignalconditionStr(condition.LogicalOperator, telesignal.ResultValue, (int)condition.ComparisonValue);
break;
}
lcValue = new LinkageConditionValueModel
{
ConditionType = ConditionTypeEnum.Telesignalisation,
ConfigurationId = telesignal.Id,
ConfigurationName = telesignal.Name,
ResultTime = telesignal.ResultTime,
ResultValue = telesignal.ResultValue,
ResultValueStr = telesignal.ResultValueStr
};
}
catch (Exception ex)
{
MonitoringEventBus.LogHandler(ex.ToString(), "异常信息");
}
return conditionStr;
}
private string GetTelesignalconditionStr(string logicalOperator, int sourceValue, int comparisonValue)
{
string conditionStr = string.Empty;
try
{
//int baseComparaValue = 0;
if (logicalOperator == "==")
{
conditionStr = (sourceValue == comparisonValue ).ToString().ToLower();
}
else if (logicalOperator == ">=")
{
conditionStr = (sourceValue >= comparisonValue ).ToString().ToLower();
}
else if (logicalOperator == "<=")
{
conditionStr = (sourceValue <= comparisonValue ).ToString().ToLower();
}
else if (logicalOperator == ">")
{
conditionStr = (sourceValue > comparisonValue).ToString().ToLower();
}
else if (logicalOperator == "<")
{
conditionStr = (sourceValue < comparisonValue ).ToString().ToLower();
}
else
{
conditionStr = "false";
}
}
catch (Exception ex)
{
MonitoringEventBus.LogHandler(ex.ToString(), "异常信息");
}
return conditionStr;
}
/// <summary>
/// 根据联动策略按顺序获取条件ID列表
/// </summary>
/// <param name="linkageStrategy"></param>
/// <returns></returns>
private List<Guid> GetConditionIds(LinkageStrategyModel linkageStrategy)
{
List<Guid> ids = new List<Guid>();
try
{
if (!string.IsNullOrEmpty(linkageStrategy.ConditionIds))
{
string[] idStrArray = linkageStrategy.ConditionIds.Split(',');
foreach (string idStr in idStrArray)
{
Guid guid;
Guid.TryParse(idStr, out guid);
if (guid != Guid.Empty)
ids.Add(guid);
}
if (ids.Count != idStrArray.Length)
ids = null;
}
}
catch (Exception ex)
{
MonitoringEventBus.LogHandler(ex.ToString(), "异常信息");
}
return ids;
}
/// <summary>
/// 向客户端发送数据
/// </summary>
/// <param name="linkageStrategy"></param>
private Task SendLinkageDataAsync(LinkageStrategyModel linkageStrategy, List<LinkageConditionValueModel> lcValues)
{
return Task.Run(() =>
{
try
{
DataMonitorMessageModel messageModel = new DataMonitorMessageModel();
messageModel.Content = new
{
LinkageStrategyId = linkageStrategy.Id,
LinkageStrategyName = linkageStrategy.Name,
LinkageConditionValue = lcValues
};
messageModel.GroupType = GroupTypeEnum.All;
messageModel.MessageType = MessgeTypeEnum.Linkage;
_webSocketServer.SendMsg(messageModel);
}
catch (Exception ex)
{
MonitoringEventBus.LogHandler(ex.ToString(), "异常信息");
}
});
}
}
}