SOMS/test/ISMSTcpCmdWpfAppDemo/MainWindow.xaml.cs
2025-07-08 14:01:10 +08:00

890 lines
30 KiB
C#
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// MainWindow.xaml.cs
using System;
using System.Collections.Generic;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Windows;
using System.Windows.Threading;
using ICSharpCode.AvalonEdit.Document;
using ICSharpCode.AvalonEdit.Folding;
using Newtonsoft.Json.Linq;
using Flurl;
using Flurl.Http;
using Newtonsoft.Json;
using ToolLibrary;
using System.Buffers;
using System.IO.Pipelines;
using System.Text.Json;
using JsonException = Newtonsoft.Json.JsonException;
using System.Collections.Concurrent;
using System.Configuration;
using System.Windows.Controls;
using System.Configuration;
using System.Windows.Interop;
using System.Buffers.Text;
namespace ISMSTcpCmdWpfAppDemo
{
public partial class MainWindow : Window
{
// TCP 相关字段
private TcpClient _tcpClient = new TcpClient();
private NetworkStream _tcpStream;
private CancellationTokenSource _tcpCts;
private List<byte> _tcpBuffer = new List<byte>();
private const int BufferSize = 1024;
private ConcurrentBag<byte> _allMsg = new ConcurrentBag<byte>();
// Web API 相关字段
private HttpListener _httpListener = new HttpListener();
private const int WebApiPort = 38094;
private SemaphoreSlim _reconnectLock = new SemaphoreSlim(1, 1);
private string _sendMsg = "";
// 配置参数
private readonly TimeSpan IncompleteDataTimeout = TimeSpan.FromSeconds(5);
// 核心组件
private Pipe _pipe = new Pipe();
private CancellationTokenSource _cts = new();
private DateTime _lastDataTime = DateTime.MinValue;
public MainWindow()
{
InitializeComponent();
SetupComponents();
LoadSettings();
_httpListener.Prefixes.Add($"http://*:{WebApiPort}/");
StartWebApi();
ConnectAsync(txtIP.Text, int.Parse(txtPort.Text), externalToken: _cts.Token);
}
private void SetupComponents()
{
// 初始化 AvalonEdit 的折叠功能
var foldingManager = FoldingManager.Install(txtReceived.TextArea);
var foldingStrategy = new JsonFoldingStrategy();
txtReceived.TextChanged += (s, e) => foldingStrategy.UpdateFoldings(foldingManager, txtReceived.Document);
}
private void BtnDisconnect_Click(object sender, RoutedEventArgs e)
{
Dispose();
}
private async void BtnConnect_Click(object sender, RoutedEventArgs e)
{
await ConnectAsync(txtIP.Text, int.Parse(txtPort.Text), externalToken: _cts.Token);
}
#region TCP
private async Task ConnectAsync(string ip, int port, int maxRetries = 5, CancellationToken externalToken = default)
{
_pipe = new Pipe();
bool isConnected = false;
if (!await _reconnectLock.WaitAsync(0)) return;
try
{
using var cts = CancellationTokenSource.CreateLinkedTokenSource(externalToken);
int retryCount = 0;
var baseDelay = TimeSpan.FromSeconds(2);
while (!cts.Token.IsCancellationRequested)
{
try
{
_tcpClient?.Dispose();
_tcpClient = new TcpClient { NoDelay = true };
await _tcpClient.ConnectAsync(ip, port, cts.Token);
_tcpStream = _tcpClient.GetStream();
_tcpCts?.Cancel();
_tcpCts = new CancellationTokenSource();
_ = StartProcessing(_tcpStream);
UpdateStatus("已连接", "Green");
isConnected = true;
return;
}
catch (Exception ex) when (retryCount < maxRetries)
{
retryCount++;
var delay = baseDelay * Math.Pow(2, retryCount);
UpdateStatus($"连接中... ({retryCount}/{maxRetries})", "Orange");
await Task.Delay((int)delay.TotalMilliseconds, cts.Token);
}
catch (Exception ex)
{
UpdateStatus("连接失败", "Red");
ShowError($"最终连接失败", ex);
throw;
}
}
}
finally
{
_reconnectLock.Release();
if (!isConnected) UpdateStatus("连接失败", "Red");
}
}
// 启动处理器
public async Task StartProcessing(NetworkStream stream)
{
_tcpStream = stream;
Task.Run(ReceiveDataLoop);
Task.Run(ProcessDataLoop);
Task.Run(MonitorBufferTimeout);
}
public void Dispose()
{
try
{
UpdateStatus("已断开", "Red");
_cts.Cancel();
_tcpCts?.Cancel();
_tcpCts?.Dispose();
_pipe.Writer.Complete();
_pipe.Reader.Complete();
_tcpStream?.Dispose();
_tcpClient?.Dispose();
_tcpCts = new CancellationTokenSource();
_cts = new CancellationTokenSource();
}
catch (Exception ex)
{
}
}
// 数据接收循环
private async Task ReceiveDataLoop()
{
var writer = _pipe.Writer;
try
{
while (!_cts.IsCancellationRequested && _tcpStream != null)
{
Memory<byte> memory = writer.GetMemory(BufferSize);
int bytesRead = await _tcpStream.ReadAsync(memory, _cts.Token);
if (bytesRead == 0) break;
writer.Advance(bytesRead);
await writer.FlushAsync();
_lastDataTime = DateTime.Now;
await Task.Delay(100);
}
}
catch (OperationCanceledException)
{
_cts = new CancellationTokenSource();
}
finally
{
await writer.CompleteAsync();
}
Dispose();
await Task.Delay(5000);
await ConnectAsync(txtIP.Text, int.Parse(txtPort.Text), externalToken: _cts.Token);
}
// 数据处理循环
private async Task ProcessDataLoop()
{
var reader = _pipe.Reader;
try
{
while (!_cts.IsCancellationRequested)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
while (true)
{
try
{
// 检查头部长度
if (buffer.Length < 6) break;
// 提取并转换头部
byte[] headerBytes = new byte[6];
buffer.Slice(0, 6).CopyTo(headerBytes);
string header = Encoding.UTF8.GetString(headerBytes);
// 解析数字长度
if (!int.TryParse(header, out int contentLength) || contentLength < 0)
break;
// 检查完整报文
long requiredBytes = 6 + contentLength + 2;
if (buffer.Length < requiredBytes) break;
byte[] okBytes = new byte[2];
buffer.Slice(6, 2).CopyTo(okBytes);
string okstr = Encoding.UTF8.GetString(okBytes);
if (okstr == "OK")
{
// 提取内容
string content = Encoding.UTF8.GetString(buffer.Slice(8, contentLength).ToArray());
if (_sendMsg.Contains("GetFaultWaveCfgFile"))//查找波形配置
{
if (_sendMsg.EndsWith("104")) //104协议查询
{
UpdateAllMsg("查询类型 :波形配置/104");
//HandleString(TrimEndsEfficient(content));
HandleJObject(content);
}
else //ftp方式查询
{
UpdateAllMsg("查询类型 :波形配置/ftp");
HandleString(TrimEndsEfficient(content));
}
}
else if (_sendMsg.Contains("GetFaultWaveDataFile"))//故障录播
{
if (_sendMsg.EndsWith("104")) //104协议查询
{
UpdateAllMsg("查询类型 :故障录播/104");
//HandleString(TrimEndsEfficient(content));
HandleJObject(content);
}
else //ftp方式查询
{
UpdateAllMsg("查询类型 :故障录播/ftp");
HandleString(TrimEndsEfficient(content));
}
}
else if (_sendMsg.Contains("GetFaultRptByTime")) //查找故障报告
{
UpdateAllMsg("查询类型 :故障报告");
HandleJArray(content);
}
else if (_sendMsg.Contains("CallDeviceDZ")) //查找定值
{
UpdateAllMsg("查询类型 :定值");
HandleJArray(content);
}
else if (_sendMsg.Contains("CallVersion"))
{
UpdateAllMsg("查询类型 :版本信息");
HandleJObject(content);
}
UpdateAllMsg(okstr + content);
}
else
{
// 提取内容 报警内容
string content = Encoding.UTF8.GetString(buffer.Slice(6, contentLength).ToArray());
UpdateAllMsg(content);
if (TryParseJToken(content, out var token))
{
ShowAlarmMsg(content);
HandleJObject(content);
}
else
{
HandleJArray(content);
HandleString(content);
}
}
// 移动缓冲区
buffer = buffer.Slice(requiredBytes);
}
catch
{
break;
}
await Task.Delay(1);
}
reader.AdvanceTo(buffer.Start, buffer.End);
await Task.Delay(100);
}
}
finally
{
await reader.CompleteAsync();
}
}
// Base64转二进制并保存带基础异常处理
public void SaveBase64AsFile(string base64, string outputPath)
{
if (!string.IsNullOrEmpty(base64))
{
try
{
byte[] data = Convert.FromBase64String(base64);
//Directory.CreateDirectory(Path.GetDirectoryName(outputPath)); // 自动创建目录
File.WriteAllBytes(outputPath, data);
}
catch (FormatException)
{
//throw new InvalidDataException("非标准Base64字符串");
}
}
}
// 调用示例
// 高性能方法(适合超大字符串)
string TrimEndsEfficient(ReadOnlySpan<char> span)
{
int start = span.IndexOf('{');
int end = span.LastIndexOf('}');
if (start != -1 && end != -1 && end > start)
{
return new string(span.Slice(start + 1, end - start - 1));
}
else
{
return string.Empty;
}
}
// 超时监控
private async Task MonitorBufferTimeout()
{
while (!_cts.IsCancellationRequested)
{
await Task.Delay(1000);
if (DateTime.Now - _lastDataTime > IncompleteDataTimeout)
{
if (_pipe.Reader.TryRead(out ReadResult result))
{
HandleInvalidData("Data timeout", (int)result.Buffer.Length);
_pipe.Reader.AdvanceTo(result.Buffer.End);
}
}
}
}
// 示例处理方法(需根据业务实现)
private void HandleString(string str)
{
JObject obj = new JObject
{
{ "content", str }
};
_token = obj;
OnTcpResponseReceived(obj);
}
private void HandleJArray(string json)
{
if (TryParseJToken(json, out var token))
{
}
_token = token;
OnTcpResponseReceived(token);
}
private void HandleJObject(string json)
{
//发送报警信息
Task.Run(() =>
{
if (TryParseJToken(json, out var token))
{
if (token!=null)
{
_token = token;
OnTcpResponseReceived(token);
if (Guid.TryParse(token["AlertID"].ToString(),out _)) //AlertID
{
HttpHelper.HttpPostRequest<object>("http://localhost:38090/api/services/SOMS/AlarmLiveData/UploadISMSAlarmMsg", json as object);
}
}
}
});
}
private void HandleInvalidData(string reason, int bytesToSkip)
{
}
#region
private bool TryParseJToken(string json, out JToken token)
{
try
{
token = JToken.Parse(json);
return true;
}
catch
{
token = null;
return false;
}
}
#endregion
#endregion
#region Web API
private async void StartWebApi()
{
try
{
_httpListener.Start();
webApiState.Text = "已启动";
while (_httpListener.IsListening)
{
var context = await _httpListener.GetContextAsync();
ProcessHttpRequest(context);
await Task.Delay(100);
}
}
catch (Exception ex) { ShowError("Web API 错误", ex); }
}
private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);
private readonly CancellationTokenSource _httpcts = new CancellationTokenSource();
private async Task ProcessHttpRequest(HttpListenerContext context)
{
try
{
if (context.Request.HttpMethod != "GET" || context.Request.Url.AbsolutePath != "/api")
{
SendHttpResponse(context.Response, 404, "Not Found");
return;
}
var dzparam = context.Request.QueryString["dz"];
var faultRptparam = context.Request.QueryString["FaultRpt"];
var waveCfgparam = context.Request.QueryString["waveCfg"];
var waveDatparam = context.Request.QueryString["waveDat"];
var versionparam = context.Request.QueryString["version"];
// 互斥锁替代Running标记
if (!await _semaphore.WaitAsync(TimeSpan.Zero))
{
SendHttpResponse(context.Response, 503, "Error: 服务器繁忙");
return;
}
try
{
WebResult<object> result;
using var timeoutCts = new CancellationTokenSource(TimeSpan.FromSeconds(180));
if (!string.IsNullOrEmpty(dzparam))
{
result = await ProcessRequestAsync(dzparam, "定值", timeoutCts.Token);
}
else if (!string.IsNullOrEmpty(faultRptparam))
{
result = await ProcessRequestAsync(faultRptparam, "故障报告", timeoutCts.Token);
}
else if (!string.IsNullOrEmpty(waveCfgparam))
{
result = await ProcessRequestAsync(waveCfgparam, "波形配置", timeoutCts.Token);
}
else if (!string.IsNullOrEmpty(waveDatparam))
{
result = await ProcessRequestAsync(waveDatparam, "波形文件", timeoutCts.Token);
}
else if (!string.IsNullOrEmpty(versionparam))
{
result = await ProcessRequestAsync(versionparam, "装置版本", timeoutCts.Token);
}
else
{
result = WebResult<object>.ErrorResult(400, new Exception("无效参数"));
}
SendHttpResponse(context.Response, 200, JsonConvert.SerializeObject(result));
}
finally
{
_semaphore.Release();
}
}
catch (OperationCanceledException)
{
SendHttpResponse(context.Response, 504, "Error: 请求超时");
}
catch (Exception ex)
{
SendHttpResponse(context.Response, 500, $"Error: {ex.Message}");
}
}
private async Task<WebResult<object>> ProcessRequestAsync(string param, string logType, CancellationToken ct)
{
WirteHttpMsg($"收到{logType}信息:{param}");
SendTcpMsg(param);
var tcs = new TaskCompletionSource<bool>();
object mtoken = default;
var recvHandler = (object token) =>
{
tcs.TrySetResult(true);
mtoken = token;
return true;
};
try
{
// 注册接收完成回调
TcpResponseReceived += recvHandler;
// 同时等待接收信号和超时
//var delayTask = Task.Delay(1000*300, ct);
var completedTask = await Task.WhenAll(tcs.Task);
//if (completedTask == delayTask)
// throw new TimeoutException("操作超时");
return WebResult<object>.Ok(mtoken);
}
finally
{
TcpResponseReceived -= recvHandler;
}
}
// 在TCP数据接收处触发事件
public event Func<object,bool> TcpResponseReceived;
// 当收到TCP响应时调用
private void OnTcpResponseReceived(object token)
{
TcpResponseReceived?.Invoke(token);
}
private void WirteHttpMsg(string msg)
{
Dispatcher.BeginInvoke(() =>
{
if (MessageList.Items.Count > 100)
{
MessageList.Items.Clear();
}
MessageList.Items.Add(msg);
});
}
private void SendHttpResponse(HttpListenerResponse response, int statusCode, string content)
{
WirteHttpMsg($"状态:{statusCode} 发送http消息{content}");
response.StatusCode = statusCode;
response.ContentType = "text/plain; charset=utf-8";
response.SendChunked = true;
using (var writer = new StreamWriter(response.OutputStream, Encoding.UTF8, 4096))
{
int chunkSize = 4096;
for (int i = 0; i < content.Length; i += chunkSize)
{
int length = Math.Min(chunkSize, content.Length - i);
writer.Write(content.AsSpan(i, length));
}
}
response.Close();
}
#endregion
#region UI
private void UpdateStatus(string message, string color)
{
Dispatcher.Invoke(() => {
txtStatus.Text = message;
txtStatus.Foreground = color == "Green"
? System.Windows.Media.Brushes.Green
: System.Windows.Media.Brushes.Red;
});
}
private void UpdateAllMsg(string arrayString)
{
Dispatcher.BeginInvoke(() =>
{
if (AllMsgMessageList.Items.Count > 1000)
{
AllMsgMessageList.Items.Clear();
}
AllMsgMessageList.Items.Add($"时间:{DateTime.Now.ToString("HH:mm:ss.fff")}\n 内容:{arrayString}");
});
}
private void ShowError(string message, Exception ex)
{
Dispatcher.BeginInvoke(() => {
txtErr.Text = $"{DateTime.Now:HH:mm:ss} - {message}: {ex.Message}";
});
}
private void ShowRawData()
{
var text = Encoding.UTF8.GetString(_tcpBuffer.ToArray());
_tcpBuffer.Clear();
UpdateTextContent(text);
}
// 增加更新节流机制
// 替换StringBuilder为线程安全队列
// 替换StringBuilder为线程安全队列
private ConcurrentQueue<string> _msgQueue = new ConcurrentQueue<string>();
private DateTime _lastUpdateTime = DateTime.MinValue;
private const int UpdateInterval = 200;
private void UpdateTextContent(string content)
{
_msgQueue.Enqueue(content); // 线程安全入队
if ((DateTime.Now - _lastUpdateTime).TotalMilliseconds < UpdateInterval)
return;
Dispatcher.BeginInvoke(() =>
{
// 批量处理队列
var sb = new StringBuilder();
while (_msgQueue.TryDequeue(out var msg)) // 线程安全出队
{
sb.AppendLine(msg);
}
if (sb.Length > 0)
{
// 安全插入文本
var doc = txtReceived.Document;
using (doc.RunUpdate())
{
// 追加新内容到文档末尾
doc.Insert(doc.TextLength, sb.ToString());
// 动态计算清理位置
const int maxLength = 200000;
const int trimSize = 10000;
if (doc.TextLength > maxLength)
{
var removeLength = Math.Min(trimSize, doc.TextLength - (maxLength - trimSize));
doc.Remove(0, removeLength);
}
}
txtReceived.ScrollToEnd();
}
});
_lastUpdateTime = DateTime.Now;
}
#endregion
#region
private bool IsValidJson(string input)
{
try { return JToken.Parse(input) != null; }
catch { return false; }
}
JToken _token;
private string FormatJson(string json)
{
try
{
_token = JToken.Parse(json);
return _token.ToString(Newtonsoft.Json.Formatting.Indented);
}
catch
{
return json;
}
}
private void SaveToFile(string content)
{
return;
var fileName = $"log_{DateTime.Now:yyyyMMdd_HHmmss}.json";
File.WriteAllTextAsync(fileName, content);
}
#endregion
#region
protected override void OnClosed(EventArgs e)
{
_httpListener.Stop();
_tcpCts?.Cancel();
_tcpClient?.Close();
base.OnClosed(e);
}
private async void BtnSend_Click(object sender, RoutedEventArgs e)
{
try
{
await SendTcpMsg(txtSend.Text);
}
catch (Exception ex) { ShowError("发送失败", ex); }
}
// 修改SendTcpMsg方法改为完全异步
public async Task<bool> SendTcpMsg(string msg)
{
_sendMsg = msg;
try
{
if (_tcpClient?.Connected != true || _tcpStream == null)
return false;
var data = Encoding.UTF8.GetBytes(msg + "\n");
// 改为异步写入
await _tcpStream.WriteAsync(data, 0, data.Length);
await _tcpStream.FlushAsync();
// UI更新放到主线程
Dispatcher.Invoke(() =>
{
txtReceived.AppendText($"发送成功:{msg}\n");
UpdateAllMsg(msg);
txtReceived.ScrollToEnd();
});
return true;
}
catch (Exception ex)
{
Dispatcher.Invoke(() => ShowError("发送失败", ex));
return false;
}
}
private void StartWebApi_Click(object sender, RoutedEventArgs e) => StartWebApi();
private void StopWebApi_Click(object sender, RoutedEventArgs e)
{
_httpListener.Stop();
webApiState.Text = "已停止";
}
#endregion
long alarmIndex = 0;
private void ShowAlarmMsg(string msg)
{
if (alarmIndex > long.MaxValue - 1)
{
alarmIndex = 0;
}
alarmIndex++;
Dispatcher.BeginInvoke(() =>
{
if (AlarmMessageList.Items.Count > 200)
{
AlarmMessageList.Items.Clear();
}
AlarmMessageList.Items.Add($"序号:{alarmIndex} {msg}");
});
}
private void ClearServerButton_Click(object sender, RoutedEventArgs e)
{
MessageList.Items.Clear();
}
private void ClearAlarm_Click(object sender, RoutedEventArgs e)
{
AlarmMessageList.Items.Clear();
}
private void clearISMSMsgBtn_Click(object sender, RoutedEventArgs e)
{
txtReceived.Text = string.Empty;
}
private void LoadSettings()
{
// 从配置文件加载设置
txtIP.Text = ConfigurationManager.AppSettings["ServerIP"] ?? "192.168.65.33";
txtPort.Text = ConfigurationManager.AppSettings["ServerPort"] ?? "43916";
}
private void SaveSettings()
{
try
{
Configuration config = ConfigurationManager.OpenExeConfiguration(ConfigurationUserLevel.None);
// 保存 IP
if (config.AppSettings.Settings["ServerIP"] == null)
config.AppSettings.Settings.Add("ServerIP", txtIP.Text);
else
config.AppSettings.Settings["ServerIP"].Value = txtIP.Text;
//this.txtPort.Text
// 保存端口
if (config.AppSettings.Settings["ServerPort"] == null)
config.AppSettings.Settings.Add("ServerPort", this.txtPort.Text);
else
config.AppSettings.Settings["ServerPort"].Value = this.txtPort.Text;
config.Save(ConfigurationSaveMode.Modified);
ConfigurationManager.RefreshSection("appSettings");
}
catch (Exception ex)
{
MessageBox.Show($"保存配置时出错:{ex.Message}", "错误", MessageBoxButton.OK, MessageBoxImage.Error);
}
}
private void TxtIP_TextChanged(object sender, TextChangedEventArgs e)
{
}
private void TxtPort_TextChanged(object sender, TextChangedEventArgs e)
{
}
private void SaveCfg_Click(object sender, RoutedEventArgs e)
{
SaveSettings();
}
private void ClearAllMsg_Click(object sender, RoutedEventArgs e)
{
AllMsgMessageList.Items.Clear();
}
}
}