386 lines
16 KiB
C#
Raw Normal View History

2024-08-21 16:50:14 +08:00
using Abp.Dependency;
using Newtonsoft.Json;
using System.Threading.Tasks.Dataflow;
using Yunda.ISAS.DataMonitoringServer.DataAnalysis.Helper;
using Yunda.ISAS.DataMonitoringServer.DataAnalysis.LinkageAnalysis;
using Yunda.ISAS.DataMonitoringServer.DataAnalysis.TeleInfoSave;
using Yunda.ISAS.DataMonitoringServer.DataCenter;
using Yunda.ISAS.DataMonitoringServer.WebSocket;
using Yunda.ISAS.DataMonitoringServer.WebSocket.Model;
using Yunda.ISAS.MongoDB.Entities.DataMonitoring;
using Yunda.SOMS.DataMonitoringServer.Service.DataAnalysis.Model;
using YunDa.ISAS.DataTransferObject.CommonDto;
using YunDa.ISAS.DataTransferObject.EquipmentLiveData;
using YunDa.ISAS.Entities.DataMonitoring;
using YunDa.ISAS.Redis.Entities.DataMonitorCategory;
namespace Yunda.ISAS.DataMonitoringServer.DataAnalysis.DataCollection
{
public class DataSendTask : ISingletonDependency
{
private readonly TelemeteringResultSaveTask _telemeteringResultSaveTask;
private readonly DataRepository _dataRepository;
private readonly WebApiRequest _webApiRequest;
private readonly RunningDataCache _runningDataCache;
private readonly LinkageAnalysisTask _linkageAnalysisTask;
private readonly WebSocketServer _webSocketServer;
private readonly ConfigurationHepler _configurationHepler;
private readonly AlarmAnalysis _alarmAnalysis;
private readonly RedisDataRepository _redisDataRepository;
public DataSendTask(TelemeteringResultSaveTask telemeteringResultSaveTask,
DataRepository dataRepository,
WebApiRequest webApiRequest,
RunningDataCache runningDataCache,
LinkageAnalysisTask linkageAnalysisTask,
WebSocketServer webSocketController,
ConfigurationHepler configurationHepler,
AlarmAnalysis alarmAnalysis,
RedisDataRepository redisDataRepository
)
{
_configurationHepler = configurationHepler;
_webSocketServer = webSocketController;
_runningDataCache = runningDataCache;
_linkageAnalysisTask = linkageAnalysisTask;
_telemeteringResultSaveTask = telemeteringResultSaveTask;
_dataRepository = dataRepository;
_webApiRequest = webApiRequest;
_alarmAnalysis = alarmAnalysis;
_redisDataRepository = redisDataRepository;
RECORDYXBURSTActionBlock = new ActionBlock<RECORDYXBURST_New>(yx =>
{
try
{
SendYXInfoToClient(yx);
}
catch (Exception ex)
{
MonitoringEventBus.LogHandler(ex.ToString(), "异常信息");
}
}, new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1 });
YCTActionBlock = new ActionBlock<YC_TYPE_New>(yc =>
{
try
{
ClassifyAndSendYCInfo(yc);
}
catch (Exception ex)
{
MonitoringEventBus.LogHandler(ex.ToString(), "异常信息");
}
}, new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1 });
//#if DEBUG
// Task.Run(() => {
// while (true)
// {
// YCTActionBlock.Post(new YC_TYPE()
// {
// addr = 1,
// inf = 20012,
// val = 20,
// sector = 0
// });
// Task.Delay(3000).Wait();
// }
// });
//#endif
}
public ActionBlock<RECORDYXBURST_New> RECORDYXBURSTActionBlock = default;
public ActionBlock<YC_TYPE_New> YCTActionBlock = default;
public int startInfoAddr = 0;
public int endInfoAddr = 100000;
/// <summary>
/// 时间转换算法
/// </summary>
private DateTime FormatDateTimeFunc(byte[] tm)
{
if (tm[5] <= 12 && tm[5] >= 1
&& tm[4] <= 31 && tm[4] >= 1
&& tm[3] <= 24 && tm[3] >= 0
&& tm[2] <= 60 && tm[2] >= 0
&& (((tm[1] << 8) + tm[0]) / 1000) <= 60 && (((tm[1] << 8) + tm[0]) / 1000) >= 0
&& (((tm[1] << 8) + tm[0]) % 1000) <= 999 && (((tm[1] << 8) + tm[0]) % 1000) >= 0
)
{
return new DateTime(2000 + tm[6], tm[5], tm[4], tm[3], tm[2],
((tm[1] << 8) + tm[0]) / 1000, ((tm[1] << 8) + tm[0]) % 1000);
}
else
{
var tmstr = JsonConvert.SerializeObject(tm);
throw new Exception($"时间数值错误 年:{2000 + tm[6]}月:{tm[5]}日:{tm[4]}时:{tm[3]}分:{tm[2]}秒:{((tm[1] << 8) + tm[0]) / 1000}毫秒:{((tm[1] << 8) + tm[0]) % 1000} 原数据为:{tmstr}");
}
}
/// <summary>
/// 处理数据并发送遥信数据到客户端
/// </summary>
/// <param name="yX_TYPEs"></param>
public async void SendYXInfoToClient(RECORDYXBURST_New yx)
{
//Stopwatch stopwatch = new Stopwatch();
//stopwatch.Start();
//动力环境
Dictionary<Guid, EquipmentDataModel> eSendDataDic_0 = new Dictionary<Guid, EquipmentDataModel>();
//在线监测
Dictionary<Guid, EquipmentDataModel> eSendDataDic_1 = new Dictionary<Guid, EquipmentDataModel>();
List<Task> tasks = new List<Task>();
foreach (EquipmentDataModel e in _runningDataCache.EquipmentDataDic.Values)
{
TelesignalisationModel telesignal = e.Telesignalisations.FirstOrDefault(m => m.DispatcherAddress == yx.dev_inf);
if (telesignal == null) continue;
int rstValue = telesignal.RemoteType == RemoteTypeEnum.SinglePoint ? (yx.yx_val - 1) : yx.yx_val;
//如果值不变则不进行后续处理
//if (telesignal.ResultValue == rstValue)
//{
// break;
//}
telesignal.ResultValue = rstValue;
telesignal.ResultTime = yx.time;
var yxres = new TelesignalisationResult()
{
ResultTime = telesignal.ResultTime,
ResultValue = telesignal.ResultValue,
TelesignalisationConfigurationId = telesignal.Id,
SaveMethod = 2
};
_dataRepository.TelesignalisationResultRepository.InsertOneAsync(yxres);
switch (e.GroupType)
{
case MonitoringGroupTypeEnum.DLHJ:
SetSendTelesignalDataDic(e, telesignal, ref eSendDataDic_0);
break;
case MonitoringGroupTypeEnum.ZXJC:
SetSendTelesignalDataDic(e, telesignal, ref eSendDataDic_1);
break;
}
Task.Run(() => {
var isArming = _webApiRequest.GetEquipmentSafetyState(e.EquipmentInfoId);
if (isArming == 1 || isArming == 0)
{
_alarmAnalysis.HandleTelesignalAlarmAsync(e, telesignal);
//联动检测
_linkageAnalysisTask.LinkAnanysisActionBlock.Post(telesignal.Id);
}
});
//修改对应设备的遥信值后,跳出本次循环,修改下一个遥信
}
//}
bool isChange_0 = eSendDataDic_0.Count > 0;
bool isChange_1 = eSendDataDic_1.Count > 0;
if (isChange_0)
{
SendMonitoringChangedDataAsync(eSendDataDic_0.Values.ToList(), GroupTypeEnum.DLHJ);
}
if (isChange_1)
{
SendMonitoringChangedDataAsync(eSendDataDic_1.Values.ToList(), GroupTypeEnum.ZXJC);
}
//stopwatch.Stop();
//Debug.WriteLine("发送时间:"+ stopwatch.ElapsedMilliseconds);
}
/// <summary>
/// 处理数据发送遥测数据到客户端
/// </summary>
/// <param name="yC_TYPEs"></param>
public void ClassifyAndSendYCInfo(YC_TYPE_New yc)
{
//动力环境
Dictionary<Guid, EquipmentDataModel> eSendDataDic_0 = new Dictionary<Guid, EquipmentDataModel>();
//在线监测
Dictionary<Guid, EquipmentDataModel> eSendDataDic_1 = new Dictionary<Guid, EquipmentDataModel>();
List<Task> tasks = new List<Task>();
foreach (EquipmentDataModel e in _runningDataCache.EquipmentDataDic.Values)
{
TelemeteringModel telemetering =
e.Telemeterings
.FirstOrDefault(m=> m.DispatcherAddress == yc.inf);
if (telemetering == null) continue;
float value = yc.val * telemetering.Coefficient;
telemetering.ResultValue = value;
telemetering.ResultTime = yc.time;
Task.Run(() => {
var rst = new TelemeteringResult()
{
ResultTime = telemetering.ResultTime,
ResultValue = telemetering.ResultValue,
TelemeteringConfigurationId = telemetering.Id,
SaveMethod = 2
};
//保存变化的数据
_telemeteringResultSaveTask.SaveTelemeteringResultActionBlock?.Post(rst);
//在Redis中缓存环境温度
if (telemetering.IsEnvironmentTemp)
{
EnvironmentTempValue environmentTempValue = new EnvironmentTempValue(value);
_redisDataRepository.EnvironmentTempValueRedis.HashSetUpdateOneAsync(nameof(EnvironmentTempValue), telemetering.Id.ToString(), environmentTempValue);
}
//如果值不变则不进行后续处理
var disArming = _webApiRequest.GetEquipmentSafetyState(e.EquipmentInfoId);
if (disArming == 1 || disArming == 0)
{
_alarmAnalysis.HandleTelemeteringAlarmAsync(e, telemetering);
//联动检测
_linkageAnalysisTask.LinkAnanysisActionBlock.Post(telemetering.Id);
//修改对应设备的遥测值后,跳出本次循环,修改下一个遥信
}
});
switch (e.GroupType)
{
case MonitoringGroupTypeEnum.DLHJ:
SetSendTelemeteringDataDic(e, telemetering, ref eSendDataDic_0);
break;
case MonitoringGroupTypeEnum.ZXJC:
SetSendTelemeteringDataDic(e, telemetering, ref eSendDataDic_1);
break;
default:
break;
}
}
bool isChange_0 = eSendDataDic_0.Count > 0;
bool isChange_1 = eSendDataDic_1.Count > 0;
if (isChange_0)
{
//发送动力环境遥测数据到客户端
SendMonitoringChangedDataAsync(eSendDataDic_0.Values.ToList(), GroupTypeEnum.DLHJ);
}
if (isChange_1)
{
//发送在线检测遥测数据到客户端
SendMonitoringChangedDataAsync(eSendDataDic_1.Values.ToList(), GroupTypeEnum.ZXJC);
}
}
private Dictionary<TelesignalisationModel, DateTime> TelesignaleAlarmTempBuffDic = new Dictionary<TelesignalisationModel, DateTime>();
/// <summary>
/// 初始化遥信设备信息
/// </summary>
/// <param name="e"></param>
/// <param name="telesignal"></param>
/// <param name="dic"></param>
private void SetSendTelesignalDataDic(EquipmentDataModel e, TelesignalisationModel telesignal, ref Dictionary<Guid, EquipmentDataModel> dic)
{
if (dic != null && dic.ContainsKey(e.EquipmentInfoId))
{
dic[e.EquipmentInfoId].Telesignalisations.Add(telesignal);
}
else
{
EquipmentDataModel sendE = new EquipmentDataModel
{
EquipmentInfoId = e.EquipmentInfoId,
EquipmentTypeId = e.EquipmentTypeId,
SafetyStateType = e.SafetyStateType,
GroupType = e.GroupType
};
sendE.Telesignalisations.Add(telesignal);
dic.Add(sendE.EquipmentInfoId, sendE);
}
}
/// <summary>
/// 初始化遥信设备信息
/// </summary>
/// <param name="e"></param>
/// <param name="telemetering"></param>
/// <param name="dic"></param>
private void SetSendTelemeteringDataDic(EquipmentDataModel e, TelemeteringModel telemetering, ref Dictionary<Guid, EquipmentDataModel> dic)
{
if (dic != null && dic.ContainsKey(e.EquipmentInfoId))
{
dic[e.EquipmentInfoId].Telemeterings.Add(telemetering);
}
else
{
EquipmentDataModel sendE = new EquipmentDataModel
{
EquipmentInfoId = e.EquipmentInfoId,
EquipmentTypeId = e.EquipmentTypeId,
SafetyStateType = e.SafetyStateType,
GroupType = e.GroupType
};
sendE.Telemeterings.Add(telemetering);
dic.Add(sendE.EquipmentInfoId, sendE);
}
}
/// <summary>
/// 发送变化的数据
/// </summary>
/// <param name="eList"></param>
/// <param name="groupType"></param>
private async void SendMonitoringChangedDataAsync(List<EquipmentDataModel> eList, GroupTypeEnum groupType)
{
await Task.Run(() =>
{
DataMonitorMessageModel messageModel = new DataMonitorMessageModel
{
Content = eList,
GroupType = groupType,
MessageType = MessgeTypeEnum.Changed
};
//#warning Send
_webSocketServer.SendMsg(messageModel);
}).ConfigureAwait(false);
}
/// <summary>
/// 发送遥测数据到主界面日志显示
/// </summary>
/// <param name="info"></param>
public void RecordYCLogInfo(YC_TYPE_New info)
{
string infoStr = $"装置地址:{info.addr}CPU扇区号{info.sector},编码地址:{info.inf},遥测值:{info.val}";
MonitoringEventBus.LogHandler(infoStr, "遥测数据");
//if (info.inf >= startInfoAddr && info.inf <= endInfoAddr)
//{
// MonitoringEventBus.LogHandler(infoStr, "遥测数据");
//}
}
/// <summary>
/// 发送遥信数据到主界面日志显示
/// </summary>
/// <param name="info"></param>
public void RecordYXLogInfo(RECORDYXBURST_New info)
{
string infoStr = $"装置地址:{info.dev_addr}CPU扇区号{info.dev_sector},编码地址:{info.dev_inf},遥信值:{info.yx_val}";
MonitoringEventBus.LogHandler(infoStr, "双点信息");
//if (info.dev_inf >= startInfoAddr && info.dev_inf <= endInfoAddr)
//{
// MonitoringEventBus.LogHandler(infoStr, "遥信数据");
//}
}
}
}