using Abp.Dependency; using DotNetty.Codecs.Http; using DotNetty.Codecs.Http.WebSockets; using DotNetty.Common; using DotNetty.Transport.Bootstrapping; using DotNetty.Transport.Channels; using DotNetty.Transport.Channels.Groups; using DotNetty.Transport.Channels.Sockets; using DotNettyHelper; using System; using System.Collections.Generic; using System.Linq; using System.Net; using System.Runtime; using System.Runtime.InteropServices; using ToolLibrary.LogHelper; using Yunda.ISAS.DataMonitoringServer.WebSocket.Model; namespace Yunda.ISAS.DataMonitoringServer.WebSocket.DotNetty.Server { public class DotNettyWebSocketServer : ISingletonDependency { public DotNettyWebSocketServer() { ResourceLeakDetector.Level = ResourceLeakDetector.DetectionLevel.Simple; } private static volatile Dictionary _dicChannelGroup = new Dictionary(); public event ExceptionCaughtDelegate ExceptionCaughtEvent; public event HandlerDelegate HandlerAddedEvent; public event HandlerDelegate HandlerRemovedEvent; public event ReceiveMessageDelegate ReceiveMessageEvent; public Dictionary GetDictionaryChannelGroup() { return _dicChannelGroup; } private readonly object lockObj = new object(); public void AddChannel(GroupTypeEnum groupClass, IChannelHandlerContext Context) { lock (lockObj) { //if (groupClass == GroupTypeEnum.None) return _dotNettyWebSocketServer; IChannel channel = null; foreach (var keyValuePair in _dicChannelGroup) { if (keyValuePair.Key == groupClass) continue; channel = keyValuePair.Value.Where(kv => kv.RemoteAddress == Context.Channel.RemoteAddress).FirstOrDefault(); if (channel != null) { keyValuePair.Value.Remove(channel); break; } } if (!_dicChannelGroup.ContainsKey(groupClass)) { _dicChannelGroup.Add(groupClass, new DefaultChannelGroup(Context.Executor)); } } //if (_dicChannelGroup[groupClass].Where(kv => kv.RemoteAddress == Context.Channel.RemoteAddress).Any()) // return _dotNettyWebSocketServer; _dicChannelGroup[groupClass].Add(Context.Channel); //return _dotNettyWebSocketServer; } private IChannel bootstrapChannel = null; private IEventLoopGroup bossGroup = null; private IEventLoopGroup workGroup = null; private string _websocketPath = "all"; public async void RunServerAsync(string path, int port) { _websocketPath = path; //在这个模式下,垃圾回收会更频繁地运行,以减小每次回收的停顿时间。这适用于对低延迟要求极高的应用程序。 GCSettings.LatencyMode = GCLatencyMode.LowLatency; //if (!RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) //{ // //调整垃圾收集器侵入应用程序的时间。 // GCSettings.LatencyMode = GCLatencyMode.SustainedLowLatency; //} bossGroup = new MultithreadEventLoopGroup(); workGroup = new MultithreadEventLoopGroup(); try { var bootstrap = new ServerBootstrap(); bootstrap .Group(bossGroup, workGroup) .Channel() .Option(ChannelOption.SoBacklog, 500) .ChildHandler(new ActionChannelInitializer(channel => { IChannelPipeline pipeline = channel.Pipeline; pipeline.AddLast(new HttpServerCodec()); pipeline.AddLast(new HttpObjectAggregator(65536)); pipeline.AddLast(GetWebSocketServerHandler()); })); bootstrapChannel = await bootstrap.BindAsync(IPAddress.Any, port); } catch (Exception ex) { throw ex; } } private WebSocketServerHandler GetWebSocketServerHandler() { WebSocketServerHandler _wsHandler = new WebSocketServerHandler(_websocketPath); _wsHandler.ExceptionCaughtEvent += ExceptionCaught; _wsHandler.HandlerAddedEvent += HandlerAdded; _wsHandler.HandlerRemovedEvent += HandlerRemoved; _wsHandler.ReceiveMessageEvent += ReceiveMessage; return _wsHandler; } private void HandlerRemoved(HandlerEventArgs hanlerEventArgs) { try { HandlerRemovedEvent?.Invoke(hanlerEventArgs); } catch (Exception ex) { Log4Helper.Error(this.GetType(), "客户端下线",ex); } } /// /// 处理客户端报错 /// /// private void ExceptionCaught(ExceptionCaughtEventArgs exceptionCaughtEventArgs) { try { ExceptionCaughtEvent?.Invoke(exceptionCaughtEventArgs); } catch (Exception ex) { Log4Helper.Error(this.GetType(), "客户端报错", ex); } } /// /// 处理客户端接入 /// /// private void HandlerAdded(HandlerEventArgs hanlerEventArgs) { try { AddChannel(GroupTypeEnum.None, hanlerEventArgs.Context); HandlerAddedEvent?.Invoke(hanlerEventArgs); } catch (Exception ex) { Log4Helper.Error(this.GetType(), "处理客户端接入", ex); } } public async void CloseServerAsync() { try { if (bootstrapChannel != null) await bootstrapChannel.CloseAsync(); } catch (Exception ex) { Log4Helper.Error(this.GetType(), "关闭websocket", ex); } finally { if (workGroup != null) { await workGroup.ShutdownGracefullyAsync(); } if (bossGroup != null) { await bossGroup.ShutdownGracefullyAsync(); } } } /// /// 群发消息 /// /// public async void Send(DataMonitorMessageModel msg) { if (msg.GroupType == GroupTypeEnum.None || !_dicChannelGroup.ContainsKey(msg.GroupType)) return; WebSocketFrame frame = new TextWebSocketFrame(msg.ToJson()); //if (msg.GroupType == GroupTypeEnum.None) //{ // foreach (IChannelGroup group in _dicChannelGroup.Values) // { // if (!group.Any()) continue; // group.WriteAndFlushAsync(frame); // } // return; //} try { if (_dicChannelGroup.ContainsKey(GroupTypeEnum.All)) foreach (IChannelGroup group in _dicChannelGroup.Values) { if (!group.Any()) continue; await group.WriteAndFlushAsync(frame.Copy()); } //_dicChannelGroup[GroupTypeEnum.All].WriteAndFlushAsync(frame.Copy()); if (_dicChannelGroup.ContainsKey(msg.GroupType)) await _dicChannelGroup[msg.GroupType].WriteAndFlushAsync(frame.Copy()); //_dicChannelGroup[GroupTypeEnum.All].WriteAndFlushAsync(frame); } catch (Exception ex) { Log4Helper.Error(this.GetType(), "websocket群发消息错误", ex); } } /// /// 给指定客户端发送消息 /// /// /// public async void SendAsync(IChannelHandlerContext context, DataMonitorMessageModel msg) { try { WebSocketFrame frame = new TextWebSocketFrame(msg.ToJson()); await context.WriteAndFlushAsync(frame); } catch (Exception ex) { Log4Helper.Error(this.GetType(), "websocket给指定客户端发送消息错误", ex); } } private void ReceiveMessage(ReceiveMessageEventArgs recMsgEventArgs) { try { if (recMsgEventArgs.Message == null) return; AddChannel(recMsgEventArgs.Message.GroupType, recMsgEventArgs.Context); ReceiveMessageEvent?.Invoke(recMsgEventArgs); } catch (Exception ex) { Log4Helper.Error(this.GetType(), "websocket收取数据错误", ex); } } } }