using MQTTnet; using MQTTnet.Diagnostics; using MQTTnet.Protocol; using MQTTnet.Server; using Newtonsoft.Json; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.IO; using System.Linq; using System.Text; using System.Text.Json; using System.Threading.Tasks; namespace MqttWpfApp { internal class MqttServerController { private static MqttServer _mqttServer; public static ConcurrentQueue DataQueue { get; } = new ConcurrentQueue(); public static async Task RunMinimalServer() { /* * This sample starts a simple MQTT server which will accept any TCP connection. */ var mqttFactory = new MqttFactory(); // The port for the default endpoint is 1883. // The default endpoint is NOT encrypted! // Use the builder classes where possible. var mqttServerOptions = new MqttServerOptionsBuilder().WithDefaultEndpoint().Build(); _mqttServer = mqttFactory.CreateMqttServer(mqttServerOptions); await _mqttServer.StartAsync(); _mqttServer.InterceptingPublishAsync += MqttServer_InterceptingPublishAsync; } private static Task MqttServer_InterceptingPublishAsync(InterceptingPublishEventArgs arg) { var str = Encoding.UTF8.GetString(arg.ApplicationMessage.Payload); DataQueue.Enqueue(str); return Task.CompletedTask; } public static async Task SendMsg(string topic, T data) { var playload = JsonConvert.SerializeObject(data); var message = new MqttApplicationMessageBuilder().WithTopic(topic).WithPayload(playload).Build(); // Now inject the new message at the broker. await _mqttServer.InjectApplicationMessage( new InjectedMqttApplicationMessage(message) { SenderClientId = "SenderClientId" }); } public static async Task SubscribeAsync(Dictionary id_topic) { foreach (var item in id_topic) { await _mqttServer.SubscribeAsync(item.Key, item.Value); } } public static async Task StopMiniServerAsync() { await _mqttServer.StopAsync(); } } }