using Abp.Dependency; using System.Threading.Tasks.Dataflow; using Yunda.ISAS.DataMonitoringServer.DataCenter; using Yunda.ISAS.MongoDB.Entities.DataMonitoring; using YunDa.ISAS.DataTransferObject.EquipmentLiveData; using ConstantModel = Yunda.ISAS.DataMonitoringServer.DataAnalysis.Model.ConstantModel; namespace Yunda.ISAS.DataMonitoringServer.DataAnalysis.TeleInfoSave { public class TelemeteringResultSaveTask : ISingletonDependency { public ActionBlock SaveTelemeteringResultActionBlock; private readonly DataRepository _dataRepository; private readonly RunningDataCache _runningDataCache; public TelemeteringResultSaveTask(DataRepository dataRepository, RunningDataCache runningDataCache ) { _dataRepository = dataRepository; _runningDataCache = runningDataCache; Action SaveTelemeteringResultAction = rstModel => { try { //直接保存到数据库 dataRepository.TelemeteringResultRepository.CollectionName = typeof(TelemeteringResult).Name + "_" + DateTime.Now.ToString("yyyyMMdd"); dataRepository.TelemeteringResultRepository.InsertOne(rstModel); } catch (Exception ex) { MonitoringEventBus.LogHandler(ex.Message, "异常信息"); } }; SaveTelemeteringResultActionBlock = new ActionBlock(SaveTelemeteringResultAction); } //public event LogMessageDelegate LogMessageEvent; private static double _saveInterval = ConstantModel.DefaultInterval; /// /// 保存时间间隔 /// public static double SaveInterval { get { return _saveInterval; } set { _saveInterval = value >= 1000 ? value : ConstantModel.DefaultInterval; } } /// /// 遥测统计数据前一次记录秒数 /// private int _preHour = -1; private bool IsStartedSaveTask = false; public void SaveStart() { if (!IsStartedSaveTask) { Task.Factory.StartNew(() => { while (true) { try { DateTime dateTime = DateTime.Now; List rstModels = new List(); foreach (EquipmentDataModel equipment in _runningDataCache.EquipmentDataDic.Values) { TelesignalisationModel singnalModel = equipment.Telesignalisations.Where(singnal => singnal.IsCommStatus && singnal.ResultValue == singnal.CommValue).FirstOrDefault(); if (singnalModel != null) continue; foreach (TelemeteringModel telmetering in equipment.Telemeterings) { if (telmetering.ResultValue == ConstantModel.DefaultValue) continue; rstModels.Add(new TelemeteringResult { ResultValue = telmetering.ResultValue, ResultTime = dateTime, TelemeteringConfigurationId = telmetering.Id, SaveMethod = 1 }); lock (_lockObj) { if (_runningDataCache.TelemeteringHourStatisticsDic.ContainsKey(telmetering.Id)) _runningDataCache.TelemeteringHourStatisticsDic[telmetering.Id].SetValue(telmetering.ResultValue); } } } SaveDatasAsync(rstModels)?.Wait(); if (_preHour != DateTime.Now.Hour) { SaveHourDataAsync().Wait(); _preHour = DateTime.Now.Hour; } } catch (Exception ex) { MonitoringEventBus.LogHandler(ex.Message, "异常信息"); } Task.Delay((int)SaveInterval).Wait(); } }); } IsStartedSaveTask = true; } private object _lockObj = new object(); /// /// 保存实时数据 /// /// private Task SaveDatasAsync(List rstModels) { if (rstModels == null || rstModels.Count == 0) return null; return Task.Run(() => { try { //直接保存到数据库 _dataRepository.TelemeteringResultRepository.CollectionName = typeof(TelemeteringResult).Name + "_" + DateTime.Now.ToString("yyyyMMdd"); _dataRepository.TelemeteringResultRepository.InsertMany(rstModels); } catch (Exception ex) { MonitoringEventBus.LogHandler(ex.Message, "异常信息"); } }); } /// /// 保存小时统计数据 /// private Task SaveHourDataAsync() { return Task.Run(() => { try { lock (_lockObj) { //保存统计数据 var hourStatistics = _runningDataCache.TelemeteringHourStatisticsDic.Values.Where(s => s.StatisticsCount > 0); if (hourStatistics!=null&& hourStatistics.Count()>0) { var telemeteringResults = hourStatistics.Select(t => new TelemeteringHourStatisticsResult() { StatisticsCount = t.StatisticsCount , TelemeteringConfigurationId = t.TelemeteringConfigurationId, AVG = t.AVG, StatisticsDateTime = t.StatisticsDateTime, First = t.First, Last = t.Last, Max = t.Max, Min = t.Min }); _dataRepository.TelemeteringHourStatisticsResultRepository.InsertMany(telemeteringResults); } //整点,初始化统计数据 foreach (var hourSt in _runningDataCache.TelemeteringHourStatisticsDic) { hourSt.Value.InitValue(); } } } catch (Exception ex) { MonitoringEventBus.LogHandler(ex.Message, "异常信息"); //LogMessageEvent?.Invoke(new LogMessageEventArgs(ex.Message)); } }); } public void SaveStop() { try { //if (_timer == null) return; //_timer.Stop(); } catch (Exception ex) { MonitoringEventBus.LogHandler(ex.Message, "异常信息"); //LogMessageEvent?.Invoke(new LogMessageEventArgs(ex.Message)); } } } }