using DynamicExpresso;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json.Linq;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Drawing;
using System.Linq;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using ToolLibrary;
using YunDa.Server.ISMSTcp.Domain;
using YunDa.Server.ISMSTcp.Interfaces;
using YunDa.Server.ISMSTcp.Models;
using YunDa.SOMS.DataTransferObject.ExternalEntities.BeijingYounuo;
using YunDa.SOMS.DataTransferObject.GeneralInformation.EquipmentLiveDataDto;
using YunDa.SOMS.DataTransferObject.GeneralInformation.ProtectionDeviceInfoDto;
using YunDa.SOMS.Entities.GeneralInformation;
using YunDa.SOMS.Redis.Repositories;
using static System.Runtime.InteropServices.JavaScript.JSType;
namespace YunDa.Server.ISMSTcp.Services
{
///
/// 遥信数据处理服务
///
public class TelesignalisationHandle
{
private readonly ILogger _logger;
private readonly IRedisRepository _telesignalisationModelListRedis;
private readonly string _telesignalisationModelListRediskey = "telesignalisationModelList";
public string _telesignalisationInflectionInflectionZZChannelRediskey = "telesignalisationInflection_ZZ_Channel";
private readonly IWebSocketPushService _webSocketPushService;
private readonly IApiEndpoints _apiEndpoints;
// 🔧 新增:保护装置通信信息Redis仓储,用于检查设备检修状态
private readonly IRedisRepository _protectionDeviceCommInfoRedis;
private const string PROTECTION_DEVICE_COMM_INFO_REDIS_KEY = "protectionDeviceCommInfo";
// 映射字典:YX_ID -> haskey
private readonly ConcurrentDictionary _yxIdToHashKeyMapping = new ConcurrentDictionary();
// 🔧 新增:网线和光缆逻辑表达式处理相关字段
private readonly WebApiRequest _webApiRequest;
private readonly ConcurrentDictionary> _telesignalToNetworkCableMapping = new ConcurrentDictionary>();
private readonly ConcurrentDictionary> _telesignalToOpticalCableMapping = new ConcurrentDictionary>();
// 初始化状态
public volatile bool _isInitialized = false;
public volatile bool _isExpressionMappingInitialized = false;
private readonly object _initLock = new object();
private readonly object _expressionInitLock = new object();
//遥信数据缓冲队列
private ZzDataCacheContainer _zzDataCacheContainer = new ZzDataCacheContainer(ZzDataCacheContainerDataType.eYX, 30); //只保留5分钟的数据
///
/// 构造函数
///
/// 日志服务
/// 遥信数据Redis仓储
public TelesignalisationHandle(
IApiEndpoints apiEndpoints,
ILogger logger,
IWebSocketPushService webSocketPushService,
IRedisRepository telesignalisationModelListRedis,
IRedisRepository protectionDeviceCommInfoRedis,
WebApiRequest webApiRequest,
ThingService thingService)
{
_apiEndpoints = apiEndpoints ?? throw new ArgumentNullException(nameof(apiEndpoints));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_telesignalisationModelListRedis = telesignalisationModelListRedis ?? throw new ArgumentNullException(nameof(telesignalisationModelListRedis));
_webSocketPushService = webSocketPushService ?? throw new ArgumentNullException(nameof(webSocketPushService));
_protectionDeviceCommInfoRedis = protectionDeviceCommInfoRedis ?? throw new ArgumentNullException(nameof(protectionDeviceCommInfoRedis));
_webApiRequest = webApiRequest ?? throw new ArgumentNullException(nameof(webApiRequest));
ZzDataCache.SetDataCache(ZzDataCacheContainerDataType.eYX, _zzDataCacheContainer);
}
///
/// 初始化遥信数据映射
///
/// 最大重试次数
/// 重试间隔(毫秒)
/// 初始化任务
public async Task InitAsync(int maxRetries = 3, int retryDelayMs = 2000)
{
if (_isInitialized)
{
_logger.LogInformation("遥信数据映射已经初始化完成");
return true;
}
lock (_initLock)
{
if (_isInitialized)
{
return true;
}
_logger.LogInformation("开始初始化遥信数据映射...");
}
for (int attempt = 1; attempt <= maxRetries; attempt++)
{
try
{
_logger.LogInformation("第 {Attempt}/{MaxRetries} 次尝试查询遥信数据", attempt, maxRetries);
// 查询所有遥信数据,使用固定的数据源类别 "Zongzi" (0)
string redisKey = $"{_telesignalisationModelListRediskey}_Zongzi";
var telesignalisationModels = await _telesignalisationModelListRedis.HashSetGetAllAsync(redisKey);
if (telesignalisationModels == null || telesignalisationModels.Count == 0)
{
_logger.LogWarning("第 {Attempt} 次查询遥信数据为空,Redis键: {RedisKey}", attempt, redisKey);
if (attempt < maxRetries)
{
_logger.LogInformation("等待 {DelayMs} 毫秒后重试...", retryDelayMs);
await Task.Delay(retryDelayMs);
continue;
}
else
{
_logger.LogError("所有重试都失败,无法获取遥信数据");
return false;
}
}
// 创建映射字典
int mappingCount = 0;
foreach (var model in telesignalisationModels)
{
if (!string.IsNullOrEmpty(model.ismsbaseYXId))
{
// 构造haskey格式:"{dev_addr}_{dev_sector}_{dev_inf}_0"
// 这里使用模型中的地址信息
string haskey = $"{model.DeviceAddress}_0_{model.DispatcherAddress}_0";
if (_yxIdToHashKeyMapping.TryAdd(model.ismsbaseYXId, haskey))
{
mappingCount++;
//初始化
_zzDataCacheContainer.Write(model.ismsbaseYXId, model.ResultValue, model.ResultTime, model.Name, model.ResultValueStr, model.DispatcherAddress);
}
}
else
{
_logger.LogDebug("遥信模型 {ModelName} 的 ismsbaseYXId 为空,跳过", model.Name);
}
}
lock (_initLock)
{
_isInitialized = true;
}
_logger.LogInformation("遥信数据映射初始化成功!总计 {TotalModels} 个模型,创建 {MappingCount} 个映射",
telesignalisationModels.Count, mappingCount);
return true;
}
catch (Exception ex)
{
_logger.LogError(ex, "第 {Attempt} 次初始化遥信数据映射时发生错误", attempt);
if (attempt < maxRetries)
{
_logger.LogInformation("等待 {DelayMs} 毫秒后重试...", retryDelayMs);
await Task.Delay(retryDelayMs);
}
else
{
_logger.LogError("所有重试都失败,初始化遥信数据映射失败");
return false;
}
}
}
return false;
}
///
/// 初始化网线和光缆逻辑表达式映射
///
/// 最大重试次数
/// 重试间隔(毫秒)
/// 初始化任务
public async Task InitExpressionMappingAsync(int maxRetries = 3, int retryDelayMs = 2000)
{
if (_isExpressionMappingInitialized)
{
_logger.LogInformation("网线和光缆逻辑表达式映射已经初始化完成");
return true;
}
lock (_expressionInitLock)
{
if (_isExpressionMappingInitialized)
{
return true;
}
_logger.LogInformation("开始初始化网线和光缆逻辑表达式映射...");
}
for (int attempt = 1; attempt <= maxRetries; attempt++)
{
try
{
_logger.LogInformation("第 {Attempt}/{MaxRetries} 次尝试获取网线和光缆配置数据", attempt, maxRetries);
// 1. 获取网线配置数据
var networkCables = await _webApiRequest.GetNetworkCableListAsync();
var opticalCables = await _webApiRequest.GetOpticalCableListAsync();
if ((networkCables == null || networkCables.Count == 0) &&
(opticalCables == null || opticalCables.Count == 0))
{
_logger.LogWarning("第 {Attempt} 次查询网线和光缆配置数据为空", attempt);
if (attempt < maxRetries)
{
_logger.LogInformation("等待 {DelayMs} 毫秒后重试...", retryDelayMs);
await Task.Delay(retryDelayMs);
continue;
}
else
{
_logger.LogError("所有重试都失败,无法获取网线和光缆配置数据");
return false;
}
}
// 2. 处理网线配置,创建遥信地址到网线配置的映射
int networkCableMappingCount = 0;
if (networkCables != null)
{
foreach (var cable in networkCables.Where(c => c.IsActive && !string.IsNullOrWhiteSpace(c.LogicalExpression)))
{
var addresses = ExtractTelesignalAddresses(cable.LogicalExpression);
foreach (var address in addresses)
{
_telesignalToNetworkCableMapping.AddOrUpdate(address,
new List { cable },
(key, existingList) =>
{
existingList.Add(cable);
return existingList;
});
networkCableMappingCount++;
}
}
}
// 3. 处理光缆配置,创建遥信地址到光缆配置的映射
int opticalCableMappingCount = 0;
if (opticalCables != null)
{
foreach (var cable in opticalCables.Where(c => c.IsActive && !string.IsNullOrWhiteSpace(c.LogicalExpression)))
{
var addresses = ExtractTelesignalAddresses(cable.LogicalExpression);
foreach (var address in addresses)
{
_telesignalToOpticalCableMapping.AddOrUpdate(address,
new List { cable },
(key, existingList) =>
{
existingList.Add(cable);
return existingList;
});
opticalCableMappingCount++;
}
}
}
lock (_expressionInitLock)
{
_isExpressionMappingInitialized = true;
}
_logger.LogInformation("网线和光缆逻辑表达式映射初始化成功!网线配置: {NetworkCableCount} 个,光缆配置: {OpticalCableCount} 个,网线映射: {NetworkCableMappingCount} 个,光缆映射: {OpticalCableMappingCount} 个",
networkCables?.Count ?? 0, opticalCables?.Count ?? 0, networkCableMappingCount, opticalCableMappingCount);
return true;
}
catch (Exception ex)
{
_logger.LogError(ex, "第 {Attempt} 次初始化网线和光缆逻辑表达式映射时发生错误", attempt);
if (attempt < maxRetries)
{
_logger.LogInformation("等待 {DelayMs} 毫秒后重试...", retryDelayMs);
await Task.Delay(retryDelayMs);
}
else
{
_logger.LogError("所有重试都失败,初始化网线和光缆逻辑表达式映射失败");
return false;
}
}
}
return false;
}
///
/// 处理遥信数据
///
/// 遥信数据JSON令牌
/// 是否跳过转发逻辑(仅更新Redis)
/// 处理任务
public async Task ProcessYXDataAsync(JToken yxData, bool skipForwarding = false)
{
try
{
if (yxData == null)
{
_logger.LogWarning("遥信数据为空,跳过处理");
return null;
}
if (!_isInitialized)
{
_logger.LogWarning("遥信数据映射尚未初始化,跳过处理");
return null;
}
return await ProcessSingleYXDataAsync(yxData, skipForwarding);
//_logger.LogInformation("遥信数据处理完成");
}
catch (Exception ex)
{
_logger.LogError(ex, "处理遥信数据时发生错误: {YXData}", yxData?.ToString());
throw;
}
}
///
/// 处理单个遥信数据项
///
/// 单个遥信数据项
/// 是否跳过转发逻辑(仅更新Redis)
/// 处理任务
private async Task ProcessSingleYXDataAsync(JToken yxItem, bool skipForwarding = false)
{
try
{
// 解析YXData结构
var yxDataModel = yxItem.ToObject();
if (yxDataModel == null)
{
_logger.LogWarning("无法解析遥信数据项: {Item}", yxItem?.ToString());
return null;
}
//开关状态(合、分、不定),与具体的规约(101、104)无关 TSwitchstate = (swon = 0 swOff = 1,swUncertainty = 2);
switch (yxDataModel.V)
{
case 0:
yxDataModel.V = 2;
break;
case 2:
yxDataModel.V = 0;
break;
}
//if (yxDataModel.YX_ID == "YXB001121065")
//{
// _logger.LogWarning($"YXB001121065: {yxDataModel.V}, {yxDataModel.T}");
// Debug.WriteLine($"YXB001121065: {yxDataModel.V}, {yxDataModel.T}");
//}
// 使用映射字典查找对应的haskey
if (!_yxIdToHashKeyMapping.TryGetValue(yxDataModel.YX_ID, out string haskey))
{
// _logger.LogWarning("未找到YX_ID {YxId} 对应的映射", yxDataModel.YX_ID);
return null;
}
// 构建Redis键
string redisKey = $"{_telesignalisationModelListRediskey}_Zongzi";
// 从Redis获取现有数据
var telesignalisationModel = await _telesignalisationModelListRedis.HashSetGetOneAsync(redisKey, haskey);
if (telesignalisationModel == null)
{
_logger.LogWarning("Redis中未找到遥信数据: RedisKey={RedisKey}, HasKey={HasKey}", redisKey, haskey);
return null;
}
// 解析时间
DateTime resultTime = DateTime.Now;
if (!string.IsNullOrEmpty(yxDataModel.T))
{
if (DateTime.TryParse(yxDataModel.T, out DateTime parsedTime))
{
if (parsedTime.Year>2000)
{
resultTime = parsedTime;
}
}
else
{
_logger.LogWarning("无法解析时间格式: {TimeStr},使用当前时间", yxDataModel.T);
}
}
// 更新数据
telesignalisationModel.ResultTime = resultTime;
telesignalisationModel.ResultValue = yxDataModel.V;
// 保存到Redis
await _telesignalisationModelListRedis.HashSetUpdateOneAsync(redisKey, haskey, telesignalisationModel);
string channel = $"{_telesignalisationInflectionInflectionZZChannelRediskey}_Zongzi";
_telesignalisationModelListRedis.PublishAsync(channel, telesignalisationModel);
//缓存遥信数据到内存
_zzDataCacheContainer.Write(yxDataModel.YX_ID, yxDataModel.V, resultTime, telesignalisationModel.Name, telesignalisationModel.ResultValueStr, telesignalisationModel.DispatcherAddress);
// 🔧 优化:如果跳过转发,仅更新Redis后直接返回
if (skipForwarding)
{
_logger.LogDebug("跳过转发逻辑 - YX_ID: {YxId}", yxDataModel.YX_ID);
return telesignalisationModel;
}
// 提取设备ID并调用数据变位信号API
var equipmentId = telesignalisationModel.EquipmentInfoId.Value;
await CallDataChangedAsync(equipmentId);
// 🔧 新增:检查设备是否处于检修状态,如果是则跳过数据推送
var isInMaintenance = await IsEquipmentInMaintenanceAsync(equipmentId);
if (isInMaintenance)
{
_logger.LogDebug("设备处于检修状态,跳过遥信数据推送 - EquipmentInfoId: {EquipmentId}", equipmentId);
// 设备处于检修状态,跳过数据推送
return telesignalisationModel;
}
// WebSocket实时推送
try
{
JToken pointToken = JToken.FromObject(telesignalisationModel);
// 推送到YX分组
await _webSocketPushService.PushYXDataAsync(pointToken, telesignalisationModel.EquipmentInfoId.ToString());
}
catch (Exception pushEx)
{
_logger.LogWarning(pushEx, "YX数据WebSocket推送失败,但不影响主要处理流程");
}
// 🔧 新增:处理网线和光缆逻辑表达式
if (_isExpressionMappingInitialized)
{
await ProcessLogicExpressionsAsync(yxDataModel.YX_ID, yxDataModel.V);
}
return telesignalisationModel;
}
catch (Exception ex)
{
_logger.LogError(ex, "处理单个遥信数据项时发生错误: {Item}", yxItem?.ToString());
return null;
}
}
public async Task> GetYXDataByDataIds(List ids, int times, int timeWindowType, CancellationToken cancellationToken)
{
//检查数据是否为空,如果为空,从Redis里取最新值
if(!_isInitialized)
{
await InitAsync();
}
var finalResult = await _zzDataCacheContainer.Read(ids, times, timeWindowType, cancellationToken);
string redisKey = $"{_telesignalisationModelListRediskey}_Zongzi";
foreach (var item in finalResult)
{
if(item.TimeStamp == DateTime.MinValue)
{
if (!_yxIdToHashKeyMapping.TryGetValue(item.Id, out string haskey))
{
continue;
}
var telesignalisationModel = await _telesignalisationModelListRedis.HashSetGetOneAsync(redisKey, haskey);
if (telesignalisationModel == null)
{
_logger.LogWarning("Redis中未找到遥信数据: RedisKey={RedisKey}, HasKey={HasKey}", redisKey, haskey);
continue; // 继续处理下一个haskey
}
item.Value = telesignalisationModel.ResultValue;
item.ValueStr = telesignalisationModel.ResultValueStr;
item.TimeStamp = telesignalisationModel.ResultTime;
}
}
return finalResult;
}
private readonly ConcurrentQueue _callTimestamps = new ConcurrentQueue();
private readonly SemaphoreSlim _rateLimitLock = new SemaphoreSlim(1, 1);
private const int MaxCallsPerWindow = 300;
private const int TimeWindowSeconds = 3;
private async Task CallDataChangedAsync(Guid equipmentId)
{
try
{
// 检查调用频率
if (!await ShouldAllowCallAsync())
{
_logger.LogWarning("数据变位信号调用频率过高,放弃本次调用,设备ID: {EquipmentId}", equipmentId);
return;
}
var url = $"{_apiEndpoints.DataChangedUri}?equipmentId={equipmentId}";
_ = Task.Run(async () =>
{
try
{
var response = await Task.Run(() => HttpHelper.HttpGetRequest