70 lines
2.4 KiB
C#
70 lines
2.4 KiB
C#
using MQTTnet;
|
|
using MQTTnet.Diagnostics;
|
|
using MQTTnet.Protocol;
|
|
using MQTTnet.Server;
|
|
using Newtonsoft.Json;
|
|
using System;
|
|
using System.Collections.Concurrent;
|
|
using System.Collections.Generic;
|
|
using System.Diagnostics;
|
|
using System.IO;
|
|
using System.Linq;
|
|
using System.Text;
|
|
using System.Text.Json;
|
|
using System.Threading.Tasks;
|
|
|
|
namespace MqttWpfApp
|
|
{
|
|
internal class MqttServerController
|
|
{
|
|
private static MqttServer _mqttServer;
|
|
public static ConcurrentQueue<string> DataQueue { get; } = new ConcurrentQueue<string>();
|
|
public static async Task RunMinimalServer()
|
|
{
|
|
/*
|
|
* This sample starts a simple MQTT server which will accept any TCP connection.
|
|
*/
|
|
var mqttFactory = new MqttFactory();
|
|
|
|
// The port for the default endpoint is 1883.
|
|
// The default endpoint is NOT encrypted!
|
|
// Use the builder classes where possible.
|
|
var mqttServerOptions = new MqttServerOptionsBuilder().WithDefaultEndpoint().Build();
|
|
_mqttServer = mqttFactory.CreateMqttServer(mqttServerOptions);
|
|
await _mqttServer.StartAsync();
|
|
|
|
|
|
_mqttServer.InterceptingPublishAsync += MqttServer_InterceptingPublishAsync;
|
|
|
|
}
|
|
private static Task MqttServer_InterceptingPublishAsync(InterceptingPublishEventArgs arg)
|
|
{
|
|
var str = Encoding.UTF8.GetString(arg.ApplicationMessage.Payload);
|
|
DataQueue.Enqueue(str);
|
|
return Task.CompletedTask;
|
|
}
|
|
public static async Task SendMsg<T>(string topic, T data)
|
|
{
|
|
var playload = JsonConvert.SerializeObject(data);
|
|
var message = new MqttApplicationMessageBuilder().WithTopic(topic).WithPayload(playload).Build();
|
|
// Now inject the new message at the broker.
|
|
await _mqttServer.InjectApplicationMessage(
|
|
new InjectedMqttApplicationMessage(message)
|
|
{
|
|
SenderClientId = "SenderClientId"
|
|
});
|
|
}
|
|
public static async Task SubscribeAsync(Dictionary<string,string> id_topic)
|
|
{
|
|
foreach (var item in id_topic)
|
|
{
|
|
await _mqttServer.SubscribeAsync(item.Key, item.Value);
|
|
}
|
|
}
|
|
public static async Task StopMiniServerAsync()
|
|
{
|
|
await _mqttServer.StopAsync();
|
|
}
|
|
}
|
|
}
|