using Microsoft.Extensions.Logging; using Newtonsoft.Json.Linq; using StackExchange.Redis; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.IO; using System.IO.Compression; using System.Linq; using System.Net; using System.Net.Http; using System.Numerics; using System.Text; using System.Text.Json; using System.Threading; using System.Threading.Tasks; using YunDa.Server.ISMSTcp.Domain; using YunDa.Server.ISMSTcp.Models; using YunDa.SOMS.Core.Helper; using YunDa.SOMS.DataTransferObject.GeneralInformation.EquipmentLiveDataDto; using YunDa.SOMS.Entities.DataMonitoring; using YunDa.SOMS.Redis.Repositories; using static System.Net.Mime.MediaTypeNames; namespace YunDa.Server.ISMSTcp.Services { public class ThingDeviceStatusModel { public string TwinID { get; set; } = string.Empty; public string Metric { get; set; } = string.Empty; public string Val { get; set; } = string.Empty; } public class ThingDeviceBindingModel { public string Description { get; set; } = string.Empty; public string TwinID { get; set; } = string.Empty; public string Metric { get; set; } = string.Empty; //格式:1:绿灯1|0:红灯1 public string Val { get; set; } = string.Empty; public int TwinDataBindingType { get; set; } public string Remark { get; set; } = string.Empty; public bool IsActive { get; set; } public string Id { get; set; } = string.Empty; public TelesignalisationConfigurationModel TelesignalisationConfiguration { get; set; } public List MetricList { get; set; } public List ValList { get; set; } } public class TelesignalisationConfigurationModel { public string Name { get; set; } = string.Empty; public int DispatcherAddress { get; set; } public string IsmsbaseYXId { get; set; } = string.Empty; public int RemoteType { get; set; } public string Id { get; set; } = string.Empty; } public class ThingDevicePushDataModel { public string Val { get; set; } = string.Empty; public string Cmd { get; set; } = string.Empty; public string CloseCmd { get; set; } = string.Empty; } public class ThingService { private Dictionary _deviceBindingConfigs = new Dictionary(); private readonly object _configLock = new object(); //更新状态锁,防止全部与变化更新时有冲突,因为更新全体时,会清空当前所有状态 private readonly SemaphoreSlim _pushLock = new SemaphoreSlim(1, 1); private readonly string _telesignalisationModelListRediskey = "telesignalisationModelList_Zongzi"; private bool _updatedDeviceBindingConfigOk = false; private readonly ILogger _logger; private readonly WebApiRequest _webApiRequest; private readonly ThingApiService _thingApiService; private readonly IRedisRepository _telesignalisationModelListRedis; public ThingService( ILogger logger, WebApiRequest webApiRequest, ThingApiService thingApiService, IRedisRepository telesignalisationModelListRedis ) { _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _webApiRequest = webApiRequest ?? throw new ArgumentNullException(nameof(webApiRequest)); _thingApiService = thingApiService ?? throw new ArgumentNullException(nameof(thingApiService)); _telesignalisationModelListRedis = telesignalisationModelListRedis ?? throw new ArgumentNullException(nameof(telesignalisationModelListRedis)); StartAsync(); } private async Task StartAsync() { //更新配置 _ = Task.Run(async () => { while (true) {//每30秒更新一下配置 //更新孪生体与遥测数据绑定关系 await UpdateDeviceBindingConfig(); //更新设备和孪生体的关联 await UpdateSimDatas(); await Task.Delay(30000); } }); //推送状态 _ = Task.Run(async () => { while (true) {//每个小时,更新一下全体孪生状态 if(_updatedDeviceBindingConfigOk) { await UpdateDeviceAllStatus(); //await Task.Delay(60000); //测试时用 await Task.Delay(60 * 60 * 1000); } else {//没获取到配置时,10秒再检测 await Task.Delay(10000); } } }); await Task.CompletedTask; } //更新孪生体与遥测数据绑定关系 private async Task UpdateDeviceBindingConfig() { try { List list = await _webApiRequest.GetThingDeviceBindingConfigAsync(); if (list != null) { lock(_configLock) { _deviceBindingConfigs.Clear(); foreach (var item in list) { item.MetricList = GetMetricList(item.Metric); item.ValList = GetMetricList(item.Val); _deviceBindingConfigs[item.TelesignalisationConfiguration.IsmsbaseYXId] = item; } //获取到绑定信息,可以更新全部状态了 if (_deviceBindingConfigs.Count > 0) _updatedDeviceBindingConfigOk = true; } } } catch (Exception ex) { _updatedDeviceBindingConfigOk = false; _logger.LogError(ex, "ThingService 更新孪生体与遥测数据绑定关系出错"); } await Task.CompletedTask; } //解析tMetric和Val配置 private List GetMetricList(string metric) { List list = new List(); var items = metric.Split('|'); foreach (var item in items) { var parts = item.Split(':'); if (parts.Length == 3) { list.Add(new ThingDevicePushDataModel { Val = parts[0], Cmd = parts[1], CloseCmd = parts[2], }); } } return list; } //状态有变化时,更新孪生体 public async Task UpdateDeviceChangeStatus(List yXDatas) { if (yXDatas == null) return; try { //整理好要发送的命令 List statusList = new List(); var twinIDs = new HashSet(); lock(_configLock) { foreach (var item in yXDatas) { if (!string.IsNullOrWhiteSpace(item.YX_ID)) { if (_deviceBindingConfigs.TryGetValue(item.YX_ID, out var bindingItem)) { var metric = bindingItem.MetricList.Find(e => e.Val == item.T); var val = bindingItem.ValList.Find(e => e.Val == item.T); if (metric != null && val != null) { var status = new ThingDeviceStatusModel { TwinID = bindingItem.TwinID.Trim(), Metric = metric.Cmd.Trim(), Val = val.Cmd.Trim() }; statusList.Add(status); //变化时,由于没有清空所有动画状态,所有要手动关闭之前的状态 if (!string.IsNullOrWhiteSpace(metric.CloseCmd)) { statusList.Add(new ThingDeviceStatusModel { TwinID = bindingItem.TwinID.Trim(), Metric = metric.CloseCmd.Trim(), Val = val.CloseCmd.Trim() }); } twinIDs.Add(bindingItem.TwinID.Trim()); } } } } } //开始发送命令 if (statusList.Count > 0) { await _pushLock.WaitAsync(); try { //发送状态 await _thingApiService.PushDeviceStatusAsync(statusList); } finally { _pushLock.Release(); } } } catch(Exception ex) { _logger.LogError(ex, "ThingService 状态有变化时,更新孪生体出错"); } await Task.CompletedTask; } //实时更新全体孪生体 public async Task UpdateDeviceAllStatus() { await _pushLock.WaitAsync(); try { List statusList = new List(); var twinIDs = new HashSet(); //先从Redis里读取出全体状态 var telesignalisationModels = await _telesignalisationModelListRedis.HashSetGetAllAsync(_telesignalisationModelListRediskey); if(telesignalisationModels != null) { lock(_configLock) { foreach (var item in telesignalisationModels) { if (!string.IsNullOrWhiteSpace(item.ismsbaseYXId)) { if (_deviceBindingConfigs.TryGetValue(item.ismsbaseYXId, out var bindingItem)) { int yxValue = item.ResultValue; var metric = bindingItem.MetricList.Find(e => e.Val == yxValue.ToString()); var val = bindingItem.ValList.Find(e => e.Val == yxValue.ToString()); if (metric != null && val != null) { statusList.Add(new ThingDeviceStatusModel { TwinID = bindingItem.TwinID.Trim(), Metric = metric.Cmd.Trim(), Val = val.Cmd.Trim() }); twinIDs.Add(bindingItem.TwinID.Trim()); } else { int k = 0; } } } } } } //开始发送命令 if (statusList.Count > 0) { //发送清除动画命令 bool pushOk = true; foreach (var twinID in twinIDs) { pushOk &= await _thingApiService.ClearDeviceStatusAsync(twinID, "*灯*"); } //发送状态 if (pushOk) { await Task.Delay(_thingApiService.CallApiDelay); pushOk = await _thingApiService.PushDeviceStatusAsync(statusList); } } } catch(Exception ex) { _logger.LogError(ex, "ThingService 实时更新全体孪生体出错"); } finally { _pushLock.Release(); } await Task.CompletedTask; } //更新设备和孪生体的关联 private async Task UpdateSimDatas() { try { } catch (Exception ex) { } await Task.CompletedTask; } } public class ThingApiService { private readonly ILogger _logger; private readonly string _apiServerUrl = "http://192.168.81.22"; private readonly string _loginCode = "admin"; private readonly string _loginKey = "Thing@123"; private readonly string _esAuthorization = "admin:uino"; public int CallApiDelay => 1200; //api调用间隔时间 ms private string _apiToken = string.Empty; private DateTime _apiTokenTime = DateTime.MinValue; public ThingApiService( ILogger logger) { _logger = logger ?? throw new ArgumentNullException(nameof(logger)); } //获取Token private async Task GetTokenAsync() { try { if (string.IsNullOrWhiteSpace(_apiToken) || (DateTime.Now - _apiTokenTime).TotalHours > 6) {//每6小时更新一次token var http = new ThingWebClientHelper(); var json = await http.PostJsonAsync($"{_apiServerUrl}:1662/thing/provider/rest/getToken", new { loginCode = _loginCode, loginKey = _loginKey }); JObject obj = JObject.Parse(json); _apiToken = obj["data"]?["token"]?.ToString(); if(!string.IsNullOrWhiteSpace(_apiToken)) _apiTokenTime = DateTime.Now; } } catch (Exception ex) { } if(string.IsNullOrWhiteSpace(_apiToken)) _logger.LogError("ThingApiService - 获取token失败"); return _apiToken; } //推送设备状态 public async Task PushDeviceStatusAsync(List deviceStatus, int retryCount = 3) { try { for (int i = 0; i < retryCount; i++) { var http = new ThingWebClientHelper(); http.SetHeader("Tk", await GetTokenAsync()); var json = await http.PostJsonAsync($"{_apiServerUrl}:1662/thing/provider/rest/monitor/dynamics", deviceStatus); JObject obj = JObject.Parse(json); bool success = obj.ContainsKey("success") ? obj.Value("success") : false; if (!success) { string errMessage = obj.ContainsKey("message") ? obj.Value("message") ?? "" : ""; _logger.LogError($"ThingApiService推送设备状态出错:{errMessage}"); await Task.Delay(CallApiDelay); } else { return true; } } } catch(Exception ex) { } return false; } //清空设备状态 public async Task ClearDeviceStatusAsync(string twinID, string metricKeyword, int retryCount = 3) { try { for (int i = 0; i < retryCount; i++) { var http = new ThingWebClientHelper(); http.SetHeader("Authorization", string.Format("Basic {0}", Convert.ToBase64String(Encoding.UTF8.GetBytes(_esAuthorization)))); string postData = string.Format( @"{{ ""query"": {{ ""bool"": {{ ""must"": [ {{ ""term"": {{ ""twinID"": ""{0}"" }} }}, {{ ""wildcard"": {{ ""metric"": ""{1}"" }} }} ] }} }} }}", twinID, metricKeyword ); var json = await http.PostJsonAsync($"{_apiServerUrl}:9200/performance/_delete_by_query", postData); JObject obj = JObject.Parse(json); //成功时,没有success字段,所有没有时,设置为true bool success = obj.ContainsKey("success") ? obj.Value("success") : true; if (!success) { string errMessage = obj.ContainsKey("message") ? obj.Value("message") ?? "" : ""; _logger.LogError($"ThingApiService 清空动画失败:{errMessage}"); await Task.Delay(CallApiDelay); } else { return true; } } } catch (Exception ex) { } return false; } private async Task Test() { try { var http = new ThingWebClientHelper(); http.SetHeader("Tk", await GetTokenAsync()); string postData = @"{ ""pageSize"": 30, ""pageNum"": 1, ""cdt"": { ""classId"": 6888439830657232 } }"; var json = await http.PostJsonAsync($"{_apiServerUrl}/thing/twin/ci/queryPageBySearchBean", postData); JObject obj = JObject.Parse(json); } catch (Exception ex) { } return true; } private async Task Example() { await ClearDeviceStatusAsync("2号主变保护测控屏", "*灯*"); List deviceStatus = new List(); deviceStatus.Add( new ThingDeviceStatusModel() { TwinID = "2号主变保护测控屏", Metric = "绿灯3", Val = "绿灯3" } ); await PushDeviceStatusAsync(deviceStatus); } } public class ThingWebClientHelper { private readonly HttpClient _httpClient; private readonly CookieContainer _cookieContainer; public ThingWebClientHelper() { _cookieContainer = new CookieContainer(); var handler = new HttpClientHandler { CookieContainer = _cookieContainer, UseCookies = true }; _httpClient = new HttpClient(handler); } public void SetHeader(string key, string value, HttpContent content = null) { if (IsContentHeader(key)) { if (content != null) { // 处理内容头 if (content.Headers.Contains(key)) { content.Headers.Remove(key); } content.Headers.Add(key, value); } } else { // 处理普通请求头 if (_httpClient.DefaultRequestHeaders.Contains(key)) { _httpClient.DefaultRequestHeaders.Remove(key); } _httpClient.DefaultRequestHeaders.Add(key, value); } } /// /// 判断是否为内容头 /// /// 头部键名 /// 是否为内容头 private bool IsContentHeader(string key) { // 常见的内容头集合,可根据需要扩展 string[] contentHeaders = { "Content-Type", "Content-Length", "Content-Disposition", "Content-Encoding", "Content-Language", "Content-Location", "Content-Range", "Content-MD5" }; return contentHeaders.Contains(key, StringComparer.OrdinalIgnoreCase); } /// /// 发送 GET 请求并获取响应内容。 /// /// 请求 URL。 /// 响应内容。 public async Task GetAsync(string url) { try { var response = await _httpClient.GetAsync(url); response.EnsureSuccessStatusCode(); return await GetResponseContent(response); } catch (Exception ex) { //return $"Error: {ex.Message}"; Console.WriteLine($"Error: {ex.Message}"); } return string.Empty; } /// /// 发送 POST 请求并获取响应内容。 /// /// 请求 URL。 /// POST 数据(键值对)。 /// 响应内容。 public async Task PostAsync(string url, string postData, string contentType = "application/x-www-form-urlencoded") { try { var content = new StringContent(postData, Encoding.UTF8, contentType); var response = await _httpClient.PostAsync(url, content); response.EnsureSuccessStatusCode(); return await GetResponseContent(response); } catch (Exception ex) { return $"Error: {ex.Message}"; } } public async Task PostJsonAsync(string url, object obj, string contentType = "application/json") { try { string jsonContent = obj is string ? (string)obj : JsonSerializer.Serialize(obj); var content = new StringContent(jsonContent, Encoding.UTF8, contentType); var response = await _httpClient.PostAsync(url, content); response.EnsureSuccessStatusCode(); return await GetResponseContent(response); } catch (Exception ex) { } return string.Empty; } private async Task GetResponseContent(HttpResponseMessage response) { try { if (response.Content.Headers.ContentEncoding.Contains("gzip")) { // 获取压缩流 var compressedStream = await response.Content.ReadAsStreamAsync(); var gzipStream = new GZipStream(compressedStream, CompressionMode.Decompress); var reader = new StreamReader(gzipStream, Encoding.UTF8); return await reader.ReadToEndAsync(); } else if (response.Content.Headers.ContentEncoding.Contains("deflate")) { var compressedStream = await response.Content.ReadAsStreamAsync(); var deflateStream = new DeflateStream(compressedStream, CompressionMode.Decompress); var reader = new StreamReader(deflateStream, Encoding.UTF8); return await reader.ReadToEndAsync(); } else { return await response.Content.ReadAsStringAsync(); } } catch (Exception ex) { return $"Error: {ex.Message}"; } } } }