123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869 |
- using MQTTnet;
- using MQTTnet.Diagnostics;
- using MQTTnet.Protocol;
- using MQTTnet.Server;
- using Newtonsoft.Json;
- using System;
- using System.Collections.Concurrent;
- using System.Collections.Generic;
- using System.Diagnostics;
- using System.IO;
- using System.Linq;
- using System.Text;
- using System.Text.Json;
- using System.Threading.Tasks;
- namespace MqttWpfApp
- {
- internal class MqttServerController
- {
- private static MqttServer _mqttServer;
- public static ConcurrentQueue<string> DataQueue { get; } = new ConcurrentQueue<string>();
- public static async Task RunMinimalServer()
- {
- /*
- * This sample starts a simple MQTT server which will accept any TCP connection.
- */
- var mqttFactory = new MqttFactory();
- // The port for the default endpoint is 1883.
- // The default endpoint is NOT encrypted!
- // Use the builder classes where possible.
- var mqttServerOptions = new MqttServerOptionsBuilder().WithDefaultEndpoint().Build();
- _mqttServer = mqttFactory.CreateMqttServer(mqttServerOptions);
- await _mqttServer.StartAsync();
-
-
- _mqttServer.InterceptingPublishAsync += MqttServer_InterceptingPublishAsync;
- }
- private static Task MqttServer_InterceptingPublishAsync(InterceptingPublishEventArgs arg)
- {
- var str = Encoding.UTF8.GetString(arg.ApplicationMessage.Payload);
- DataQueue.Enqueue(str);
- return Task.CompletedTask;
- }
- public static async Task SendMsg<T>(string topic, T data)
- {
- var playload = JsonConvert.SerializeObject(data);
- var message = new MqttApplicationMessageBuilder().WithTopic(topic).WithPayload(playload).Build();
- // Now inject the new message at the broker.
- await _mqttServer.InjectApplicationMessage(
- new InjectedMqttApplicationMessage(message)
- {
- SenderClientId = "SenderClientId"
- });
- }
- public static async Task SubscribeAsync(Dictionary<string,string> id_topic)
- {
- foreach (var item in id_topic)
- {
- await _mqttServer.SubscribeAsync(item.Key, item.Value);
- }
- }
- public static async Task StopMiniServerAsync()
- {
- await _mqttServer.StopAsync();
- }
- }
- }
|