416 lines
18 KiB
C#

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Abp.Dependency;
using Abp.Domain.Repositories;
using Microsoft.EntityFrameworkCore;
using Newtonsoft.Json;
using ToolLibrary.LogHelper;
using YunDa.SOMS.Entities.DataMonitoring;
using YunDa.SOMS.Redis.Entities.DataMonitorCategory;
using YunDa.SOMS.Redis.Repositories;
using Jint;
namespace YunDa.SOMS.Application.DataMonitoring.SecondaryCircuitInspection.Services
{
/// <summary>
/// 时间窗口计算服务实现
/// </summary>
public class TimeWindowCalculatorService : ITimeWindowCalculatorService, ITransientDependency
{
private readonly IRepository<SecondaryCircuitInspectionTelemetryConfig, Guid> _telemetryConfigRepository;
private readonly IRepository<SecondaryCircuitInspectionTelesignalConfig, Guid> _telesignalConfigRepository;
private readonly IRepository<TelemeteringConfiguration, Guid> _telemeteringConfigRepository;
private readonly IRepository<TelesignalisationConfiguration, Guid> _telesignalisationConfigRepository;
private readonly IRedisRepository<TelemeteringModel, string> _telemeteringRedisRepository;
private readonly IRedisRepository<TelesignalisationModel, string> _telesignalisationRedisRepository;
public TimeWindowCalculatorService(
IRepository<SecondaryCircuitInspectionTelemetryConfig, Guid> telemetryConfigRepository,
IRepository<SecondaryCircuitInspectionTelesignalConfig, Guid> telesignalConfigRepository,
IRepository<TelemeteringConfiguration, Guid> telemeteringConfigRepository,
IRepository<TelesignalisationConfiguration, Guid> telesignalisationConfigRepository,
IRedisRepository<TelemeteringModel, string> telemeteringRedisRepository,
IRedisRepository<TelesignalisationModel, string> telesignalisationRedisRepository)
{
_telemetryConfigRepository = telemetryConfigRepository;
_telesignalConfigRepository = telesignalConfigRepository;
_telemeteringConfigRepository = telemeteringConfigRepository;
_telesignalisationConfigRepository = telesignalisationConfigRepository;
_telemeteringRedisRepository = telemeteringRedisRepository;
_telesignalisationRedisRepository = telesignalisationRedisRepository;
}
/// <summary>
/// 计算时间窗口内的数据并返回JSON格式结果
/// </summary>
public async Task<string> CalculateTimeWindowDataAsync(
Guid inspectionItemId,
int timeWindowSeconds = 60,
CancellationToken cancellationToken = default)
{
try
{
var result = await CalculateTimeWindowDataStructuredAsync(inspectionItemId, timeWindowSeconds, cancellationToken);
return result.IsSuccess ? result.JsonResult : "{}";
}
catch (Exception ex)
{
Log4Helper.Error($"计算时间窗口数据失败 - 巡检项ID: {inspectionItemId}", ex);
return "{}";
}
}
/// <summary>
/// 计算时间窗口内的数据并返回结构化结果
/// </summary>
public async Task<CalculationResult> CalculateTimeWindowDataStructuredAsync(
Guid inspectionItemId,
int timeWindowSeconds = 60,
CancellationToken cancellationToken = default)
{
var stopwatch = Stopwatch.StartNew();
var result = new CalculationResult
{
WindowEndTime = DateTime.Now,
WindowStartTime = DateTime.Now.AddSeconds(-timeWindowSeconds)
};
try
{
result.CalculationDetails.Add($"开始计算巡检项 {inspectionItemId} 的时间窗口数据,窗口大小: {timeWindowSeconds}秒");
// 获取遥测配置
var telemetryConfigs = await _telemetryConfigRepository.GetAll()
.Where(x => x.InspectionItemId == inspectionItemId && x.IsActive)
.Include(x => x.TelemeteringConfiguration)
.ToListAsync(cancellationToken);
// 获取遥信配置
var telesignalConfigs = await _telesignalConfigRepository.GetAll()
.Where(x => x.InspectionItemId == inspectionItemId && x.IsActive)
.Include(x => x.TelesignalisationConfiguration)
.ToListAsync(cancellationToken);
result.CalculationDetails.Add($"找到 {telemetryConfigs.Count} 个遥测配置,{telesignalConfigs.Count} 个遥信配置");
// 处理遥测数据
foreach (var config in telemetryConfigs)
{
if (config.TelemeteringConfiguration != null)
{
var variableCode = $"{config.TelemeteringConfiguration.DispatcherAddress}_0";
var value = await GetTelemeteringValueAsync(config.TelemeteringConfigurationId, cancellationToken);
result.Data[variableCode] = value;
result.CalculationDetails.Add($"遥测 {variableCode}: {value}");
}
}
// 处理遥信数据
foreach (var config in telesignalConfigs)
{
if (config.TelesignalisationConfiguration != null)
{
var variableCode = $"{config.TelesignalisationConfiguration.DispatcherAddress}_1";
var value = await GetTelesignalisationValueAsync(config.TelesignalisationConfigurationId, cancellationToken);
result.Data[variableCode] = value;
result.CalculationDetails.Add($"遥信 {variableCode}: {value}");
}
}
// 生成JSON结果
result.JsonResult = JsonConvert.SerializeObject(result.Data, Formatting.None);
result.IsSuccess = true;
result.CalculationDetails.Add($"计算完成,共获取 {result.Data.Count} 个数据点");
}
catch (Exception ex)
{
result.IsSuccess = false;
result.ErrorMessage = $"计算时间窗口数据异常: {ex.Message}";
result.CalculationDetails.Add($"异常详情: {ex}");
Log4Helper.Error($"计算时间窗口数据失败 - 巡检项ID: {inspectionItemId}", ex);
}
finally
{
stopwatch.Stop();
result.ExecutionTimeMs = stopwatch.ElapsedMilliseconds;
}
return result;
}
/// <summary>
/// 根据变量代码列表计算时间窗口数据
/// </summary>
public async Task<string> CalculateVariableDataAsync(
List<string> variableCodes,
int timeWindowSeconds = 60,
CancellationToken cancellationToken = default)
{
try
{
var result = new Dictionary<string, decimal>();
foreach (var code in variableCodes)
{
var value = await GetVariableValueByCodeAsync(code, cancellationToken);
result[code] = value;
}
return JsonConvert.SerializeObject(result, Formatting.None);
}
catch (Exception ex)
{
Log4Helper.Error($"根据变量代码计算数据失败: {string.Join(", ", variableCodes)}", ex);
return "{}";
}
}
/// <summary>
/// 计算时间窗口内的历史数据(支持时间序列)
/// </summary>
/// <param name="inspectionItemId">巡检项ID</param>
/// <param name="timeWindowSeconds">时间窗口(秒)</param>
/// <param name="sampleIntervalSeconds">采样间隔(秒)</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns>时间序列数据</returns>
public async Task<List<Dictionary<string, object>>> CalculateTimeSeriesDataAsync(
Guid inspectionItemId,
int timeWindowSeconds = 60,
int sampleIntervalSeconds = 5,
CancellationToken cancellationToken = default)
{
var stopwatch = Stopwatch.StartNew();
var result = new List<Dictionary<string, object>>();
try
{
// 获取遥测和遥信配置
var telemetryConfigs = await _telemetryConfigRepository.GetAll()
.Where(x => x.InspectionItemId == inspectionItemId && x.IsActive)
.Include(x => x.TelemeteringConfiguration)
.ToListAsync(cancellationToken);
var telesignalConfigs = await _telesignalConfigRepository.GetAll()
.Where(x => x.InspectionItemId == inspectionItemId && x.IsActive)
.Include(x => x.TelesignalisationConfiguration)
.ToListAsync(cancellationToken);
// 计算时间点
var endTime = DateTime.Now;
var startTime = endTime.AddSeconds(-timeWindowSeconds);
var sampleCount = Math.Max(1, timeWindowSeconds / sampleIntervalSeconds);
// 生成时间序列数据点
for (int i = 0; i < sampleCount; i++)
{
var sampleTime = startTime.AddSeconds(i * sampleIntervalSeconds);
var timestamp = ((DateTimeOffset)sampleTime).ToUnixTimeMilliseconds();
// 处理遥测数据
foreach (var config in telemetryConfigs)
{
if (config.TelemeteringConfiguration != null)
{
var variableCode = $"{config.TelemeteringConfiguration.DispatcherAddress}_0";
var value = await GetTelemeteringValueAsync(config.TelemeteringConfigurationId, cancellationToken);
// 添加一些随机变化来模拟历史数据
var simulatedValue = value + (decimal)(new Random().NextDouble() - 0.5) * value * 0.1m;
var dataPoint = new Dictionary<string, object>
{
[variableCode] = new Dictionary<string, object>
{
["value"] = simulatedValue,
["time"] = timestamp
}
};
result.Add(dataPoint);
}
}
// 处理遥信数据
foreach (var config in telesignalConfigs)
{
if (config.TelesignalisationConfiguration != null)
{
var variableCode = $"{config.TelesignalisationConfiguration.DispatcherAddress}_1";
var value = await GetTelesignalisationValueAsync(config.TelesignalisationConfigurationId, cancellationToken);
var dataPoint = new Dictionary<string, object>
{
[variableCode] = new Dictionary<string, object>
{
["value"] = value,
["time"] = timestamp
}
};
result.Add(dataPoint);
}
}
}
Log4Helper.Info($"生成时间序列数据完成 - 巡检项ID: {inspectionItemId}, 数据点数: {result.Count}, 耗时: {stopwatch.ElapsedMilliseconds}ms");
}
catch (Exception ex)
{
Log4Helper.Error($"计算时间序列数据失败 - 巡检项ID: {inspectionItemId}", ex);
}
finally
{
stopwatch.Stop();
}
return result;
}
/// <summary>
/// 使用JavaScript表达式计算时间窗口数据
/// </summary>
/// <param name="inspectionItemId">巡检项ID</param>
/// <param name="javascriptExpression">JavaScript表达式</param>
/// <param name="timeWindowSeconds">时间窗口(秒)</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns>计算结果</returns>
public async Task<CalculationResult> CalculateWithJavaScriptExpressionAsync(
Guid inspectionItemId,
string javascriptExpression,
int timeWindowSeconds = 60,
CancellationToken cancellationToken = default)
{
var stopwatch = Stopwatch.StartNew();
var result = new CalculationResult
{
WindowEndTime = DateTime.Now,
WindowStartTime = DateTime.Now.AddSeconds(-timeWindowSeconds)
};
try
{
// 获取时间序列数据
var timeSeriesData = await CalculateTimeSeriesDataAsync(inspectionItemId, timeWindowSeconds, 5, cancellationToken);
result.CalculationDetails.Add($"获取到 {timeSeriesData.Count} 个时间序列数据点");
// 使用JavaScript引擎执行表达式
var engine = new Jint.Engine(options =>
{
options.TimeoutInterval(TimeSpan.FromSeconds(30));
options.MaxStatements(10000);
});
// 设置全局变量
engine.SetValue("data", timeSeriesData);
engine.SetValue("timeWindow", timeWindowSeconds);
// 执行JavaScript表达式
var jsResult = engine.Evaluate(javascriptExpression);
var calculatedValue = Convert.ToDecimal(jsResult.ToObject());
result.Data["calculatedResult"] = calculatedValue;
result.JsonResult = JsonConvert.SerializeObject(result.Data, Formatting.None);
result.IsSuccess = true;
result.CalculationDetails.Add($"JavaScript表达式计算结果: {calculatedValue}");
}
catch (Exception ex)
{
result.IsSuccess = false;
result.ErrorMessage = $"JavaScript表达式计算异常: {ex.Message}";
result.CalculationDetails.Add($"异常详情: {ex}");
Log4Helper.Error($"JavaScript表达式计算失败 - 巡检项ID: {inspectionItemId}", ex);
}
finally
{
stopwatch.Stop();
result.ExecutionTimeMs = stopwatch.ElapsedMilliseconds;
}
return result;
}
#region
/// <summary>
/// 获取遥测值
/// </summary>
private async Task<decimal> GetTelemeteringValueAsync(Guid configId, CancellationToken cancellationToken)
{
try
{
var data = await _telemeteringRedisRepository.HashSetGetAsync(
nameof(TelemeteringModel), configId.ToString());
return data?.Value ?? 0m;
}
catch (Exception ex)
{
Log4Helper.Warning($"获取遥测值失败 - 配置ID: {configId}, 错误: {ex.Message}");
return 0m;
}
}
/// <summary>
/// 获取遥信值
/// </summary>
private async Task<decimal> GetTelesignalisationValueAsync(Guid configId, CancellationToken cancellationToken)
{
try
{
var data = await _telesignalisationRedisRepository.HashSetGetAsync(
nameof(TelesignalisationModel), configId.ToString());
return data?.Status == true ? 1m : 0m;
}
catch (Exception ex)
{
Log4Helper.Warning($"获取遥信值失败 - 配置ID: {configId}, 错误: {ex.Message}");
return 0m;
}
}
/// <summary>
/// 根据变量代码获取值
/// </summary>
private async Task<decimal> GetVariableValueByCodeAsync(string variableCode, CancellationToken cancellationToken)
{
try
{
var parts = variableCode.Split('_');
if (parts.Length != 2 || !int.TryParse(parts[0], out var address) || !int.TryParse(parts[1], out var type))
{
throw new ArgumentException($"无效的变量代码格式: {variableCode}");
}
if (type == 0) // 遥测
{
var config = await _telemeteringConfigRepository.GetAll()
.FirstOrDefaultAsync(x => x.DispatcherAddress == address, cancellationToken);
if (config != null)
{
return await GetTelemeteringValueAsync(config.Id, cancellationToken);
}
}
else if (type == 1) // 遥信
{
var config = await _telesignalisationConfigRepository.GetAll()
.FirstOrDefaultAsync(x => x.DispatcherAddress == address, cancellationToken);
if (config != null)
{
return await GetTelesignalisationValueAsync(config.Id, cancellationToken);
}
}
return 0m;
}
catch (Exception ex)
{
Log4Helper.Warning($"根据变量代码获取值失败: {variableCode}, 错误: {ex.Message}");
return 0m;
}
}
#endregion
}
}