311 lines
12 KiB
C#
Raw Normal View History

2024-07-15 10:31:26 +08:00
using Abp.Dependency;
using MongoDB.Driver;
using MongoDB.Driver.Linq;
using System;
2024-09-20 09:59:25 +08:00
using System.Collections;
2024-07-15 10:31:26 +08:00
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using System.Windows.Documents;
2024-09-20 09:59:25 +08:00
using System.Windows.Markup;
2024-07-15 10:31:26 +08:00
using ToolLibrary.LogHelper;
using Yunda.ISAS.DataMonitoringServer.DataAnalysis.DataCollection.Dlls;
using Yunda.ISAS.DataMonitoringServer.DataCenter;
using Yunda.ISAS.DataMonitoringServer.WPF.ViewModel;
using YunDa.ISAS.DataTransferObject.EquipmentLiveData;
using YunDa.ISAS.Entities.DataMonitoring;
using YunDa.ISAS.Redis.Entities.DataMonitorCategory;
namespace Yunda.ISAS.DataMonitoringServer.DataAnalysis.DataCollection
{
/// <summary>
/// 数据采集
/// </summary>
public class DataCollectionTask : ISingletonDependency
{
private Content _settingModel = null;
//public delegate void DataCollectionTaskCompleteDelegate();
//public event DataCollectionTaskCompleteDelegate DataCollectionTaskCompleteEvent;
/// <summary>
/// 时间转换算法
/// </summary>
private Func<byte[], DateTime> FormatDateTime = tm => new DateTime(2000 + tm[6], tm[5], tm[4], tm[3], tm[2],
((tm[1] << 8) + tm[0]) / 1000, ((tm[1] << 8) + tm[0]) % 1000);
CancellationTokenSource cancellationTokenSource = default;
private readonly DataSendTask _dataSendTask;
private readonly RunningDataCache _runningDataCache;
private readonly RedisDataRepository _redisDataRepository;
public DataCollectionTask(Content settingModel
, DataSendTask dataSendTask,
RedisDataRepository redisDataRepository,
RunningDataCache runningDataCache)
{
_dataSendTask = dataSendTask;
_settingModel = settingModel;
_runningDataCache = runningDataCache;
_redisDataRepository = redisDataRepository;
Task.Factory.StartNew(() => {
while (true)
{
if (_runningDataCache != null)
{
TimeSpan span = TimeSpan.FromMinutes(30);
CallAllData();
Log4Helper.Info(this.GetType(), $"定时总召,时间间隔{span.TotalMinutes}分钟");
Task.Delay(span).Wait();
}
}
});
}
#region
ActionBlock<IECData> _handleDataAction;
public static IEC104Client _client;
private bool _isStarted= false;
/// <summary>
/// 开始
/// </summary>
public void CollectionStart(Content settingModel, Action startWebsocket)
{
2024-09-20 09:59:25 +08:00
yxList.Clear(); ;
ycList.Clear();
MonitoringEventBus.LogHandler($"开始连接远动机ip:{settingModel.Dev_Ip},端口:{settingModel.Dev_Port}", "通知信息");
2024-07-15 10:31:26 +08:00
try
{
_handleDataAction = new ActionBlock<IECData>(data =>
{
Stopwatch stopwatch = Stopwatch.StartNew();
if (data.DataType == IECDataType.Telemetering)
{
YC_TYPE_New info = new YC_TYPE_New()
2024-09-20 09:59:25 +08:00
{
2024-07-15 10:31:26 +08:00
chgFlag = 0,
2024-11-26 13:45:28 +08:00
addr = (byte)settingModel.Dev_addr,
2024-07-15 10:31:26 +08:00
sector = 0,
val= data.FValue,
inf = data.InfoAddr,
time = data.Time
};
if (_isStarted)
{
_dataSendTask.RecordYCLogInfo(info);
_dataSendTask.YCTActionBlock.Post(info);
}
else
{
2024-09-20 09:59:25 +08:00
//OnlyChangeYC(info);
ycList.Add(info);
2024-07-15 10:31:26 +08:00
}
}
else if (data.DataType == IECDataType.Telesignal)
{
var info = new RECORDYXBURST_New()
{
2024-11-26 13:45:28 +08:00
dev_addr = (byte)settingModel.Dev_addr,
2024-07-15 10:31:26 +08:00
dev_inf = data.InfoAddr,
dev_sector = 0,
yx_val = data.IValue,
time = data.Time
};
if (_isStarted)
{
_dataSendTask.RecordYXLogInfo(info);
_dataSendTask.RECORDYXBURSTActionBlock.Post(info);
}
else
{
2024-09-20 09:59:25 +08:00
yxList.Add(info);
2024-07-15 10:31:26 +08:00
}
}
else if (data.DataType == IECDataType.Error)
{
MonitoringEventBus.LogHandler(data.Ex.ToString(), "异常信息");
Log4Helper.Error(this.GetType(), data.Ex.ToString());
}
else if(data.DataType == IECDataType.Msg)
{
2024-11-26 13:45:28 +08:00
//MonitoringEventBus.LogHandler(data.Msg, "104数据");
//Log4Helper.Info(this.GetType(), data.Msg);
2024-08-21 16:50:14 +08:00
//Debug.WriteLine(data.Msg);
2024-07-15 10:31:26 +08:00
}
else if (data.DataType == IECDataType.CompleteAll)
{
Log4Helper.Info(this.GetType(), $"完成总召时间: {DateTime.Now.ToLongTimeString}");
if (!_isStarted)
{
_isStarted = true;
startWebsocket();
}
2024-09-20 09:59:25 +08:00
Task.Run(() =>
{
OnlyChangeYX();
OnlyChangeYC();
});
2024-07-15 10:31:26 +08:00
}
stopwatch.Stop();
//Debug.WriteLine("处理数据花费时间:"+stopwatch.ElapsedTicks);
2024-11-26 13:45:28 +08:00
//Log4Helper.Info(this.GetType(),$"处理数据花费时间: {stopwatch.ElapsedTicks}");
2024-07-15 10:31:26 +08:00
},new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism =2,MaxMessagesPerTask = 3000});
_client = new IEC104Client(settingModel.Dev_Ip, settingModel.Dev_Port, _handleDataAction);
_client.Connect();
//#if DEBUG
// while (true)
// {
// _handleDataAction.Post(new IECData
// {
// Cause = 1,
// DataType = IECDataType.Telesignal,
// IValue = 1,
// Time = DateTime.Now,
// InfoAddr = 3024
// });
// Task.Delay(10 * 1000).Wait();
// }
//#endif
return;
}
catch (Exception ex)
{
MonitoringEventBus.LogHandler(ex.Message, "异常信息");
}
MonitoringEventBus.LogHandler("数据采集开始", "数据采集");
}
public void CallAllData()
{
_isStarted = false;
_client?.CallAll();
}
/// <summary>
/// 停止
/// </summary>
public void CollectionStop()
{
2024-11-26 13:45:28 +08:00
try
{
_isStarted = false;
_client?.Close();
}
catch (Exception ex)
{
MonitoringEventBus.LogHandler("数据采集关闭", "数据采集");
}
2024-07-15 10:31:26 +08:00
}
#endregion
Dictionary<int, TelesignalisationModel> TelesignalisationModelDic = new Dictionary<int, TelesignalisationModel>();
2024-09-20 09:59:25 +08:00
List<RECORDYXBURST_New> yxList = new List<RECORDYXBURST_New>();
List<YC_TYPE_New> ycList = new List<YC_TYPE_New>();
2024-07-15 10:31:26 +08:00
/// <summary>
/// 仅更新遥信数据字典
/// </summary>
/// <param name="yx"></param>
2024-09-20 09:59:25 +08:00
private async void OnlyChangeYX()
2024-07-15 10:31:26 +08:00
{
2024-11-26 13:45:28 +08:00
string redisKey = _redisDataRepository.TelesignalisationModelListRediskey+"_"+ _settingModel.GetDatacatgoryName(_settingModel.DataSourceCategoryName);
// 从 Redis 中批量获取所有数据
2024-09-20 09:59:25 +08:00
var yxDatas = await _redisDataRepository.TelesignalisationModelListRedis.HashSetGetAllAsync(redisKey);
2024-11-26 13:45:28 +08:00
if (yxDatas == null || yxDatas.Count == 0) return;
var updateList = new List<TelesignalisationModel>();
var updateIdList = new List<string>();
var categoriyValue = _settingModel.GetDatacatgoryValue(_settingModel.DataSourceCategoryName);
foreach (RECORDYXBURST_New yx in yxList)
2024-07-15 10:31:26 +08:00
{
2024-11-26 13:45:28 +08:00
var yxData = yxDatas.FirstOrDefault(t => t.DispatcherAddress == yx.dev_inf&&t.CPUSector == yx.dev_sector&&yx.dev_addr ==t.DeviceAddress&&(int)t.DataSourceCategory == categoriyValue);
if (yxData != null)
2024-07-15 10:31:26 +08:00
{
2024-11-26 13:45:28 +08:00
yxData.ResultTime = yx.time;
yxData.ResultValue = yx.yx_val;
updateList.Add(yxData);
updateIdList.Add($"{yxData.DeviceAddress}_{yxData.CPUSector}_{yxData.DispatcherAddress}_{categoriyValue}");
//Log4Helper.Info(this.GetType(), $"查找数据成功: 地址:{yx.dev_inf} 类型:{categoriyValue}");
// 添加到更新列表
}
else
{
//Log4Helper.Error(this.GetType(), $"查找数据失败:装置地址:{yx.dev_addr} 调度地址:{yx.dev_inf} 类型:{categoriyValue}");
2024-07-15 10:31:26 +08:00
}
}
2024-11-26 13:45:28 +08:00
// 批量更新 Redis减少写入操作的频次
await _redisDataRepository.TelesignalisationModelListRedis.HashSetUpdateManyAsync(redisKey, updateIdList, updateList);
2024-07-15 10:31:26 +08:00
}
2024-11-26 13:45:28 +08:00
2024-07-15 10:31:26 +08:00
/// <summary>
/// 仅更新遥测数据字典
/// </summary>
/// <param name="yc"></param>
2024-09-20 09:59:25 +08:00
private async void OnlyChangeYC()
2024-07-15 10:31:26 +08:00
{
2024-11-26 13:45:28 +08:00
string redisKey = _redisDataRepository.TelemeteringModelListRediskey + "_" + _settingModel.GetDatacatgoryName(_settingModel.DataSourceCategoryName); ;
// 从 Redis 中批量获取所有遥测数据
var ycDatas = await _redisDataRepository.TelemeteringModelListRedis.HashSetGetAllAsync(redisKey);
if (ycDatas == null || ycDatas.Count == 0) return;
// 用于批量更新的列表
var updateList = new List<TelemeteringModel>();
var updateIdList = new List<string>();
var categoriyValue = _settingModel.GetDatacatgoryValue(_settingModel.DataSourceCategoryName);
foreach (YC_TYPE_New yc in ycList)
2024-07-15 10:31:26 +08:00
{
2024-11-26 13:45:28 +08:00
// 在内存数据中查找符合条件的项
var ycData = ycDatas.FirstOrDefault(t => t.DispatcherAddress == yc.inf&&t.CPUSector ==yc.sector&&t.DeviceAddress == yc.addr&& (int)t.DataSourceCategory == categoriyValue);
if (ycData != null)
2024-07-15 10:31:26 +08:00
{
2024-11-26 13:45:28 +08:00
ycData.ResultTime = yc.time;
ycData.ResultValue = yc.val;
// 将更新项添加到批量更新列表
updateList.Add( ycData);
updateIdList.Add($"{ycData.DeviceAddress}_{ycData.CPUSector}_{ycData.DispatcherAddress}_{categoriyValue}");
//Log4Helper.Info(this.GetType(), $"查找数据成功: 地址:{yc.inf} 类型:{categoriyValue}");
}
else
{
//Log4Helper.Error(this.GetType(), $"查找数据失败: 装置地址:{yc.addr} 调度地址:{yc.inf} 类型:{categoriyValue}");
2024-07-15 10:31:26 +08:00
}
}
2024-11-26 13:45:28 +08:00
// 一次性将所有更新写入 Redis
await _redisDataRepository.TelemeteringModelListRedis.HashSetUpdateManyAsync(redisKey, updateIdList, updateList);
2024-07-15 10:31:26 +08:00
}
2024-11-26 13:45:28 +08:00
2024-07-15 10:31:26 +08:00
/// <summary>
/// 获取指定地址的数据
/// </summary>
/// <param name="addr"></param>
/// <param name="sector"></param>
/// <param name="inf"></param>
public void GetOneData(byte addr, byte sector, ushort inf)
{
2024-09-20 09:59:25 +08:00
2024-07-15 10:31:26 +08:00
}
}
}