using DotNetty.Common.Utilities;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using Serilog;
using Serilog.Core;
using Serilog.Events;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Linq.Dynamic.Core.Tokenizer;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using ToolLibrary;
using YunDa.Server.Communication.CommunicationModels;
using YunDa.Server.Communication.Framework;
using YunDa.Server.ISMSTcp.Domain;
using YunDa.Server.ISMSTcp.Filters.Enums;
using YunDa.Server.ISMSTcp.Filters.Extensions;
using YunDa.Server.ISMSTcp.Filters.Interfaces;
using YunDa.Server.ISMSTcp.Interfaces;
using YunDa.Server.ISMSTcp.Models;
using YunDa.SOMS.DataTransferObject.GeneralInformation.EquipmentLiveDataDto;
using YunDa.SOMS.DataTransferObject.GeneralInformation.ProtectionDeviceInfoDto;
using YunDa.SOMS.DataTransferObject.MaintenanceAndOperations.SecondaryEquipment;
using YunDa.SOMS.Redis.Repositories;
using static System.Runtime.InteropServices.JavaScript.JSType;
namespace YunDa.Server.ISMSTcp.Services
{
///
/// 数据处理器实现
///
/// 重构说明:
/// - 移除了不必要的 _dataBuffer 缓冲区逻辑,因为当前实现可以正确解析完整的数据包
/// - 简化了数据处理流程,直接处理接收到的数据而不需要缓冲不完整的数据包
/// - 保留了数据包解析和验证的核心逻辑
/// - 移除了相关的线程安全保护,因为不再需要管理共享的缓冲区状态
///
public class DataProcessor
{
private readonly ILogger _logger;
private readonly MessageParser _messageParser;
private readonly IAlarmService _alarmService;
private readonly IPacketParser _packetParser;
private readonly TelemeteringHandle _telemeteringHandle;
private readonly TelesignalisationHandle _telesignalisationHandle;
private readonly VirtualTerminalHandler _virtualTerminalHandler;
private readonly IFilterControlService _filterControlService;
private readonly IDeviceCommunicationStateService _deviceCommunicationStateService;
private readonly IQueryCoordinationService _queryCoordinationService;
private readonly IWebSocketPushService _webSocketPushService;
private readonly IRedisRepository _protectionDeviceCommInfoRedis;
private readonly IApiEndpoints _apiEndpoints;
private readonly WebApiRequest _webApiRequest;
private readonly GwErrorRatioService _gwErrorRatioService;
//孪生体服务
private readonly ThingService _thingService;
// 高频数据处理优化常量
private const int MaxDataSize = 50000; // 单次处理的最大数据大小(50KB)
private const int CriticalDataSize = 100000; // 临界数据大小(100KB)
private const int MaxPacketsPerBatch = 100; // 单批次最大处理数据包数量
private const int MinHeaderSize = 6; // 最小头部大小
// 性能统计
private long _totalProcessedBytes = 0;
private int _totalProcessedPackets = 0;
private int _discardedDataCount = 0;
//二次回路巡检计划
private readonly SecondaryCircuitInspectionPlanService _secondaryCircuitInspectionPlanService;
public DataProcessor(
ILogger logger,
MessageParser messageParser,
IAlarmService alarmService,
ICommandStateMachine commandStateMachine,
IPacketParser packetParser,
TelemeteringHandle telemeteringHandle,
TelesignalisationHandle telesignalisationHandle,
VirtualTerminalHandler virtualTerminalHandler,
IFilterControlService filterControlService,
IDeviceCommunicationStateService deviceCommunicationStateService,
IQueryCoordinationService queryCoordinationService,
IWebSocketPushService webSocketPushService,
IRedisRepository protectionDeviceCommInfoRedis,
IApiEndpoints apiEndpoints,
WebApiRequest webApiRequest,
SecondaryCircuitInspectionPlanService secondaryCircuitInspectionPlanService,
ThingService thingService,
GwErrorRatioService gwErrorRatioService)
{
_logger = logger;
_messageParser = messageParser;
_alarmService = alarmService;
CommandStateMachine = commandStateMachine;
_packetParser = packetParser ?? throw new ArgumentNullException(nameof(packetParser));
_telemeteringHandle = telemeteringHandle ?? throw new ArgumentNullException(nameof(telemeteringHandle));
_telesignalisationHandle = telesignalisationHandle ?? throw new ArgumentNullException(nameof(telesignalisationHandle));
_virtualTerminalHandler = virtualTerminalHandler ?? throw new ArgumentNullException(nameof(virtualTerminalHandler));
_filterControlService = filterControlService ?? throw new ArgumentNullException(nameof(filterControlService));
_deviceCommunicationStateService = deviceCommunicationStateService ?? throw new ArgumentNullException(nameof(deviceCommunicationStateService));
_queryCoordinationService = queryCoordinationService ?? throw new ArgumentNullException(nameof(queryCoordinationService));
_webSocketPushService = webSocketPushService ?? throw new ArgumentNullException(nameof(webSocketPushService));
_protectionDeviceCommInfoRedis = protectionDeviceCommInfoRedis ?? throw new ArgumentNullException(nameof(protectionDeviceCommInfoRedis));
_apiEndpoints = apiEndpoints ?? throw new ArgumentNullException(nameof(apiEndpoints));
_webApiRequest = webApiRequest ?? throw new ArgumentNullException(nameof(webApiRequest));
_secondaryCircuitInspectionPlanService = secondaryCircuitInspectionPlanService ?? throw new ArgumentNullException(nameof(secondaryCircuitInspectionPlanService));
_thingService = thingService ?? throw new ArgumentNullException(nameof(thingService));
_gwErrorRatioService = gwErrorRatioService ?? throw new ArgumentNullException(nameof(gwErrorRatioService));
}
///
/// 命令状态机
///
public ICommandStateMachine CommandStateMachine { get; }
///
/// TCP响应接收事件
/// 当成功接收并解析TCP响应数据时触发
///
public event Action? TcpResponseReceived;
///
/// 根据消息类型处理消息内容
///
/// 消息类型
/// 消息内容
/// 原始JSON字符串
/// 处理结果
private async Task ProcessMessageByTypeAsync(string messageType, JToken contentToken, string originalJson)
{
try
{
if (messageType.ToUpper()!= "YC"&& messageType.ToUpper()!= "YX"&& "HeartBeat"!= messageType&& "CommState" != messageType)
{
Log.Information(messageType + ":" + originalJson);
}
// 触发TCP响应接收事件
TcpResponseReceived?.Invoke();
Models.ProcessResult result = messageType.ToUpper() switch
{
"YC" => await ProcessYCMessageAsync(contentToken) ,
"YX" => await ProcessYXMessageAsync(contentToken),
"VA" => await ProcessVAMessageAsync(contentToken),
"DZ" => await ProcessDZMessageAsync(contentToken),
"COMMSTATE" => await ProcessCommStateMessageAsync(contentToken),
"ALERT" => await ProcessAlertMessageAsync(contentToken, originalJson),
"VERSION" => await ProcessVersionMessageAsync(contentToken),
"ERROR" => await ProcessErrorMessageAsync(contentToken, originalJson),
"HEARTBEAT" => await ProcessHeartBeatMessageAsync(contentToken),
"WAVECFG" => await ProcessWaveCfgMessageAsync(contentToken),
"WAVEDATA" => await ProcessWaveDataMessageAsync(contentToken),
"故障报告" => await ProcessFaultRptMessageAsync(contentToken),
"ALLDATA" => await ProcessAllDataMessageAsync(contentToken),
"GWERRORRATIO" => await ProcessGwErrorRatioMessageAsync(contentToken),
_ => ProcessUnknownMessageType(messageType, originalJson)
};
return result;
}
catch (Exception ex)
{
var errorMessageType = ISMSMessageTypeExtensions.ParseMessageType(messageType);
_logger.LogErrorWithFilter(_filterControlService, errorMessageType, ex,
"处理消息类型 {MessageType} 时发生异常", messageType);
return Models.ProcessResult.Error($"Message processing error for type {messageType}: {ex.Message}");
}
}
private async Task ProcessFaultRptMessageAsync(JToken contentToken)
{
HandleRecvMsg(contentToken, "FAULTRPT");
var alertDatas = contentToken.ToObject>();
foreach (var item in alertDatas)
{
var json = JsonConvert.SerializeObject(item);
Log.Warning("故障报告:"+json);
}
return Models.ProcessResult.Success(contentToken, "FAULTRPT");
}
///
/// 处理未知消息类型
///
/// 消息类型
/// 原始JSON
/// 处理结果
private Models.ProcessResult ProcessUnknownMessageType(string messageType, string originalJson)
{
var unknownMessageType = ISMSMessageTypeExtensions.ParseMessageType(messageType);
_logger.LogWarningWithFilter(_filterControlService, unknownMessageType,
"未知消息类型: {MessageType}", messageType);
return Models.ProcessResult.Success(default,messageType);
}
#region 新协议消息类型处理方法
///
/// 处理YC(遥测)消息
///
/// 消息内容
/// 处理结果
private async Task ProcessYCMessageAsync(JToken contentToken)
{
try
{
if (!_telemeteringHandle._isInitialized)
{
await _telemeteringHandle.InitAsync();
}
//_logger.LogInformation("Processing YC (telemetering) message");
if (contentToken is JArray contentArray && contentArray.Count > 0)
{
// 批量处理遥测数据
await _telemeteringHandle.ProcessYCDataAsync(contentArray);
}
else
{
_logger.LogWarning("YC message content is not a valid array or is empty");
}
return Models.ProcessResult.Success( contentToken,"YC");
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing YC message");
return Models.ProcessResult.Error($"YC processing error: {ex.Message}");
}
}
///
/// 处理YX(遥信)消息
///
/// 消息内容
/// 处理结果
private async Task ProcessYXMessageAsync(JToken contentToken)
{
try
{
if (!_telesignalisationHandle._isInitialized)
{
await _telesignalisationHandle.InitAsync();
}
if (contentToken is JArray contentArray && contentArray.Count > 0)
{
List telesignalisationModels = new List();
// 🔧 优化:根据数据量区分处理方式
// 大批量数据(>10):仅更新Redis,不触发转发逻辑(命令响应场景)
// 小批量数据(<=10):完整处理包括转发(主动变位场景)
if (contentArray.Count > 10)
{
_logger.LogDebug("检测到大批量遥信数据({Count}条),仅更新Redis,跳过转发逻辑", contentArray.Count);
// 仅更新Redis,不触发WebSocket转发和其他处理逻辑
foreach (var item in contentArray)
{
var model = await _telesignalisationHandle.ProcessYXDataAsync(item, skipForwarding: true);
if(model != null)
telesignalisationModels.Add(model);
}
}
else
{
_logger.LogDebug("检测到小批量遥信数据({Count}条),执行完整处理逻辑", contentArray.Count);
// 完整处理:更新Redis + WebSocket转发 + 告警处理等
foreach (var item in contentArray)
{
var model = await _telesignalisationHandle.ProcessYXDataAsync(item, skipForwarding: false);
if (model != null)
telesignalisationModels.Add(model);
}
}
//将状态推送到孪生体
_thingService.UpdateThingYXStatus(telesignalisationModels);
}
else
{
_logger.LogWarning("YX message content is not a valid array or is empty");
}
return Models.ProcessResult.Success(contentToken, "YX");
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing YX message");
return Models.ProcessResult.Error($"YX processing error: {ex.Message}");
}
}
///
/// 处理VA(虚端子)消息
///
/// 消息内容
/// 处理结果
private async Task ProcessVAMessageAsync(JToken contentToken)
{
try
{
_logger.LogInformation("Processing VA (virtual terminal) message");
if (contentToken is JArray contentArray && contentArray.Count > 0)
{
List virtualTerminalDatas = new List();
foreach (var item in contentArray)
{
if (item is JObject itemObject)
{
if (_virtualTerminalHandler.TryParseVirtualTerminalData(item.ToString(), out var virtualTerminalData))
{
virtualTerminalDatas.Add(virtualTerminalData);
await _virtualTerminalHandler.ProcessVirtualTerminalDataAsync(virtualTerminalData);
}
else
{
_logger.LogWarning("Failed to parse virtual terminal data: {Data}", item.ToString());
}
}
}
//向孪生体推送断线数据
_thingService.UpdateOpticalFiberAlarmStatus(virtualTerminalDatas, null, null);
_thingService.UpdateOpticalCableAlarmStatus(virtualTerminalDatas, null);
}
else
{
_logger.LogWarning("VA message content is not a valid array or is empty");
}
return Models.ProcessResult.Success(contentToken, "VA");
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing VA message");
return Models.ProcessResult.Error($"VA processing error: {ex.Message}");
}
}
///
/// 处理DZ(定值)消息
///
/// 消息内容
/// 处理结果
private async Task ProcessDZMessageAsync(JToken contentToken)
{
try
{
_logger.LogInformation("Processing DZ (device setting) message");
if (contentToken is JArray contentArray && contentArray.Count > 0)
{
var processedCount = 0;
var errorCount = 0;
var validDzDataList = new List();
foreach (var item in contentArray)
{
try
{
// 预处理JSON数据,处理Value字段可能包含非数字值的情况
var processedItem = PreprocessDZDataItem(item);
var dzData = processedItem.ToObject();
if (dzData != null)
{
validDzDataList.Add(dzData);
}
}
catch (Exception itemEx)
{
_logger.LogError(itemEx, "处理单个定值数据失败: {Item}", item?.ToString());
}
}
_logger.LogInformation("定值消息处理完成 - 成功: {ProcessedCount}, 错误: {ErrorCount}",
processedCount, errorCount);
_logger.LogError("DZ Data: {DzData}", JsonConvert.SerializeObject(validDzDataList));
HandleCommandResponseAsync(contentToken.ToString(), false);
HandleRecvMsg(contentToken, "dz", false);
}
else
{
_logger.LogWarning("DZ message content is not a valid array or is empty");
}
return Models.ProcessResult.Success(contentToken, "DZ");
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing DZ message");
return Models.ProcessResult.Error($"DZ processing error: {ex.Message}");
}
}
///
/// 处理CommState(通信状态)消息
///
/// 消息内容
/// 处理结果
private async Task ProcessCommStateMessageAsync(JToken contentToken)
{
try
{
_logger.LogInformation("开始处理通信状态消息");
// 解析通信状态消息内容
if (contentToken == null || !contentToken.HasValues)
{
_logger.LogWarning("通信状态消息内容为空");
return Models.ProcessResult.Error("通信状态消息内容为空");
}
const string redisKey = "ProtectionDeviceCommInfos";
var deviceUpdates = new List<(string deviceId, bool isOnline)>();
int processedCount = 0;
int errorCount = 0;
// 解析Content数组中的每个设备通信状态信息
if (contentToken is JArray contentArray)
{
foreach (var item in contentArray)
{
try
{
var deviceId = item["DeviceID"]?.ToString();
var commState = item["CommState"]?.ToString();
if (string.IsNullOrWhiteSpace(deviceId))
{
_logger.LogWarning("设备ID为空,跳过此条记录: {Item}", item.ToString());
errorCount++;
continue;
}
if (string.IsNullOrWhiteSpace(commState))
{
_logger.LogWarning("通信状态为空,设备ID: {DeviceId},跳过此条记录", deviceId);
errorCount++;
continue;
}
// 将通信状态转换为布尔值 ("1" = true, 其他 = false)
bool isOnline = commState == "1";
deviceUpdates.Add((deviceId, isOnline));
processedCount++;
}
catch (Exception itemEx)
{
_logger.LogError(itemEx, "解析单个设备通信状态失败: {Item}", item?.ToString());
errorCount++;
}
}
}
else
{
_logger.LogWarning("通信状态消息Content不是数组格式: {Content}", contentToken.ToString());
return Models.ProcessResult.Error("通信状态消息格式错误");
}
// 批量更新Redis中的设备通信状态
if (deviceUpdates.Count > 0)
{
int savedCount = 0;
foreach (var (deviceId, isOnline) in deviceUpdates)
{
try
{
// 从Redis获取现有的设备信息
var existingDevice = await _protectionDeviceCommInfoRedis.HashSetGetOneAsync(redisKey, deviceId);
if (existingDevice != null)
{
// 只更新IsOnline字段
existingDevice.IsOnline = isOnline;
await _protectionDeviceCommInfoRedis.HashSetUpdateOneAsync(redisKey, deviceId, existingDevice);
savedCount++;
}
else
{
_logger.LogWarning("Redis中未找到设备信息,设备ID: {DeviceId}", deviceId);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "更新Redis中设备通信状态失败,设备ID: {DeviceId}", deviceId);
}
}
_logger.LogInformation("通信状态消息处理完成 - 解析成功: {ProcessedCount}, 保存成功: {SavedCount}, 错误: {ErrorCount}",
processedCount, savedCount, errorCount);
if (savedCount != deviceUpdates.Count)
{
_logger.LogWarning("部分设备通信状态保存失败 - 预期: {Expected}, 实际: {Actual}", deviceUpdates.Count, savedCount);
}
// 保持原有的缓存服务调用以确保兼容性
var deviceStates = deviceUpdates.ToDictionary(x => x.deviceId, x => x.isOnline ? "1" : "0");
await _deviceCommunicationStateService.SetDeviceCommunicationStatesAsync(deviceStates);
// WebSocket实时推送
try
{
// 推送到COMMSTATE分组
await _webSocketPushService.PushCommStateDataAsync(contentToken);
}
catch (Exception pushEx)
{
_logger.LogWarning(pushEx, "通信状态数据WebSocket推送失败,但不影响主要处理流程");
}
}
else
{
_logger.LogWarning("没有有效的设备通信状态数据需要保存");
}
return Models.ProcessResult.Success(contentToken, "COMMSTATE");
}
catch (Exception ex)
{
_logger.LogError(ex, "处理通信状态消息时发生异常");
return Models.ProcessResult.Error($"通信状态消息处理异常: {ex.Message}");
}
}
///
/// 处理Alert(报警)消息
///
/// 消息内容
/// 原始JSON字符串
/// 处理结果
private async Task ProcessAlertMessageAsync(JToken contentToken, string originalJson)
{
try
{
// 输入验证
if (contentToken == null)
{
_logger.LogWarning("Alert message content is null");
return Models.ProcessResult.Error("Alert message content is null");
}
// 反序列化告警数据
List alertDatas;
try
{
alertDatas = contentToken.ToObject>();
if (alertDatas == null || alertDatas.Count == 0)
{
_logger.LogWarning("Alert message contains no valid alert data");
return Models.ProcessResult.Success(contentToken, "ALERT");
}
}
catch (JsonException jsonEx)
{
_logger.LogError(jsonEx, "Failed to deserialize alert data from content token");
return Models.ProcessResult.Error($"Invalid alert data format: {jsonEx.Message}");
}
// 处理每个告警数据 - 使用并发处理提高性能
var alarmTasks = alertDatas
.Where(alertData => alertData != null && !string.IsNullOrEmpty(alertData.AlertType))
.Select(async alertData =>
{
try
{
string json = JsonConvert.SerializeObject(alertData);
await HandleAlarmMessageAsync(json);
return true;
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to process individual alert data: {AlertId}", alertData?.AlertID);
return false;
}
});
var alarmResults = await Task.WhenAll(alarmTasks);
var successCount = alarmResults.Count(r => r);
var failureCount = alarmResults.Length - successCount;
if (failureCount > 0)
{
_logger.LogWarning("Alert processing completed with {SuccessCount} successes and {FailureCount} failures",
successCount, failureCount);
}
// 处理故障报告类型的告警
//await ProcessFaultReportsAsync(alertDatas);
//报警上传
await _webApiRequest.UploadAlertMessageAsync(alertDatas);
//报警推送到孪生体
await _thingService.UpdateAlarmDatas(alertDatas);
// WebSocket实时推送 - 异步执行,不阻塞主流程
_ = Task.Run(async () =>
{
try
{
await _webSocketPushService.PushAlertDataAsync(contentToken);
}
catch (Exception pushEx)
{
_logger.LogWarning(pushEx, "告警数据WebSocket推送失败,但不影响主要处理流程");
}
});
return Models.ProcessResult.Success(contentToken, "ALERT");
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing Alert message");
return Models.ProcessResult.Error($"Alert processing error: {ex.Message}");
}
}
///
/// 处理故障报告类型的告警
///
/// 告警数据列表
/// 处理任务
private async Task ProcessFaultReportsAsync(List alertDatas)
{
try
{
var faultReports = alertDatas
.Where(t => !string.IsNullOrEmpty(t?.AlertType) && t.AlertType == "故障报告")
.ToList();
if (faultReports.Count > 0)
{
var jtoken = JToken.FromObject(faultReports);
HandleRecvMsg(jtoken, "faultRpt");
}
else
{
//_logger.LogInformation("No fault reports found in alert data");
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing fault reports");
// 不抛出异常,避免影响主流程
}
}
///
/// 处理Version(版本)消息
///
/// 消息内容
/// 处理结果
private async Task ProcessVersionMessageAsync(JToken contentToken)
{
try
{
HandleRecvMsg(contentToken, "VERSION");
if (contentToken is JArray contentArray && contentArray.Count > 0)
{
var processedCount = 0;
var errorCount = 0;
var validVersionDataList = new List();
foreach (var item in contentArray)
{
try
{
var versionData = item.ToObject();
validVersionDataList.Add(versionData);
}
catch (Exception itemEx)
{
_logger.LogError(itemEx, "处理单个版本信息失败: {Item}", item?.ToString());
errorCount++;
}
}
_logger.LogInformation("版本消息处理完成 - 成功: {ProcessedCount}, 错误: {ErrorCount}",
processedCount, errorCount);
}
else
{
_logger.LogWarning("Version message content is not a valid array or is empty");
}
return Models.ProcessResult.Success(contentToken, "VERSION");
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing Version message");
return Models.ProcessResult.Error($"Version processing error: {ex.Message}");
}
}
///
/// 处理Error(错误)消息
///
/// 消息内容
/// 原始JSON字符串
/// 处理结果
private async Task ProcessErrorMessageAsync(JToken contentToken, string originalJson)
{
try
{
if (contentToken is JArray contentArray && contentArray.Count > 0)
{
foreach (var item in contentArray)
{
var errorData = item.ToString();
HandleRecvMsg(item, "ERROR", true);
_logger.LogError("处理单个错误信息: {errorData}", errorData);
}
//_logger.LogInformation("错误消息处理完成 - 总数: {ProcessedCount}, 严重错误: {CriticalCount}",
// processedCount, criticalErrorCount);
}
return Models.ProcessResult.Success(contentToken, "ERROR");
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing Error message");
return Models.ProcessResult.Error($"Error processing error: {ex.Message}");
}
}
///
/// 映射内部消息类型到查询类型
///
/// 内部消息类型
/// 查询类型
private string MapInternalTypeToQueryType(string internalType)
{
return internalType?.ToUpper() switch
{
"FAULTRPT" => "faultRpt",
"VERSION" => "version",
"WAVECFG" => "waveCfg",
"WAVEDATA" => "waveDat",
"DZ" => "dz",
"ERROR" => "error",
_ => internalType?.ToLower() ?? string.Empty
};
}
private void HandleRecvMsg(JToken token, string type, bool isError = false)
{
// 检查是否有外部查询正在进行,如果有则信号查询完成
if (_queryCoordinationService.IsExternalQueryInProgress)
{
var status = _queryCoordinationService.GetStatus();
if (!string.IsNullOrEmpty(status.CurrentQueryId))
{
if (isError)
{
_logger.LogInformation("检测到错误消息,完成外部查询: {QueryId}", status.CurrentQueryId);
_queryCoordinationService.CompleteExternalQuery(status.CurrentQueryId, token.ToString());
}
else
{
// 映射内部类型到查询类型进行比较
var mappedType = type.ToLower();
var currentQueryType = _queryCoordinationService.CurrentQueryType?.ToLower();
_logger.LogInformation("消息类型匹配检查 - 接收类型: {ReceivedType}, 映射类型: {MappedType}, 期望类型: {ExpectedType}",
type, mappedType, currentQueryType);
if (mappedType == currentQueryType)
{
_logger.LogInformation("检测到匹配消息类型 {MessageType},完成外部查询: {QueryId}",
mappedType, status.CurrentQueryId);
_queryCoordinationService.CompleteExternalQuery(status.CurrentQueryId, token.ToString());
}
else
{
_logger.LogInformation("消息类型不匹配 - 接收: {ReceivedType}({MappedType}), 期望: {ExpectedType}",
type, mappedType, currentQueryType);
}
}
}
else
{
_logger.LogWarning("外部查询正在进行但查询ID为空");
}
}
else
{
//_logger.LogInformation("没有外部查询正在进行,忽略消息类型: {MessageType}", type);
}
}
///
/// 处理HeartBeat(心跳)消息
///
/// 消息内容
/// 处理结果
private async Task ProcessHeartBeatMessageAsync(JToken contentToken)
{
try
{
if (contentToken is JArray contentArray && contentArray.Count > 0)
{
foreach (var item in contentArray)
{
var heartBeatData = item.ToString();
}
}
else
{
_logger.LogWarning("HeartBeat message content is not a valid array or is empty");
}
return Models.ProcessResult.Success(contentToken, "HEARTBEAT");
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing HeartBeat message");
return Models.ProcessResult.Error($"HeartBeat processing error: {ex.Message}");
}
}
///
/// 处理WaveCfg(波形配置)消息
///
/// 消息内容
/// 处理结果
private async Task ProcessWaveCfgMessageAsync(JToken contentToken)
{
try
{
_logger.LogInformation("Processing WaveCfg (wave configuration) message");
HandleRecvMsg(contentToken, "WAVECFG");
if (contentToken is JArray contentArray && contentArray.Count > 0)
{
var processedCount = 0;
var errorCount = 0;
var validWaveCfgDataList = new List();
foreach (var item in contentArray)
{
try
{
var waveCfgData = item.ToObject();
if (waveCfgData != null && waveCfgData.IsValid())
{
_logger.LogInformation("波形配置: {WaveCfgData}", waveCfgData.ToString());
validWaveCfgDataList.Add(waveCfgData);
processedCount++;
}
else
{
_logger.LogWarning("查询中,请稍后使用ftp获取: {Item}", item.ToString());
errorCount++;
}
}
catch (Exception itemEx)
{
_logger.LogError(itemEx, "处理单个波形配置失败: {Item}", item?.ToString());
errorCount++;
}
}
_logger.LogInformation("波形配置消息处理完成 - 成功: {ProcessedCount}, 错误: {ErrorCount}",
processedCount, errorCount);
}
else
{
_logger.LogWarning("WaveCfg message content is not a valid array or is empty");
}
return Models.ProcessResult.Success(contentToken, "WAVECFG");
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing WaveCfg message");
return Models.ProcessResult.Error($"WaveCfg processing error: {ex.Message}");
}
}
///
/// 处理WaveData(波形数据)消息
///
/// 消息内容
/// 处理结果
private async Task ProcessWaveDataMessageAsync(JToken contentToken)
{
try
{
HandleRecvMsg(contentToken, "WAVEDATA");
_logger.LogInformation("Processing WaveData (wave data) message");
if (contentToken is JArray contentArray && contentArray.Count > 0)
{
var processedCount = 0;
var errorCount = 0;
var totalDataSize = 0;
var validWaveDataList = new List();
foreach (var item in contentArray)
{
try
{
var waveDataInfo = item.ToObject();
validWaveDataList.Add(waveDataInfo);
}
catch (Exception itemEx)
{
_logger.LogError(itemEx, "处理单个波形数据失败: {Item}", item?.ToString());
errorCount++;
}
}
_logger.LogInformation("波形数据消息处理完成 - 成功: {ProcessedCount}, 错误: {ErrorCount}, 总数据量: {TotalSize}字节",
processedCount, errorCount, totalDataSize);
}
else
{
_logger.LogWarning("WaveData message content is not a valid array or is empty");
}
return Models.ProcessResult.Success(contentToken, "WAVEDATA");
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing WaveData message");
return Models.ProcessResult.Error($"WaveData processing error: {ex.Message}");
}
}
///
/// 处理AllData(总召结果)消息
///
/// 消息内容
/// 处理结果
private async Task ProcessAllDataMessageAsync(JToken contentToken)
{
try
{
_logger.LogInformation("Processing AllData (general interrogation result) message");
//总召返回字符串,内容为总召状态
if (contentToken is JArray contentArray && contentArray.Count > 0)
{
foreach (var item in contentArray)
{
_logger.LogInformation("总召结果: {Item}", item.ToString());
}
}
return Models.ProcessResult.Success(contentToken, "AllData");
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing AllData message");
return Models.ProcessResult.Error($"AllData processing error: {ex.Message}");
}
}
///
/// 处理网关通信数据消息,数据每小时发一次
///
/// 消息内容
/// 处理结果
private async Task ProcessGwErrorRatioMessageAsync(JToken contentToken)
{
return await _gwErrorRatioService.TranselateData(contentToken);
}
#endregion
///
/// 处理报警消息
///
/// 报警JSON数据
/// 处理任务
public async Task HandleAlarmMessageAsync(string json)
{
if (string.IsNullOrWhiteSpace(json))
{
_logger.LogWarning("Attempted to process null or empty alarm message");
return;
}
try
{
// 异步上传报警消息
var uploadResult = await _alarmService.UploadAlarmMessageAsync(json);
if (uploadResult)
{
//_logger.LogDebug("Alarm message uploaded successfully");
}
else
{
//_logger.LogWarning("Alarm message upload returned false - may be queued for retry");
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error handling alarm message: {Json}", json);
// 不重新抛出异常,避免影响上层调用
}
}
///
/// 处理命令响应
///
/// 响应内容
/// 是否为报警数据
/// 处理任务
public async Task HandleCommandResponseAsync(string content, bool isAlarmData)
{
try
{
// 报警数据是主动推送的,不需要处理命令响应
if (isAlarmData)
{
return;
}
var currentCommand = CommandStateMachine.CurrentCommand;
if (currentCommand == null)
{
return;
}
_logger.LogWarning("Detected error response: {Content}", content);
// 检查是否有外部查询正在进行,如果有则信号查询完成
if (_queryCoordinationService.IsExternalQueryInProgress)
{
var status = _queryCoordinationService.GetStatus();
if (!string.IsNullOrEmpty(status.CurrentQueryId))
{
_logger.LogInformation("检测到错误响应,完成外部查询: {QueryId}", status.CurrentQueryId);
_queryCoordinationService.CompleteExternalQuery(status.CurrentQueryId, content);
}
}
// 错误响应也视为命令完成,解除命令锁定
if (CommandStateMachine.CompleteCommand())
{
_logger.LogInformation("Command '{CommandName}' completed with error response: {Response}",
currentCommand.CommandName, content);
}
else
{
_logger.LogWarning("Failed to complete command '{CommandName}' with error response: {Response}",
currentCommand.CommandName, content);
}
await Task.CompletedTask;
return;
await Task.CompletedTask;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error handling command response: {Content}", content);
}
}
public async Task ProcessDataAsync(ReadOnlyMemory data)
{
int contentLength = 0;
string content = "";
string errorMsg = "解析失败";
JToken? jcontent = null;
string type = "Error";
if (_messageParser.TryParseMessage(data.ToArray(),out contentLength, out content, out errorMsg, out jcontent, out type))
{
await ProcessMessageByTypeAsync(type, jcontent, content);
return ProcessResult.Success(jcontent, type);
}
return ProcessResult.Error(errorMsg);
}
///
/// 预处理DZ数据项,处理Value字段可能包含非数字值的情况
///
/// 原始JSON数据项
/// 处理后的JSON数据项
private JToken PreprocessDZDataItem(JToken item)
{
try
{
if (item is JObject jObject && jObject["Value"] != null)
{
var valueToken = jObject["Value"];
var valueString = valueToken?.ToString();
// 检查Value是否为非数字值(如"退出"等中文字符)
if (!string.IsNullOrEmpty(valueString) && !decimal.TryParse(valueString, out _))
{
_logger.LogWarning("DZ数据Value字段包含非数字值: {Value}", valueString);
// 将非数字值替换为0
jObject["Value"] = valueString;
}
}
return item;
}
catch (Exception ex)
{
_logger.LogError(ex, "预处理DZ数据项时发生错误");
return item; // 返回原始数据,让后续处理决定如何处理
}
}
}
}