531 lines
18 KiB
C#
Raw Normal View History

2025-07-16 09:20:13 +08:00
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json.Linq;
using YunDa.Server.ISMSTcp.Interfaces;
using YunDa.Server.ISMSTcp.Models;
namespace YunDa.Server.ISMSTcp.Services
{
/// <summary>
/// 数据处理器实现
/// </summary>
public class DataProcessor : IDataProcessor
{
private readonly ILogger<DataProcessor> _logger;
private readonly IMessageParser _messageParser;
private readonly IAlarmService _alarmService;
public DataProcessor(
ILogger<DataProcessor> logger,
IMessageParser messageParser,
IAlarmService alarmService,
ICommandStateMachine commandStateMachine)
{
_logger = logger;
_messageParser = messageParser;
_alarmService = alarmService;
CommandStateMachine = commandStateMachine;
}
/// <summary>
/// 命令状态机
/// </summary>
public ICommandStateMachine CommandStateMachine { get; }
// 用于存储不完整的数据包
private byte[] _dataBuffer = Array.Empty<byte>();
/// <summary>
/// 处理接收到的数据
/// </summary>
/// <param name="data">原始数据</param>
/// <returns>处理结果</returns>
public async Task<ProcessResult> ProcessDataAsync(ReadOnlyMemory<byte> data)
{
try
{
// 将新接收的数据追加到现有缓冲区
var newBuffer = data.ToArray();
var combinedBuffer = CombineBuffers(_dataBuffer, newBuffer);
// 如果缓冲区太小无法包含完整的头部6字节则等待更多数据
if (combinedBuffer.Length < 6)
{
_dataBuffer = combinedBuffer;
return ProcessResult.Error("Incomplete header, waiting for more data", 0);
}
// 尝试解析数据包
var (processedBytes, results) = await ProcessPacketsAsync(combinedBuffer);
// 更新缓冲区,保留未处理的数据
if (processedBytes < combinedBuffer.Length)
{
_dataBuffer = combinedBuffer.Skip(processedBytes).ToArray();
}
else
{
_dataBuffer = Array.Empty<byte>();
}
// 如果成功处理了至少一个数据包,返回成功结果
if (results.Count > 0)
{
var lastResult = results.Last();
return lastResult;
}
// 如果没有处理任何数据包,返回错误结果
return ProcessResult.Error("No complete packets found", 0);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing data");
return ProcessResult.Error(ex.Message, 0);
}
}
/// <summary>
/// 合并两个字节数组
/// </summary>
/// <param name="buffer1">第一个缓冲区</param>
/// <param name="buffer2">第二个缓冲区</param>
/// <returns>合并后的缓冲区</returns>
private byte[] CombineBuffers(byte[] buffer1, byte[] buffer2)
{
if (buffer1.Length == 0)
return buffer2;
if (buffer2.Length == 0)
return buffer1;
var result = new byte[buffer1.Length + buffer2.Length];
Buffer.BlockCopy(buffer1, 0, result, 0, buffer1.Length);
Buffer.BlockCopy(buffer2, 0, result, buffer1.Length, buffer2.Length);
return result;
}
/// <summary>
/// 处理缓冲区中的所有完整数据包
/// </summary>
/// <param name="buffer">数据缓冲区</param>
/// <returns>处理的字节数和处理结果列表</returns>
private async Task<(int ProcessedBytes, List<ProcessResult> Results)> ProcessPacketsAsync(byte[] buffer)
{
int processedBytes = 0;
var results = new List<ProcessResult>();
while (processedBytes < buffer.Length)
{
// 确保剩余数据足够包含头部6字节
if (buffer.Length - processedBytes < 6)
break;
// 提取并解析头部6字节长度指示器
var headerBytes = new byte[6];
Buffer.BlockCopy(buffer, processedBytes, headerBytes, 0, 6);
string header = Encoding.UTF8.GetString(headerBytes);
// 解析数字长度
if (!int.TryParse(header, out int contentLength) || contentLength < 0)
{
_logger.LogWarning("Invalid header format: {Header}", header);
// 跳过一个字节并继续尝试解析
processedBytes++;
continue;
}
// 计算完整数据包所需的字节数头部6字节 + 内容 + 尾部2字节
int requiredBytes = 6 + contentLength + 2;
// 检查是否有足够的数据形成完整的数据包
if (buffer.Length - processedBytes < requiredBytes)
break;
// 提取完整的数据包
var packetBytes = new byte[requiredBytes];
Buffer.BlockCopy(buffer, processedBytes, packetBytes, 0, requiredBytes);
// 处理单个完整的数据包
var result = await ProcessSinglePacketAsync(packetBytes);
if (result.IsValid)
{
results.Add(result);
processedBytes += requiredBytes;
}
else
{
// 如果处理失败,跳过一个字节并继续尝试
processedBytes++;
}
}
return (processedBytes, results);
}
/// <summary>
/// 处理单个完整的数据包
/// </summary>
/// <param name="packetBytes">完整的数据包字节</param>
/// <returns>处理结果</returns>
private async Task<ProcessResult> ProcessSinglePacketAsync(byte[] packetBytes)
{
if (_messageParser.TryParseMessage(packetBytes, out int contentLength, out string content))
{
using (_logger.BeginScope(new Dictionary<string, object> { ["LogCategory"] = "Communication" }))
{
_logger.LogDebug("Parsed message with content length: {Length}", contentLength);
}
// 检查并处理命令超时
CommandStateMachine.CheckAndHandleTimeout();
bool isAlarmData = false;
// 处理不同类型的内容
if (_messageParser.TryParseJToken(content, out var token))
{
if (token is JObject jObject)
{
// 检查是否为报警数据
isAlarmData = jObject.TryGetValue("AlertID", out var alertIdToken) &&
Guid.TryParse(alertIdToken.ToString(), out _);
await HandleJsonObjectAsync(content);
await HandleCommandResponseAsync(content, isAlarmData);
return ProcessResult.Success(false, content, 6 + contentLength + 2);
}
else if (token is JArray)
{
await HandleJsonArrayAsync(content);
await HandleCommandResponseAsync(content, isAlarmData);
return ProcessResult.Success(true, content, 6 + contentLength + 2);
}
}
// 处理为普通字符串
await HandleStringAsync(content);
await HandleCommandResponseAsync(content, isAlarmData);
return ProcessResult.Success(false, content, 6 + contentLength + 2);
}
return ProcessResult.Error("Invalid message format", 0);
}
/// <summary>
/// 处理JSON对象
/// </summary>
/// <param name="json">JSON字符串</param>
/// <returns>处理任务</returns>
public async Task HandleJsonObjectAsync(string json)
{
try
{
_logger.LogInformation("Processing JSON object: {Json}", json);
if (_messageParser.TryParseJToken(json, out var token) && token is JObject jObject)
{
// 检查是否为报警消息包含AlertID
if (jObject.TryGetValue("AlertID", out var alertIdToken) &&
Guid.TryParse(alertIdToken.ToString(), out _))
{
await HandleAlarmMessageAsync(json);
}
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error handling JSON object: {Json}", json);
}
}
/// <summary>
/// 处理JSON数组
/// </summary>
/// <param name="json">JSON字符串</param>
/// <returns>处理任务</returns>
public async Task HandleJsonArrayAsync(string json)
{
try
{
_logger.LogInformation("Processing JSON array: {Json}", json);
// 可以在这里添加特定的JSON数组处理逻辑
await Task.CompletedTask;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error handling JSON array: {Json}", json);
}
}
/// <summary>
/// 处理字符串数据
/// </summary>
/// <param name="content">字符串内容</param>
/// <returns>处理任务</returns>
public async Task HandleStringAsync(string content)
{
try
{
_logger.LogInformation("Processing string content: {Content}", content);
// 尝试解析特殊格式的错误消息
if (_messageParser.TryParseSpecialErrorFormat(content, out string parsedErrorContent))
{
_logger.LogWarning("Detected special error format, converted to JSON: {Original} -> {Parsed}",
content, parsedErrorContent);
// 将解析后的JSON格式错误消息作为报警处理
await HandleAlarmMessageAsync(parsedErrorContent);
}
await Task.CompletedTask;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error handling string: {Content}", content);
}
}
/// <summary>
/// 处理报警消息
/// </summary>
/// <param name="json">报警JSON数据</param>
/// <returns>处理任务</returns>
public async Task HandleAlarmMessageAsync(string json)
{
try
{
_logger.LogWarning("Processing alarm message: {Json}", json);
// 异步上传报警消息
try
{
await _alarmService.UploadAlarmMessageAsync(json);
_logger.LogInformation("Alarm message uploaded successfully");
}
catch (Exception uploadEx)
{
_logger.LogError(uploadEx, "Failed to upload alarm message");
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error handling alarm message: {Json}", json);
}
}
/// <summary>
/// 处理命令响应
/// </summary>
/// <param name="content">响应内容</param>
/// <param name="isAlarmData">是否为报警数据</param>
/// <returns>处理任务</returns>
public async Task HandleCommandResponseAsync(string content, bool isAlarmData)
{
try
{
// 报警数据是主动推送的,不需要处理命令响应
if (isAlarmData)
{
_logger.LogDebug("Skipping command response handling for alarm data");
return;
}
// 检查是否有待处理的命令
if (!CommandStateMachine.HasPendingCommand)
{
_logger.LogDebug("No pending command for response: {Content}", content);
return;
}
var currentCommand = CommandStateMachine.CurrentCommand;
if (currentCommand == null)
{
return;
}
// 检查是否为错误响应(包含"执行失败"或"Error"关键词)
bool isErrorResponse = IsErrorResponse(content);
if (isErrorResponse)
{
_logger.LogWarning("Detected error response: {Content}", content);
// 错误响应也视为命令完成,解除命令锁定
if (CommandStateMachine.CompleteCommand())
{
_logger.LogInformation("Command '{CommandName}' completed with error response: {Response}",
currentCommand.CommandName, content);
}
else
{
_logger.LogWarning("Failed to complete command '{CommandName}' with error response: {Response}",
currentCommand.CommandName, content);
}
await Task.CompletedTask;
return;
}
// 尝试从响应内容中提取命令名称进行匹配
string? responseCommandName = ExtractCommandNameFromResponse(content);
if (CommandStateMachine.CompleteCommand(responseCommandName))
{
_logger.LogInformation("Command '{CommandName}' completed successfully with response: {Response}",
currentCommand.CommandName, content);
}
else
{
_logger.LogWarning("Failed to complete command '{CommandName}' with response: {Response}",
currentCommand.CommandName, content);
}
await Task.CompletedTask;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error handling command response: {Content}", content);
}
}
/// <summary>
/// 检查响应是否为错误响应
/// </summary>
/// <param name="content">响应内容</param>
/// <returns>是否为错误响应</returns>
private bool IsErrorResponse(string content)
{
if (string.IsNullOrEmpty(content))
{
return false;
}
// 检查是否匹配特殊错误格式
if (_messageParser.TryParseSpecialErrorFormat(content, out _))
{
_logger.LogDebug("Detected special error format in response: {Content}", content);
return true;
}
// 检查JSON格式的错误响应
if (_messageParser.TryParseJToken(content, out var token) && token is JObject jObject)
{
// 检查JSON中是否包含错误字段
if (jObject.TryGetValue("ErrorCode", out _) ||
jObject.TryGetValue("error", out _) ||
jObject.TryGetValue("Error", out _) ||
jObject.TryGetValue("errorMessage", out _))
{
_logger.LogDebug("Found error field in JSON response: {Content}", content);
return true;
}
}
// 定义错误关键词列表
var errorKeywords = new[]
{
"执行失败",
"Error",
"error",
"ERROR",
"失败",
"异常",
"Exception",
"exception",
"EXCEPTION",
"AutoCallDeviceDZJSON Error",
"找不到对应的装置对象"
};
// 检查内容是否包含任何错误关键词
foreach (var keyword in errorKeywords)
{
if (content.Contains(keyword, StringComparison.OrdinalIgnoreCase))
{
_logger.LogDebug("Found error keyword '{Keyword}' in response: {Content}", keyword, content);
return true;
}
}
return false;
}
/// <summary>
/// 从响应内容中提取命令名称
/// </summary>
/// <param name="content">响应内容</param>
/// <returns>命令名称</returns>
private string? ExtractCommandNameFromResponse(string content)
{
try
{
// 对于错误响应,尝试从错误信息中提取命令名称
if (IsErrorResponse(content))
{
// 检查是否为特殊格式的错误消息
if (_messageParser.TryParseSpecialErrorFormat(content, out string parsedErrorContent))
{
// 从解析后的JSON中提取错误代码作为命令标识
if (_messageParser.TryParseJToken(parsedErrorContent, out var errorToken) && errorToken is JObject errorObj)
{
if (errorObj.TryGetValue("ErrorCode", out var errorCodeToken))
{
_logger.LogDebug("Extracted error code '{ErrorCode}' from special error format: {Content}",
errorCodeToken.ToString(), content);
return errorCodeToken.ToString();
}
}
}
// 如果是管道分隔的错误响应,尝试提取第一部分作为命令名称
if (content.Contains('|'))
{
var parts = content.Split('|', StringSplitOptions.RemoveEmptyEntries);
if (parts.Length > 0)
{
return parts[0].Trim();
}
}
// 对于纯文本错误响应返回null让系统使用当前命令名称进行匹配
_logger.LogDebug("Error response detected, using current command for completion: {Content}", content);
return null;
}
// 尝试从管道分隔的字符串中提取命令名称
if (content.Contains('|'))
{
var parts = content.Split('|', StringSplitOptions.RemoveEmptyEntries);
if (parts.Length > 0)
{
return parts[0].Trim();
}
}
// 如果是JSON尝试从JSON中提取命令相关信息
if (_messageParser.TryParseJToken(content, out var token) && token is JObject jObject)
{
// 可以根据实际的JSON结构来提取命令名称
// 这里提供一个通用的实现
if (jObject.TryGetValue("command", out var commandToken) ||
jObject.TryGetValue("cmd", out commandToken) ||
jObject.TryGetValue("type", out commandToken))
{
return commandToken.ToString();
}
}
return null;
}
catch (Exception ex)
{
_logger.LogDebug(ex, "Failed to extract command name from response: {Content}", content);
return null;
}
}
}
}