1052 lines
44 KiB
C#
Raw Normal View History

2025-07-16 09:20:13 +08:00
using System;
using System.Collections.Generic;
2025-07-31 18:51:24 +08:00
using System.IO;
2025-07-16 09:20:13 +08:00
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json.Linq;
using YunDa.Server.ISMSTcp.Interfaces;
using YunDa.Server.ISMSTcp.Models;
using YunDa.Server.ISMSTcp.Filters.Interfaces;
using YunDa.Server.ISMSTcp.Filters.Enums;
using YunDa.Server.ISMSTcp.Filters.Extensions;
using YunDa.Server.Communication.Framework;
using YunDa.Server.Communication.CommunicationModels;
2025-07-31 18:51:24 +08:00
using Serilog;
using Serilog.Core;
using Serilog.Events;
using System.Threading;
using DotNetty.Common.Utilities;
using Newtonsoft.Json;
using YunDa.SOMS.DataTransferObject.MaintenanceAndOperations.SecondaryEquipment;
2025-07-16 09:20:13 +08:00
namespace YunDa.Server.ISMSTcp.Services
{
/// <summary>
2025-07-31 18:51:24 +08:00
/// 数据处理器实现
///
/// 重构说明:
/// - 移除了不必要的 _dataBuffer 缓冲区逻辑,因为当前实现可以正确解析完整的数据包
/// - 简化了数据处理流程,直接处理接收到的数据而不需要缓冲不完整的数据包
/// - 保留了数据包解析和验证的核心逻辑
/// - 移除了相关的线程安全保护,因为不再需要管理共享的缓冲区状态
2025-07-16 09:20:13 +08:00
/// </summary>
public class DataProcessor
2025-07-16 09:20:13 +08:00
{
2025-07-31 18:51:24 +08:00
private readonly ILogger<DataProcessor> _logger;
private readonly MessageParser _messageParser;
2025-07-31 18:51:24 +08:00
private readonly IAlarmService _alarmService;
private readonly IPacketParser _packetParser;
private readonly TelemeteringHandle _telemeteringHandle;
private readonly TelesignalisationHandle _telesignalisationHandle;
private readonly VirtualTerminalHandler _virtualTerminalHandler;
private readonly IFilterControlService _filterControlService;
private readonly IDeviceCommunicationStateService _deviceCommunicationStateService;
private readonly IQueryCoordinationService _queryCoordinationService;
private readonly IWebSocketPushService _webSocketPushService;
2025-07-31 18:51:24 +08:00
// 高频数据处理优化常量
private const int MaxDataSize = 50000; // 单次处理的最大数据大小50KB
private const int CriticalDataSize = 100000; // 临界数据大小100KB
private const int MaxPacketsPerBatch = 100; // 单批次最大处理数据包数量
private const int MinHeaderSize = 6; // 最小头部大小
// 性能统计
private long _totalProcessedBytes = 0;
private int _totalProcessedPackets = 0;
private int _discardedDataCount = 0;
public DataProcessor(
ILogger<DataProcessor> logger,
MessageParser messageParser,
2025-07-31 18:51:24 +08:00
IAlarmService alarmService,
ICommandStateMachine commandStateMachine,
IPacketParser packetParser,
TelemeteringHandle telemeteringHandle,
TelesignalisationHandle telesignalisationHandle,
VirtualTerminalHandler virtualTerminalHandler,
IFilterControlService filterControlService,
IDeviceCommunicationStateService deviceCommunicationStateService,
IQueryCoordinationService queryCoordinationService,
IWebSocketPushService webSocketPushService)
2025-07-16 09:20:13 +08:00
{
2025-07-31 18:51:24 +08:00
_logger = logger;
_messageParser = messageParser;
_alarmService = alarmService;
CommandStateMachine = commandStateMachine;
_packetParser = packetParser ?? throw new ArgumentNullException(nameof(packetParser));
_telemeteringHandle = telemeteringHandle ?? throw new ArgumentNullException(nameof(telemeteringHandle));
_telesignalisationHandle = telesignalisationHandle ?? throw new ArgumentNullException(nameof(telesignalisationHandle));
_virtualTerminalHandler = virtualTerminalHandler ?? throw new ArgumentNullException(nameof(virtualTerminalHandler));
_filterControlService = filterControlService ?? throw new ArgumentNullException(nameof(filterControlService));
_deviceCommunicationStateService = deviceCommunicationStateService ?? throw new ArgumentNullException(nameof(deviceCommunicationStateService));
_queryCoordinationService = queryCoordinationService ?? throw new ArgumentNullException(nameof(queryCoordinationService));
_webSocketPushService = webSocketPushService ?? throw new ArgumentNullException(nameof(webSocketPushService));
2025-07-31 18:51:24 +08:00
}
2025-07-16 09:20:13 +08:00
2025-07-31 18:51:24 +08:00
/// <summary>
/// 命令状态机
/// </summary>
public ICommandStateMachine CommandStateMachine { get; }
/// <summary>
/// TCP响应接收事件
/// 当成功接收并解析TCP响应数据时触发
/// </summary>
public event Action? TcpResponseReceived;
2025-07-31 18:51:24 +08:00
/// <summary>
/// 根据消息类型处理消息内容
2025-07-31 18:51:24 +08:00
/// </summary>
/// <param name="messageType">消息类型</param>
/// <param name="contentToken">消息内容</param>
/// <param name="originalJson">原始JSON字符串</param>
2025-07-31 18:51:24 +08:00
/// <returns>处理结果</returns>
private async Task<Models.ProcessResult?> ProcessMessageByTypeAsync(string messageType, JToken contentToken, string originalJson)
2025-07-31 18:51:24 +08:00
{
try
{
// 记录消息处理开始
var parsedMessageType = ISMSMessageTypeExtensions.ParseMessageType(messageType);
_logger.LogInformationWithFilter(_filterControlService, parsedMessageType,
"开始处理消息类型: {MessageType}", messageType);
// 触发TCP响应接收事件
TcpResponseReceived?.Invoke();
Models.ProcessResult result = messageType.ToUpper() switch
2025-07-31 18:51:24 +08:00
{
"YC" => await ProcessYCMessageAsync(contentToken) ,
"YX" => await ProcessYXMessageAsync(contentToken),
"VA" => await ProcessVAMessageAsync(contentToken),
"DZ" => await ProcessDZMessageAsync(contentToken),
"COMMSTATE" => await ProcessCommStateMessageAsync(contentToken),
"ALERT" => await ProcessAlertMessageAsync(contentToken, originalJson),
"VERSION" => await ProcessVersionMessageAsync(contentToken),
"ERROR" => await ProcessErrorMessageAsync(contentToken, originalJson),
"HEARTBEAT" => await ProcessHeartBeatMessageAsync(contentToken),
"WAVECFG" => await ProcessWaveCfgMessageAsync(contentToken),
"WAVEDATA" => await ProcessWaveDataMessageAsync(contentToken),
"FAULTRPT" => await ProcessFaultRptMessageAsync(contentToken),
"ALLDATA" => await ProcessAllDataMessageAsync(contentToken),
_ => ProcessUnknownMessageType(messageType, originalJson)
};
return result;
2025-07-16 09:20:13 +08:00
}
2025-07-31 18:51:24 +08:00
catch (Exception ex)
2025-07-16 09:20:13 +08:00
{
var errorMessageType = ISMSMessageTypeExtensions.ParseMessageType(messageType);
_logger.LogErrorWithFilter(_filterControlService, errorMessageType, ex,
"处理消息类型 {MessageType} 时发生异常", messageType);
return Models.ProcessResult.Error($"Message processing error for type {messageType}: {ex.Message}");
2025-07-16 09:20:13 +08:00
}
}
private async Task<ProcessResult> ProcessFaultRptMessageAsync(JToken contentToken)
{
HandleRecvMsg(contentToken, "FAULTRPT");
return Models.ProcessResult.Success(contentToken, "FAULTRPT");
}
/// <summary>
/// 处理未知消息类型
/// </summary>
/// <param name="messageType">消息类型</param>
/// <param name="originalJson">原始JSON</param>
/// <returns>处理结果</returns>
private Models.ProcessResult ProcessUnknownMessageType(string messageType, string originalJson)
{
var unknownMessageType = ISMSMessageTypeExtensions.ParseMessageType(messageType);
_logger.LogWarningWithFilter(_filterControlService, unknownMessageType,
"未知消息类型: {MessageType}", messageType);
return Models.ProcessResult.Success(default,messageType);
}
2025-07-16 09:20:13 +08:00
#region
2025-07-31 18:51:24 +08:00
/// <summary>
/// 处理YC遥测消息
2025-07-31 18:51:24 +08:00
/// </summary>
/// <param name="contentToken">消息内容</param>
/// <returns>处理结果</returns>
private async Task<Models.ProcessResult> ProcessYCMessageAsync(JToken contentToken)
2025-07-16 09:20:13 +08:00
{
2025-07-31 18:51:24 +08:00
try
2025-07-16 09:20:13 +08:00
{
if (!_telemeteringHandle._isInitialized)
2025-07-31 18:51:24 +08:00
{
await _telemeteringHandle.InitAsync();
}
_logger.LogInformation("Processing YC (telemetering) message");
2025-07-16 09:20:13 +08:00
if (contentToken is JArray contentArray && contentArray.Count > 0)
{
// 处理遥测数据
foreach (var item in contentArray)
2025-07-31 18:51:24 +08:00
{
await _telemeteringHandle.ProcessYCDataAsync(item);
2025-07-31 18:51:24 +08:00
}
}
else
{
_logger.LogWarning("YC message content is not a valid array or is empty");
2025-07-31 18:51:24 +08:00
}
return Models.ProcessResult.Success( contentToken,"YC");
2025-07-31 18:51:24 +08:00
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing YC message");
return Models.ProcessResult.Error($"YC processing error: {ex.Message}");
2025-07-31 18:51:24 +08:00
}
}
2025-07-16 09:20:13 +08:00
2025-07-31 18:51:24 +08:00
/// <summary>
/// 处理YX遥信消息
2025-07-31 18:51:24 +08:00
/// </summary>
/// <param name="contentToken">消息内容</param>
/// <returns>处理结果</returns>
private async Task<Models.ProcessResult> ProcessYXMessageAsync(JToken contentToken)
2025-07-31 18:51:24 +08:00
{
try
2025-07-16 09:20:13 +08:00
{
if (!_telesignalisationHandle._isInitialized)
{
await _telesignalisationHandle.InitAsync();
}
_logger.LogInformation("Processing YX (telesignalisation) message");
2025-07-16 09:20:13 +08:00
if (contentToken is JArray contentArray && contentArray.Count > 0)
2025-07-31 18:51:24 +08:00
{
// 处理遥信数据
foreach (var item in contentArray)
2025-07-31 18:51:24 +08:00
{
await _telesignalisationHandle.ProcessYXDataAsync(item);
2025-07-31 18:51:24 +08:00
}
2025-07-16 09:20:13 +08:00
}
else
2025-07-31 18:51:24 +08:00
{
_logger.LogWarning("YX message content is not a valid array or is empty");
2025-07-16 09:20:13 +08:00
}
return Models.ProcessResult.Success(contentToken, "YX");
2025-07-31 18:51:24 +08:00
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing YX message");
return Models.ProcessResult.Error($"YX processing error: {ex.Message}");
2025-07-31 18:51:24 +08:00
}
2025-07-16 09:20:13 +08:00
}
2025-07-31 18:51:24 +08:00
/// <summary>
/// 处理VA虚端子消息
2025-07-31 18:51:24 +08:00
/// </summary>
/// <param name="contentToken">消息内容</param>
/// <returns>处理结果</returns>
private async Task<Models.ProcessResult> ProcessVAMessageAsync(JToken contentToken)
2025-07-16 09:20:13 +08:00
{
2025-07-31 18:51:24 +08:00
try
2025-07-16 09:20:13 +08:00
{
_logger.LogInformation("Processing VA (virtual terminal) message");
2025-07-31 18:51:24 +08:00
if (contentToken is JArray contentArray && contentArray.Count > 0)
2025-07-16 09:20:13 +08:00
{
foreach (var item in contentArray)
{
if (item is JObject itemObject)
{
if (_virtualTerminalHandler.TryParseVirtualTerminalData(item.ToString(), out var virtualTerminalData))
{
await _virtualTerminalHandler.ProcessVirtualTerminalDataAsync(virtualTerminalData);
}
else
{
_logger.LogWarning("Failed to parse virtual terminal data: {Data}", item.ToString());
}
}
}
}
else
{
_logger.LogWarning("VA message content is not a valid array or is empty");
2025-07-16 09:20:13 +08:00
}
return Models.ProcessResult.Success(contentToken, "VA");
2025-07-31 18:51:24 +08:00
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing VA message");
return Models.ProcessResult.Error($"VA processing error: {ex.Message}");
2025-07-31 18:51:24 +08:00
}
2025-07-16 09:20:13 +08:00
}
2025-07-31 18:51:24 +08:00
/// <summary>
/// 处理DZ定值消息
2025-07-31 18:51:24 +08:00
/// </summary>
/// <param name="contentToken">消息内容</param>
/// <returns>处理结果</returns>
private async Task<Models.ProcessResult> ProcessDZMessageAsync(JToken contentToken)
2025-07-16 09:20:13 +08:00
{
2025-07-31 18:51:24 +08:00
try
2025-07-16 09:20:13 +08:00
{
_logger.LogInformation("Processing DZ (device setting) message");
2025-07-16 09:20:13 +08:00
if (contentToken is JArray contentArray && contentArray.Count > 0)
2025-07-31 18:51:24 +08:00
{
var processedCount = 0;
var errorCount = 0;
var validDzDataList = new List<DZData>();
foreach (var item in contentArray)
{
try
{
// 预处理JSON数据处理Value字段可能包含非数字值的情况
var processedItem = PreprocessDZDataItem(item);
var dzData = processedItem.ToObject<DZData>();
if (dzData != null)
{
validDzDataList.Add(dzData);
}
}
catch (Exception itemEx)
{
_logger.LogError(itemEx, "处理单个定值数据失败: {Item}", item?.ToString());
}
}
_logger.LogInformation("定值消息处理完成 - 成功: {ProcessedCount}, 错误: {ErrorCount}",
processedCount, errorCount);
_logger.LogError("DZ Data: {DzData}", JsonConvert.SerializeObject(validDzDataList));
HandleCommandResponseAsync(contentToken.ToString(), false);
HandleRecvMsg(contentToken, "dz", false);
2025-07-31 18:51:24 +08:00
}
else
2025-07-31 18:51:24 +08:00
{
_logger.LogWarning("DZ message content is not a valid array or is empty");
2025-07-31 18:51:24 +08:00
}
return Models.ProcessResult.Success(contentToken, "DZ");
2025-07-31 18:51:24 +08:00
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing DZ message");
return Models.ProcessResult.Error($"DZ processing error: {ex.Message}");
2025-07-16 09:20:13 +08:00
}
}
2025-07-31 18:51:24 +08:00
/// <summary>
/// 处理CommState通信状态消息
2025-07-31 18:51:24 +08:00
/// </summary>
/// <param name="contentToken">消息内容</param>
/// <returns>处理结果</returns>
private async Task<Models.ProcessResult> ProcessCommStateMessageAsync(JToken contentToken)
2025-07-16 09:20:13 +08:00
{
try
{
_logger.LogInformation("开始处理通信状态消息");
// 解析通信状态消息内容
if (contentToken == null || !contentToken.HasValues)
2025-07-31 18:51:24 +08:00
{
_logger.LogWarning("通信状态消息内容为空");
return Models.ProcessResult.Error("通信状态消息内容为空");
2025-07-31 18:51:24 +08:00
}
var deviceStates = new Dictionary<string, string>();
int processedCount = 0;
int errorCount = 0;
// 解析Content数组中的每个设备通信状态信息
if (contentToken is JArray contentArray)
2025-07-31 18:51:24 +08:00
{
foreach (var item in contentArray)
{
try
{
var deviceId = item["DeviceID"]?.ToString();
var commState = item["CommState"]?.ToString();
2025-07-31 18:51:24 +08:00
if (string.IsNullOrWhiteSpace(deviceId))
{
_logger.LogWarning("设备ID为空跳过此条记录: {Item}", item.ToString());
errorCount++;
continue;
}
if (string.IsNullOrWhiteSpace(commState))
{
_logger.LogWarning("通信状态为空设备ID: {DeviceId},跳过此条记录", deviceId);
errorCount++;
continue;
}
deviceStates[deviceId] = commState;
processedCount++;
_logger.LogDebug("解析设备通信状态 - 设备ID: {DeviceId}, 状态: {CommState}", deviceId, commState);
}
catch (Exception itemEx)
{
_logger.LogError(itemEx, "解析单个设备通信状态失败: {Item}", item?.ToString());
errorCount++;
}
}
}
else
2025-07-31 18:51:24 +08:00
{
_logger.LogWarning("通信状态消息Content不是数组格式: {Content}", contentToken.ToString());
return Models.ProcessResult.Error("通信状态消息格式错误");
2025-07-31 18:51:24 +08:00
}
// 批量保存设备通信状态到缓存
if (deviceStates.Count > 0)
2025-07-31 18:51:24 +08:00
{
var savedCount = await _deviceCommunicationStateService.SetDeviceCommunicationStatesAsync(deviceStates);
_logger.LogInformation("通信状态消息处理完成 - 解析成功: {ProcessedCount}, 保存成功: {SavedCount}, 错误: {ErrorCount}",
processedCount, savedCount, errorCount);
if (savedCount != deviceStates.Count)
2025-07-31 18:51:24 +08:00
{
_logger.LogWarning("部分设备通信状态保存失败 - 预期: {Expected}, 实际: {Actual}", deviceStates.Count, savedCount);
2025-07-31 18:51:24 +08:00
}
// WebSocket实时推送
try
{
// 推送到COMMSTATE分组
await _webSocketPushService.PushCommStateDataAsync(contentToken);
_logger.LogDebug("通信状态数据WebSocket推送成功");
}
catch (Exception pushEx)
2025-07-31 18:51:24 +08:00
{
_logger.LogWarning(pushEx, "通信状态数据WebSocket推送失败但不影响主要处理流程");
2025-07-31 18:51:24 +08:00
}
}
else
{
_logger.LogWarning("没有有效的设备通信状态数据需要保存");
2025-07-31 18:51:24 +08:00
}
return Models.ProcessResult.Success(contentToken, "COMMSTATE");
2025-07-16 09:20:13 +08:00
}
2025-07-31 18:51:24 +08:00
catch (Exception ex)
2025-07-16 09:20:13 +08:00
{
_logger.LogError(ex, "处理通信状态消息时发生异常");
return Models.ProcessResult.Error($"通信状态消息处理异常: {ex.Message}");
2025-07-16 09:20:13 +08:00
}
}
2025-07-31 18:51:24 +08:00
/// <summary>
/// 处理Alert报警消息
2025-07-31 18:51:24 +08:00
/// </summary>
/// <param name="contentToken">消息内容</param>
/// <param name="originalJson">原始JSON字符串</param>
/// <returns>处理结果</returns>
private async Task<Models.ProcessResult> ProcessAlertMessageAsync(JToken contentToken, string originalJson)
2025-07-16 09:20:13 +08:00
{
try
2025-07-16 09:20:13 +08:00
{
_logger.LogInformation("Processing Alert message");
2025-07-16 09:20:13 +08:00
// 输入验证
if (contentToken == null)
2025-07-31 18:51:24 +08:00
{
_logger.LogWarning("Alert message content is null");
return Models.ProcessResult.Error("Alert message content is null");
2025-07-31 18:51:24 +08:00
}
2025-07-16 09:20:13 +08:00
// 反序列化告警数据
List<AlertData> alertDatas;
try
{
alertDatas = contentToken.ToObject<List<AlertData>>();
if (alertDatas == null || alertDatas.Count == 0)
{
_logger.LogWarning("Alert message contains no valid alert data");
2025-07-31 18:51:24 +08:00
return Models.ProcessResult.Success(contentToken, "ALERT");
}
}
catch (JsonException jsonEx)
2025-07-16 09:20:13 +08:00
{
_logger.LogError(jsonEx, "Failed to deserialize alert data from content token");
return Models.ProcessResult.Error($"Invalid alert data format: {jsonEx.Message}");
2025-07-16 09:20:13 +08:00
}
2025-07-31 18:51:24 +08:00
_logger.LogDebug("Processing {Count} alert messages", alertDatas.Count);
2025-07-31 18:51:24 +08:00
// 处理每个告警数据 - 使用并发处理提高性能
var alarmTasks = alertDatas
.Where(alertData => alertData != null && !string.IsNullOrEmpty(alertData.AlertType))
.Select(async alertData =>
{
try
{
string json = JsonConvert.SerializeObject(alertData);
await HandleAlarmMessageAsync(json);
return true;
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to process individual alert data: {AlertId}", alertData?.AlertID);
return false;
}
});
2025-07-31 18:51:24 +08:00
var alarmResults = await Task.WhenAll(alarmTasks);
var successCount = alarmResults.Count(r => r);
var failureCount = alarmResults.Length - successCount;
2025-07-31 18:51:24 +08:00
if (failureCount > 0)
{
_logger.LogWarning("Alert processing completed with {SuccessCount} successes and {FailureCount} failures",
successCount, failureCount);
}
2025-07-31 18:51:24 +08:00
// 处理故障报告类型的告警
await ProcessFaultReportsAsync(alertDatas);
// WebSocket实时推送 - 异步执行,不阻塞主流程
_ = Task.Run(async () =>
{
try
{
await _webSocketPushService.PushAlertDataAsync(contentToken);
_logger.LogDebug("告警数据WebSocket推送成功");
}
catch (Exception pushEx)
{
_logger.LogWarning(pushEx, "告警数据WebSocket推送失败但不影响主要处理流程");
}
});
_logger.LogInformation("Alert message processing completed successfully - Total: {Total}, Success: {Success}, Failed: {Failed}",
alertDatas.Count, successCount, failureCount);
return Models.ProcessResult.Success(contentToken, "ALERT");
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing Alert message");
return Models.ProcessResult.Error($"Alert processing error: {ex.Message}");
}
}
2025-07-31 18:51:24 +08:00
/// <summary>
/// 处理故障报告类型的告警
2025-07-31 18:51:24 +08:00
/// </summary>
/// <param name="alertDatas">告警数据列表</param>
/// <returns>处理任务</returns>
private async Task ProcessFaultReportsAsync(List<AlertData> alertDatas)
2025-07-31 18:51:24 +08:00
{
try
{
var faultReports = alertDatas
.Where(t => !string.IsNullOrEmpty(t?.AlertType) && t.AlertType == "故障报告")
.ToList();
if (faultReports.Count > 0)
2025-07-16 09:20:13 +08:00
{
_logger.LogDebug("Processing {Count} fault reports", faultReports.Count);
2025-07-31 18:51:24 +08:00
var jtoken = JToken.FromObject(faultReports);
HandleRecvMsg(jtoken, "faultRpt");
_logger.LogDebug("Fault reports processed successfully");
}
else
{
_logger.LogDebug("No fault reports found in alert data");
}
2025-07-16 09:20:13 +08:00
}
2025-07-31 18:51:24 +08:00
catch (Exception ex)
{
_logger.LogError(ex, "Error processing fault reports");
// 不抛出异常,避免影响主流程
2025-07-31 18:51:24 +08:00
}
}
2025-07-16 09:20:13 +08:00
2025-07-31 18:51:24 +08:00
/// <summary>
/// 处理Version版本消息
2025-07-31 18:51:24 +08:00
/// </summary>
/// <param name="contentToken">消息内容</param>
/// <returns>处理结果</returns>
private async Task<Models.ProcessResult> ProcessVersionMessageAsync(JToken contentToken)
2025-07-31 18:51:24 +08:00
{
try
2025-07-16 09:20:13 +08:00
{
_logger.LogInformation("Processing Version message");
HandleRecvMsg(contentToken, "VERSION");
if (contentToken is JArray contentArray && contentArray.Count > 0)
2025-07-31 18:51:24 +08:00
{
var processedCount = 0;
var errorCount = 0;
var validVersionDataList = new List<IsmsDeviceItem>();
foreach (var item in contentArray)
2025-07-31 18:51:24 +08:00
{
try
{
var versionData = item.ToObject<IsmsDeviceItem>();
validVersionDataList.Add(versionData);
}
catch (Exception itemEx)
{
_logger.LogError(itemEx, "处理单个版本信息失败: {Item}", item?.ToString());
errorCount++;
}
2025-07-31 18:51:24 +08:00
}
_logger.LogInformation("版本消息处理完成 - 成功: {ProcessedCount}, 错误: {ErrorCount}",
processedCount, errorCount);
2025-07-31 18:51:24 +08:00
}
else
2025-07-31 18:51:24 +08:00
{
_logger.LogWarning("Version message content is not a valid array or is empty");
2025-07-31 18:51:24 +08:00
}
return Models.ProcessResult.Success(contentToken, "VERSION");
2025-07-16 09:20:13 +08:00
}
2025-07-31 18:51:24 +08:00
catch (Exception ex)
2025-07-16 09:20:13 +08:00
{
_logger.LogError(ex, "Error processing Version message");
return Models.ProcessResult.Error($"Version processing error: {ex.Message}");
2025-07-16 09:20:13 +08:00
}
}
2025-07-31 18:51:24 +08:00
/// <summary>
/// 处理Error错误消息
2025-07-31 18:51:24 +08:00
/// </summary>
/// <param name="contentToken">消息内容</param>
/// <param name="originalJson">原始JSON字符串</param>
/// <returns>处理结果</returns>
private async Task<Models.ProcessResult> ProcessErrorMessageAsync(JToken contentToken, string originalJson)
2025-07-16 09:20:13 +08:00
{
try
{
_logger.LogWarning("Processing Error message");
if (contentToken is JArray contentArray && contentArray.Count > 0)
{
var processedCount = 0;
var criticalErrorCount = 0;
2025-07-16 09:20:13 +08:00
foreach (var item in contentArray)
{
var errorData = item.ToString();
HandleRecvMsg(item, "ERROR", true);
_logger.LogError("处理单个错误信息失败: {errorData}", errorData);
}
2025-07-31 18:51:24 +08:00
_logger.LogInformation("错误消息处理完成 - 总数: {ProcessedCount}, 严重错误: {CriticalCount}",
processedCount, criticalErrorCount);
}
return Models.ProcessResult.Success(contentToken, "ERROR");
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing Error message");
return Models.ProcessResult.Error($"Error processing error: {ex.Message}");
}
}
2025-07-31 18:51:24 +08:00
/// <summary>
/// 映射内部消息类型到查询类型
2025-07-31 18:51:24 +08:00
/// </summary>
/// <param name="internalType">内部消息类型</param>
/// <returns>查询类型</returns>
private string MapInternalTypeToQueryType(string internalType)
2025-07-16 09:20:13 +08:00
{
return internalType?.ToUpper() switch
{
"FAULTRPT" => "faultRpt",
"VERSION" => "version",
"WAVECFG" => "waveCfg",
"WAVEDATA" => "waveDat",
"DZ" => "dz",
"ERROR" => "error",
_ => internalType?.ToLower() ?? string.Empty
};
2025-07-16 09:20:13 +08:00
}
private void HandleRecvMsg(JToken token, string type, bool isError = false)
2025-07-16 09:20:13 +08:00
{
// 检查是否有外部查询正在进行,如果有则信号查询完成
if (_queryCoordinationService.IsExternalQueryInProgress)
2025-07-31 18:51:24 +08:00
{
var status = _queryCoordinationService.GetStatus();
if (!string.IsNullOrEmpty(status.CurrentQueryId))
{
if (isError)
{
_logger.LogInformation("检测到错误消息,完成外部查询: {QueryId}", status.CurrentQueryId);
_queryCoordinationService.CompleteExternalQuery(status.CurrentQueryId, token.ToString());
}
else
{
// 映射内部类型到查询类型进行比较
var mappedType = type.ToLower();
var currentQueryType = _queryCoordinationService.CurrentQueryType?.ToLower();
_logger.LogDebug("消息类型匹配检查 - 接收类型: {ReceivedType}, 映射类型: {MappedType}, 期望类型: {ExpectedType}",
type, mappedType, currentQueryType);
if (mappedType == currentQueryType)
{
_logger.LogInformation("检测到匹配消息类型 {MessageType},完成外部查询: {QueryId}",
mappedType, status.CurrentQueryId);
_queryCoordinationService.CompleteExternalQuery(status.CurrentQueryId, token.ToString());
}
else
{
_logger.LogDebug("消息类型不匹配 - 接收: {ReceivedType}({MappedType}), 期望: {ExpectedType}",
type, mappedType, currentQueryType);
}
}
}
else
{
_logger.LogWarning("外部查询正在进行但查询ID为空");
}
2025-07-31 18:51:24 +08:00
}
else
2025-07-31 18:51:24 +08:00
{
_logger.LogDebug("没有外部查询正在进行,忽略消息类型: {MessageType}", type);
2025-07-31 18:51:24 +08:00
}
}
/// <summary>
/// 处理HeartBeat心跳消息
/// </summary>
/// <param name="contentToken">消息内容</param>
/// <returns>处理结果</returns>
private async Task<Models.ProcessResult> ProcessHeartBeatMessageAsync(JToken contentToken)
{
try
2025-07-31 18:51:24 +08:00
{
_logger.LogDebug("Processing HeartBeat message");
if (contentToken is JArray contentArray && contentArray.Count > 0)
{
foreach (var item in contentArray)
{
var heartBeatData = item.ToString();
}
}
else
{
_logger.LogWarning("HeartBeat message content is not a valid array or is empty");
}
return Models.ProcessResult.Success(contentToken, "HEARTBEAT");
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing HeartBeat message");
return Models.ProcessResult.Error($"HeartBeat processing error: {ex.Message}");
2025-07-31 18:51:24 +08:00
}
2025-07-16 09:20:13 +08:00
}
2025-07-31 18:51:24 +08:00
/// <summary>
/// 处理WaveCfg波形配置消息
2025-07-31 18:51:24 +08:00
/// </summary>
/// <param name="contentToken">消息内容</param>
/// <returns>处理结果</returns>
private async Task<Models.ProcessResult> ProcessWaveCfgMessageAsync(JToken contentToken)
2025-07-16 09:20:13 +08:00
{
2025-07-31 18:51:24 +08:00
try
2025-07-16 09:20:13 +08:00
{
_logger.LogInformation("Processing WaveCfg (wave configuration) message");
HandleRecvMsg(contentToken, "WAVECFG");
if (contentToken is JArray contentArray && contentArray.Count > 0)
2025-07-31 18:51:24 +08:00
{
var processedCount = 0;
var errorCount = 0;
var validWaveCfgDataList = new List<WaveCfgData>();
2025-07-31 18:51:24 +08:00
foreach (var item in contentArray)
{
try
{
var waveCfgData = item.ToObject<WaveCfgData>();
if (waveCfgData != null && waveCfgData.IsValid())
{
_logger.LogInformation("波形配置: {WaveCfgData}", waveCfgData.ToString());
_logger.LogDebug("采样时长: {Duration}秒", waveCfgData.GetSampleDuration());
2025-07-31 18:51:24 +08:00
validWaveCfgDataList.Add(waveCfgData);
processedCount++;
}
else
{
_logger.LogWarning("无效的波形配置: {Item}", item.ToString());
errorCount++;
}
}
catch (Exception itemEx)
{
_logger.LogError(itemEx, "处理单个波形配置失败: {Item}", item?.ToString());
errorCount++;
}
}
2025-07-31 18:51:24 +08:00
_logger.LogInformation("波形配置消息处理完成 - 成功: {ProcessedCount}, 错误: {ErrorCount}",
processedCount, errorCount);
}
else
{
_logger.LogWarning("WaveCfg message content is not a valid array or is empty");
2025-07-31 18:51:24 +08:00
}
return Models.ProcessResult.Success(contentToken, "WAVECFG");
2025-07-31 18:51:24 +08:00
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing WaveCfg message");
return Models.ProcessResult.Error($"WaveCfg processing error: {ex.Message}");
2025-07-16 09:20:13 +08:00
}
}
2025-07-31 18:51:24 +08:00
/// <summary>
/// 处理WaveData波形数据消息
2025-07-31 18:51:24 +08:00
/// </summary>
/// <param name="contentToken">消息内容</param>
2025-07-31 18:51:24 +08:00
/// <returns>处理结果</returns>
private async Task<Models.ProcessResult> ProcessWaveDataMessageAsync(JToken contentToken)
2025-07-16 09:20:13 +08:00
{
try
{
HandleRecvMsg(contentToken, "WAVEDATA");
_logger.LogInformation("Processing WaveData (wave data) message");
if (contentToken is JArray contentArray && contentArray.Count > 0)
{
var processedCount = 0;
var errorCount = 0;
var totalDataSize = 0;
var validWaveDataList = new List<WaveDataInfo>();
2025-07-16 09:20:13 +08:00
foreach (var item in contentArray)
{
try
{
var waveDataInfo = item.ToObject<WaveDataInfo>();
validWaveDataList.Add(waveDataInfo);
}
catch (Exception itemEx)
{
_logger.LogError(itemEx, "处理单个波形数据失败: {Item}", item?.ToString());
errorCount++;
}
}
2025-07-31 18:51:24 +08:00
_logger.LogInformation("波形数据消息处理完成 - 成功: {ProcessedCount}, 错误: {ErrorCount}, 总数据量: {TotalSize}字节",
processedCount, errorCount, totalDataSize);
}
else
{
_logger.LogWarning("WaveData message content is not a valid array or is empty");
}
2025-07-31 18:51:24 +08:00
return Models.ProcessResult.Success(contentToken, "WAVEDATA");
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing WaveData message");
return Models.ProcessResult.Error($"WaveData processing error: {ex.Message}");
}
}
2025-07-31 18:51:24 +08:00
/// <summary>
/// 处理AllData总召结果消息
/// </summary>
/// <param name="contentToken">消息内容</param>
/// <returns>处理结果</returns>
private async Task<Models.ProcessResult> ProcessAllDataMessageAsync(JToken contentToken)
{
2025-07-31 18:51:24 +08:00
try
2025-07-16 09:20:13 +08:00
{
_logger.LogInformation("Processing AllData (general interrogation result) message");
2025-07-31 18:51:24 +08:00
//总召返回字符串,内容为总召状态
if (contentToken is JArray contentArray && contentArray.Count > 0)
2025-07-31 18:51:24 +08:00
{
foreach (var item in contentArray)
{
_logger.LogInformation("总召结果: {Item}", item.ToString());
}
2025-07-31 18:51:24 +08:00
}
return Models.ProcessResult.Success(contentToken, "AllData");
2025-07-31 18:51:24 +08:00
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing AllData message");
return Models.ProcessResult.Error($"AllData processing error: {ex.Message}");
2025-07-16 09:20:13 +08:00
}
2025-07-31 18:51:24 +08:00
}
2025-07-16 09:20:13 +08:00
#endregion
2025-07-31 18:51:24 +08:00
/// <summary>
/// 处理报警消息
2025-07-31 18:51:24 +08:00
/// </summary>
/// <param name="json">报警JSON数据</param>
/// <returns>处理任务</returns>
public async Task HandleAlarmMessageAsync(string json)
2025-07-16 09:20:13 +08:00
{
if (string.IsNullOrWhiteSpace(json))
{
_logger.LogWarning("Attempted to process null or empty alarm message");
return;
}
2025-07-31 18:51:24 +08:00
try
2025-07-16 09:20:13 +08:00
{
_logger.LogDebug("Processing alarm message: {Json}", json);
2025-07-31 18:51:24 +08:00
// 异步上传报警消息
var uploadResult = await _alarmService.UploadAlarmMessageAsync(json);
2025-07-31 18:51:24 +08:00
if (uploadResult)
2025-07-16 09:20:13 +08:00
{
_logger.LogDebug("Alarm message uploaded successfully");
2025-07-31 18:51:24 +08:00
}
else
2025-07-16 09:20:13 +08:00
{
_logger.LogWarning("Alarm message upload returned false - may be queued for retry");
2025-07-16 09:20:13 +08:00
}
2025-07-31 18:51:24 +08:00
}
catch (Exception ex)
{
_logger.LogError(ex, "Error handling alarm message: {Json}", json);
// 不重新抛出异常,避免影响上层调用
2025-07-16 09:20:13 +08:00
}
2025-07-31 18:51:24 +08:00
}
/// <summary>
/// 处理命令响应
2025-07-31 18:51:24 +08:00
/// </summary>
/// <param name="content">响应内容</param>
/// <param name="isAlarmData">是否为报警数据</param>
/// <returns>处理任务</returns>
public async Task HandleCommandResponseAsync(string content, bool isAlarmData)
2025-07-31 18:51:24 +08:00
{
try
2025-07-16 09:20:13 +08:00
{
// 报警数据是主动推送的,不需要处理命令响应
if (isAlarmData)
{
_logger.LogDebug("Skipping command response handling for alarm data");
return;
}
2025-07-31 18:51:24 +08:00
var currentCommand = CommandStateMachine.CurrentCommand;
if (currentCommand == null)
2025-07-16 09:20:13 +08:00
{
return;
}
2025-07-31 18:51:24 +08:00
_logger.LogWarning("Detected error response: {Content}", content);
// 检查是否有外部查询正在进行,如果有则信号查询完成
if (_queryCoordinationService.IsExternalQueryInProgress)
{
var status = _queryCoordinationService.GetStatus();
if (!string.IsNullOrEmpty(status.CurrentQueryId))
{
_logger.LogInformation("检测到错误响应,完成外部查询: {QueryId}", status.CurrentQueryId);
_queryCoordinationService.CompleteExternalQuery(status.CurrentQueryId, content);
}
2025-07-16 09:20:13 +08:00
}
// 错误响应也视为命令完成,解除命令锁定
if (CommandStateMachine.CompleteCommand())
2025-07-16 09:20:13 +08:00
{
_logger.LogInformation("Command '{CommandName}' completed with error response: {Response}",
currentCommand.CommandName, content);
2025-07-16 09:20:13 +08:00
}
else
{
_logger.LogWarning("Failed to complete command '{CommandName}' with error response: {Response}",
currentCommand.CommandName, content);
}
await Task.CompletedTask;
return;
2025-07-31 18:51:24 +08:00
await Task.CompletedTask;
2025-07-31 18:51:24 +08:00
}
catch (Exception ex)
{
_logger.LogError(ex, "Error handling command response: {Content}", content);
2025-07-16 09:20:13 +08:00
}
2025-07-31 18:51:24 +08:00
}
2025-07-16 09:20:13 +08:00
public async Task<ProcessResult> ProcessDataAsync(ReadOnlyMemory<byte> data)
2025-07-31 18:51:24 +08:00
{
int contentLength = 0;
string content = "";
string errorMsg = "解析失败";
JToken? jcontent = null;
string type = "Error";
if (_messageParser.TryParseMessage(data.ToArray(),out contentLength, out content, out errorMsg, out jcontent, out type))
{
await ProcessMessageByTypeAsync(type, jcontent, content);
return ProcessResult.Success(jcontent, type);
}
return ProcessResult.Error(errorMsg);
2025-07-16 09:20:13 +08:00
}
2025-07-31 18:51:24 +08:00
/// <summary>
/// 预处理DZ数据项处理Value字段可能包含非数字值的情况
2025-07-31 18:51:24 +08:00
/// </summary>
/// <param name="item">原始JSON数据项</param>
/// <returns>处理后的JSON数据项</returns>
private JToken PreprocessDZDataItem(JToken item)
2025-07-16 09:20:13 +08:00
{
2025-07-31 18:51:24 +08:00
try
{
if (item is JObject jObject && jObject["Value"] != null)
{
var valueToken = jObject["Value"];
var valueString = valueToken?.ToString();
// 检查Value是否为非数字值如"退出"等中文字符)
if (!string.IsNullOrEmpty(valueString) && !decimal.TryParse(valueString, out _))
{
_logger.LogWarning("DZ数据Value字段包含非数字值: {Value}将设置为0", valueString);
// 将非数字值替换为0
jObject["Value"] = 0;
}
}
return item;
2025-07-31 18:51:24 +08:00
}
catch (Exception ex)
{
_logger.LogError(ex, "预处理DZ数据项时发生错误");
return item; // 返回原始数据,让后续处理决定如何处理
2025-07-31 18:51:24 +08:00
}
2025-07-16 09:20:13 +08:00
}
2025-07-31 18:51:24 +08:00
2025-07-16 09:20:13 +08:00
}
}
2025-07-31 18:51:24 +08:00