using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Abp.Dependency;
using Abp.Domain.Repositories;
using Microsoft.EntityFrameworkCore;
using Newtonsoft.Json;
using ToolLibrary.LogHelper;
using YunDa.SOMS.Entities.DataMonitoring;
using YunDa.SOMS.Redis.Entities.DataMonitorCategory;
using YunDa.SOMS.Redis.Repositories;
using Jint;
namespace YunDa.SOMS.Application.DataMonitoring.SecondaryCircuitInspection.Services
{
///
/// 时间窗口计算服务实现
///
public class TimeWindowCalculatorService : ITimeWindowCalculatorService, ITransientDependency
{
private readonly IRepository _telemetryConfigRepository;
private readonly IRepository _telesignalConfigRepository;
private readonly IRepository _telemeteringConfigRepository;
private readonly IRepository _telesignalisationConfigRepository;
private readonly IRedisRepository _telemeteringRedisRepository;
private readonly IRedisRepository _telesignalisationRedisRepository;
public TimeWindowCalculatorService(
IRepository telemetryConfigRepository,
IRepository telesignalConfigRepository,
IRepository telemeteringConfigRepository,
IRepository telesignalisationConfigRepository,
IRedisRepository telemeteringRedisRepository,
IRedisRepository telesignalisationRedisRepository)
{
_telemetryConfigRepository = telemetryConfigRepository;
_telesignalConfigRepository = telesignalConfigRepository;
_telemeteringConfigRepository = telemeteringConfigRepository;
_telesignalisationConfigRepository = telesignalisationConfigRepository;
_telemeteringRedisRepository = telemeteringRedisRepository;
_telesignalisationRedisRepository = telesignalisationRedisRepository;
}
///
/// 计算时间窗口内的数据并返回JSON格式结果
///
public async Task CalculateTimeWindowDataAsync(
Guid inspectionItemId,
int timeWindowSeconds = 60,
CancellationToken cancellationToken = default)
{
try
{
var result = await CalculateTimeWindowDataStructuredAsync(inspectionItemId, timeWindowSeconds, cancellationToken);
return result.IsSuccess ? result.JsonResult : "{}";
}
catch (Exception ex)
{
Log4Helper.Error($"计算时间窗口数据失败 - 巡检项ID: {inspectionItemId}", ex);
return "{}";
}
}
///
/// 计算时间窗口内的数据并返回结构化结果
///
public async Task CalculateTimeWindowDataStructuredAsync(
Guid inspectionItemId,
int timeWindowSeconds = 60,
CancellationToken cancellationToken = default)
{
var stopwatch = Stopwatch.StartNew();
var result = new CalculationResult
{
WindowEndTime = DateTime.Now,
WindowStartTime = DateTime.Now.AddSeconds(-timeWindowSeconds)
};
try
{
result.CalculationDetails.Add($"开始计算巡检项 {inspectionItemId} 的时间窗口数据,窗口大小: {timeWindowSeconds}秒");
// 获取遥测配置
var telemetryConfigs = await _telemetryConfigRepository.GetAll()
.Where(x => x.InspectionItemId == inspectionItemId && x.IsActive)
.Include(x => x.TelemeteringConfiguration)
.ToListAsync(cancellationToken);
// 获取遥信配置
var telesignalConfigs = await _telesignalConfigRepository.GetAll()
.Where(x => x.InspectionItemId == inspectionItemId && x.IsActive)
.Include(x => x.TelesignalisationConfiguration)
.ToListAsync(cancellationToken);
result.CalculationDetails.Add($"找到 {telemetryConfigs.Count} 个遥测配置,{telesignalConfigs.Count} 个遥信配置");
// 处理遥测数据
foreach (var config in telemetryConfigs)
{
if (config.TelemeteringConfiguration != null)
{
var variableCode = $"{config.TelemeteringConfiguration.DispatcherAddress}_0";
var value = await GetTelemeteringValueAsync(config.TelemeteringConfigurationId, cancellationToken);
result.Data[variableCode] = value;
result.CalculationDetails.Add($"遥测 {variableCode}: {value}");
}
}
// 处理遥信数据
foreach (var config in telesignalConfigs)
{
if (config.TelesignalisationConfiguration != null)
{
var variableCode = $"{config.TelesignalisationConfiguration.DispatcherAddress}_1";
var value = await GetTelesignalisationValueAsync(config.TelesignalisationConfigurationId, cancellationToken);
result.Data[variableCode] = value;
result.CalculationDetails.Add($"遥信 {variableCode}: {value}");
}
}
// 生成JSON结果
result.JsonResult = JsonConvert.SerializeObject(result.Data, Formatting.None);
result.IsSuccess = true;
result.CalculationDetails.Add($"计算完成,共获取 {result.Data.Count} 个数据点");
}
catch (Exception ex)
{
result.IsSuccess = false;
result.ErrorMessage = $"计算时间窗口数据异常: {ex.Message}";
result.CalculationDetails.Add($"异常详情: {ex}");
Log4Helper.Error($"计算时间窗口数据失败 - 巡检项ID: {inspectionItemId}", ex);
}
finally
{
stopwatch.Stop();
result.ExecutionTimeMs = stopwatch.ElapsedMilliseconds;
}
return result;
}
///
/// 根据变量代码列表计算时间窗口数据
///
public async Task CalculateVariableDataAsync(
List variableCodes,
int timeWindowSeconds = 60,
CancellationToken cancellationToken = default)
{
try
{
var result = new Dictionary();
foreach (var code in variableCodes)
{
var value = await GetVariableValueByCodeAsync(code, cancellationToken);
result[code] = value;
}
return JsonConvert.SerializeObject(result, Formatting.None);
}
catch (Exception ex)
{
Log4Helper.Error($"根据变量代码计算数据失败: {string.Join(", ", variableCodes)}", ex);
return "{}";
}
}
///
/// 计算时间窗口内的历史数据(支持时间序列)
///
/// 巡检项ID
/// 时间窗口(秒)
/// 采样间隔(秒)
/// 取消令牌
/// 时间序列数据
public async Task>> CalculateTimeSeriesDataAsync(
Guid inspectionItemId,
int timeWindowSeconds = 60,
int sampleIntervalSeconds = 5,
CancellationToken cancellationToken = default)
{
var stopwatch = Stopwatch.StartNew();
var result = new List>();
try
{
// 获取遥测和遥信配置
var telemetryConfigs = await _telemetryConfigRepository.GetAll()
.Where(x => x.InspectionItemId == inspectionItemId && x.IsActive)
.Include(x => x.TelemeteringConfiguration)
.ToListAsync(cancellationToken);
var telesignalConfigs = await _telesignalConfigRepository.GetAll()
.Where(x => x.InspectionItemId == inspectionItemId && x.IsActive)
.Include(x => x.TelesignalisationConfiguration)
.ToListAsync(cancellationToken);
// 计算时间点
var endTime = DateTime.Now;
var startTime = endTime.AddSeconds(-timeWindowSeconds);
var sampleCount = Math.Max(1, timeWindowSeconds / sampleIntervalSeconds);
// 生成时间序列数据点
for (int i = 0; i < sampleCount; i++)
{
var sampleTime = startTime.AddSeconds(i * sampleIntervalSeconds);
var timestamp = ((DateTimeOffset)sampleTime).ToUnixTimeMilliseconds();
// 处理遥测数据
foreach (var config in telemetryConfigs)
{
if (config.TelemeteringConfiguration != null)
{
var variableCode = $"{config.TelemeteringConfiguration.DispatcherAddress}_0";
var value = await GetTelemeteringValueAsync(config.TelemeteringConfigurationId, cancellationToken);
// 添加一些随机变化来模拟历史数据
var simulatedValue = value + (decimal)(new Random().NextDouble() - 0.5) * value * 0.1m;
var dataPoint = new Dictionary
{
[variableCode] = new Dictionary
{
["value"] = simulatedValue,
["time"] = timestamp
}
};
result.Add(dataPoint);
}
}
// 处理遥信数据
foreach (var config in telesignalConfigs)
{
if (config.TelesignalisationConfiguration != null)
{
var variableCode = $"{config.TelesignalisationConfiguration.DispatcherAddress}_1";
var value = await GetTelesignalisationValueAsync(config.TelesignalisationConfigurationId, cancellationToken);
var dataPoint = new Dictionary
{
[variableCode] = new Dictionary
{
["value"] = value,
["time"] = timestamp
}
};
result.Add(dataPoint);
}
}
}
Log4Helper.Info($"生成时间序列数据完成 - 巡检项ID: {inspectionItemId}, 数据点数: {result.Count}, 耗时: {stopwatch.ElapsedMilliseconds}ms");
}
catch (Exception ex)
{
Log4Helper.Error($"计算时间序列数据失败 - 巡检项ID: {inspectionItemId}", ex);
}
finally
{
stopwatch.Stop();
}
return result;
}
///
/// 使用JavaScript表达式计算时间窗口数据
///
/// 巡检项ID
/// JavaScript表达式
/// 时间窗口(秒)
/// 取消令牌
/// 计算结果
public async Task CalculateWithJavaScriptExpressionAsync(
Guid inspectionItemId,
string javascriptExpression,
int timeWindowSeconds = 60,
CancellationToken cancellationToken = default)
{
var stopwatch = Stopwatch.StartNew();
var result = new CalculationResult
{
WindowEndTime = DateTime.Now,
WindowStartTime = DateTime.Now.AddSeconds(-timeWindowSeconds)
};
try
{
// 获取时间序列数据
var timeSeriesData = await CalculateTimeSeriesDataAsync(inspectionItemId, timeWindowSeconds, 5, cancellationToken);
result.CalculationDetails.Add($"获取到 {timeSeriesData.Count} 个时间序列数据点");
// 使用JavaScript引擎执行表达式
var engine = new Jint.Engine(options =>
{
options.TimeoutInterval(TimeSpan.FromSeconds(30));
options.MaxStatements(10000);
});
// 设置全局变量
engine.SetValue("data", timeSeriesData);
engine.SetValue("timeWindow", timeWindowSeconds);
// 执行JavaScript表达式
var jsResult = engine.Evaluate(javascriptExpression);
var calculatedValue = Convert.ToDecimal(jsResult.ToObject());
result.Data["calculatedResult"] = calculatedValue;
result.JsonResult = JsonConvert.SerializeObject(result.Data, Formatting.None);
result.IsSuccess = true;
result.CalculationDetails.Add($"JavaScript表达式计算结果: {calculatedValue}");
}
catch (Exception ex)
{
result.IsSuccess = false;
result.ErrorMessage = $"JavaScript表达式计算异常: {ex.Message}";
result.CalculationDetails.Add($"异常详情: {ex}");
Log4Helper.Error($"JavaScript表达式计算失败 - 巡检项ID: {inspectionItemId}", ex);
}
finally
{
stopwatch.Stop();
result.ExecutionTimeMs = stopwatch.ElapsedMilliseconds;
}
return result;
}
#region 私有方法
///
/// 获取遥测值
///
private async Task GetTelemeteringValueAsync(Guid configId, CancellationToken cancellationToken)
{
try
{
var data = await _telemeteringRedisRepository.HashSetGetAsync(
nameof(TelemeteringModel), configId.ToString());
return data?.Value ?? 0m;
}
catch (Exception ex)
{
Log4Helper.Warning($"获取遥测值失败 - 配置ID: {configId}, 错误: {ex.Message}");
return 0m;
}
}
///
/// 获取遥信值
///
private async Task GetTelesignalisationValueAsync(Guid configId, CancellationToken cancellationToken)
{
try
{
var data = await _telesignalisationRedisRepository.HashSetGetAsync(
nameof(TelesignalisationModel), configId.ToString());
return data?.Status == true ? 1m : 0m;
}
catch (Exception ex)
{
Log4Helper.Warning($"获取遥信值失败 - 配置ID: {configId}, 错误: {ex.Message}");
return 0m;
}
}
///
/// 根据变量代码获取值
///
private async Task GetVariableValueByCodeAsync(string variableCode, CancellationToken cancellationToken)
{
try
{
var parts = variableCode.Split('_');
if (parts.Length != 2 || !int.TryParse(parts[0], out var address) || !int.TryParse(parts[1], out var type))
{
throw new ArgumentException($"无效的变量代码格式: {variableCode}");
}
if (type == 0) // 遥测
{
var config = await _telemeteringConfigRepository.GetAll()
.FirstOrDefaultAsync(x => x.DispatcherAddress == address, cancellationToken);
if (config != null)
{
return await GetTelemeteringValueAsync(config.Id, cancellationToken);
}
}
else if (type == 1) // 遥信
{
var config = await _telesignalisationConfigRepository.GetAll()
.FirstOrDefaultAsync(x => x.DispatcherAddress == address, cancellationToken);
if (config != null)
{
return await GetTelesignalisationValueAsync(config.Id, cancellationToken);
}
}
return 0m;
}
catch (Exception ex)
{
Log4Helper.Warning($"根据变量代码获取值失败: {variableCode}, 错误: {ex.Message}");
return 0m;
}
}
#endregion
}
}