SOMS/test/ISMSTcpCmdWpfAppDemo/MainWindow.xaml.cs

827 lines
27 KiB
C#
Raw Normal View History

2025-03-10 18:15:27 +08:00
// MainWindow.xaml.cs
using System;
using System.Collections.Generic;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Windows;
using System.Windows.Threading;
using ICSharpCode.AvalonEdit.Document;
using ICSharpCode.AvalonEdit.Folding;
using Newtonsoft.Json.Linq;
using Flurl;
using Flurl.Http;
using Newtonsoft.Json;
using ToolLibrary;
using System.Buffers;
using System.IO.Pipelines;
using System.Text.Json;
using JsonException = Newtonsoft.Json.JsonException;
using System.Collections.Concurrent;
namespace ISMSTcpCmdWpfAppDemo
{
public partial class MainWindow : Window
{
// TCP 相关字段
private TcpClient _tcpClient = new TcpClient();
private NetworkStream _tcpStream;
private CancellationTokenSource _tcpCts;
private List<byte> _tcpBuffer = new List<byte>();
private const int BufferSize = 1024;
// Web API 相关字段
private HttpListener _httpListener = new HttpListener();
private const int WebApiPort = 38094;
private SemaphoreSlim _reconnectLock = new SemaphoreSlim(1, 1);
// 配置参数
private readonly TimeSpan IncompleteDataTimeout = TimeSpan.FromSeconds(5);
// 核心组件
private readonly Pipe _pipe = new Pipe();
private CancellationTokenSource _cts = new();
private DateTime _lastDataTime = DateTime.MinValue;
public MainWindow()
{
InitializeComponent();
SetupComponents();
_httpListener.Prefixes.Add($"http://127.0.0.1:{WebApiPort}/");
StartWebApi();
}
private void SetupComponents()
{
// 初始化 AvalonEdit 的折叠功能
var foldingManager = FoldingManager.Install(txtReceived.TextArea);
var foldingStrategy = new JsonFoldingStrategy();
txtReceived.TextChanged += (s, e) => foldingStrategy.UpdateFoldings(foldingManager, txtReceived.Document);
}
private void BtnDisconnect_Click(object sender, RoutedEventArgs e)
{
Dispose();
UpdateStatus("已断开", "Red");
}
private async void BtnConnect_Click(object sender, RoutedEventArgs e)
{
await ConnectAsync(txtIP.Text, int.Parse(txtPort.Text), externalToken: _cts.Token);
}
#region TCP
private async Task ConnectAsync(string ip, int port, int maxRetries = 5, CancellationToken externalToken = default)
{
bool isConnected = false;
if (!await _reconnectLock.WaitAsync(0)) return;
try
{
using var cts = CancellationTokenSource.CreateLinkedTokenSource(externalToken);
int retryCount = 0;
var baseDelay = TimeSpan.FromSeconds(2);
while (!cts.Token.IsCancellationRequested)
{
try
{
_tcpClient?.Dispose();
_tcpClient = new TcpClient { NoDelay = true };
await _tcpClient.ConnectAsync(ip, port, cts.Token);
_tcpStream = _tcpClient.GetStream();
_tcpCts?.Cancel();
_tcpCts = new CancellationTokenSource();
_ = StartProcessing(_tcpStream);
UpdateStatus("已连接", "Green");
isConnected = true;
return;
}
catch (Exception ex) when (retryCount < maxRetries)
{
retryCount++;
var delay = baseDelay * Math.Pow(2, retryCount);
UpdateStatus($"连接中... ({retryCount}/{maxRetries})", "Orange");
await Task.Delay((int)delay.TotalMilliseconds, cts.Token);
}
catch (Exception ex)
{
UpdateStatus("连接失败", "Red");
ShowError($"最终连接失败", ex);
throw;
}
}
}
finally
{
_reconnectLock.Release();
if (!isConnected) UpdateStatus("连接失败", "Red");
}
}
// 启动处理器
public async Task StartProcessing(NetworkStream stream)
{
_tcpStream = stream;
Task.Run(ReceiveDataLoop);
Task.Run(ProcessDataLoop);
Task.Run(MonitorBufferTimeout);
}
public void Dispose()
{
_cts.Cancel();
_tcpCts?.Cancel();
_tcpCts?.Dispose();
_pipe.Writer.Complete();
_pipe.Reader.Complete();
_tcpStream?.Dispose();
_tcpClient?.Dispose();
_tcpCts = new CancellationTokenSource();
_cts = new CancellationTokenSource();
}
// 数据接收循环
private async Task ReceiveDataLoop()
{
var writer = _pipe.Writer;
try
{
while (!_cts.IsCancellationRequested && _tcpStream != null)
{
Memory<byte> memory = writer.GetMemory(BufferSize);
int bytesRead = await _tcpStream.ReadAsync(memory, _cts.Token);
if (bytesRead == 0) break;
writer.Advance(bytesRead);
await writer.FlushAsync();
_lastDataTime = DateTime.Now;
await Task.Delay(100);
}
}
catch (OperationCanceledException)
{
_cts = new CancellationTokenSource();
}
finally
{
await writer.CompleteAsync();
}
}
// 数据处理循环
private async Task ProcessDataLoop()
{
var reader = _pipe.Reader;
try
{
while (!_cts.IsCancellationRequested)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
while (TryProcessBuffer(ref buffer, out ProcessResult processResult))
{
if (processResult.IsValid)
{
if (processResult.IsJArray)
HandleJArray(processResult.JsonData);
else
HandleJObject(processResult.JsonData);
}
else
{
HandleInvalidData(processResult.ErrorMessage ?? "Unknown error",
processResult.ConsumedBytes);
}
await Task.Delay(100);
}
reader.AdvanceTo(buffer.Start, buffer.End);
if (result.IsCompleted) break;
await Task.Delay(100); // 添加小延迟防止CPU占用过高
}
}
finally
{
await reader.CompleteAsync();
}
}
// 核心解析逻辑
private bool TryProcessBuffer(ref ReadOnlySequence<byte> buffer, out ProcessResult result)
{
result = default;
var reader = new SequenceReader<byte>(buffer);
int JsonLength;
// 优先查找 "OK" 前缀
if (reader.TryReadTo(out ReadOnlySpan<byte> _, new[] { (byte)'O', (byte)'K' }, advancePastDelimiter: true))
{
//long prefixPosition = reader.Consumed - 2; // OK的长度为2
long prefixPosition = reader.Consumed-2;
if (TryParseJson(ref reader, out string? json,out JsonLength, true))
{
//int totalConsumed = (int)(prefixPosition + json.Length);
result = ProcessResult.Success(true, json, JsonLength);
buffer = buffer.Slice(JsonLength);
return true;
}
}
reader = new SequenceReader<byte>(buffer);
// 尝试解析普通JSON
if (TryParseJson(ref reader, out string? regularJson, out JsonLength, false))
{
result = ProcessResult.Success(false, regularJson, (int)JsonLength);
buffer = buffer.Slice(reader.Position);
return true;
}
// 检测到不完整数据
if (buffer.Length > 0)
{
result = ProcessResult.Error("Incomplete data", 0);
return false;
}
return false;
}
// 高性能JSON解析
private bool TryParseJson(ref SequenceReader<byte> reader, out string? json, out int JsonLength,bool isSkipOK = false)
{
json = null;
JsonLength = 0;
ReadOnlySpan<byte> originalBuffer = reader.UnreadSequence.ToArray();
int originalBufferLength = originalBuffer.Length;
ReadOnlySpan<byte> buffer = originalBuffer;
int skipBytes = 2;
try
{
// 1. 处理跳过"OK"的逻辑
if (isSkipOK)
{
ReadOnlySpan<byte> okBytes = new byte[] { (byte)'O', (byte)'K' };
int okIndex = buffer.IndexOf(okBytes);
if (okIndex != -1)
buffer = buffer.Slice(okIndex + okBytes.Length);
skipBytes = 2;
int arrayStart = buffer.IndexOf((byte)'[');
if (arrayStart != 0) return false;
int arrayEnd = -1, bracketCount = 1;
for (int i = 1; i < buffer.Length; i++)
{
if (buffer[i] == '[') bracketCount++;
else if (buffer[i] == ']') bracketCount--;
if (bracketCount != 0) continue;
arrayEnd = i;
break;
}
if (arrayEnd == -1)
return false;
// 3. 提取并验证数组内容
if (arrayStart != -1 && arrayEnd != -1 && arrayEnd > arrayStart)
{
//ReadOnlySpan<byte> arrayData = buffer.Slice(arrayStart, arrayEnd - arrayStart + 1);
ReadOnlySpan<byte> arrayData = buffer.Slice(0, arrayEnd + 1);
string arrayString = Encoding.UTF8.GetString(arrayData);
JArray parsedArray = JArray.Parse(arrayString); // 验证数组有效性
json = arrayString;
// 5. 计算实际消耗的字节数
int totalConsumed = arrayStart + arrayEnd;
//if (totalConsumed > originalBufferLength) return false;
if (totalConsumed>= reader.Length)
{
totalConsumed = (int)reader.Length-1;
}
try
{
reader.Advance(totalConsumed);
}
catch
{
reader.AdvanceToEnd();
}
UpdateTextContent(json);
JsonLength = (int)reader.Length;
return true;
}
}
else
{
if (buffer.Length>0)
{
string rawString = Encoding.UTF8.GetString(buffer);
if (rawString.Contains("OK"))
{
return false;
}
if (IsTruncatedJson(rawString))
{
return false;
}
//if (string.IsNullOrWhiteSpace)
//{
//}
try
{
JToken token = JToken.Parse(rawString);
if (token.Type == JTokenType.Object)
{
int consumed = buffer.Length;
json = rawString;
reader.Advance(consumed);
ShowAlarmMsg(rawString);
JsonLength = consumed;
return true;
}
}
catch (Exception ex)
{
int consumed = buffer.Length;
json = rawString;
ShowError(rawString, ex);
reader.Advance(consumed);
JsonLength = consumed;
return true;
}
}
}
// 6. 未找到有效数组时回退原始解析
return false;
}
catch (JsonReaderException ex)
{
// 错误处理(保持原有逻辑,略作调整)
int errorCharIndex = ex.LinePosition - 1;
if (errorCharIndex >= 0)
{
int errorByteIndex = skipBytes + Encoding.UTF8.GetByteCount(
Encoding.UTF8.GetString(buffer).Substring(0, errorCharIndex)
);
UpdateTextContent($"错误数据:{Encoding.UTF8.GetString(originalBuffer.Slice(0, errorByteIndex))}");
reader.Advance(errorByteIndex);
}
return false;
}
catch
{
reader.Advance(originalBufferLength);
return false;
}
}
public bool IsTruncatedJson(string str)
{
int balance = 10;
foreach (char c in str)
{
if (c == '{') balance++;
else if (c == '}')
{
if (balance == 0) return false;
balance--;
}
}
return balance >10;
}
// 超时监控
private async Task MonitorBufferTimeout()
{
while (!_cts.IsCancellationRequested)
{
await Task.Delay(1000);
if (DateTime.Now - _lastDataTime > IncompleteDataTimeout)
{
if (_pipe.Reader.TryRead(out ReadResult result))
{
HandleInvalidData("Data timeout", (int)result.Buffer.Length);
_pipe.Reader.AdvanceTo(result.Buffer.End);
}
}
}
}
// 示例处理方法(需根据业务实现)
private void HandleJArray(string json)
{
TryParseJToken(json, out var token);
_token = token;
OnTcpResponseReceived(token);
}
private void HandleJObject(string json)
{
//发送报警信息
Task.Run(() =>
{
TryParseJToken(json, out var token);
HttpHelper.HttpPostRequest<object>("http://localhost:38091/api/services/isas/AlarmLiveData/UploadISMSAlarmMsg", json as object);
});
}
private void HandleInvalidData(string reason, int bytesToSkip)
{
}
#region
private bool TryParseJToken(string json, out JToken token)
{
try
{
token = JToken.Parse(json);
return true;
}
catch
{
token = null;
return false;
}
}
#endregion
#endregion
#region Web API
private async void StartWebApi()
{
try
{
_httpListener.Start();
webApiState.Text = "已启动";
while (_httpListener.IsListening)
{
var context = await _httpListener.GetContextAsync();
ProcessHttpRequest(context);
await Task.Delay(100);
}
}
catch (Exception ex) { ShowError("Web API 错误", ex); }
}
private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);
private readonly CancellationTokenSource _httpcts = new CancellationTokenSource();
private async Task ProcessHttpRequest(HttpListenerContext context)
{
try
{
if (context.Request.HttpMethod != "GET" || context.Request.Url.AbsolutePath != "/api")
{
SendHttpResponse(context.Response, 404, "Not Found");
return;
}
var dzparam = context.Request.QueryString["dz"];
var faultRptparam = context.Request.QueryString["FaultRpt"];
// 互斥锁替代Running标记
if (!await _semaphore.WaitAsync(TimeSpan.Zero))
{
SendHttpResponse(context.Response, 503, "Error: 服务器繁忙");
return;
}
try
{
WebResult<object> result;
using var timeoutCts = new CancellationTokenSource(TimeSpan.FromSeconds(180));
if (!string.IsNullOrEmpty(dzparam))
{
result = await ProcessRequestAsync(dzparam, "定值", timeoutCts.Token);
}
else if (!string.IsNullOrEmpty(faultRptparam))
{
result = await ProcessRequestAsync(faultRptparam, "故障报告", timeoutCts.Token);
}
else
{
result = WebResult<object>.ErrorResult(400, new Exception("无效参数"));
}
SendHttpResponse(context.Response, 200, JsonConvert.SerializeObject(result));
}
finally
{
_semaphore.Release();
}
}
catch (OperationCanceledException)
{
SendHttpResponse(context.Response, 504, "Error: 请求超时");
}
catch (Exception ex)
{
SendHttpResponse(context.Response, 500, $"Error: {ex.Message}");
}
}
private async Task<WebResult<object>> ProcessRequestAsync(string param, string logType, CancellationToken ct)
{
WirteHttpMsg($"收到{logType}信息:{param}");
SendTcpMsg(param);
var tcs = new TaskCompletionSource<bool>();
JToken mtoken = null;
var recvHandler = (JToken token) =>
{
tcs.TrySetResult(true);
mtoken = token;
return true;
};
try
{
// 注册接收完成回调
TcpResponseReceived += recvHandler;
// 同时等待接收信号和超时
var delayTask = Task.Delay(1000*300, ct);
var completedTask = await Task.WhenAny(tcs.Task, delayTask);
if (completedTask == delayTask)
throw new TimeoutException("操作超时");
return WebResult<object>.Ok(_token);
}
finally
{
TcpResponseReceived -= recvHandler;
}
}
// 在TCP数据接收处触发事件
public event Func<JToken,bool> TcpResponseReceived;
// 当收到TCP响应时调用
private void OnTcpResponseReceived(JToken token)
{
TcpResponseReceived?.Invoke(token);
}
private void WirteHttpMsg(string msg)
{
Dispatcher.BeginInvoke(() =>
{
if (MessageList.Items.Count > 100)
{
MessageList.Items.Clear();
}
MessageList.Items.Add(msg);
});
}
private void SendHttpResponse(HttpListenerResponse response, int statusCode, string content)
{
WirteHttpMsg($"状态:{statusCode} 发送http消息{content}");
response.StatusCode = statusCode;
response.ContentType = "text/plain; charset=utf-8";
response.SendChunked = true;
using (var writer = new StreamWriter(response.OutputStream, Encoding.UTF8, 4096))
{
int chunkSize = 4096;
for (int i = 0; i < content.Length; i += chunkSize)
{
int length = Math.Min(chunkSize, content.Length - i);
writer.Write(content.AsSpan(i, length));
}
}
response.Close();
}
#endregion
#region UI
private void UpdateStatus(string message, string color)
{
Dispatcher.Invoke(() => {
txtStatus.Text = message;
txtStatus.Foreground = color == "Green"
? System.Windows.Media.Brushes.Green
: System.Windows.Media.Brushes.Red;
});
}
private void ShowError(string message, Exception ex)
{
Dispatcher.BeginInvoke(() => {
txtErr.Text = $"{DateTime.Now:HH:mm:ss} - {message}: {ex.Message}";
});
}
private void ShowRawData()
{
var text = Encoding.UTF8.GetString(_tcpBuffer.ToArray());
_tcpBuffer.Clear();
UpdateTextContent(text);
}
// 增加更新节流机制
// 替换StringBuilder为线程安全队列
// 替换StringBuilder为线程安全队列
private ConcurrentQueue<string> _msgQueue = new ConcurrentQueue<string>();
private DateTime _lastUpdateTime = DateTime.MinValue;
private const int UpdateInterval = 200;
private void UpdateTextContent(string content)
{
_msgQueue.Enqueue(content); // 线程安全入队
if ((DateTime.Now - _lastUpdateTime).TotalMilliseconds < UpdateInterval)
return;
Dispatcher.BeginInvoke(() =>
{
// 批量处理队列
var sb = new StringBuilder();
while (_msgQueue.TryDequeue(out var msg)) // 线程安全出队
{
sb.AppendLine(msg);
}
if (sb.Length > 0)
{
// 安全插入文本
var doc = txtReceived.Document;
using (doc.RunUpdate())
{
// 追加新内容到文档末尾
doc.Insert(doc.TextLength, sb.ToString());
// 动态计算清理位置
const int maxLength = 20000;
const int trimSize = 10000;
if (doc.TextLength > maxLength)
{
var removeLength = Math.Min(trimSize, doc.TextLength - (maxLength - trimSize));
doc.Remove(0, removeLength);
}
}
txtReceived.ScrollToEnd();
}
});
_lastUpdateTime = DateTime.Now;
}
#endregion
#region
private bool IsValidJson(string input)
{
try { return JToken.Parse(input) != null; }
catch { return false; }
}
JToken _token;
private string FormatJson(string json)
{
try
{
_token = JToken.Parse(json);
return _token.ToString(Newtonsoft.Json.Formatting.Indented);
}
catch
{
return json;
}
}
private void SaveToFile(string content)
{
return;
var fileName = $"log_{DateTime.Now:yyyyMMdd_HHmmss}.json";
File.WriteAllTextAsync(fileName, content);
}
#endregion
#region
protected override void OnClosed(EventArgs e)
{
_httpListener.Stop();
_tcpCts?.Cancel();
_tcpClient?.Close();
base.OnClosed(e);
}
private async void BtnSend_Click(object sender, RoutedEventArgs e)
{
try
{
await SendTcpMsg(txtSend.Text);
}
catch (Exception ex) { ShowError("发送失败", ex); }
}
// 修改SendTcpMsg方法改为完全异步
public async Task<bool> SendTcpMsg(string msg)
{
try
{
if (_tcpClient?.Connected != true || _tcpStream == null)
return false;
var data = Encoding.UTF8.GetBytes(msg + "\n");
// 改为异步写入
await _tcpStream.WriteAsync(data, 0, data.Length);
await _tcpStream.FlushAsync();
// UI更新放到主线程
Dispatcher.Invoke(() =>
{
txtReceived.AppendText($"发送成功:{msg}\n");
txtReceived.ScrollToEnd();
});
return true;
}
catch (Exception ex)
{
Dispatcher.Invoke(() => ShowError("发送失败", ex));
return false;
}
}
private void StartWebApi_Click(object sender, RoutedEventArgs e) => StartWebApi();
private void StopWebApi_Click(object sender, RoutedEventArgs e)
{
_httpListener.Stop();
webApiState.Text = "已停止";
}
#endregion
long alarmIndex = 0;
private void ShowAlarmMsg(string msg)
{
if (alarmIndex > long.MaxValue - 1)
{
alarmIndex = 0;
}
alarmIndex++;
Dispatcher.BeginInvoke(() =>
{
if (AlarmMessageList.Items.Count > 200)
{
AlarmMessageList.Items.Clear();
}
AlarmMessageList.Items.Add($"序号:{alarmIndex} {msg}");
});
}
private void ClearServerButton_Click(object sender, RoutedEventArgs e)
{
txtReceived.Text = string.Empty;
}
private void ClearAlarm_Click(object sender, RoutedEventArgs e)
{
AlarmMessageList.Items.Clear();
}
private void clearISMSMsgBtn_Click(object sender, RoutedEventArgs e)
{
txtReceived.Text = string.Empty;
}
}
}