317 lines
10 KiB
Markdown
317 lines
10 KiB
Markdown
![]() |
# 遥测数据存储架构文档 (Telemetry Data Storage Architecture)
|
|||
|
|
|||
|
## 概述 (Overview)
|
|||
|
|
|||
|
本文档详细描述了SOMS系统中遥测数据的存储机制、统计数据处理方法和存储策略。系统采用Redis+MongoDB双存储架构,实现高性能实时数据处理和可靠的历史数据存储。
|
|||
|
|
|||
|
## 🎯 领导要求实施 - 定时保存模式强制1分钟间隔
|
|||
|
|
|||
|
### 核心要求
|
|||
|
- **定时保存模式**:强制执行1分钟间隔,忽略用户配置
|
|||
|
- **适用范围**:所有数据源类别(综自、配电、辅控、在线监测、机器人)
|
|||
|
- **Cron表达式**:`"0 * * * * ?"` - 每分钟第0秒执行
|
|||
|
- **配置覆盖**:自动忽略`ConnectionConfig.SaveInterval`设置
|
|||
|
|
|||
|
### 保存模式说明
|
|||
|
- `SaveModeEnum.Interval`(定时保存):🎯 强制1分钟间隔
|
|||
|
- `SaveModeEnum.Change`(变更保存):按数据变化触发保存
|
|||
|
- `SaveModeEnum.Both`(定时+变更):🎯 定时部分强制1分钟间隔 + 变更触发
|
|||
|
|
|||
|
## 系统架构 (System Architecture)
|
|||
|
|
|||
|
### 双存储架构
|
|||
|
```
|
|||
|
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
|
|||
|
│ 数据采集层 │ │ Redis缓存层 │ │ MongoDB持久层 │
|
|||
|
│ Data Collection │ │ Redis Cache │ │ MongoDB Store │
|
|||
|
├─────────────────┤ ├─────────────────┤ ├─────────────────┤
|
|||
|
│ • IEC104协议 │───▶│ • 实时数据缓存 │───▶│ • 历史数据存储 │
|
|||
|
│ • 综自/配电/辅控 │ │ • 当日数据存储 │ │ • 统计数据归档 │
|
|||
|
│ • 在线监测 │ │ • 2天TTL │ │ • 按年份分表 │
|
|||
|
│ • 机器人巡检 │ │ • 高速读写 │ │ • 长期存储 │
|
|||
|
└─────────────────┘ └─────────────────┘ └─────────────────┘
|
|||
|
```
|
|||
|
|
|||
|
### 数据流向图
|
|||
|
```mermaid
|
|||
|
graph TD
|
|||
|
A[数据采集] --> B[TelemeteringResultSaveTask]
|
|||
|
B --> C{保存模式判断}
|
|||
|
|
|||
|
C -->|定时保存/组合模式| D[🎯 强制1分钟间隔]
|
|||
|
C -->|变更保存| E[数据变化触发]
|
|||
|
|
|||
|
D --> F[Quartz调度任务]
|
|||
|
E --> G[实时数据处理]
|
|||
|
|
|||
|
F --> H[Redis存储]
|
|||
|
G --> H
|
|||
|
|
|||
|
H --> I[三层Redis存储]
|
|||
|
I --> J[统计结果Redis]
|
|||
|
I --> K[分桶数据Redis]
|
|||
|
I --> L[统计分桶Redis]
|
|||
|
|
|||
|
J --> M[MongoDB归档]
|
|||
|
K --> M
|
|||
|
L --> M
|
|||
|
|
|||
|
M --> N[按年份分表存储]
|
|||
|
N --> O[TelemeteringStatisticsResult_Interval_Year]
|
|||
|
```
|
|||
|
|
|||
|
## 核心组件 (Core Components)
|
|||
|
|
|||
|
### 1. TelemeteringResultSaveTask
|
|||
|
**主要职责**:
|
|||
|
- 遥测数据保存任务处理器
|
|||
|
- 实施领导要求的1分钟间隔强制策略
|
|||
|
- 管理Quartz调度任务
|
|||
|
- 提供数据缓存和批量保存优化
|
|||
|
|
|||
|
**关键方法**:
|
|||
|
```csharp
|
|||
|
// 🎯 领导要求:强制1分钟间隔的Cron表达式生成
|
|||
|
private string GetCronExpressionForIntervalMode(FixedIntervalEnum interval, ConnectionConfig connection)
|
|||
|
{
|
|||
|
return interval switch
|
|||
|
{
|
|||
|
FixedIntervalEnum.Minute1 => "0 * * * * ?", // 每分钟第0秒执行
|
|||
|
FixedIntervalEnum.Day1 => "0 0 2 * * ?", // 每天凌晨2点执行日统计
|
|||
|
_ => throw new NotSupportedException($"定时保存模式仅支持1分钟间隔和日统计")
|
|||
|
};
|
|||
|
}
|
|||
|
|
|||
|
// 配置验证和警告记录
|
|||
|
private void ValidateIntervalModeConfiguration(ConnectionConfig connection)
|
|||
|
```
|
|||
|
|
|||
|
### 2. 数据存储策略
|
|||
|
|
|||
|
#### Redis存储(实时/当日数据)
|
|||
|
- **存储期限**:2天TTL
|
|||
|
- **数据类型**:
|
|||
|
- 统计结果:`telemetering_statistics_result:{yyyyMMdd}:{telemeteringId}`
|
|||
|
- 分桶数据:`telemetering_bucket:{yyyyMMdd}:{telemeteringId}`
|
|||
|
- 统计分桶:`telemetering_statistics_bucket:{yyyyMMdd}:{telemeteringId}`
|
|||
|
|
|||
|
#### MongoDB存储(历史数据)
|
|||
|
- **分表策略**:按年份和间隔类型分表
|
|||
|
- **集合命名**:`TelemeteringStatisticsResult_{interval}_{year}`
|
|||
|
- **索引优化**:时间、设备ID、统计类型复合索引
|
|||
|
|
|||
|
### 3. Quartz调度系统
|
|||
|
|
|||
|
#### 任务类型
|
|||
|
1. **1分钟间隔任务**(🎯 领导要求)
|
|||
|
- Cron: `"0 * * * * ?"`
|
|||
|
- 处理实时数据保存
|
|||
|
- 适用于定时和组合保存模式
|
|||
|
|
|||
|
2. **日统计任务**
|
|||
|
- Cron: `"0 0 2 * * ?"`
|
|||
|
- 凌晨2点执行
|
|||
|
- 生成日统计数据
|
|||
|
|
|||
|
#### 任务配置
|
|||
|
```csharp
|
|||
|
// 为每个连接创建两种任务
|
|||
|
var intervals = new[] { FixedIntervalEnum.Minute1, FixedIntervalEnum.Day1 };
|
|||
|
|
|||
|
// 🎯 领导要求验证
|
|||
|
ValidateIntervalModeConfiguration(connection);
|
|||
|
|
|||
|
// 创建Quartz任务
|
|||
|
IJobDetail job = JobBuilder.Create<TelemeteringSaveJob>()
|
|||
|
.WithIdentity(jobKey)
|
|||
|
.UsingJobData(jobDataMap)
|
|||
|
.Build();
|
|||
|
```
|
|||
|
|
|||
|
## 数据处理流程 (Data Processing Flow)
|
|||
|
|
|||
|
### 1. 数据接收与缓存
|
|||
|
```csharp
|
|||
|
public async Task SaveWithCacheAsync(TelemeteringModel model, ConnectionConfig connectionConfig)
|
|||
|
{
|
|||
|
// 1. 数据验证
|
|||
|
if (!ValidateTelemetryData(model)) return;
|
|||
|
|
|||
|
// 2. 缓存存储(线程安全队列)
|
|||
|
var queue = _cache.GetOrAdd(key, _ => new ConcurrentQueue<TelemeteringModel>());
|
|||
|
queue.Enqueue(model);
|
|||
|
|
|||
|
// 3. 存储当天数据到Redis
|
|||
|
await StoreDailyDataToRedis(model, connectionConfig);
|
|||
|
|
|||
|
// 4. 性能指标更新
|
|||
|
TelemetryMetrics.IncrementCounter(TelemetryMetrics.Counters.TELEMETRY_RECEIVED);
|
|||
|
}
|
|||
|
```
|
|||
|
|
|||
|
### 2. Redis三层存储
|
|||
|
```csharp
|
|||
|
private async Task StoreDailyDataToRedis(TelemeteringModel model, ConnectionConfig connectionConfig)
|
|||
|
{
|
|||
|
var today = DateTime.Now.Date;
|
|||
|
var equipmentName = GetEquipmentName(model);
|
|||
|
|
|||
|
// 1. 存储到遥测统计结果Redis仓储
|
|||
|
await StoreToStatisticsResultRedis(model, today, equipmentName);
|
|||
|
|
|||
|
// 2. 存储到遥测分桶Redis仓储
|
|||
|
await StoreToBucketRedis(model, today, equipmentName);
|
|||
|
|
|||
|
// 3. 存储到遥测统计分桶Redis仓储
|
|||
|
await StoreToStatisticsBucketRedis(model, today, equipmentName);
|
|||
|
}
|
|||
|
```
|
|||
|
|
|||
|
### 3. 批量MongoDB归档
|
|||
|
```csharp
|
|||
|
private async Task SaveBatchToDatabase(List<TelemeteringModel> dataBatch, string category)
|
|||
|
{
|
|||
|
// 🚀 性能优化:使用无序批量插入
|
|||
|
var options = new InsertManyOptions
|
|||
|
{
|
|||
|
IsOrdered = false, // 允许并行插入
|
|||
|
BypassDocumentValidation = true // 跳过文档验证
|
|||
|
};
|
|||
|
|
|||
|
await _telemeteringModelRepository.Collection.InsertManyAsync(documents, options);
|
|||
|
}
|
|||
|
```
|
|||
|
|
|||
|
## 查询策略 (Query Strategy)
|
|||
|
|
|||
|
### 当前查询模式
|
|||
|
系统目前主要依赖MongoDB进行历史数据查询,通过`ITelemeteringBucketQueryService`提供统一的查询接口:
|
|||
|
|
|||
|
```csharp
|
|||
|
// 统计数据查询
|
|||
|
public async Task<List<TelemeteringStatisticsResult>> QueryStatisticsData(
|
|||
|
Guid telemeteringId,
|
|||
|
FixedIntervalEnum interval,
|
|||
|
DateTime startTime,
|
|||
|
DateTime endTime,
|
|||
|
StatisticsTypeEnum statisticsType = StatisticsTypeEnum.RealTime)
|
|||
|
{
|
|||
|
// 按年份分表查询
|
|||
|
var years = GetYearsBetween(startTime, endTime);
|
|||
|
foreach (var year in years)
|
|||
|
{
|
|||
|
_statisticsRepository.CollectionName = $"TelemeteringStatisticsResult_{interval}_{year}";
|
|||
|
// 执行查询...
|
|||
|
}
|
|||
|
}
|
|||
|
```
|
|||
|
|
|||
|
### 🚨 查询策略优化需求
|
|||
|
由于实施了1分钟强制间隔策略,需要实现**混合查询策略**:
|
|||
|
- **当日数据**:优先从Redis查询(高性能)
|
|||
|
- **历史数据**:从MongoDB查询(可靠存储)
|
|||
|
- **容错处理**:Redis缺失时回退到MongoDB
|
|||
|
|
|||
|
## 性能优化 (Performance Optimization)
|
|||
|
|
|||
|
### 1. 缓存机制
|
|||
|
- **线程安全队列**:`ConcurrentQueue<TelemeteringModel>`
|
|||
|
- **批量处理**:定时器触发批量保存
|
|||
|
- **重试策略**:Polly重试管道
|
|||
|
|
|||
|
### 2. 数据库优化
|
|||
|
- **分表策略**:按年份和数据源类别分表
|
|||
|
- **索引优化**:时间、设备ID复合索引
|
|||
|
- **批量插入**:无序并行插入提升性能
|
|||
|
|
|||
|
### 3. Redis优化
|
|||
|
- **TTL管理**:2天自动过期
|
|||
|
- **键命名规范**:便于查询和管理
|
|||
|
- **数据压缩**:MessagePack序列化
|
|||
|
|
|||
|
## 监控与指标 (Monitoring & Metrics)
|
|||
|
|
|||
|
### 关键指标
|
|||
|
```csharp
|
|||
|
// 性能指标
|
|||
|
TelemetryMetrics.SetGauge(TelemetryMetrics.Gauges.CACHE_SIZE, newSize);
|
|||
|
TelemetryMetrics.IncrementCounter(TelemetryMetrics.Counters.TELEMETRY_RECEIVED);
|
|||
|
TelemetryMetrics.RecordSuccess("save_with_cache");
|
|||
|
TelemetryMetrics.RecordError("save_with_cache", ex);
|
|||
|
```
|
|||
|
|
|||
|
### 日志记录
|
|||
|
- **配置覆盖警告**:记录1分钟强制间隔覆盖情况
|
|||
|
- **性能日志**:批量保存操作的性能统计
|
|||
|
- **错误日志**:详细的异常信息和堆栈跟踪
|
|||
|
|
|||
|
## 配置管理 (Configuration Management)
|
|||
|
|
|||
|
### 连接配置
|
|||
|
```csharp
|
|||
|
public class ConnectionConfig
|
|||
|
{
|
|||
|
public int SaveMode { get; set; } // 保存模式
|
|||
|
public int SaveInterval { get; set; } // 保存间隔(被强制覆盖)
|
|||
|
public int DataSourceCategoryName { get; set; } // 数据源类别
|
|||
|
}
|
|||
|
```
|
|||
|
|
|||
|
### 🎯 领导要求配置验证
|
|||
|
```csharp
|
|||
|
private void ValidateIntervalModeConfiguration(ConnectionConfig connection)
|
|||
|
{
|
|||
|
if (connection.SaveInterval != 60)
|
|||
|
{
|
|||
|
Log4Helper.Warn(this.GetType(),
|
|||
|
$"🎯 领导要求覆盖:连接[{connection.Name}] " +
|
|||
|
$"配置间隔{connection.SaveInterval}秒 → 强制60秒");
|
|||
|
}
|
|||
|
}
|
|||
|
```
|
|||
|
|
|||
|
## 数据源类别支持 (Data Source Categories)
|
|||
|
|
|||
|
系统支持所有数据源类别的1分钟间隔强制执行:
|
|||
|
|
|||
|
| 类别代码 | 中文名称 | 英文标识 | 描述 |
|
|||
|
|---------|---------|---------|------|
|
|||
|
| 0 | 综自 | Zongzi | 综合自动化系统 |
|
|||
|
| 1 | 配电 | Peidian | 配电自动化系统 |
|
|||
|
| 2 | 辅控 | Fukong | 辅助控制系统 |
|
|||
|
| 3 | 在线监测 | ZXJC | 在线监测系统 |
|
|||
|
| 4 | 机器人 | Robot | 机器人巡检系统 |
|
|||
|
|
|||
|
## 故障处理 (Error Handling)
|
|||
|
|
|||
|
### 重试策略
|
|||
|
```csharp
|
|||
|
private readonly ResiliencePipeline _retryPipeline = new ResiliencePipelineBuilder()
|
|||
|
.AddRetry(new RetryStrategyOptions
|
|||
|
{
|
|||
|
MaxRetryAttempts = 3,
|
|||
|
Delay = TimeSpan.FromSeconds(1),
|
|||
|
BackoffType = DelayBackoffType.Exponential
|
|||
|
})
|
|||
|
.Build();
|
|||
|
```
|
|||
|
|
|||
|
### 异常处理
|
|||
|
- **数据验证失败**:记录警告,跳过处理
|
|||
|
- **Redis连接失败**:重试机制,降级到MongoDB
|
|||
|
- **MongoDB写入失败**:批量写入异常处理,部分成功记录
|
|||
|
|
|||
|
## 未来优化方向 (Future Optimization)
|
|||
|
|
|||
|
1. **混合查询策略**:实现Redis+MongoDB智能路由
|
|||
|
2. **数据压缩**:历史数据压缩存储
|
|||
|
3. **分布式缓存**:Redis集群支持
|
|||
|
4. **实时流处理**:Kafka/RabbitMQ消息队列
|
|||
|
5. **AI预测**:基于历史数据的异常预测
|
|||
|
|
|||
|
---
|
|||
|
|
|||
|
**文档版本**:v1.0
|
|||
|
**最后更新**:2025-01-25
|
|||
|
**维护人员**:SOMS开发团队
|