using Abp.Dependency;
using DotNetty.Transport.Channels;
using DotNettyHelper;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Linq;
using System.Threading.Tasks;
using ToolLibrary.LogHelper;
using Yunda.ISAS.DataMonitoringServer.DataAnalysis;
using Yunda.ISAS.DataMonitoringServer.DataAnalysis.DataCollection;
using Yunda.ISAS.DataMonitoringServer.DataAnalysis.Helper;
using Yunda.ISAS.DataMonitoringServer.DataAnalysis.Model;
using Yunda.ISAS.DataMonitoringServer.DataCenter;
using Yunda.ISAS.DataMonitoringServer.WebSocket.DotNetty.Server;
using Yunda.ISAS.DataMonitoringServer.WebSocket.Model;
using YunDa.ISAS.DataTransferObject.CommonDto;
using YunDa.ISAS.DataTransferObject.EquipmentLiveData;
using YunDa.ISAS.Redis.Entities.AlarmCategory;
namespace Yunda.ISAS.DataMonitoringServer.WebSocket
{
public class WebSocketServer : ISingletonDependency
{
private readonly ConfigurationHepler _configurationHepler;
private readonly RunningDataCache _runningDataCache;
private readonly TeleCommandDataSendTask _teleCommandDataSendTask;
private readonly WebApiRequest _webApiRequest;
private readonly DotNettyWebSocketServer _dotNettyWebSocketServer;
private readonly RedisDataRepository _redisDataRepository;
public WebSocketServer(ConfigurationHepler configurationHepler,
RunningDataCache runningDataCache,
TeleCommandDataSendTask teleCommandDataSendTask,
WebApiRequest webApiRequest,
RedisDataRepository redisDataRepository,
DotNettyWebSocketServer dotNettyWebSocketServer)
{
_redisDataRepository = redisDataRepository;
_runningDataCache = runningDataCache;
_configurationHepler = configurationHepler;
_teleCommandDataSendTask = teleCommandDataSendTask;
_webApiRequest = webApiRequest;
_dotNettyWebSocketServer = dotNettyWebSocketServer;
}
///
/// 打开WebSocket
///
public void WebSocketStartAsync(string path, int port)
{
try
{
_dotNettyWebSocketServer.RunServerAsync(path, port);
MonitoringEventBus.LogHandler("WebSocket监听成功", "WebSocket启动信息");
_dotNettyWebSocketServer.HandlerAddedEvent += WebSocket_HandlerAddedEvent;
_dotNettyWebSocketServer.HandlerRemovedEvent += WebSocket_HandlerRemovedEvent;
_dotNettyWebSocketServer.ExceptionCaughtEvent += WebSocket_ExceptionCaughtEvent;
_dotNettyWebSocketServer.ReceiveMessageEvent += WebSocket_ReceiveMessageEvent;
}
catch (Exception ex)
{
MonitoringEventBus.LogHandler(ex.Message, "WebSocket异常信息");
}
}
///
/// 关闭WebSocket
///
public void WebSocketStop()
{
try
{
_dotNettyWebSocketServer.CloseServerAsync();
MonitoringEventBus.LogHandler("关闭WebSocket", "WebSocket信息");
}
catch (Exception ex)
{
MonitoringEventBus.LogHandler(ex.Message, "WebSocket异常信息");
}
}
///
/// WebSocket接收到消息触发的事件
///
///
private void WebSocket_ReceiveMessageEvent(ReceiveMessageEventArgs recMsgEventArgs)
{
var info = JsonConvert.SerializeObject(recMsgEventArgs.Message);
MonitoringEventBus.LogHandler(info, "WebSocket信息");
//TODO接收到消息触发的事件
switch (recMsgEventArgs.Message.MessageType)
{
case MessgeTypeEnum.All:
try
{
MonitoringGroupTypeEnum mGroupType = MonitoringGroupTypeEnum.DEFAULT;
switch (recMsgEventArgs.Message.GroupType)
{
case GroupTypeEnum.DLHJ:
mGroupType = MonitoringGroupTypeEnum.DLHJ;
break;
case GroupTypeEnum.ZXJC:
mGroupType = MonitoringGroupTypeEnum.ZXJC;
break;
}
if (mGroupType == MonitoringGroupTypeEnum.DEFAULT)
recMsgEventArgs.Message.Content = _runningDataCache.EquipmentDataDic.Values.ToList();
else
recMsgEventArgs.Message.Content = _runningDataCache
.EquipmentDataDic.Values.Where(e => e.GroupType == mGroupType).ToList();
SendMsg(recMsgEventArgs.Context, recMsgEventArgs.Message);
}
catch (Exception ex)
{
Log4Helper.Error(this.GetType(), "MessgeTypeEnum.All", ex);
}
break;
case MessgeTypeEnum.RemoteControl:
try
{
Task.Run(() =>
{
TelecommandModel telecommand = JsonConvert.DeserializeObject(recMsgEventArgs.Message.Content.ToString());
if (telecommand == null) return;
_teleCommandDataSendTask.TelecommandTActionBlock.Post(new TelcommandDataModel() { ReceiveMessageEvent = recMsgEventArgs, Telecommand = telecommand });
});
}
catch (Exception ex)
{
Log4Helper.Error(this.GetType(), "MessgeTypeEnum.RemoteControl", ex);
}
break;
case MessgeTypeEnum.ArmingDisArming:
try
{
ArmingDisarmingModel armingDisarmingModel = JsonConvert.DeserializeObject(recMsgEventArgs.Message.Content.ToString());
if (armingDisarmingModel == null)
{
break;
}
Task.Run(() =>
{
if (armingDisarmingModel.DelaySeconds < 0)
{
armingDisarmingModel.DelaySeconds = 0;
}
_webApiRequest.SendArmsEquipmentsUpdate(armingDisarmingModel, (int)armingDisarmingModel.SafetyStateType);
if (armingDisarmingModel.DelaySeconds == 0)
{
foreach (var item in armingDisarmingModel.EquipmentIds)
{
if (_runningDataCache.EquipmentDataDic.ContainsKey(item))
{
_runningDataCache.EquipmentDataDic[item].SafetyStateType = armingDisarmingModel.SafetyStateType;
DataMonitorMessageModel messageModel = new DataMonitorMessageModel
{
Content = new List() { _runningDataCache.EquipmentDataDic[item] },
GroupType = GroupTypeEnum.All,
MessageType = MessgeTypeEnum.Changed
};
SendMsg(messageModel);
}
}
}
});
}
catch (Exception ex)
{
Log4Helper.Error(this.GetType(), " MessgeTypeEnum.ArmingDisArming", ex);
}
break;
case MessgeTypeEnum.AlarmQueueConfirm:
try
{
var alarmConfirmIds = JsonConvert.DeserializeObject>(recMsgEventArgs.Message.Content.ToString());
if (alarmConfirmIds != null && alarmConfirmIds.Count > 0)
{
_redisDataRepository.AlarmListRedis.DeleteHashKeiesAsync(nameof(AlarmListRedis), alarmConfirmIds);
}
}
catch (Exception ex)
{
Log4Helper.Error(this.GetType(), "确认报警队列出错", ex);
}
break;
case MessgeTypeEnum.AlarmmQueueClear:
try
{
var alarmIds = JsonConvert.DeserializeObject>(recMsgEventArgs.Message.Content.ToString());
if (alarmIds != null && alarmIds.Count > 0)
{
var dic = _redisDataRepository.AlarmListRedis.HashSetGetDicAll(nameof(AlarmListRedis));
foreach (var id in alarmIds)
{
if (dic.ContainsKey(id.ToString()))
{
var entity = dic[id.ToString()];
if (entity != null)
{
entity.IsClear = true;
_redisDataRepository.AlarmListRedis.HashSetUpdateOneAsync(nameof(AlarmListRedis), id.ToString(), entity);
}
}
}
}
}
catch (Exception ex)
{
Log4Helper.Error(this.GetType(), "清除报警队列出错", ex);
}
break;
case MessgeTypeEnum.AlarmmQueueClearReset:
try
{
var alarmIds = JsonConvert.DeserializeObject>(recMsgEventArgs.Message.Content.ToString());
if (alarmIds != null && alarmIds.Count > 0)
{
var dic = _redisDataRepository.AlarmListRedis.HashSetGetDicAll(nameof(AlarmListRedis));
foreach (var id in alarmIds)
{
if (dic.ContainsKey(id.ToString()))
{
var entity = dic[id.ToString()];
entity.IsClear = false;
_redisDataRepository.AlarmListRedis.HashSetUpdateOneAsync(nameof(AlarmListRedis), id.ToString(), entity);
}
}
}
}
catch (Exception ex)
{
Log4Helper.Error(this.GetType(), "清除报警队列出错", ex);
}
break;
default:
break;
}
}
///
/// WebSocket发生错误触发的事件
///
///
private void WebSocket_ExceptionCaughtEvent(ExceptionCaughtEventArgs exceptionCaughtEventArgs)
{
MonitoringEventBus.LogHandler(exceptionCaughtEventArgs.Context.Channel.RemoteAddress + ":" + exceptionCaughtEventArgs.Exception.ToString(), "异常信息");
}
///
/// 客户端断开连接触发的事件
///
///
private void WebSocket_HandlerRemovedEvent(HandlerEventArgs hanlerEventArgs)
{
var info = hanlerEventArgs.Context.Channel.RemoteAddress + "离线了!客户端数量:" + _dotNettyWebSocketServer.GetDictionaryChannelGroup().Values.Select(v => v.Count).Sum();
MonitoringEventBus.LogHandler(info, "WebSocket信息");
}
///
/// 客户端连接触发的事件
///
///
private void WebSocket_HandlerAddedEvent(HandlerEventArgs hanlerEventArgs)
{
var info = hanlerEventArgs.Context.Channel.RemoteAddress + "上线了!客户端数量:" + _dotNettyWebSocketServer.GetDictionaryChannelGroup().Values.Select(v => v.Count).Sum();
MonitoringEventBus.LogHandler(info, "WebSocket信息");
}
JsonSerializerSettings _jsonSerializerSettings = new JsonSerializerSettings()
{
MaxDepth = 4,
};
///
/// 群发消息
///
///
public void SendMsg(DataMonitorMessageModel msg)
{
try
{
var info = $"群发消息:消息组{msg.GroupType},消息内容{msg.MessageType}";// + JsonConvert.SerializeObject(msg, _jsonSerializerSettings).Substring(0, 200);
//MonitoringEventBus.LogHandler(info, "WebSocket信息");
//Log4Helper.Info(this.GetType(), JsonConvert.SerializeObject(msg, _jsonSerializerSettings));
_dotNettyWebSocketServer.Send(msg);
}
catch (Exception ex)
{
MonitoringEventBus.LogHandler(ex.Message, "WebSocket异常信息");
}
}
///
/// 指定客户端发送消息
///
///
public void SendMsg(IChannelHandlerContext context, DataMonitorMessageModel msg)
{
try
{
_dotNettyWebSocketServer.SendAsync(context, msg);
var info = $"群发消息:消息组{msg.GroupType},消息内容{msg.MessageType}";
//var info = JsonConvert.SerializeObject(msg, _jsonSerializerSettings).Substring(0, 200);
Log4Helper.Info(this.GetType(), JsonConvert.SerializeObject(msg, _jsonSerializerSettings));
//MonitoringEventBus.LogHandler(info, "WebSocket发送信息");
}
catch (Exception ex)
{
MonitoringEventBus.LogHandler(ex.Message, "WebSocket异常信息");
}
}
}
}