2024-08-21 16:50:14 +08:00

386 lines
16 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using Abp.Dependency;
using Newtonsoft.Json;
using System.Threading.Tasks.Dataflow;
using Yunda.ISAS.DataMonitoringServer.DataAnalysis.Helper;
using Yunda.ISAS.DataMonitoringServer.DataAnalysis.LinkageAnalysis;
using Yunda.ISAS.DataMonitoringServer.DataAnalysis.TeleInfoSave;
using Yunda.ISAS.DataMonitoringServer.DataCenter;
using Yunda.ISAS.DataMonitoringServer.WebSocket;
using Yunda.ISAS.DataMonitoringServer.WebSocket.Model;
using Yunda.ISAS.MongoDB.Entities.DataMonitoring;
using Yunda.SOMS.DataMonitoringServer.Service.DataAnalysis.Model;
using YunDa.ISAS.DataTransferObject.CommonDto;
using YunDa.ISAS.DataTransferObject.EquipmentLiveData;
using YunDa.ISAS.Entities.DataMonitoring;
using YunDa.ISAS.Redis.Entities.DataMonitorCategory;
namespace Yunda.ISAS.DataMonitoringServer.DataAnalysis.DataCollection
{
public class DataSendTask : ISingletonDependency
{
private readonly TelemeteringResultSaveTask _telemeteringResultSaveTask;
private readonly DataRepository _dataRepository;
private readonly WebApiRequest _webApiRequest;
private readonly RunningDataCache _runningDataCache;
private readonly LinkageAnalysisTask _linkageAnalysisTask;
private readonly WebSocketServer _webSocketServer;
private readonly ConfigurationHepler _configurationHepler;
private readonly AlarmAnalysis _alarmAnalysis;
private readonly RedisDataRepository _redisDataRepository;
public DataSendTask(TelemeteringResultSaveTask telemeteringResultSaveTask,
DataRepository dataRepository,
WebApiRequest webApiRequest,
RunningDataCache runningDataCache,
LinkageAnalysisTask linkageAnalysisTask,
WebSocketServer webSocketController,
ConfigurationHepler configurationHepler,
AlarmAnalysis alarmAnalysis,
RedisDataRepository redisDataRepository
)
{
_configurationHepler = configurationHepler;
_webSocketServer = webSocketController;
_runningDataCache = runningDataCache;
_linkageAnalysisTask = linkageAnalysisTask;
_telemeteringResultSaveTask = telemeteringResultSaveTask;
_dataRepository = dataRepository;
_webApiRequest = webApiRequest;
_alarmAnalysis = alarmAnalysis;
_redisDataRepository = redisDataRepository;
RECORDYXBURSTActionBlock = new ActionBlock<RECORDYXBURST_New>(yx =>
{
try
{
SendYXInfoToClient(yx);
}
catch (Exception ex)
{
MonitoringEventBus.LogHandler(ex.ToString(), "异常信息");
}
}, new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1 });
YCTActionBlock = new ActionBlock<YC_TYPE_New>(yc =>
{
try
{
ClassifyAndSendYCInfo(yc);
}
catch (Exception ex)
{
MonitoringEventBus.LogHandler(ex.ToString(), "异常信息");
}
}, new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1 });
//#if DEBUG
// Task.Run(() => {
// while (true)
// {
// YCTActionBlock.Post(new YC_TYPE()
// {
// addr = 1,
// inf = 20012,
// val = 20,
// sector = 0
// });
// Task.Delay(3000).Wait();
// }
// });
//#endif
}
public ActionBlock<RECORDYXBURST_New> RECORDYXBURSTActionBlock = default;
public ActionBlock<YC_TYPE_New> YCTActionBlock = default;
public int startInfoAddr = 0;
public int endInfoAddr = 100000;
/// <summary>
/// 时间转换算法
/// </summary>
private DateTime FormatDateTimeFunc(byte[] tm)
{
if (tm[5] <= 12 && tm[5] >= 1
&& tm[4] <= 31 && tm[4] >= 1
&& tm[3] <= 24 && tm[3] >= 0
&& tm[2] <= 60 && tm[2] >= 0
&& (((tm[1] << 8) + tm[0]) / 1000) <= 60 && (((tm[1] << 8) + tm[0]) / 1000) >= 0
&& (((tm[1] << 8) + tm[0]) % 1000) <= 999 && (((tm[1] << 8) + tm[0]) % 1000) >= 0
)
{
return new DateTime(2000 + tm[6], tm[5], tm[4], tm[3], tm[2],
((tm[1] << 8) + tm[0]) / 1000, ((tm[1] << 8) + tm[0]) % 1000);
}
else
{
var tmstr = JsonConvert.SerializeObject(tm);
throw new Exception($"时间数值错误 年:{2000 + tm[6]}月:{tm[5]}日:{tm[4]}时:{tm[3]}分:{tm[2]}秒:{((tm[1] << 8) + tm[0]) / 1000}毫秒:{((tm[1] << 8) + tm[0]) % 1000} 原数据为:{tmstr}");
}
}
/// <summary>
/// 处理数据并发送遥信数据到客户端
/// </summary>
/// <param name="yX_TYPEs"></param>
public async void SendYXInfoToClient(RECORDYXBURST_New yx)
{
//Stopwatch stopwatch = new Stopwatch();
//stopwatch.Start();
//动力环境
Dictionary<Guid, EquipmentDataModel> eSendDataDic_0 = new Dictionary<Guid, EquipmentDataModel>();
//在线监测
Dictionary<Guid, EquipmentDataModel> eSendDataDic_1 = new Dictionary<Guid, EquipmentDataModel>();
List<Task> tasks = new List<Task>();
foreach (EquipmentDataModel e in _runningDataCache.EquipmentDataDic.Values)
{
TelesignalisationModel telesignal = e.Telesignalisations.FirstOrDefault(m => m.DispatcherAddress == yx.dev_inf);
if (telesignal == null) continue;
int rstValue = telesignal.RemoteType == RemoteTypeEnum.SinglePoint ? (yx.yx_val - 1) : yx.yx_val;
//如果值不变则不进行后续处理
//if (telesignal.ResultValue == rstValue)
//{
// break;
//}
telesignal.ResultValue = rstValue;
telesignal.ResultTime = yx.time;
var yxres = new TelesignalisationResult()
{
ResultTime = telesignal.ResultTime,
ResultValue = telesignal.ResultValue,
TelesignalisationConfigurationId = telesignal.Id,
SaveMethod = 2
};
_dataRepository.TelesignalisationResultRepository.InsertOneAsync(yxres);
switch (e.GroupType)
{
case MonitoringGroupTypeEnum.DLHJ:
SetSendTelesignalDataDic(e, telesignal, ref eSendDataDic_0);
break;
case MonitoringGroupTypeEnum.ZXJC:
SetSendTelesignalDataDic(e, telesignal, ref eSendDataDic_1);
break;
}
Task.Run(() => {
var isArming = _webApiRequest.GetEquipmentSafetyState(e.EquipmentInfoId);
if (isArming == 1 || isArming == 0)
{
_alarmAnalysis.HandleTelesignalAlarmAsync(e, telesignal);
//联动检测
_linkageAnalysisTask.LinkAnanysisActionBlock.Post(telesignal.Id);
}
});
//修改对应设备的遥信值后,跳出本次循环,修改下一个遥信
}
//}
bool isChange_0 = eSendDataDic_0.Count > 0;
bool isChange_1 = eSendDataDic_1.Count > 0;
if (isChange_0)
{
SendMonitoringChangedDataAsync(eSendDataDic_0.Values.ToList(), GroupTypeEnum.DLHJ);
}
if (isChange_1)
{
SendMonitoringChangedDataAsync(eSendDataDic_1.Values.ToList(), GroupTypeEnum.ZXJC);
}
//stopwatch.Stop();
//Debug.WriteLine("发送时间:"+ stopwatch.ElapsedMilliseconds);
}
/// <summary>
/// 处理数据发送遥测数据到客户端
/// </summary>
/// <param name="yC_TYPEs"></param>
public void ClassifyAndSendYCInfo(YC_TYPE_New yc)
{
//动力环境
Dictionary<Guid, EquipmentDataModel> eSendDataDic_0 = new Dictionary<Guid, EquipmentDataModel>();
//在线监测
Dictionary<Guid, EquipmentDataModel> eSendDataDic_1 = new Dictionary<Guid, EquipmentDataModel>();
List<Task> tasks = new List<Task>();
foreach (EquipmentDataModel e in _runningDataCache.EquipmentDataDic.Values)
{
TelemeteringModel telemetering =
e.Telemeterings
.FirstOrDefault(m=> m.DispatcherAddress == yc.inf);
if (telemetering == null) continue;
float value = yc.val * telemetering.Coefficient;
telemetering.ResultValue = value;
telemetering.ResultTime = yc.time;
Task.Run(() => {
var rst = new TelemeteringResult()
{
ResultTime = telemetering.ResultTime,
ResultValue = telemetering.ResultValue,
TelemeteringConfigurationId = telemetering.Id,
SaveMethod = 2
};
//保存变化的数据
_telemeteringResultSaveTask.SaveTelemeteringResultActionBlock?.Post(rst);
//在Redis中缓存环境温度
if (telemetering.IsEnvironmentTemp)
{
EnvironmentTempValue environmentTempValue = new EnvironmentTempValue(value);
_redisDataRepository.EnvironmentTempValueRedis.HashSetUpdateOneAsync(nameof(EnvironmentTempValue), telemetering.Id.ToString(), environmentTempValue);
}
//如果值不变则不进行后续处理
var disArming = _webApiRequest.GetEquipmentSafetyState(e.EquipmentInfoId);
if (disArming == 1 || disArming == 0)
{
_alarmAnalysis.HandleTelemeteringAlarmAsync(e, telemetering);
//联动检测
_linkageAnalysisTask.LinkAnanysisActionBlock.Post(telemetering.Id);
//修改对应设备的遥测值后,跳出本次循环,修改下一个遥信
}
});
switch (e.GroupType)
{
case MonitoringGroupTypeEnum.DLHJ:
SetSendTelemeteringDataDic(e, telemetering, ref eSendDataDic_0);
break;
case MonitoringGroupTypeEnum.ZXJC:
SetSendTelemeteringDataDic(e, telemetering, ref eSendDataDic_1);
break;
default:
break;
}
}
bool isChange_0 = eSendDataDic_0.Count > 0;
bool isChange_1 = eSendDataDic_1.Count > 0;
if (isChange_0)
{
//发送动力环境遥测数据到客户端
SendMonitoringChangedDataAsync(eSendDataDic_0.Values.ToList(), GroupTypeEnum.DLHJ);
}
if (isChange_1)
{
//发送在线检测遥测数据到客户端
SendMonitoringChangedDataAsync(eSendDataDic_1.Values.ToList(), GroupTypeEnum.ZXJC);
}
}
private Dictionary<TelesignalisationModel, DateTime> TelesignaleAlarmTempBuffDic = new Dictionary<TelesignalisationModel, DateTime>();
/// <summary>
/// 初始化遥信设备信息
/// </summary>
/// <param name="e"></param>
/// <param name="telesignal"></param>
/// <param name="dic"></param>
private void SetSendTelesignalDataDic(EquipmentDataModel e, TelesignalisationModel telesignal, ref Dictionary<Guid, EquipmentDataModel> dic)
{
if (dic != null && dic.ContainsKey(e.EquipmentInfoId))
{
dic[e.EquipmentInfoId].Telesignalisations.Add(telesignal);
}
else
{
EquipmentDataModel sendE = new EquipmentDataModel
{
EquipmentInfoId = e.EquipmentInfoId,
EquipmentTypeId = e.EquipmentTypeId,
SafetyStateType = e.SafetyStateType,
GroupType = e.GroupType
};
sendE.Telesignalisations.Add(telesignal);
dic.Add(sendE.EquipmentInfoId, sendE);
}
}
/// <summary>
/// 初始化遥信设备信息
/// </summary>
/// <param name="e"></param>
/// <param name="telemetering"></param>
/// <param name="dic"></param>
private void SetSendTelemeteringDataDic(EquipmentDataModel e, TelemeteringModel telemetering, ref Dictionary<Guid, EquipmentDataModel> dic)
{
if (dic != null && dic.ContainsKey(e.EquipmentInfoId))
{
dic[e.EquipmentInfoId].Telemeterings.Add(telemetering);
}
else
{
EquipmentDataModel sendE = new EquipmentDataModel
{
EquipmentInfoId = e.EquipmentInfoId,
EquipmentTypeId = e.EquipmentTypeId,
SafetyStateType = e.SafetyStateType,
GroupType = e.GroupType
};
sendE.Telemeterings.Add(telemetering);
dic.Add(sendE.EquipmentInfoId, sendE);
}
}
/// <summary>
/// 发送变化的数据
/// </summary>
/// <param name="eList"></param>
/// <param name="groupType"></param>
private async void SendMonitoringChangedDataAsync(List<EquipmentDataModel> eList, GroupTypeEnum groupType)
{
await Task.Run(() =>
{
DataMonitorMessageModel messageModel = new DataMonitorMessageModel
{
Content = eList,
GroupType = groupType,
MessageType = MessgeTypeEnum.Changed
};
//#warning Send
_webSocketServer.SendMsg(messageModel);
}).ConfigureAwait(false);
}
/// <summary>
/// 发送遥测数据到主界面日志显示
/// </summary>
/// <param name="info"></param>
public void RecordYCLogInfo(YC_TYPE_New info)
{
string infoStr = $"装置地址:{info.addr}CPU扇区号{info.sector},编码地址:{info.inf},遥测值:{info.val}";
MonitoringEventBus.LogHandler(infoStr, "遥测数据");
//if (info.inf >= startInfoAddr && info.inf <= endInfoAddr)
//{
// MonitoringEventBus.LogHandler(infoStr, "遥测数据");
//}
}
/// <summary>
/// 发送遥信数据到主界面日志显示
/// </summary>
/// <param name="info"></param>
public void RecordYXLogInfo(RECORDYXBURST_New info)
{
string infoStr = $"装置地址:{info.dev_addr}CPU扇区号{info.dev_sector},编码地址:{info.dev_inf},遥信值:{info.yx_val}";
MonitoringEventBus.LogHandler(infoStr, "双点信息");
//if (info.dev_inf >= startInfoAddr && info.dev_inf <= endInfoAddr)
//{
// MonitoringEventBus.LogHandler(infoStr, "遥信数据");
//}
}
}
}