|
- using Abp.Dependency;
- using DotNetty.Transport.Channels;
- using DotNettyHelper;
- using Newtonsoft.Json;
- using System;
- using System.Collections.Generic;
- using System.ComponentModel;
- using System.Linq;
- using System.Threading.Tasks;
- using ToolLibrary.LogHelper;
- using Yunda.ISAS.DataMonitoringServer.DataAnalysis;
- using Yunda.ISAS.DataMonitoringServer.DataAnalysis.DataCollection;
- using Yunda.ISAS.DataMonitoringServer.DataAnalysis.Helper;
- using Yunda.ISAS.DataMonitoringServer.DataAnalysis.Model;
- using Yunda.ISAS.DataMonitoringServer.DataCenter;
- using Yunda.ISAS.DataMonitoringServer.WebSocket.DotNetty.Server;
- using Yunda.ISAS.DataMonitoringServer.WebSocket.Model;
- using YunDa.ISAS.DataTransferObject.CommonDto;
- using YunDa.ISAS.DataTransferObject.EquipmentLiveData;
- using YunDa.ISAS.Redis.Entities.AlarmCategory;
- namespace Yunda.ISAS.DataMonitoringServer.WebSocket
- {
- public class WebSocketServer : ISingletonDependency
- {
- private readonly ConfigurationHepler _configurationHepler;
- private readonly RunningDataCache _runningDataCache;
- private readonly TeleCommandDataSendTask _teleCommandDataSendTask;
- private readonly WebApiRequest _webApiRequest;
- private readonly DotNettyWebSocketServer _dotNettyWebSocketServer;
- private readonly RedisDataRepository _redisDataRepository;
-
- public WebSocketServer(ConfigurationHepler configurationHepler,
- RunningDataCache runningDataCache,
- TeleCommandDataSendTask teleCommandDataSendTask,
- WebApiRequest webApiRequest,
- RedisDataRepository redisDataRepository,
- DotNettyWebSocketServer dotNettyWebSocketServer)
- {
- _redisDataRepository = redisDataRepository;
- _runningDataCache = runningDataCache;
- _configurationHepler = configurationHepler;
- _teleCommandDataSendTask = teleCommandDataSendTask;
- _webApiRequest = webApiRequest;
- _dotNettyWebSocketServer = dotNettyWebSocketServer;
- }
- /// <summary>
- /// 打开WebSocket
- /// </summary>
- public void WebSocketStartAsync(string path, int port)
- {
- try
- {
- _dotNettyWebSocketServer.RunServerAsync(path, port);
- MonitoringEventBus.LogHandler("WebSocket监听成功", "WebSocket启动信息");
- _dotNettyWebSocketServer.HandlerAddedEvent += WebSocket_HandlerAddedEvent;
- _dotNettyWebSocketServer.HandlerRemovedEvent += WebSocket_HandlerRemovedEvent;
- _dotNettyWebSocketServer.ExceptionCaughtEvent += WebSocket_ExceptionCaughtEvent;
- _dotNettyWebSocketServer.ReceiveMessageEvent += WebSocket_ReceiveMessageEvent;
- }
- catch (Exception ex)
- {
- MonitoringEventBus.LogHandler(ex.Message, "WebSocket异常信息");
- }
- }
- /// <summary>
- /// 关闭WebSocket
- /// </summary>
- public void WebSocketStop()
- {
- try
- {
- _dotNettyWebSocketServer.CloseServerAsync();
- MonitoringEventBus.LogHandler("关闭WebSocket", "WebSocket信息");
- }
- catch (Exception ex)
- {
- MonitoringEventBus.LogHandler(ex.Message, "WebSocket异常信息");
- }
- }
- /// <summary>
- /// WebSocket接收到消息触发的事件
- /// </summary>
- /// <param name="recMsgEventArgs"></param>
- private void WebSocket_ReceiveMessageEvent(ReceiveMessageEventArgs<DataMonitorMessageModel> recMsgEventArgs)
- {
- var info = JsonConvert.SerializeObject(recMsgEventArgs.Message);
- MonitoringEventBus.LogHandler(info, "WebSocket信息");
- //TODO接收到消息触发的事件
- switch (recMsgEventArgs.Message.MessageType)
- {
- case MessgeTypeEnum.All:
- try
- {
- MonitoringGroupTypeEnum mGroupType = MonitoringGroupTypeEnum.DEFAULT;
- switch (recMsgEventArgs.Message.GroupType)
- {
- case GroupTypeEnum.DLHJ:
- mGroupType = MonitoringGroupTypeEnum.DLHJ;
- break;
- case GroupTypeEnum.ZXJC:
- mGroupType = MonitoringGroupTypeEnum.ZXJC;
- break;
- }
- if (mGroupType == MonitoringGroupTypeEnum.DEFAULT)
- recMsgEventArgs.Message.Content = _runningDataCache.EquipmentDataDic.Values.ToList();
- else
- recMsgEventArgs.Message.Content = _runningDataCache
- .EquipmentDataDic.Values.Where(e => e.GroupType == mGroupType).ToList();
- SendMsg(recMsgEventArgs.Context, recMsgEventArgs.Message);
- }
- catch (Exception ex)
- {
- Log4Helper.Error(this.GetType(), "MessgeTypeEnum.All", ex);
- }
- break;
- case MessgeTypeEnum.RemoteControl:
- try
- {
- Task.Run(() =>
- {
- TelecommandModel telecommand = JsonConvert.DeserializeObject<TelecommandModel>(recMsgEventArgs.Message.Content.ToString());
- if (telecommand == null) return;
- _teleCommandDataSendTask.TelecommandTActionBlock.Post(new TelcommandDataModel() { ReceiveMessageEvent = recMsgEventArgs, Telecommand = telecommand });
- });
- }
- catch (Exception ex)
- {
- Log4Helper.Error(this.GetType(), "MessgeTypeEnum.RemoteControl", ex);
- }
- break;
- case MessgeTypeEnum.ArmingDisArming:
- try
- {
- ArmingDisarmingModel armingDisarmingModel = JsonConvert.DeserializeObject<ArmingDisarmingModel>(recMsgEventArgs.Message.Content.ToString());
- if (armingDisarmingModel == null)
- {
- break;
- }
- Task.Run(() =>
- {
- if (armingDisarmingModel.DelaySeconds < 0)
- {
- armingDisarmingModel.DelaySeconds = 0;
- }
- _webApiRequest.SendArmsEquipmentsUpdate(armingDisarmingModel, (int)armingDisarmingModel.SafetyStateType);
- if (armingDisarmingModel.DelaySeconds == 0)
- {
- foreach (var item in armingDisarmingModel.EquipmentIds)
- {
- if (_runningDataCache.EquipmentDataDic.ContainsKey(item))
- {
- _runningDataCache.EquipmentDataDic[item].SafetyStateType = armingDisarmingModel.SafetyStateType;
- DataMonitorMessageModel messageModel = new DataMonitorMessageModel
- {
- Content = new List<EquipmentDataModel>() { _runningDataCache.EquipmentDataDic[item] },
- GroupType = GroupTypeEnum.All,
- MessageType = MessgeTypeEnum.Changed
- };
- SendMsg(messageModel);
- }
- }
- }
- });
- }
- catch (Exception ex)
- {
- Log4Helper.Error(this.GetType(), " MessgeTypeEnum.ArmingDisArming", ex);
- }
- break;
- case MessgeTypeEnum.AlarmQueueConfirm:
- try
- {
- var alarmConfirmIds = JsonConvert.DeserializeObject<List<Guid>>(recMsgEventArgs.Message.Content.ToString());
- if (alarmConfirmIds != null && alarmConfirmIds.Count > 0)
- {
- _redisDataRepository.AlarmListRedis.DeleteHashKeiesAsync(nameof(AlarmListRedis), alarmConfirmIds);
- }
- }
- catch (Exception ex)
- {
- Log4Helper.Error(this.GetType(), "确认报警队列出错", ex);
- }
- break;
- case MessgeTypeEnum.AlarmmQueueClear:
- try
- {
- var alarmIds = JsonConvert.DeserializeObject<List<Guid>>(recMsgEventArgs.Message.Content.ToString());
- if (alarmIds != null && alarmIds.Count > 0)
- {
- var dic = _redisDataRepository.AlarmListRedis.HashSetGetDicAll(nameof(AlarmListRedis));
- foreach (var id in alarmIds)
- {
- if (dic.ContainsKey(id.ToString()))
- {
- var entity = dic[id.ToString()];
- if (entity != null)
- {
- entity.IsClear = true;
- _redisDataRepository.AlarmListRedis.HashSetUpdateOneAsync(nameof(AlarmListRedis), id.ToString(), entity);
- }
- }
- }
- }
- }
- catch (Exception ex)
- {
- Log4Helper.Error(this.GetType(), "清除报警队列出错", ex);
- }
- break;
- case MessgeTypeEnum.AlarmmQueueClearReset:
- try
- {
- var alarmIds = JsonConvert.DeserializeObject<List<Guid>>(recMsgEventArgs.Message.Content.ToString());
- if (alarmIds != null && alarmIds.Count > 0)
- {
- var dic = _redisDataRepository.AlarmListRedis.HashSetGetDicAll(nameof(AlarmListRedis));
- foreach (var id in alarmIds)
- {
- if (dic.ContainsKey(id.ToString()))
- {
- var entity = dic[id.ToString()];
- entity.IsClear = false;
- _redisDataRepository.AlarmListRedis.HashSetUpdateOneAsync(nameof(AlarmListRedis), id.ToString(), entity);
- }
- }
- }
- }
- catch (Exception ex)
- {
- Log4Helper.Error(this.GetType(), "清除报警队列出错", ex);
- }
- break;
- default:
- break;
- }
- }
- /// <summary>
- /// WebSocket发生错误触发的事件
- /// </summary>
- /// <param name="exceptionCaughtEventArgs"></param>
- private void WebSocket_ExceptionCaughtEvent(ExceptionCaughtEventArgs exceptionCaughtEventArgs)
- {
- MonitoringEventBus.LogHandler(exceptionCaughtEventArgs.Context.Channel.RemoteAddress + ":" + exceptionCaughtEventArgs.Exception.ToString(), "异常信息");
- }
- /// <summary>
- /// 客户端断开连接触发的事件
- /// </summary>
- /// <param name="hanlerEventArgs"></param>
- private void WebSocket_HandlerRemovedEvent(HandlerEventArgs hanlerEventArgs)
- {
- var info = hanlerEventArgs.Context.Channel.RemoteAddress + "离线了!客户端数量:" + _dotNettyWebSocketServer.GetDictionaryChannelGroup().Values.Select(v => v.Count).Sum();
- MonitoringEventBus.LogHandler(info, "WebSocket信息");
- }
- /// <summary>
- /// 客户端连接触发的事件
- /// </summary>
- /// <param name="hanlerEventArgs"></param>
- private void WebSocket_HandlerAddedEvent(HandlerEventArgs hanlerEventArgs)
- {
- var info = hanlerEventArgs.Context.Channel.RemoteAddress + "上线了!客户端数量:" + _dotNettyWebSocketServer.GetDictionaryChannelGroup().Values.Select(v => v.Count).Sum();
- MonitoringEventBus.LogHandler(info, "WebSocket信息");
- }
- JsonSerializerSettings _jsonSerializerSettings = new JsonSerializerSettings()
- {
- MaxDepth = 4,
- };
- /// <summary>
- /// 群发消息
- /// </summary>
- /// <param name="msg"></param>
- public void SendMsg(DataMonitorMessageModel msg)
- {
- try
- {
- var info = $"群发消息:消息组{msg.GroupType},消息内容{msg.MessageType}";// + JsonConvert.SerializeObject(msg, _jsonSerializerSettings).Substring(0, 200);
- //MonitoringEventBus.LogHandler(info, "WebSocket信息");
- //Log4Helper.Info(this.GetType(), JsonConvert.SerializeObject(msg, _jsonSerializerSettings));
- _dotNettyWebSocketServer.Send(msg);
- }
- catch (Exception ex)
- {
- MonitoringEventBus.LogHandler(ex.Message, "WebSocket异常信息");
- }
- }
- /// <summary>
- /// 指定客户端发送消息
- /// </summary>
- /// <param name="msg"></param>
- public void SendMsg(IChannelHandlerContext context, DataMonitorMessageModel msg)
- {
- try
- {
- _dotNettyWebSocketServer.SendAsync(context, msg);
- var info = $"群发消息:消息组{msg.GroupType},消息内容{msg.MessageType}";
- //var info = JsonConvert.SerializeObject(msg, _jsonSerializerSettings).Substring(0, 200);
- Log4Helper.Info(this.GetType(), JsonConvert.SerializeObject(msg, _jsonSerializerSettings));
- //MonitoringEventBus.LogHandler(info, "WebSocket发送信息");
- }
- catch (Exception ex)
- {
- MonitoringEventBus.LogHandler(ex.Message, "WebSocket异常信息");
- }
- }
- }
- }
|