SOMS/test/MqttWpfApp/Server_Simple_Samples.cs
2024-07-15 10:31:26 +08:00

301 lines
10 KiB
C#

using MQTTnet;
using MQTTnet.Diagnostics;
using MQTTnet.Protocol;
using MQTTnet.Server;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;
namespace MqttServerConsoleApp;
public static class Server_Simple_Samples
{
public static async Task Force_Disconnecting_Client()
{
/*
* This sample will disconnect a client.
*
* See _Run_Minimal_Server_ for more information.
*/
using (var mqttServer = await StartMqttServer())
{
// Let the client connect.
await Task.Delay(TimeSpan.FromSeconds(5));
// Now disconnect the client (if connected).
var affectedClient = (await mqttServer.GetClientsAsync()).FirstOrDefault(c => c.Id == "MyClient");
if (affectedClient != null)
{
await affectedClient.DisconnectAsync();
}
}
}
public static async Task Publish_Message_From_Broker()
{
/*
* This sample will publish a message directly at the broker.
*
* See _Run_Minimal_Server_ for more information.
*/
using (var mqttServer = await StartMqttServer())
{
// Create a new message using the builder as usual.
var message = new MqttApplicationMessageBuilder().WithTopic("HelloWorld").WithPayload("Test").Build();
// Now inject the new message at the broker.
await mqttServer.InjectApplicationMessage(
new InjectedMqttApplicationMessage(message)
{
SenderClientId = "SenderClientId"
});
}
}
public static async Task Run_Minimal_Server()
{
/*
* This sample starts a simple MQTT server which will accept any TCP connection.
*/
var mqttFactory = new MqttFactory();
// The port for the default endpoint is 1883.
// The default endpoint is NOT encrypted!
// Use the builder classes where possible.
var mqttServerOptions = new MqttServerOptionsBuilder().WithDefaultEndpoint().Build();
// The port can be changed using the following API (not used in this example).
// new MqttServerOptionsBuilder()
// .WithDefaultEndpoint()
// .WithDefaultEndpointPort(1234)
// .Build();
using (var mqttServer = mqttFactory.CreateMqttServer(mqttServerOptions))
{
await mqttServer.StartAsync();
mqttServer.SubscribeAsync("ug67", "uplink");
mqttServer.SubscribeAsync("ug67", "downlink/$deveui");
var message = new MqttApplicationMessageBuilder().WithTopic("downlink/$deveui").WithPayload("Test").Build();
// Now inject the new message at the broker.
await mqttServer.InjectApplicationMessage(
new InjectedMqttApplicationMessage(message)
{
SenderClientId = "SenderClientId"
});
mqttServer.InterceptingPublishAsync += MqttServer_InterceptingPublishAsync;
Console.WriteLine("Press Enter to exit.");
Console.ReadLine();
// Stop and dispose the MQTT server if it is no longer needed!
await mqttServer.StopAsync();
}
}
private static Task MqttServer_InterceptingPublishAsync(InterceptingPublishEventArgs arg)
{
var str = Encoding.UTF8.GetString(arg.ApplicationMessage.Payload);
Debug.WriteLine(str);
return Task.CompletedTask;
}
public static async Task Run_Server_With_Logging()
{
/*
* This sample starts a simple MQTT server and prints the logs to the output.
*
* IMPORTANT! Do not enable logging in live environment. It will decrease performance.
*
* See sample "Run_Minimal_Server" for more details.
*/
var mqttFactory = new MqttFactory(new ConsoleLogger());
var mqttServerOptions = new MqttServerOptionsBuilder().WithDefaultEndpoint().Build();
using (var mqttServer = mqttFactory.CreateMqttServer(mqttServerOptions))
{
await mqttServer.StartAsync();
Console.WriteLine("Press Enter to exit.");
//mqttServer.
Console.ReadLine();
// Stop and dispose the MQTT server if it is no longer needed!
await mqttServer.StopAsync();
}
}
public static async Task Validating_Connections()
{
/*
* This sample starts a simple MQTT server which will check for valid credentials and client ID.
*
* See _Run_Minimal_Server_ for more information.
*/
var mqttFactory = new MqttFactory();
var mqttServerOptions = new MqttServerOptionsBuilder().WithDefaultEndpoint().Build();
using (var mqttServer = mqttFactory.CreateMqttServer(mqttServerOptions))
{
// Setup connection validation before starting the server so that there is
// no change to connect without valid credentials.
mqttServer.ValidatingConnectionAsync += e =>
{
if (e.ClientId != "ValidClientId")
{
e.ReasonCode = MqttConnectReasonCode.ClientIdentifierNotValid;
}
if (e.Username != "ValidUser")
{
e.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
}
if (e.Password != "SecretPassword")
{
e.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
}
return Task.CompletedTask;
};
await mqttServer.StartAsync();
Console.WriteLine("Press Enter to exit.");
Console.ReadLine();
await mqttServer.StopAsync();
}
}
static async Task<MqttServer> StartMqttServer()
{
var mqttFactory = new MqttFactory();
// Due to security reasons the "default" endpoint (which is unencrypted) is not enabled by default!
var mqttServerOptions = mqttFactory.CreateServerOptionsBuilder().WithDefaultEndpoint().Build();
var server = mqttFactory.CreateMqttServer(mqttServerOptions);
await server.StartAsync();
return server;
}
public static async Task Persist_Retained_Messages()
{
/*
* This sample starts a simple MQTT server which will store all retained messages in a file.
*/
var storePath = "./RetainedMessages.json";
var mqttFactory = new MqttFactory();
//defautport 1883
// Due to security reasons the "default" endpoint (which is unencrypted) is not enabled by default!
var mqttServerOptions = mqttFactory.CreateServerOptionsBuilder().WithDefaultEndpoint().Build();
using (var server = mqttFactory.CreateMqttServer(mqttServerOptions))
{
// Make sure that the server will load the retained messages.
server.LoadingRetainedMessageAsync += async eventArgs =>
{
try
{
eventArgs.LoadedRetainedMessages = await JsonSerializer.DeserializeAsync<List<MqttApplicationMessage>>(File.OpenRead(storePath));
Console.WriteLine("Retained messages loaded.");
}
catch (FileNotFoundException)
{
// Ignore because nothing is stored yet.
Console.WriteLine("No retained messages stored yet.");
}
catch (Exception exception)
{
Console.WriteLine(exception);
}
};
// Make sure to persist the changed retained messages.
server.RetainedMessageChangedAsync += async eventArgs =>
{
try
{
// This sample uses the property _StoredRetainedMessages_ which will contain all(!) retained messages.
// The event args also contain the affected retained message (property ChangedRetainedMessage). This can be
// used to write all retained messages to dedicated files etc. Then all files must be loaded and a full list
// of retained messages must be provided in the loaded event.
var buffer = JsonSerializer.SerializeToUtf8Bytes(eventArgs.StoredRetainedMessages);
await File.WriteAllBytesAsync(storePath, buffer);
Console.WriteLine("Retained messages saved.");
}
catch (Exception exception)
{
Console.WriteLine(exception);
}
};
// Make sure to clear the retained messages when they are all deleted via API.
server.RetainedMessagesClearedAsync += _ =>
{
File.Delete(storePath);
return Task.CompletedTask;
};
await server.StartAsync();
server.SubscribeAsync("ug67","ceshi");
Console.WriteLine("Press Enter to exit.");
Console.ReadLine();
}
}
class ConsoleLogger : IMqttNetLogger
{
readonly object _consoleSyncRoot = new();
public bool IsEnabled => true;
public void Publish(MqttNetLogLevel logLevel, string source, string message, object[]? parameters, Exception? exception)
{
var foregroundColor = ConsoleColor.White;
switch (logLevel)
{
case MqttNetLogLevel.Verbose:
foregroundColor = ConsoleColor.White;
break;
case MqttNetLogLevel.Info:
foregroundColor = ConsoleColor.Green;
break;
case MqttNetLogLevel.Warning:
foregroundColor = ConsoleColor.DarkYellow;
break;
case MqttNetLogLevel.Error:
foregroundColor = ConsoleColor.Red;
break;
}
if (parameters?.Length > 0)
{
message = string.Format(message, parameters);
}
lock (_consoleSyncRoot)
{
Console.ForegroundColor = foregroundColor;
Console.WriteLine(message);
if (exception != null)
{
Console.WriteLine(exception);
}
}
}
}
}