269 lines
11 KiB
C#
269 lines
11 KiB
C#
using System;
|
||
using System.Collections.Concurrent;
|
||
using System.Collections.Generic;
|
||
using System.Linq;
|
||
using System.Text;
|
||
using System.Threading.Tasks;
|
||
using Microsoft.Extensions.Logging;
|
||
using Newtonsoft.Json.Linq;
|
||
using YunDa.SOMS.DataTransferObject.GeneralInformation.EquipmentLiveDataDto;
|
||
using YunDa.SOMS.Redis.Repositories;
|
||
using YunDa.Server.ISMSTcp.Models;
|
||
|
||
namespace YunDa.Server.ISMSTcp.Services
|
||
{
|
||
/// <summary>
|
||
/// 遥信数据处理服务
|
||
/// </summary>
|
||
public class TelesignalisationHandle
|
||
{
|
||
private readonly ILogger<TelesignalisationHandle> _logger;
|
||
private readonly IRedisRepository<TelesignalisationModel, string> _telesignalisationModelListRedis;
|
||
private readonly string _telesignalisationModelListRediskey = "telesignalisationModelList";
|
||
|
||
// 映射字典:YX_ID -> haskey
|
||
private readonly ConcurrentDictionary<string, string> _yxIdToHashKeyMapping = new ConcurrentDictionary<string, string>();
|
||
|
||
// 初始化状态
|
||
private volatile bool _isInitialized = false;
|
||
private readonly object _initLock = new object();
|
||
|
||
/// <summary>
|
||
/// 构造函数
|
||
/// </summary>
|
||
/// <param name="logger">日志服务</param>
|
||
/// <param name="telesignalisationModelListRedis">遥信数据Redis仓储</param>
|
||
public TelesignalisationHandle(
|
||
ILogger<TelesignalisationHandle> logger,
|
||
IRedisRepository<TelesignalisationModel, string> telesignalisationModelListRedis)
|
||
{
|
||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||
_telesignalisationModelListRedis = telesignalisationModelListRedis ?? throw new ArgumentNullException(nameof(telesignalisationModelListRedis));
|
||
}
|
||
|
||
/// <summary>
|
||
/// 初始化遥信数据映射
|
||
/// </summary>
|
||
/// <param name="maxRetries">最大重试次数</param>
|
||
/// <param name="retryDelayMs">重试间隔(毫秒)</param>
|
||
/// <returns>初始化任务</returns>
|
||
public async Task<bool> InitAsync(int maxRetries = 3, int retryDelayMs = 2000)
|
||
{
|
||
if (_isInitialized)
|
||
{
|
||
_logger.LogInformation("遥信数据映射已经初始化完成");
|
||
return true;
|
||
}
|
||
|
||
lock (_initLock)
|
||
{
|
||
if (_isInitialized)
|
||
{
|
||
return true;
|
||
}
|
||
|
||
_logger.LogInformation("开始初始化遥信数据映射...");
|
||
}
|
||
|
||
for (int attempt = 1; attempt <= maxRetries; attempt++)
|
||
{
|
||
try
|
||
{
|
||
_logger.LogInformation("第 {Attempt}/{MaxRetries} 次尝试查询遥信数据", attempt, maxRetries);
|
||
|
||
// 查询所有遥信数据,使用固定的数据源类别 "Zongzi" (0)
|
||
string redisKey = $"{_telesignalisationModelListRediskey}_Zongzi";
|
||
var telesignalisationModels = await _telesignalisationModelListRedis.HashSetGetAllAsync(redisKey);
|
||
|
||
if (telesignalisationModels == null || telesignalisationModels.Count == 0)
|
||
{
|
||
_logger.LogWarning("第 {Attempt} 次查询遥信数据为空,Redis键: {RedisKey}", attempt, redisKey);
|
||
|
||
if (attempt < maxRetries)
|
||
{
|
||
_logger.LogInformation("等待 {DelayMs} 毫秒后重试...", retryDelayMs);
|
||
await Task.Delay(retryDelayMs);
|
||
continue;
|
||
}
|
||
else
|
||
{
|
||
_logger.LogError("所有重试都失败,无法获取遥信数据");
|
||
return false;
|
||
}
|
||
}
|
||
|
||
// 创建映射字典
|
||
int mappingCount = 0;
|
||
foreach (var model in telesignalisationModels)
|
||
{
|
||
if (!string.IsNullOrEmpty(model.ismsbaseYXId))
|
||
{
|
||
// 构造haskey格式:"{dev_addr}_{dev_sector}_{dev_inf}_0"
|
||
// 这里使用模型中的地址信息
|
||
string haskey = $"{model.InfoDeviceAddress}_{model.InfoCPUSector}_{model.InfoAddress}_0";
|
||
|
||
if (_yxIdToHashKeyMapping.TryAdd(model.ismsbaseYXId, haskey))
|
||
{
|
||
mappingCount++;
|
||
_logger.LogDebug("添加映射: YX_ID={YxId} -> haskey={HasKey}", model.ismsbaseYXId, haskey);
|
||
}
|
||
else
|
||
{
|
||
_logger.LogWarning("重复的YX_ID: {YxId},跳过映射", model.ismsbaseYXId);
|
||
}
|
||
}
|
||
else
|
||
{
|
||
_logger.LogDebug("遥信模型 {ModelName} 的 ismsbaseYXId 为空,跳过", model.Name);
|
||
}
|
||
}
|
||
|
||
lock (_initLock)
|
||
{
|
||
_isInitialized = true;
|
||
}
|
||
|
||
_logger.LogInformation("遥信数据映射初始化成功!总计 {TotalModels} 个模型,创建 {MappingCount} 个映射",
|
||
telesignalisationModels.Count, mappingCount);
|
||
return true;
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
_logger.LogError(ex, "第 {Attempt} 次初始化遥信数据映射时发生错误", attempt);
|
||
|
||
if (attempt < maxRetries)
|
||
{
|
||
_logger.LogInformation("等待 {DelayMs} 毫秒后重试...", retryDelayMs);
|
||
await Task.Delay(retryDelayMs);
|
||
}
|
||
else
|
||
{
|
||
_logger.LogError("所有重试都失败,初始化遥信数据映射失败");
|
||
return false;
|
||
}
|
||
}
|
||
}
|
||
|
||
return false;
|
||
}
|
||
|
||
/// <summary>
|
||
/// 处理遥信数据
|
||
/// </summary>
|
||
/// <param name="yxData">遥信数据JSON令牌</param>
|
||
/// <returns>处理任务</returns>
|
||
public async Task ProcessYXDataAsync(JToken yxData)
|
||
{
|
||
try
|
||
{
|
||
_logger.LogInformation("开始处理遥信数据: {YXData}", yxData?.ToString());
|
||
|
||
if (yxData == null)
|
||
{
|
||
_logger.LogWarning("遥信数据为空,跳过处理");
|
||
return;
|
||
}
|
||
|
||
if (!_isInitialized)
|
||
{
|
||
_logger.LogWarning("遥信数据映射尚未初始化,跳过处理");
|
||
return;
|
||
}
|
||
|
||
// 解析YXData数组
|
||
if (yxData is JArray yxArray)
|
||
{
|
||
_logger.LogInformation("处理遥信数据数组,包含 {Count} 个项目", yxArray.Count);
|
||
foreach (var item in yxArray)
|
||
{
|
||
await ProcessSingleYXDataAsync(item);
|
||
}
|
||
}
|
||
else if (yxData is JObject yxObject)
|
||
{
|
||
await ProcessSingleYXDataAsync(yxObject);
|
||
}
|
||
else
|
||
{
|
||
_logger.LogWarning("不支持的遥信数据格式: {DataType}", yxData.Type);
|
||
}
|
||
|
||
_logger.LogInformation("遥信数据处理完成");
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
_logger.LogError(ex, "处理遥信数据时发生错误: {YXData}", yxData?.ToString());
|
||
throw;
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// 处理单个遥信数据项
|
||
/// </summary>
|
||
/// <param name="yxItem">单个遥信数据项</param>
|
||
/// <returns>处理任务</returns>
|
||
private async Task ProcessSingleYXDataAsync(JToken yxItem)
|
||
{
|
||
try
|
||
{
|
||
// 解析YXData结构
|
||
var yxDataModel = yxItem.ToObject<YXData>();
|
||
if (yxDataModel == null)
|
||
{
|
||
_logger.LogWarning("无法解析遥信数据项: {Item}", yxItem?.ToString());
|
||
return;
|
||
}
|
||
|
||
_logger.LogDebug("处理遥信数据: YX_ID={YxId}, V={Value}, T={Time}",
|
||
yxDataModel.YX_ID, yxDataModel.V, yxDataModel.T);
|
||
|
||
// 使用映射字典查找对应的haskey
|
||
if (!_yxIdToHashKeyMapping.TryGetValue(yxDataModel.YX_ID, out string haskey))
|
||
{
|
||
_logger.LogWarning("未找到YX_ID {YxId} 对应的映射", yxDataModel.YX_ID);
|
||
return;
|
||
}
|
||
|
||
// 构建Redis键
|
||
string redisKey = $"{_telesignalisationModelListRediskey}_Zongzi";
|
||
|
||
// 从Redis获取现有数据
|
||
var telesignalisationModel = await _telesignalisationModelListRedis.HashSetGetOneAsync(redisKey, haskey);
|
||
if (telesignalisationModel == null)
|
||
{
|
||
_logger.LogWarning("Redis中未找到遥信数据: RedisKey={RedisKey}, HasKey={HasKey}", redisKey, haskey);
|
||
return;
|
||
}
|
||
|
||
// 解析时间
|
||
DateTime resultTime = DateTime.Now;
|
||
if (!string.IsNullOrEmpty(yxDataModel.T))
|
||
{
|
||
if (DateTime.TryParse(yxDataModel.T, out DateTime parsedTime))
|
||
{
|
||
resultTime = parsedTime;
|
||
}
|
||
else
|
||
{
|
||
_logger.LogWarning("无法解析时间格式: {TimeStr},使用当前时间", yxDataModel.T);
|
||
}
|
||
}
|
||
|
||
// 更新数据
|
||
telesignalisationModel.ResultTime = resultTime;
|
||
telesignalisationModel.ResultValue = yxDataModel.V;
|
||
|
||
// 保存到Redis
|
||
await _telesignalisationModelListRedis.HashSetUpdateOneAsync(redisKey, haskey, telesignalisationModel);
|
||
|
||
_logger.LogDebug("成功更新遥信数据: YX_ID={YxId}, HasKey={HasKey}, 值={Value}",
|
||
yxDataModel.YX_ID, haskey, yxDataModel.V);
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
_logger.LogError(ex, "处理单个遥信数据项时发生错误: {Item}", yxItem?.ToString());
|
||
}
|
||
}
|
||
}
|
||
}
|