123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300 |
- using MQTTnet;
- using MQTTnet.Diagnostics;
- using MQTTnet.Protocol;
- using MQTTnet.Server;
- using System;
- using System.Collections.Generic;
- using System.Diagnostics;
- using System.IO;
- using System.Linq;
- using System.Text;
- using System.Text.Json;
- using System.Threading.Tasks;
- namespace MqttServerConsoleApp;
- public static class Server_Simple_Samples
- {
- public static async Task Force_Disconnecting_Client()
- {
- /*
- * This sample will disconnect a client.
- *
- * See _Run_Minimal_Server_ for more information.
- */
- using (var mqttServer = await StartMqttServer())
- {
- // Let the client connect.
- await Task.Delay(TimeSpan.FromSeconds(5));
- // Now disconnect the client (if connected).
- var affectedClient = (await mqttServer.GetClientsAsync()).FirstOrDefault(c => c.Id == "MyClient");
- if (affectedClient != null)
- {
- await affectedClient.DisconnectAsync();
- }
- }
- }
- public static async Task Publish_Message_From_Broker()
- {
- /*
- * This sample will publish a message directly at the broker.
- *
- * See _Run_Minimal_Server_ for more information.
- */
- using (var mqttServer = await StartMqttServer())
- {
- // Create a new message using the builder as usual.
- var message = new MqttApplicationMessageBuilder().WithTopic("HelloWorld").WithPayload("Test").Build();
- // Now inject the new message at the broker.
- await mqttServer.InjectApplicationMessage(
- new InjectedMqttApplicationMessage(message)
- {
- SenderClientId = "SenderClientId"
- });
- }
- }
- public static async Task Run_Minimal_Server()
- {
- /*
- * This sample starts a simple MQTT server which will accept any TCP connection.
- */
- var mqttFactory = new MqttFactory();
- // The port for the default endpoint is 1883.
- // The default endpoint is NOT encrypted!
- // Use the builder classes where possible.
- var mqttServerOptions = new MqttServerOptionsBuilder().WithDefaultEndpoint().Build();
- // The port can be changed using the following API (not used in this example).
- // new MqttServerOptionsBuilder()
- // .WithDefaultEndpoint()
- // .WithDefaultEndpointPort(1234)
- // .Build();
- using (var mqttServer = mqttFactory.CreateMqttServer(mqttServerOptions))
- {
- await mqttServer.StartAsync();
- mqttServer.SubscribeAsync("ug67", "uplink");
- mqttServer.SubscribeAsync("ug67", "downlink/$deveui");
- var message = new MqttApplicationMessageBuilder().WithTopic("downlink/$deveui").WithPayload("Test").Build();
- // Now inject the new message at the broker.
- await mqttServer.InjectApplicationMessage(
- new InjectedMqttApplicationMessage(message)
- {
- SenderClientId = "SenderClientId"
- });
- mqttServer.InterceptingPublishAsync += MqttServer_InterceptingPublishAsync;
- Console.WriteLine("Press Enter to exit.");
- Console.ReadLine();
- // Stop and dispose the MQTT server if it is no longer needed!
- await mqttServer.StopAsync();
- }
- }
- private static Task MqttServer_InterceptingPublishAsync(InterceptingPublishEventArgs arg)
- {
- var str = Encoding.UTF8.GetString(arg.ApplicationMessage.Payload);
- Debug.WriteLine(str);
- return Task.CompletedTask;
- }
- public static async Task Run_Server_With_Logging()
- {
- /*
- * This sample starts a simple MQTT server and prints the logs to the output.
- *
- * IMPORTANT! Do not enable logging in live environment. It will decrease performance.
- *
- * See sample "Run_Minimal_Server" for more details.
- */
- var mqttFactory = new MqttFactory(new ConsoleLogger());
- var mqttServerOptions = new MqttServerOptionsBuilder().WithDefaultEndpoint().Build();
- using (var mqttServer = mqttFactory.CreateMqttServer(mqttServerOptions))
- {
- await mqttServer.StartAsync();
- Console.WriteLine("Press Enter to exit.");
- //mqttServer.
- Console.ReadLine();
- // Stop and dispose the MQTT server if it is no longer needed!
- await mqttServer.StopAsync();
- }
- }
- public static async Task Validating_Connections()
- {
- /*
- * This sample starts a simple MQTT server which will check for valid credentials and client ID.
- *
- * See _Run_Minimal_Server_ for more information.
- */
- var mqttFactory = new MqttFactory();
- var mqttServerOptions = new MqttServerOptionsBuilder().WithDefaultEndpoint().Build();
- using (var mqttServer = mqttFactory.CreateMqttServer(mqttServerOptions))
- {
- // Setup connection validation before starting the server so that there is
- // no change to connect without valid credentials.
- mqttServer.ValidatingConnectionAsync += e =>
- {
- if (e.ClientId != "ValidClientId")
- {
- e.ReasonCode = MqttConnectReasonCode.ClientIdentifierNotValid;
- }
- if (e.Username != "ValidUser")
- {
- e.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
- }
- if (e.Password != "SecretPassword")
- {
- e.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
- }
- return Task.CompletedTask;
- };
- await mqttServer.StartAsync();
- Console.WriteLine("Press Enter to exit.");
- Console.ReadLine();
- await mqttServer.StopAsync();
- }
- }
- static async Task<MqttServer> StartMqttServer()
- {
- var mqttFactory = new MqttFactory();
- // Due to security reasons the "default" endpoint (which is unencrypted) is not enabled by default!
- var mqttServerOptions = mqttFactory.CreateServerOptionsBuilder().WithDefaultEndpoint().Build();
- var server = mqttFactory.CreateMqttServer(mqttServerOptions);
- await server.StartAsync();
- return server;
- }
- public static async Task Persist_Retained_Messages()
- {
- /*
- * This sample starts a simple MQTT server which will store all retained messages in a file.
- */
- var storePath = "./RetainedMessages.json";
- var mqttFactory = new MqttFactory();
- //defautport 1883
- // Due to security reasons the "default" endpoint (which is unencrypted) is not enabled by default!
- var mqttServerOptions = mqttFactory.CreateServerOptionsBuilder().WithDefaultEndpoint().Build();
- using (var server = mqttFactory.CreateMqttServer(mqttServerOptions))
- {
- // Make sure that the server will load the retained messages.
- server.LoadingRetainedMessageAsync += async eventArgs =>
- {
- try
- {
- eventArgs.LoadedRetainedMessages = await JsonSerializer.DeserializeAsync<List<MqttApplicationMessage>>(File.OpenRead(storePath));
- Console.WriteLine("Retained messages loaded.");
- }
- catch (FileNotFoundException)
- {
- // Ignore because nothing is stored yet.
- Console.WriteLine("No retained messages stored yet.");
- }
- catch (Exception exception)
- {
- Console.WriteLine(exception);
- }
- };
- // Make sure to persist the changed retained messages.
- server.RetainedMessageChangedAsync += async eventArgs =>
- {
- try
- {
- // This sample uses the property _StoredRetainedMessages_ which will contain all(!) retained messages.
- // The event args also contain the affected retained message (property ChangedRetainedMessage). This can be
- // used to write all retained messages to dedicated files etc. Then all files must be loaded and a full list
- // of retained messages must be provided in the loaded event.
- var buffer = JsonSerializer.SerializeToUtf8Bytes(eventArgs.StoredRetainedMessages);
- await File.WriteAllBytesAsync(storePath, buffer);
- Console.WriteLine("Retained messages saved.");
- }
- catch (Exception exception)
- {
- Console.WriteLine(exception);
- }
- };
- // Make sure to clear the retained messages when they are all deleted via API.
- server.RetainedMessagesClearedAsync += _ =>
- {
- File.Delete(storePath);
- return Task.CompletedTask;
- };
- await server.StartAsync();
- server.SubscribeAsync("ug67","ceshi");
- Console.WriteLine("Press Enter to exit.");
- Console.ReadLine();
- }
- }
- class ConsoleLogger : IMqttNetLogger
- {
- readonly object _consoleSyncRoot = new();
- public bool IsEnabled => true;
- public void Publish(MqttNetLogLevel logLevel, string source, string message, object[]? parameters, Exception? exception)
- {
- var foregroundColor = ConsoleColor.White;
- switch (logLevel)
- {
- case MqttNetLogLevel.Verbose:
- foregroundColor = ConsoleColor.White;
- break;
- case MqttNetLogLevel.Info:
- foregroundColor = ConsoleColor.Green;
- break;
- case MqttNetLogLevel.Warning:
- foregroundColor = ConsoleColor.DarkYellow;
- break;
- case MqttNetLogLevel.Error:
- foregroundColor = ConsoleColor.Red;
- break;
- }
- if (parameters?.Length > 0)
- {
- message = string.Format(message, parameters);
- }
- lock (_consoleSyncRoot)
- {
- Console.ForegroundColor = foregroundColor;
- Console.WriteLine(message);
- if (exception != null)
- {
- Console.WriteLine(exception);
- }
- }
- }
- }
- }
|