2025-07-08 14:01:10 +08:00

444 lines
17 KiB
C#
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using DotNetty.Buffers;
using DotNetty.Transport.Channels;
using Newtonsoft.Json.Linq;
using Serilog;
using System;
using System.Collections;
using System.Diagnostics;
using System.Net;
using System.Text;
using System.Threading.Channels;
using System.Threading.Tasks;
using Yunda.SOMS.OperationsMainSiteGatewayServer.TcpSocket.Models;
using Yunda.SOMS.OperationsMainSiteGatewayServer.TcpSocket.TestData;
using YunDa.SOMS.DataTransferObject.GeneralInformation.ProtectionDeviceInfoDto;
//using static System.Runtime.InteropServices.JavaScript.JSType;
namespace Yunda.SOMS.OperationsMainSiteGatewayServer.TcpSocket.Server
{
public class DotNettyServerHandler : SimpleChannelInboundHandler<IByteBuffer>
{
// 定义设备信息类
private class DeviceInfo
{
public bool Status { get; set; } // 设备状态(运行或离线)
public DateTime LastUpdate { get; set; } // 最后更新时间
public int OfflineCount { get; set; } // 连续离线计数
}
public IChannelHandlerContext _context;
// 定义事件,事件处理程序包含消息和功能描述
private readonly Action<byte, byte[], byte> _onMessageReceived;
private readonly Action<string> _channelInactiveCallback;
private DateTime _deviceRunStateTime = DateTime.Now;
byte _serverAddr;
public DotNettyServerHandler(
Action<byte, byte[], byte> messageReceived, Action<string> channelInactiveCallback , byte addr)
{
_serverAddr = addr;
_onMessageReceived = messageReceived;
_channelInactiveCallback = channelInactiveCallback;
Task.Factory.StartNew(async () =>
{
while (true)
{
CheckAndUpdateDeviceStates();
await Task.Delay(1000);
}
}, TaskCreationOptions.LongRunning);
}
byte heartbeatDelayCount = 0;
byte heartbeatCount = 0;
async void CheckAndUpdateDeviceStates()
{
try
{
if (_context == null || !_context.Channel.Open || !_context.Channel.Active)
{
return;
}
if (heartbeatDelayCount ==0)
{
await HandleHeartbeat(_context, _serverAddr, heartbeatCount);
if ((DateTime.Now - _deviceRunStateTime).TotalSeconds > 65)
{
Log.Warning($" {_context.Channel.RemoteAddress.ToString()} 判定为离线超过65秒未更新");
await _context.CloseAsync();
}
}
heartbeatDelayCount++;
if (heartbeatDelayCount == 30)
{
heartbeatDelayCount = 0;
}
}
catch (Exception ex)
{
Log.Error(ex,$"状态检测");
}
}
bool IsConfirm = false;
//int sequence = 15;
//void SendTestData()
//{
// var data = SimulateData.SendProductionInformation();
// // 创建一个应用控制字实例
// ApplicationControlWord controlWord = new ApplicationControlWord(fir: true, fin: true, con: true, sequence: sequence);
// // 将控制字转换为字节
// byte controlByte = controlWord.ToByte();
// Log.Information($"Control Byte: 0x{controlByte:X2}"); // 输出控制字的十六进制表示
// SendCustomMessageAsync(_serverAddr, controlByte, 1, data);
// sequence++;
// if (sequence == 15)
// {
// sequence = 31;
// }
//}
public async override void ChannelActive(IChannelHandlerContext context)
{
try
{
string clientIp = context.Channel.RemoteAddress.ToString();
Log.Information($"客户端主动连接:{clientIp}");
//await HandleHeartbeat(context, _serverAddr, heartbeatCount);
base.ChannelActive(context);
}
catch (Exception ex)
{
Log.Information($"{ex.StackTrace}");
}
}
public override void ChannelInactive(IChannelHandlerContext context)
{
try
{
string clientIp = context.Channel.RemoteAddress.ToString();
Log.Information($"客户端主动断开连接:{clientIp}");
_channelInactiveCallback(clientIp);
base.ChannelInactive(context);
}
catch (Exception ex)
{
Log.Error(ex,$"客户端主动断开连接错误");
}
}
protected async override void ChannelRead0(IChannelHandlerContext ctx, IByteBuffer msg)
{
_context = ctx;
try
{
// 将 IByteBuffer 转换为字节数组
byte[] dataSrc = new byte[msg.ReadableBytes];
msg.GetBytes(msg.ReaderIndex, dataSrc);
// 转换为十六进制字符串
string hexString = BitConverter.ToString(dataSrc).Replace("-", " ");
Log.Information($"收到 Hex Data: {hexString}");
//// 确保释放 ByteBufNetty 的缓冲区需要手动释放)
//msg.Release();
byte startByte = msg.ReadByte();
ushort length = msg.ReadUnsignedShort();
byte address = msg.ReadByte();
if (length == 6) //心跳
{
HandleHeatbeatInfo(msg, address, ctx);
return;
}
byte controlWord = msg.ReadByte();
byte functionType = msg.ReadByte();
// 计算数据长度
int dataLength = length - 6;
if (msg.ReadableBytes < dataLength)
{
// 数据不足,等待更多数据
return;
}
HanleRecvConfirmInfo(ctx, controlWord);
byte[] data = new byte[dataLength];
msg.ReadBytes(data);
Log.Information($"主站地址: {address} 功能码:{functionType} 数据长度:{dataLength} 数据:{BitConverter.ToString(data).Replace("-", "")}");
Task.Run(() => _onMessageReceived?.Invoke(address, data, functionType));
HandleFunctionCodeAsync(functionType, address, controlWord, data, ctx);
}
catch (Exception ex)
{
Log.Error(ex,$"Error in ChannelRead0: {ex.StackTrace}");
}
}
private async void HanleRecvConfirmInfo(IChannelHandlerContext ctx,byte controlWord)
{
int bit5 = (controlWord >> 5) & 1;
if (bit5 == 1)
{
// 提取低四位
byte lower4Bits = (byte)(controlWord & 0x1F); // 使用掩码 0x0F00001111保留低四位
// 创建一个应用控制字实例
ApplicationControlWord controlWordObj = new ApplicationControlWord(fir: true, fin: true, con: false, sequence: lower4Bits);
// 将控制字转换为字节
byte controlByte = controlWordObj.ToByte();
await SendCustomByteMessageAsync( _serverAddr, controlByte, 0, 0);
}
}
private void HandleHeatbeatInfo(IByteBuffer msg, byte address, IChannelHandlerContext ctx)
{
byte handleHearfunctionType = msg.ReadByte();
byte handleHeardata = msg.ReadByte();
if (handleHearfunctionType == 7)
{
string clientIp = ctx.Channel.RemoteAddress.ToString();
Log.Information($"接受到心跳信息:地址:{address} 序列号:{handleHeardata}");
_deviceRunStateTime = DateTime.Now;
}
}
private async Task HandleFunctionCodeAsync(byte functionType, byte address, byte controlWord, byte[] data, IChannelHandlerContext ctx)
{
switch (functionType)
{
case 0: //确认报文
IsConfirm = true;
Log.Information("确认报文");
//
break;
case 10:
// 信息召唤
Log.Information("信息召唤");
//SendTestData();
break;
default:
//MonitoringEventBus.LogHandler("未知的功能码", "103客户端信息");
break;
}
}
/// <summary>
/// 心跳信息
/// </summary>
/// <param name="context"></param>
/// <param name="address"></param>
/// <param name="data"></param>
/// <returns></returns>
private async Task HandleHeartbeat(IChannelHandlerContext context, byte address, byte data)
{
if (context == null|| !context.Channel.Open|| !context.Channel.Active)
{
return;
}
try
{
// 构建数据帧
byte[] buffer = new byte[6];
// 填入启动字符
buffer[0] = 0x68;
// 填入长度假设数据部分为50字节2字节长度字段+1字节地址+1字节标志位
byte dataLength = 6;
buffer[1] = 0; // 长度低字节
buffer[2] = dataLength; // 长度高字节
// 填入地址、应用控制字、功能类型
buffer[3] = address; // 示例地址
buffer[4] = 7; // 示例功能类型
buffer[5] = data;
// 填入数据部分
IByteBuffer wrappedBuffer = Unpooled.WrappedBuffer(buffer);
// 发送数据
await context.WriteAndFlushAsync(wrappedBuffer);
Log.Information($"发送心跳信息:序号:{data} 地址:{address}");
}
catch (ClosedChannelException ex)
{
await context.CloseAsync();
Log.Error(ex, "心跳信息ClosedChannelException");
}
catch (Exception ex)
{
Log.Error(ex, "心跳信息Exception");
}
heartbeatCount++;
// 防止溢出
if (heartbeatCount == 0xFF) // 如果超过 255则重置为 0
{
heartbeatCount = 0;
}
}
public override async void ExceptionCaught(IChannelHandlerContext context, Exception ex)
{
if (ex is ClosedChannelException)
{
// 处理 Channel 被关闭的异常,关闭客户端连接
Log.Error(ex, "ClosedChannelException异常 连接已关闭,正在关闭客户端...");
await context.CloseAsync();
}
else
{
// 处理其他异常
Log.Error(ex, $"发生异常: {ex.Message}");
await context.CloseAsync();
}
}
// 发送自定义消息
private async Task SendCustomByteMessageAsync( byte address, byte controlWord, byte functionType, byte data)
{
try
{
await SendCustomMessageAsync(address, controlWord, functionType, new byte[] { data });
}
catch (Exception ex)
{
Log.Information($"Error: {ex.StackTrace}");
}
}
// 发送自定义消息
private async Task SendCustomMessageAsync(byte address, byte controlWord, byte functionType, byte[] data)
{
try
{
if (_context==null || !_context.Channel.Open || !_context.Channel.Active || data == null|| data.Length==0)
{
return;
}
IsConfirm = false;
ushort dataLength = (ushort)(6 + data.Length);
// 构建数据帧
byte[] buffer = new byte[dataLength];
// 填入启动字符
buffer[0] = 0x68;
// 填入长度假设数据部分为50字节2字节长度字段+1字节地址+1字节标志位
byte[] bytes = BitConverter.GetBytes(dataLength);
buffer[1] = bytes[1]; // 长度低字节
buffer[2] = bytes[0]; // 长度高字节
// 填入地址、应用控制字、功能类型
buffer[3] = address; // 示例地址
buffer[4] = controlWord; // 示例应用控制字
buffer[5] = functionType; // 示例功能类型
for (ushort i = 6; i < dataLength; i++)
{
buffer[i] = data[i-6];
}
string hexString = BitConverter.ToString(buffer).Replace("-", " ");
Log.Information($"发送数据: 地址:{address} 功能码:{functionType} 控制字:{Convert.ToString(controlWord, 2).PadLeft(8, '0')} 数据:{hexString}");
// 填入数据部分
IByteBuffer wrappedBuffer = Unpooled.WrappedBuffer(buffer);
// 发送数据
await _context.WriteAndFlushAsync(wrappedBuffer);
}
catch (ClosedChannelException ex)
{
await _context.CloseAsync();
}
catch (Exception ex)
{
Log.Error($"Error: {ex.StackTrace}");
}
}
private byte _controlWord = 15;
// 发送自定义消息
public async Task SendCustomMessageAsync(byte address, byte functionType, byte[] data)
{
try
{
if (_context == null || !_context.Channel.Open || !_context.Channel.Active || data == null || data.Length == 0)
{
return;
}
// 创建一个应用控制字实例
ApplicationControlWord controlWordObj = new ApplicationControlWord(fir: true, fin: true, con: true, sequence: _controlWord);
// 将控制字转换为字节
byte controlByte = controlWordObj.ToByte();
_controlWord++;
if (_controlWord == 31)
{
_controlWord = 15;
}
IsConfirm = false;
ushort dataLength = (ushort)(6 + data.Length);
// 构建数据帧
byte[] buffer = new byte[dataLength];
// 填入启动字符
buffer[0] = 0x68;
// 填入长度假设数据部分为50字节2字节长度字段+1字节地址+1字节标志位
byte[] bytes = BitConverter.GetBytes(dataLength);
buffer[1] = bytes[1]; // 长度低字节
buffer[2] = bytes[0]; // 长度高字节
// 填入地址、应用控制字、功能类型
buffer[3] = address; // 示例地址
buffer[4] = controlByte; // 示例应用控制字
buffer[5] = functionType; // 示例功能类型
for (ushort i = 6; i < dataLength; i++)
{
buffer[i] = data[i - 6];
}
string hexString = BitConverter.ToString(buffer).Replace("-", " ");
Log.Information($"发送数据: 地址:{address} 功能码:{functionType} 控制字:{Convert.ToString(controlByte, 2).PadLeft(8, '0')} 数据:{hexString}");
// 填入数据部分
IByteBuffer wrappedBuffer = Unpooled.WrappedBuffer(buffer);
// 发送数据
await _context.WriteAndFlushAsync(wrappedBuffer)
.ContinueWith(async task =>
{
await Task.Delay(300);
bool isSendSuccess = false;
for (ushort i = 0; i < 3; i++)
{
if (!IsConfirm)
{
await _context.WriteAndFlushAsync(wrappedBuffer);
}
else
{
isSendSuccess = true;
Log.Information($"成功确认");
break;
}
await Task.Delay(3000);
}
if (!isSendSuccess)
{
Log.Information($"超过三次未收到确认,断开连接");
await _context.CloseAsync();
}
});
}
catch (ClosedChannelException ex)
{
await _context.CloseAsync();
}
catch (Exception ex)
{
Log.Error($"Error: {ex.StackTrace}");
}
}
}
}