using Abp; using DotNetty.Buffers; using DotNetty.Transport.Channels; using MySqlX.XDevAPI; using System; using System.Collections; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Text; using System.Threading.Tasks; using Yunda.ISAS.DataMonitoringServer.DataAnalysis; namespace Yunda.SOMS.DataMonitoringServer.TcpSocket.Server { using static ConstValue; public class DotNettyServerHandler : SimpleChannelInboundHandler { // 定义设备信息类 private class DeviceInfo { public bool Status { get; set; } // 设备状态(运行或离线) public DateTime LastUpdate { get; set; } // 最后更新时间 public int OfflineCount { get; set; } // 连续离线计数 } private IChannelHandlerContext _context; // 定义事件,事件处理程序包含消息和功能描述 private readonly ConcurrentDictionary _connections; private readonly Action _onMessageReceived; private readonly Action _onDeviceConnection; Dictionary _deviceRunStates = new Dictionary(); Dictionary _deviceBoardStates = new Dictionary(); //0-离线 1-在线 public DotNettyServerHandler(ConcurrentDictionary connections, Action onMessageReceived, Action> deviceBoardStatesAction) { _connections = connections; _onMessageReceived = onMessageReceived; //_onDeviceConnection = onDeviceConnection; Task.Factory.StartNew(async () => { while (true) { try { CheckAndUpdateDeviceStates(); //LogDeviceStates(); if (_deviceBoardStates.Count > 0) { deviceBoardStatesAction(_deviceBoardStates); } await Task.Delay(5000); } catch (Exception ex) { MonitoringEventBus.LogHandler($"客户端连接错误:{ex.StackTrace}", "定值错误信息"); } } }, TaskCreationOptions.LongRunning); } async void CheckAndUpdateDeviceStates() { try { foreach (var entry in _deviceRunStates) { var deviceAddress = entry.Key; var device = entry.Value; if (!_deviceBoardStates.ContainsKey(deviceAddress)) { _deviceBoardStates.Add(deviceAddress, new int[8]); } if (device == null) { continue; } // 检查是否连续发送“离线”状态 if (device.OfflineCount >= 5) { if (_deviceBoardStates[deviceAddress][0] !=0) { _deviceBoardStates[deviceAddress][0] = 0; MonitoringEventBus.LogHandler($"[{DateTime.Now}] {deviceAddress} 判定为离线(连续离线状态)", "定值错误信息"); if (_connections.TryGetValue(deviceAddress, out IChannelHandlerContext ctx)) { if (ctx.Channel.Active) { try { await ctx.DisconnectAsync(); await ctx.CloseAsync(); } catch (Exception ex) { } _connections.TryRemove(deviceAddress, out IChannelHandlerContext channelHandlerContext); } } } continue; } // 检查是否超过10秒未更新 if ((DateTime.Now - device.LastUpdate).TotalSeconds > 10) { if (_connections.ContainsKey(deviceAddress)) { _deviceBoardStates[deviceAddress][0] = 0; MonitoringEventBus.LogHandler($"[{DateTime.Now}] {deviceAddress} 判定为离线(超过10秒未更新)", "定值错误信息"); if (_connections.TryGetValue(deviceAddress,out IChannelHandlerContext ctx)) { if (ctx.Channel.Active) { try { await ctx.DisconnectAsync(); await ctx.CloseAsync(); } catch (Exception ex) { } _connections.TryRemove(deviceAddress, out IChannelHandlerContext channelHandlerContext); } } } continue; } _deviceBoardStates[deviceAddress][0] = 1; } } catch (Exception ex) { MonitoringEventBus.LogHandler($"{ex.StackTrace}", "103客户端错误信息"); } } public override void ChannelActive(IChannelHandlerContext context) { try { string clientIp = context.Channel.RemoteAddress.ToString(); MonitoringEventBus.LogHandler($"客户端连接:{clientIp}", "103客户端发送消息"); base.ChannelActive(context); } catch (Exception ex) { MonitoringEventBus.LogHandler($"{ex.StackTrace}", "103客户端错误信息"); } } public override void ChannelInactive(IChannelHandlerContext context) { try { string clientIp = context.Channel.RemoteAddress.ToString(); MonitoringEventBus.LogHandler($"客户端断开连接:{clientIp}", "103客户端发送消息"); base.ChannelInactive(context); } catch (Exception ex) { MonitoringEventBus.LogHandler($"{ex.StackTrace}", "103客户端错误信息"); } } protected override void ChannelRead0(IChannelHandlerContext ctx, IByteBuffer msg) { _context = ctx; try { byte startByte = msg.ReadByte(); ushort length = msg.ReadUnsignedShort(); byte address = msg.ReadByte(); byte controlWord = msg.ReadByte(); byte functionType = msg.ReadByte(); // 计算数据长度 int dataLength = length - 6; if (msg.ReadableBytes < dataLength) { // 数据不足,等待更多数据 return; } byte[] data = new byte[dataLength]; msg.ReadBytes(data); string clientIp = ctx.Channel.RemoteAddress.ToString(); string description = FunctionCodeDescriptions.GetDescription(functionType); MonitoringEventBus.LogHandler($"装置地址 {address} 功能码:{functionType} 数据长度:{dataLength}", "103客户端消息"); Task.Run(() => _onMessageReceived?.Invoke(address, data, functionType)); HandleFunctionCodeAsync(functionType, address, controlWord, data, ctx); } catch (Exception ex) { MonitoringEventBus.LogHandler($"Error in ChannelRead0: {ex.StackTrace}", "103客户端发送消息"); } } private async Task HandleFunctionCodeAsync(byte functionType, byte address, byte controlWord, byte[] data, IChannelHandlerContext ctx) { switch (functionType) { case 0: HandleConfirmationMessage(address, data, ctx); break; case 1: MonitoringEventBus.LogHandler($"收到用户定值信息: {BitConverter.ToString(data)}", "103客户端信息"); break; case 2: // 厂家定值信息处理 break; case 3: MonitoringEventBus.LogHandler($"版本信息: {Encoding.ASCII.GetString(data)}", "103客户端信息"); break; case 4: // 自检信息处理 break; case 5: MonitoringEventBus.LogHandler($"开入开出信息: {Encoding.ASCII.GetString(data)}", "103客户端信息"); break; case 6: await UpdateDeviceCommunicationStateAsync(address, data, ctx); break; case 7: // B码对时状态处理 Debug.WriteLine($"B码对时状态处理: {BitConverter.ToString(data)}"); break; default: MonitoringEventBus.LogHandler("未知的功能码", "103客户端信息"); break; } } private void HandleConfirmationMessage(byte address, byte[] data, IChannelHandlerContext ctx) { Debug.WriteLine($"确认开入开出信息: {Encoding.ASCII.GetString(data)}"); // 可以添加发送确认报文的逻辑 } private ConcurrentDictionary _communicationStateCounts = new (); private async Task UpdateDeviceCommunicationStateAsync(byte address, byte[] data, IChannelHandlerContext ctx) { await Task.Run(async () => { try { // 更新连接上下文 if (_connections.ContainsKey(address)) { _connections[address] = ctx; } else { _connections.TryAdd(address, ctx); } if (_communicationStateCounts.ContainsKey(address)) { _communicationStateCounts[address]++; if (_communicationStateCounts[address] == 86400) { _communicationStateCounts.TryRemove(address,out _); } } else { _communicationStateCounts.TryAdd(address, 0); for (byte i = 1; i < 6; i++) { await SendCustomMessageAsync(ctx, address, 0, 5, i); } await SendCustomMessageAsync(ctx, address, 0, 1, 0); await SendCustomMessageAsync(ctx, address, 0, 2, 0); await SendCustomMessageAsync(ctx, address, 0, 3, 0); await SendCustomMessageAsync(ctx, address, 0, 4, 0); await SendCustomMessageAsync(ctx, address, 0, 7, 0); } // 更新设备状态 BitArray bit0 = new BitArray(new byte[] { data[0] }); if (!_deviceRunStates.ContainsKey(address)) { _deviceRunStates[address] = new DeviceInfo { Status = bit0[7], LastUpdate = DateTime.Now, OfflineCount = 0 }; } var device = _deviceRunStates[address]; device.Status = bit0[0]; device.LastUpdate = DateTime.Now; device.OfflineCount = bit0[0] ? 0 : device.OfflineCount + 1; // 更新设备板状态 if (!_deviceBoardStates.ContainsKey(address)) { _deviceBoardStates[address] = new int[8]; } _deviceBoardStates[address][7] = new BitArray(new byte[] { data[1] })[0] ? 0 : 1; // 液晶状态 BitArray bit2 = new BitArray(new byte[] { data[2] }); _deviceBoardStates[address][2] = bit2[0] ? 0 : 1; _deviceBoardStates[address][3] = bit2[1] ? 0 : 1; _deviceBoardStates[address][4] = bit2[2] ? 0 : 1; _deviceBoardStates[address][5] = bit2[3] ? 0 : 1; _deviceBoardStates[address][6] = bit2[4] ? 0 : 1; } catch (Exception ex) { MonitoringEventBus.LogHandler($"Error: {ex.StackTrace}", "103客户端发送消息"); } }); } public override async void ExceptionCaught(IChannelHandlerContext context, Exception ex) { try { MonitoringEventBus.LogHandler(ex.Message, "103客户端错误消息"); await context.CloseAsync(); } catch (Exception ex1) { MonitoringEventBus.LogHandler($"Error: {ex1.StackTrace}", "关闭tcp客户端"); } } // 发送自定义消息 public async Task SendCustomMessageAsync(IChannelHandlerContext context, byte address, byte controlWord, byte functionType, byte data) { try { // 构建数据帧 byte[] buffer = new byte[7]; // 填入启动字符 buffer[0] = 0x16; // 填入长度(假设数据部分为50字节,2字节长度字段+1字节地址+1字节标志位) byte dataLength = 7; buffer[1] = 0; // 长度低字节 buffer[2] = dataLength; // 长度高字节 // 填入地址、应用控制字、功能类型 buffer[3] = address; // 示例地址 buffer[4] = controlWord; // 示例应用控制字 buffer[5] = functionType; // 示例功能类型 buffer[6] = data; // 填入数据部分 IByteBuffer wrappedBuffer = Unpooled.WrappedBuffer(buffer); // 发送数据 await context.WriteAndFlushAsync(wrappedBuffer); string hexString = BitConverter.ToString(buffer).Replace("-", " "); MonitoringEventBus.LogHandler($"地址:{address} 功能码:{functionType} 数据:{hexString}", "103客户端发送消息"); await Task.Delay(1000); } catch (Exception ex) { MonitoringEventBus.LogHandler($"Error: {ex.StackTrace}", "103客户端发送消息"); } } } }