SOMS/test/MqttWpfApp/MqttServerController.cs
2024-07-15 10:31:26 +08:00

70 lines
2.4 KiB
C#

using MQTTnet;
using MQTTnet.Diagnostics;
using MQTTnet.Protocol;
using MQTTnet.Server;
using Newtonsoft.Json;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;
namespace MqttWpfApp
{
internal class MqttServerController
{
private static MqttServer _mqttServer;
public static ConcurrentQueue<string> DataQueue { get; } = new ConcurrentQueue<string>();
public static async Task RunMinimalServer()
{
/*
* This sample starts a simple MQTT server which will accept any TCP connection.
*/
var mqttFactory = new MqttFactory();
// The port for the default endpoint is 1883.
// The default endpoint is NOT encrypted!
// Use the builder classes where possible.
var mqttServerOptions = new MqttServerOptionsBuilder().WithDefaultEndpoint().Build();
_mqttServer = mqttFactory.CreateMqttServer(mqttServerOptions);
await _mqttServer.StartAsync();
_mqttServer.InterceptingPublishAsync += MqttServer_InterceptingPublishAsync;
}
private static Task MqttServer_InterceptingPublishAsync(InterceptingPublishEventArgs arg)
{
var str = Encoding.UTF8.GetString(arg.ApplicationMessage.Payload);
DataQueue.Enqueue(str);
return Task.CompletedTask;
}
public static async Task SendMsg<T>(string topic, T data)
{
var playload = JsonConvert.SerializeObject(data);
var message = new MqttApplicationMessageBuilder().WithTopic(topic).WithPayload(playload).Build();
// Now inject the new message at the broker.
await _mqttServer.InjectApplicationMessage(
new InjectedMqttApplicationMessage(message)
{
SenderClientId = "SenderClientId"
});
}
public static async Task SubscribeAsync(Dictionary<string,string> id_topic)
{
foreach (var item in id_topic)
{
await _mqttServer.SubscribeAsync(item.Key, item.Value);
}
}
public static async Task StopMiniServerAsync()
{
await _mqttServer.StopAsync();
}
}
}