MqttServerController.cs 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. using MQTTnet;
  2. using MQTTnet.Diagnostics;
  3. using MQTTnet.Protocol;
  4. using MQTTnet.Server;
  5. using Newtonsoft.Json;
  6. using System;
  7. using System.Collections.Concurrent;
  8. using System.Collections.Generic;
  9. using System.Diagnostics;
  10. using System.IO;
  11. using System.Linq;
  12. using System.Text;
  13. using System.Text.Json;
  14. using System.Threading.Tasks;
  15. namespace MqttWpfApp
  16. {
  17. internal class MqttServerController
  18. {
  19. private static MqttServer _mqttServer;
  20. public static ConcurrentQueue<string> DataQueue { get; } = new ConcurrentQueue<string>();
  21. public static async Task RunMinimalServer()
  22. {
  23. /*
  24. * This sample starts a simple MQTT server which will accept any TCP connection.
  25. */
  26. var mqttFactory = new MqttFactory();
  27. // The port for the default endpoint is 1883.
  28. // The default endpoint is NOT encrypted!
  29. // Use the builder classes where possible.
  30. var mqttServerOptions = new MqttServerOptionsBuilder().WithDefaultEndpoint().Build();
  31. _mqttServer = mqttFactory.CreateMqttServer(mqttServerOptions);
  32. await _mqttServer.StartAsync();
  33. _mqttServer.InterceptingPublishAsync += MqttServer_InterceptingPublishAsync;
  34. }
  35. private static Task MqttServer_InterceptingPublishAsync(InterceptingPublishEventArgs arg)
  36. {
  37. var str = Encoding.UTF8.GetString(arg.ApplicationMessage.Payload);
  38. DataQueue.Enqueue(str);
  39. return Task.CompletedTask;
  40. }
  41. public static async Task SendMsg<T>(string topic, T data)
  42. {
  43. var playload = JsonConvert.SerializeObject(data);
  44. var message = new MqttApplicationMessageBuilder().WithTopic(topic).WithPayload(playload).Build();
  45. // Now inject the new message at the broker.
  46. await _mqttServer.InjectApplicationMessage(
  47. new InjectedMqttApplicationMessage(message)
  48. {
  49. SenderClientId = "SenderClientId"
  50. });
  51. }
  52. public static async Task SubscribeAsync(Dictionary<string,string> id_topic)
  53. {
  54. foreach (var item in id_topic)
  55. {
  56. await _mqttServer.SubscribeAsync(item.Key, item.Value);
  57. }
  58. }
  59. public static async Task StopMiniServerAsync()
  60. {
  61. await _mqttServer.StopAsync();
  62. }
  63. }
  64. }