WebSocketServer.cs 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323
  1. using Abp.Dependency;
  2. using DotNetty.Transport.Channels;
  3. using DotNettyHelper;
  4. using Newtonsoft.Json;
  5. using System;
  6. using System.Collections.Generic;
  7. using System.ComponentModel;
  8. using System.Linq;
  9. using System.Threading.Tasks;
  10. using ToolLibrary.LogHelper;
  11. using Yunda.ISAS.DataMonitoringServer.DataAnalysis;
  12. using Yunda.ISAS.DataMonitoringServer.DataAnalysis.DataCollection;
  13. using Yunda.ISAS.DataMonitoringServer.DataAnalysis.Helper;
  14. using Yunda.ISAS.DataMonitoringServer.DataAnalysis.Model;
  15. using Yunda.ISAS.DataMonitoringServer.DataCenter;
  16. using Yunda.ISAS.DataMonitoringServer.WebSocket.DotNetty.Server;
  17. using Yunda.ISAS.DataMonitoringServer.WebSocket.Model;
  18. using YunDa.ISAS.DataTransferObject.CommonDto;
  19. using YunDa.ISAS.DataTransferObject.EquipmentLiveData;
  20. using YunDa.ISAS.Redis.Entities.AlarmCategory;
  21. namespace Yunda.ISAS.DataMonitoringServer.WebSocket
  22. {
  23. public class WebSocketServer : ISingletonDependency
  24. {
  25. private readonly ConfigurationHepler _configurationHepler;
  26. private readonly RunningDataCache _runningDataCache;
  27. private readonly TeleCommandDataSendTask _teleCommandDataSendTask;
  28. private readonly WebApiRequest _webApiRequest;
  29. private readonly DotNettyWebSocketServer _dotNettyWebSocketServer;
  30. private readonly RedisDataRepository _redisDataRepository;
  31. public WebSocketServer(ConfigurationHepler configurationHepler,
  32. RunningDataCache runningDataCache,
  33. TeleCommandDataSendTask teleCommandDataSendTask,
  34. WebApiRequest webApiRequest,
  35. RedisDataRepository redisDataRepository,
  36. DotNettyWebSocketServer dotNettyWebSocketServer)
  37. {
  38. _redisDataRepository = redisDataRepository;
  39. _runningDataCache = runningDataCache;
  40. _configurationHepler = configurationHepler;
  41. _teleCommandDataSendTask = teleCommandDataSendTask;
  42. _webApiRequest = webApiRequest;
  43. _dotNettyWebSocketServer = dotNettyWebSocketServer;
  44. }
  45. /// <summary>
  46. /// 打开WebSocket
  47. /// </summary>
  48. public void WebSocketStartAsync(string path, int port)
  49. {
  50. try
  51. {
  52. _dotNettyWebSocketServer.RunServerAsync(path, port);
  53. MonitoringEventBus.LogHandler("WebSocket监听成功", "WebSocket启动信息");
  54. _dotNettyWebSocketServer.HandlerAddedEvent += WebSocket_HandlerAddedEvent;
  55. _dotNettyWebSocketServer.HandlerRemovedEvent += WebSocket_HandlerRemovedEvent;
  56. _dotNettyWebSocketServer.ExceptionCaughtEvent += WebSocket_ExceptionCaughtEvent;
  57. _dotNettyWebSocketServer.ReceiveMessageEvent += WebSocket_ReceiveMessageEvent;
  58. }
  59. catch (Exception ex)
  60. {
  61. MonitoringEventBus.LogHandler(ex.Message, "WebSocket异常信息");
  62. }
  63. }
  64. /// <summary>
  65. /// 关闭WebSocket
  66. /// </summary>
  67. public void WebSocketStop()
  68. {
  69. try
  70. {
  71. _dotNettyWebSocketServer.CloseServerAsync();
  72. MonitoringEventBus.LogHandler("关闭WebSocket", "WebSocket信息");
  73. }
  74. catch (Exception ex)
  75. {
  76. MonitoringEventBus.LogHandler(ex.Message, "WebSocket异常信息");
  77. }
  78. }
  79. /// <summary>
  80. /// WebSocket接收到消息触发的事件
  81. /// </summary>
  82. /// <param name="recMsgEventArgs"></param>
  83. private void WebSocket_ReceiveMessageEvent(ReceiveMessageEventArgs<DataMonitorMessageModel> recMsgEventArgs)
  84. {
  85. var info = JsonConvert.SerializeObject(recMsgEventArgs.Message);
  86. MonitoringEventBus.LogHandler(info, "WebSocket信息");
  87. //TODO接收到消息触发的事件
  88. switch (recMsgEventArgs.Message.MessageType)
  89. {
  90. case MessgeTypeEnum.All:
  91. try
  92. {
  93. MonitoringGroupTypeEnum mGroupType = MonitoringGroupTypeEnum.DEFAULT;
  94. switch (recMsgEventArgs.Message.GroupType)
  95. {
  96. case GroupTypeEnum.DLHJ:
  97. mGroupType = MonitoringGroupTypeEnum.DLHJ;
  98. break;
  99. case GroupTypeEnum.ZXJC:
  100. mGroupType = MonitoringGroupTypeEnum.ZXJC;
  101. break;
  102. }
  103. if (mGroupType == MonitoringGroupTypeEnum.DEFAULT)
  104. recMsgEventArgs.Message.Content = _runningDataCache.EquipmentDataDic.Values.ToList();
  105. else
  106. recMsgEventArgs.Message.Content = _runningDataCache
  107. .EquipmentDataDic.Values.Where(e => e.GroupType == mGroupType).ToList();
  108. SendMsg(recMsgEventArgs.Context, recMsgEventArgs.Message);
  109. }
  110. catch (Exception ex)
  111. {
  112. Log4Helper.Error(this.GetType(), "MessgeTypeEnum.All", ex);
  113. }
  114. break;
  115. case MessgeTypeEnum.RemoteControl:
  116. try
  117. {
  118. Task.Run(() =>
  119. {
  120. TelecommandModel telecommand = JsonConvert.DeserializeObject<TelecommandModel>(recMsgEventArgs.Message.Content.ToString());
  121. if (telecommand == null) return;
  122. _teleCommandDataSendTask.TelecommandTActionBlock.Post(new TelcommandDataModel() { ReceiveMessageEvent = recMsgEventArgs, Telecommand = telecommand });
  123. });
  124. }
  125. catch (Exception ex)
  126. {
  127. Log4Helper.Error(this.GetType(), "MessgeTypeEnum.RemoteControl", ex);
  128. }
  129. break;
  130. case MessgeTypeEnum.ArmingDisArming:
  131. try
  132. {
  133. ArmingDisarmingModel armingDisarmingModel = JsonConvert.DeserializeObject<ArmingDisarmingModel>(recMsgEventArgs.Message.Content.ToString());
  134. if (armingDisarmingModel == null)
  135. {
  136. break;
  137. }
  138. Task.Run(() =>
  139. {
  140. if (armingDisarmingModel.DelaySeconds < 0)
  141. {
  142. armingDisarmingModel.DelaySeconds = 0;
  143. }
  144. _webApiRequest.SendArmsEquipmentsUpdate(armingDisarmingModel, (int)armingDisarmingModel.SafetyStateType);
  145. if (armingDisarmingModel.DelaySeconds == 0)
  146. {
  147. foreach (var item in armingDisarmingModel.EquipmentIds)
  148. {
  149. if (_runningDataCache.EquipmentDataDic.ContainsKey(item))
  150. {
  151. _runningDataCache.EquipmentDataDic[item].SafetyStateType = armingDisarmingModel.SafetyStateType;
  152. DataMonitorMessageModel messageModel = new DataMonitorMessageModel
  153. {
  154. Content = new List<EquipmentDataModel>() { _runningDataCache.EquipmentDataDic[item] },
  155. GroupType = GroupTypeEnum.All,
  156. MessageType = MessgeTypeEnum.Changed
  157. };
  158. SendMsg(messageModel);
  159. }
  160. }
  161. }
  162. });
  163. }
  164. catch (Exception ex)
  165. {
  166. Log4Helper.Error(this.GetType(), " MessgeTypeEnum.ArmingDisArming", ex);
  167. }
  168. break;
  169. case MessgeTypeEnum.AlarmQueueConfirm:
  170. try
  171. {
  172. var alarmConfirmIds = JsonConvert.DeserializeObject<List<Guid>>(recMsgEventArgs.Message.Content.ToString());
  173. if (alarmConfirmIds != null && alarmConfirmIds.Count > 0)
  174. {
  175. _redisDataRepository.AlarmListRedis.DeleteHashKeiesAsync(nameof(AlarmListRedis), alarmConfirmIds);
  176. }
  177. }
  178. catch (Exception ex)
  179. {
  180. Log4Helper.Error(this.GetType(), "确认报警队列出错", ex);
  181. }
  182. break;
  183. case MessgeTypeEnum.AlarmmQueueClear:
  184. try
  185. {
  186. var alarmIds = JsonConvert.DeserializeObject<List<Guid>>(recMsgEventArgs.Message.Content.ToString());
  187. if (alarmIds != null && alarmIds.Count > 0)
  188. {
  189. var dic = _redisDataRepository.AlarmListRedis.HashSetGetDicAll(nameof(AlarmListRedis));
  190. foreach (var id in alarmIds)
  191. {
  192. if (dic.ContainsKey(id.ToString()))
  193. {
  194. var entity = dic[id.ToString()];
  195. if (entity != null)
  196. {
  197. entity.IsClear = true;
  198. _redisDataRepository.AlarmListRedis.HashSetUpdateOneAsync(nameof(AlarmListRedis), id.ToString(), entity);
  199. }
  200. }
  201. }
  202. }
  203. }
  204. catch (Exception ex)
  205. {
  206. Log4Helper.Error(this.GetType(), "清除报警队列出错", ex);
  207. }
  208. break;
  209. case MessgeTypeEnum.AlarmmQueueClearReset:
  210. try
  211. {
  212. var alarmIds = JsonConvert.DeserializeObject<List<Guid>>(recMsgEventArgs.Message.Content.ToString());
  213. if (alarmIds != null && alarmIds.Count > 0)
  214. {
  215. var dic = _redisDataRepository.AlarmListRedis.HashSetGetDicAll(nameof(AlarmListRedis));
  216. foreach (var id in alarmIds)
  217. {
  218. if (dic.ContainsKey(id.ToString()))
  219. {
  220. var entity = dic[id.ToString()];
  221. entity.IsClear = false;
  222. _redisDataRepository.AlarmListRedis.HashSetUpdateOneAsync(nameof(AlarmListRedis), id.ToString(), entity);
  223. }
  224. }
  225. }
  226. }
  227. catch (Exception ex)
  228. {
  229. Log4Helper.Error(this.GetType(), "清除报警队列出错", ex);
  230. }
  231. break;
  232. default:
  233. break;
  234. }
  235. }
  236. /// <summary>
  237. /// WebSocket发生错误触发的事件
  238. /// </summary>
  239. /// <param name="exceptionCaughtEventArgs"></param>
  240. private void WebSocket_ExceptionCaughtEvent(ExceptionCaughtEventArgs exceptionCaughtEventArgs)
  241. {
  242. MonitoringEventBus.LogHandler(exceptionCaughtEventArgs.Context.Channel.RemoteAddress + ":" + exceptionCaughtEventArgs.Exception.ToString(), "异常信息");
  243. }
  244. /// <summary>
  245. /// 客户端断开连接触发的事件
  246. /// </summary>
  247. /// <param name="hanlerEventArgs"></param>
  248. private void WebSocket_HandlerRemovedEvent(HandlerEventArgs hanlerEventArgs)
  249. {
  250. var info = hanlerEventArgs.Context.Channel.RemoteAddress + "离线了!客户端数量:" + _dotNettyWebSocketServer.GetDictionaryChannelGroup().Values.Select(v => v.Count).Sum();
  251. MonitoringEventBus.LogHandler(info, "WebSocket信息");
  252. }
  253. /// <summary>
  254. /// 客户端连接触发的事件
  255. /// </summary>
  256. /// <param name="hanlerEventArgs"></param>
  257. private void WebSocket_HandlerAddedEvent(HandlerEventArgs hanlerEventArgs)
  258. {
  259. var info = hanlerEventArgs.Context.Channel.RemoteAddress + "上线了!客户端数量:" + _dotNettyWebSocketServer.GetDictionaryChannelGroup().Values.Select(v => v.Count).Sum();
  260. MonitoringEventBus.LogHandler(info, "WebSocket信息");
  261. }
  262. JsonSerializerSettings _jsonSerializerSettings = new JsonSerializerSettings()
  263. {
  264. MaxDepth = 4,
  265. };
  266. /// <summary>
  267. /// 群发消息
  268. /// </summary>
  269. /// <param name="msg"></param>
  270. public void SendMsg(DataMonitorMessageModel msg)
  271. {
  272. try
  273. {
  274. var info = $"群发消息:消息组{msg.GroupType},消息内容{msg.MessageType}";// + JsonConvert.SerializeObject(msg, _jsonSerializerSettings).Substring(0, 200);
  275. //MonitoringEventBus.LogHandler(info, "WebSocket信息");
  276. //Log4Helper.Info(this.GetType(), JsonConvert.SerializeObject(msg, _jsonSerializerSettings));
  277. _dotNettyWebSocketServer.Send(msg);
  278. }
  279. catch (Exception ex)
  280. {
  281. MonitoringEventBus.LogHandler(ex.Message, "WebSocket异常信息");
  282. }
  283. }
  284. /// <summary>
  285. /// 指定客户端发送消息
  286. /// </summary>
  287. /// <param name="msg"></param>
  288. public void SendMsg(IChannelHandlerContext context, DataMonitorMessageModel msg)
  289. {
  290. try
  291. {
  292. _dotNettyWebSocketServer.SendAsync(context, msg);
  293. var info = $"群发消息:消息组{msg.GroupType},消息内容{msg.MessageType}";
  294. //var info = JsonConvert.SerializeObject(msg, _jsonSerializerSettings).Substring(0, 200);
  295. Log4Helper.Info(this.GetType(), JsonConvert.SerializeObject(msg, _jsonSerializerSettings));
  296. //MonitoringEventBus.LogHandler(info, "WebSocket发送信息");
  297. }
  298. catch (Exception ex)
  299. {
  300. MonitoringEventBus.LogHandler(ex.Message, "WebSocket异常信息");
  301. }
  302. }
  303. }
  304. }