using Abp.Dependency; using DotNetty.Transport.Channels; using DotNettyHelper; using Newtonsoft.Json; using System; using System.Collections.Generic; using System.ComponentModel; using System.Linq; using System.Threading.Tasks; using ToolLibrary.LogHelper; using Yunda.ISAS.DataMonitoringServer.DataAnalysis; using Yunda.ISAS.DataMonitoringServer.DataAnalysis.DataCollection; using Yunda.ISAS.DataMonitoringServer.DataAnalysis.Helper; using Yunda.ISAS.DataMonitoringServer.DataAnalysis.Model; using Yunda.ISAS.DataMonitoringServer.DataCenter; using Yunda.ISAS.DataMonitoringServer.WebSocket.DotNetty.Server; using Yunda.ISAS.DataMonitoringServer.WebSocket.Model; using YunDa.ISAS.DataTransferObject.CommonDto; using YunDa.ISAS.DataTransferObject.EquipmentLiveData; using YunDa.ISAS.Redis.Entities.AlarmCategory; namespace Yunda.ISAS.DataMonitoringServer.WebSocket { public class WebSocketServer : ISingletonDependency { private readonly ConfigurationHepler _configurationHepler; private readonly RunningDataCache _runningDataCache; private readonly TeleCommandDataSendTask _teleCommandDataSendTask; private readonly WebApiRequest _webApiRequest; private readonly DotNettyWebSocketServer _dotNettyWebSocketServer; private readonly RedisDataRepository _redisDataRepository; public WebSocketServer(ConfigurationHepler configurationHepler, RunningDataCache runningDataCache, TeleCommandDataSendTask teleCommandDataSendTask, WebApiRequest webApiRequest, RedisDataRepository redisDataRepository, DotNettyWebSocketServer dotNettyWebSocketServer) { _redisDataRepository = redisDataRepository; _runningDataCache = runningDataCache; _configurationHepler = configurationHepler; _teleCommandDataSendTask = teleCommandDataSendTask; _webApiRequest = webApiRequest; _dotNettyWebSocketServer = dotNettyWebSocketServer; } /// /// 打开WebSocket /// public void WebSocketStartAsync(string path, int port) { try { _dotNettyWebSocketServer.RunServerAsync(path, port); MonitoringEventBus.LogHandler("WebSocket监听成功", "WebSocket启动信息"); _dotNettyWebSocketServer.HandlerAddedEvent += WebSocket_HandlerAddedEvent; _dotNettyWebSocketServer.HandlerRemovedEvent += WebSocket_HandlerRemovedEvent; _dotNettyWebSocketServer.ExceptionCaughtEvent += WebSocket_ExceptionCaughtEvent; _dotNettyWebSocketServer.ReceiveMessageEvent += WebSocket_ReceiveMessageEvent; } catch (Exception ex) { MonitoringEventBus.LogHandler(ex.Message, "WebSocket异常信息"); } } /// /// 关闭WebSocket /// public void WebSocketStop() { try { _dotNettyWebSocketServer.CloseServerAsync(); MonitoringEventBus.LogHandler("关闭WebSocket", "WebSocket信息"); } catch (Exception ex) { MonitoringEventBus.LogHandler(ex.Message, "WebSocket异常信息"); } } /// /// WebSocket接收到消息触发的事件 /// /// private void WebSocket_ReceiveMessageEvent(ReceiveMessageEventArgs recMsgEventArgs) { var info = JsonConvert.SerializeObject(recMsgEventArgs.Message); MonitoringEventBus.LogHandler(info, "WebSocket信息"); //TODO接收到消息触发的事件 switch (recMsgEventArgs.Message.MessageType) { case MessgeTypeEnum.All: try { MonitoringGroupTypeEnum mGroupType = MonitoringGroupTypeEnum.DEFAULT; switch (recMsgEventArgs.Message.GroupType) { case GroupTypeEnum.DLHJ: mGroupType = MonitoringGroupTypeEnum.DLHJ; break; case GroupTypeEnum.ZXJC: mGroupType = MonitoringGroupTypeEnum.ZXJC; break; } if (mGroupType == MonitoringGroupTypeEnum.DEFAULT) recMsgEventArgs.Message.Content = _runningDataCache.EquipmentDataDic.Values.ToList(); else recMsgEventArgs.Message.Content = _runningDataCache .EquipmentDataDic.Values.Where(e => e.GroupType == mGroupType).ToList(); SendMsg(recMsgEventArgs.Context, recMsgEventArgs.Message); } catch (Exception ex) { Log4Helper.Error(this.GetType(), "MessgeTypeEnum.All", ex); } break; case MessgeTypeEnum.RemoteControl: try { Task.Run(() => { TelecommandModel telecommand = JsonConvert.DeserializeObject(recMsgEventArgs.Message.Content.ToString()); if (telecommand == null) return; _teleCommandDataSendTask.TelecommandTActionBlock.Post(new TelcommandDataModel() { ReceiveMessageEvent = recMsgEventArgs, Telecommand = telecommand }); }); } catch (Exception ex) { Log4Helper.Error(this.GetType(), "MessgeTypeEnum.RemoteControl", ex); } break; case MessgeTypeEnum.ArmingDisArming: try { ArmingDisarmingModel armingDisarmingModel = JsonConvert.DeserializeObject(recMsgEventArgs.Message.Content.ToString()); if (armingDisarmingModel == null) { break; } Task.Run(() => { if (armingDisarmingModel.DelaySeconds < 0) { armingDisarmingModel.DelaySeconds = 0; } _webApiRequest.SendArmsEquipmentsUpdate(armingDisarmingModel, (int)armingDisarmingModel.SafetyStateType); if (armingDisarmingModel.DelaySeconds == 0) { foreach (var item in armingDisarmingModel.EquipmentIds) { if (_runningDataCache.EquipmentDataDic.ContainsKey(item)) { _runningDataCache.EquipmentDataDic[item].SafetyStateType = armingDisarmingModel.SafetyStateType; DataMonitorMessageModel messageModel = new DataMonitorMessageModel { Content = new List() { _runningDataCache.EquipmentDataDic[item] }, GroupType = GroupTypeEnum.All, MessageType = MessgeTypeEnum.Changed }; SendMsg(messageModel); } } } }); } catch (Exception ex) { Log4Helper.Error(this.GetType(), " MessgeTypeEnum.ArmingDisArming", ex); } break; case MessgeTypeEnum.AlarmQueueConfirm: try { var alarmConfirmIds = JsonConvert.DeserializeObject>(recMsgEventArgs.Message.Content.ToString()); if (alarmConfirmIds != null && alarmConfirmIds.Count > 0) { _redisDataRepository.AlarmListRedis.DeleteHashKeiesAsync(nameof(AlarmListRedis), alarmConfirmIds); } } catch (Exception ex) { Log4Helper.Error(this.GetType(), "确认报警队列出错", ex); } break; case MessgeTypeEnum.AlarmmQueueClear: try { var alarmIds = JsonConvert.DeserializeObject>(recMsgEventArgs.Message.Content.ToString()); if (alarmIds != null && alarmIds.Count > 0) { var dic = _redisDataRepository.AlarmListRedis.HashSetGetDicAll(nameof(AlarmListRedis)); foreach (var id in alarmIds) { if (dic.ContainsKey(id.ToString())) { var entity = dic[id.ToString()]; if (entity != null) { entity.IsClear = true; _redisDataRepository.AlarmListRedis.HashSetUpdateOneAsync(nameof(AlarmListRedis), id.ToString(), entity); } } } } } catch (Exception ex) { Log4Helper.Error(this.GetType(), "清除报警队列出错", ex); } break; case MessgeTypeEnum.AlarmmQueueClearReset: try { var alarmIds = JsonConvert.DeserializeObject>(recMsgEventArgs.Message.Content.ToString()); if (alarmIds != null && alarmIds.Count > 0) { var dic = _redisDataRepository.AlarmListRedis.HashSetGetDicAll(nameof(AlarmListRedis)); foreach (var id in alarmIds) { if (dic.ContainsKey(id.ToString())) { var entity = dic[id.ToString()]; entity.IsClear = false; _redisDataRepository.AlarmListRedis.HashSetUpdateOneAsync(nameof(AlarmListRedis), id.ToString(), entity); } } } } catch (Exception ex) { Log4Helper.Error(this.GetType(), "清除报警队列出错", ex); } break; default: break; } } /// /// WebSocket发生错误触发的事件 /// /// private void WebSocket_ExceptionCaughtEvent(ExceptionCaughtEventArgs exceptionCaughtEventArgs) { MonitoringEventBus.LogHandler(exceptionCaughtEventArgs.Context.Channel.RemoteAddress + ":" + exceptionCaughtEventArgs.Exception.ToString(), "异常信息"); } /// /// 客户端断开连接触发的事件 /// /// private void WebSocket_HandlerRemovedEvent(HandlerEventArgs hanlerEventArgs) { var info = hanlerEventArgs.Context.Channel.RemoteAddress + "离线了!客户端数量:" + _dotNettyWebSocketServer.GetDictionaryChannelGroup().Values.Select(v => v.Count).Sum(); MonitoringEventBus.LogHandler(info, "WebSocket信息"); } /// /// 客户端连接触发的事件 /// /// private void WebSocket_HandlerAddedEvent(HandlerEventArgs hanlerEventArgs) { var info = hanlerEventArgs.Context.Channel.RemoteAddress + "上线了!客户端数量:" + _dotNettyWebSocketServer.GetDictionaryChannelGroup().Values.Select(v => v.Count).Sum(); MonitoringEventBus.LogHandler(info, "WebSocket信息"); } JsonSerializerSettings _jsonSerializerSettings = new JsonSerializerSettings() { MaxDepth = 4, }; /// /// 群发消息 /// /// public void SendMsg(DataMonitorMessageModel msg) { try { var info = $"群发消息:消息组{msg.GroupType},消息内容{msg.MessageType}";// + JsonConvert.SerializeObject(msg, _jsonSerializerSettings).Substring(0, 200); //MonitoringEventBus.LogHandler(info, "WebSocket信息"); //Log4Helper.Info(this.GetType(), JsonConvert.SerializeObject(msg, _jsonSerializerSettings)); _dotNettyWebSocketServer.Send(msg); } catch (Exception ex) { MonitoringEventBus.LogHandler(ex.Message, "WebSocket异常信息"); } } /// /// 指定客户端发送消息 /// /// public void SendMsg(IChannelHandlerContext context, DataMonitorMessageModel msg) { try { _dotNettyWebSocketServer.SendAsync(context, msg); var info = $"群发消息:消息组{msg.GroupType},消息内容{msg.MessageType}"; //var info = JsonConvert.SerializeObject(msg, _jsonSerializerSettings).Substring(0, 200); Log4Helper.Info(this.GetType(), JsonConvert.SerializeObject(msg, _jsonSerializerSettings)); //MonitoringEventBus.LogHandler(info, "WebSocket发送信息"); } catch (Exception ex) { MonitoringEventBus.LogHandler(ex.Message, "WebSocket异常信息"); } } } }