10 KiB
10 KiB
遥测数据存储架构文档 (Telemetry Data Storage Architecture)
概述 (Overview)
本文档详细描述了SOMS系统中遥测数据的存储机制、统计数据处理方法和存储策略。系统采用Redis+MongoDB双存储架构,实现高性能实时数据处理和可靠的历史数据存储。
🎯 领导要求实施 - 定时保存模式强制1分钟间隔
核心要求
- 定时保存模式:强制执行1分钟间隔,忽略用户配置
- 适用范围:所有数据源类别(综自、配电、辅控、在线监测、机器人)
- Cron表达式:
"0 * * * * ?"
- 每分钟第0秒执行 - 配置覆盖:自动忽略
ConnectionConfig.SaveInterval
设置
保存模式说明
SaveModeEnum.Interval
(定时保存):🎯 强制1分钟间隔SaveModeEnum.Change
(变更保存):按数据变化触发保存SaveModeEnum.Both
(定时+变更):🎯 定时部分强制1分钟间隔 + 变更触发
系统架构 (System Architecture)
双存储架构
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ 数据采集层 │ │ Redis缓存层 │ │ MongoDB持久层 │
│ Data Collection │ │ Redis Cache │ │ MongoDB Store │
├─────────────────┤ ├─────────────────┤ ├─────────────────┤
│ • IEC104协议 │───▶│ • 实时数据缓存 │───▶│ • 历史数据存储 │
│ • 综自/配电/辅控 │ │ • 当日数据存储 │ │ • 统计数据归档 │
│ • 在线监测 │ │ • 2天TTL │ │ • 按年份分表 │
│ • 机器人巡检 │ │ • 高速读写 │ │ • 长期存储 │
└─────────────────┘ └─────────────────┘ └─────────────────┘
数据流向图
graph TD
A[数据采集] --> B[TelemeteringResultSaveTask]
B --> C{保存模式判断}
C -->|定时保存/组合模式| D[🎯 强制1分钟间隔]
C -->|变更保存| E[数据变化触发]
D --> F[Quartz调度任务]
E --> G[实时数据处理]
F --> H[Redis存储]
G --> H
H --> I[三层Redis存储]
I --> J[统计结果Redis]
I --> K[分桶数据Redis]
I --> L[统计分桶Redis]
J --> M[MongoDB归档]
K --> M
L --> M
M --> N[按年份分表存储]
N --> O[TelemeteringStatisticsResult_Interval_Year]
核心组件 (Core Components)
1. TelemeteringResultSaveTask
主要职责:
- 遥测数据保存任务处理器
- 实施领导要求的1分钟间隔强制策略
- 管理Quartz调度任务
- 提供数据缓存和批量保存优化
关键方法:
// 🎯 领导要求:强制1分钟间隔的Cron表达式生成
private string GetCronExpressionForIntervalMode(FixedIntervalEnum interval, ConnectionConfig connection)
{
return interval switch
{
FixedIntervalEnum.Minute1 => "0 * * * * ?", // 每分钟第0秒执行
FixedIntervalEnum.Day1 => "0 0 2 * * ?", // 每天凌晨2点执行日统计
_ => throw new NotSupportedException($"定时保存模式仅支持1分钟间隔和日统计")
};
}
// 配置验证和警告记录
private void ValidateIntervalModeConfiguration(ConnectionConfig connection)
2. 数据存储策略
Redis存储(实时/当日数据)
- 存储期限:2天TTL
- 数据类型:
- 统计结果:
telemetering_statistics_result:{yyyyMMdd}:{telemeteringId}
- 分桶数据:
telemetering_bucket:{yyyyMMdd}:{telemeteringId}
- 统计分桶:
telemetering_statistics_bucket:{yyyyMMdd}:{telemeteringId}
- 统计结果:
MongoDB存储(历史数据)
- 分表策略:按年份和间隔类型分表
- 集合命名:
TelemeteringStatisticsResult_{interval}_{year}
- 索引优化:时间、设备ID、统计类型复合索引
3. Quartz调度系统
任务类型
-
1分钟间隔任务(🎯 领导要求)
- Cron:
"0 * * * * ?"
- 处理实时数据保存
- 适用于定时和组合保存模式
- Cron:
-
日统计任务
- Cron:
"0 0 2 * * ?"
- 凌晨2点执行
- 生成日统计数据
- Cron:
任务配置
// 为每个连接创建两种任务
var intervals = new[] { FixedIntervalEnum.Minute1, FixedIntervalEnum.Day1 };
// 🎯 领导要求验证
ValidateIntervalModeConfiguration(connection);
// 创建Quartz任务
IJobDetail job = JobBuilder.Create<TelemeteringSaveJob>()
.WithIdentity(jobKey)
.UsingJobData(jobDataMap)
.Build();
数据处理流程 (Data Processing Flow)
1. 数据接收与缓存
public async Task SaveWithCacheAsync(TelemeteringModel model, ConnectionConfig connectionConfig)
{
// 1. 数据验证
if (!ValidateTelemetryData(model)) return;
// 2. 缓存存储(线程安全队列)
var queue = _cache.GetOrAdd(key, _ => new ConcurrentQueue<TelemeteringModel>());
queue.Enqueue(model);
// 3. 存储当天数据到Redis
await StoreDailyDataToRedis(model, connectionConfig);
// 4. 性能指标更新
TelemetryMetrics.IncrementCounter(TelemetryMetrics.Counters.TELEMETRY_RECEIVED);
}
2. Redis三层存储
private async Task StoreDailyDataToRedis(TelemeteringModel model, ConnectionConfig connectionConfig)
{
var today = DateTime.Now.Date;
var equipmentName = GetEquipmentName(model);
// 1. 存储到遥测统计结果Redis仓储
await StoreToStatisticsResultRedis(model, today, equipmentName);
// 2. 存储到遥测分桶Redis仓储
await StoreToBucketRedis(model, today, equipmentName);
// 3. 存储到遥测统计分桶Redis仓储
await StoreToStatisticsBucketRedis(model, today, equipmentName);
}
3. 批量MongoDB归档
private async Task SaveBatchToDatabase(List<TelemeteringModel> dataBatch, string category)
{
// 🚀 性能优化:使用无序批量插入
var options = new InsertManyOptions
{
IsOrdered = false, // 允许并行插入
BypassDocumentValidation = true // 跳过文档验证
};
await _telemeteringModelRepository.Collection.InsertManyAsync(documents, options);
}
查询策略 (Query Strategy)
当前查询模式
系统目前主要依赖MongoDB进行历史数据查询,通过ITelemeteringBucketQueryService
提供统一的查询接口:
// 统计数据查询
public async Task<List<TelemeteringStatisticsResult>> QueryStatisticsData(
Guid telemeteringId,
FixedIntervalEnum interval,
DateTime startTime,
DateTime endTime,
StatisticsTypeEnum statisticsType = StatisticsTypeEnum.RealTime)
{
// 按年份分表查询
var years = GetYearsBetween(startTime, endTime);
foreach (var year in years)
{
_statisticsRepository.CollectionName = $"TelemeteringStatisticsResult_{interval}_{year}";
// 执行查询...
}
}
🚨 查询策略优化需求
由于实施了1分钟强制间隔策略,需要实现混合查询策略:
- 当日数据:优先从Redis查询(高性能)
- 历史数据:从MongoDB查询(可靠存储)
- 容错处理:Redis缺失时回退到MongoDB
性能优化 (Performance Optimization)
1. 缓存机制
- 线程安全队列:
ConcurrentQueue<TelemeteringModel>
- 批量处理:定时器触发批量保存
- 重试策略:Polly重试管道
2. 数据库优化
- 分表策略:按年份和数据源类别分表
- 索引优化:时间、设备ID复合索引
- 批量插入:无序并行插入提升性能
3. Redis优化
- TTL管理:2天自动过期
- 键命名规范:便于查询和管理
- 数据压缩:MessagePack序列化
监控与指标 (Monitoring & Metrics)
关键指标
// 性能指标
TelemetryMetrics.SetGauge(TelemetryMetrics.Gauges.CACHE_SIZE, newSize);
TelemetryMetrics.IncrementCounter(TelemetryMetrics.Counters.TELEMETRY_RECEIVED);
TelemetryMetrics.RecordSuccess("save_with_cache");
TelemetryMetrics.RecordError("save_with_cache", ex);
日志记录
- 配置覆盖警告:记录1分钟强制间隔覆盖情况
- 性能日志:批量保存操作的性能统计
- 错误日志:详细的异常信息和堆栈跟踪
配置管理 (Configuration Management)
连接配置
public class ConnectionConfig
{
public int SaveMode { get; set; } // 保存模式
public int SaveInterval { get; set; } // 保存间隔(被强制覆盖)
public int DataSourceCategoryName { get; set; } // 数据源类别
}
🎯 领导要求配置验证
private void ValidateIntervalModeConfiguration(ConnectionConfig connection)
{
if (connection.SaveInterval != 60)
{
Log4Helper.Warn(this.GetType(),
$"🎯 领导要求覆盖:连接[{connection.Name}] " +
$"配置间隔{connection.SaveInterval}秒 → 强制60秒");
}
}
数据源类别支持 (Data Source Categories)
系统支持所有数据源类别的1分钟间隔强制执行:
类别代码 | 中文名称 | 英文标识 | 描述 |
---|---|---|---|
0 | 综自 | Zongzi | 综合自动化系统 |
1 | 配电 | Peidian | 配电自动化系统 |
2 | 辅控 | Fukong | 辅助控制系统 |
3 | 在线监测 | ZXJC | 在线监测系统 |
4 | 机器人 | Robot | 机器人巡检系统 |
故障处理 (Error Handling)
重试策略
private readonly ResiliencePipeline _retryPipeline = new ResiliencePipelineBuilder()
.AddRetry(new RetryStrategyOptions
{
MaxRetryAttempts = 3,
Delay = TimeSpan.FromSeconds(1),
BackoffType = DelayBackoffType.Exponential
})
.Build();
异常处理
- 数据验证失败:记录警告,跳过处理
- Redis连接失败:重试机制,降级到MongoDB
- MongoDB写入失败:批量写入异常处理,部分成功记录
未来优化方向 (Future Optimization)
- 混合查询策略:实现Redis+MongoDB智能路由
- 数据压缩:历史数据压缩存储
- 分布式缓存:Redis集群支持
- 实时流处理:Kafka/RabbitMQ消息队列
- AI预测:基于历史数据的异常预测
文档版本:v1.0
最后更新:2025-01-25
维护人员:SOMS开发团队