Random wanderings through Microsoft Azure esp. PaaS plumbing, the IoT bits, AI on Micro controllers, AI on Edge Devices, .NET nanoFramework, .NET Core on *nix and ML.NET+ONNX
Most of my applications have focused on telemetry but I had been thinking about local control for solutions that have to run disconnected. In “real-world” deployments connectivity to Azure EventGrid MQTT Broker isn’t 100% reliable (also delay and jitter issues) which are an issue for control at the edge.
Over Christmas I read an article about the Internet of Vehicles(IoV) which got me thinking about “edge brokers”. In “real-world” deployments connectivity to Azure EventGrid MQTT Broker would not 100% reliable so I have been looking at lightweight edge brokers.
NanoMQ is a Linux Foundation Edge project. It is ultra-lightweight with a footprint of less than 200Kb when deployed with the minimum feature set, is cross platform (POSIX), has MQTT V5 support, bridging, and message persistence. The support for secure deployment is very limited.
Eclipse Mosquitto is another open-source message broker that implements the MQTT versions 5.0, 3.1.1 and 3.1. It is lightweight and is suitable for use on all devices from low power single board computers to full servers.
HiveMQ Edge is an MQTT V5, V3.1.1 and V3.1 compliant, with protocol adapters which connect various industrial communication protocols and integrate several database engines, bidirectional connections to enterprise brokers. For disconnected scenarios messages are stored on disk and published once online. There are opensource(community support) and commercial versions (additional features and support). The open-source version does not include offline persistence.
There was another sample application RabbitOM.Streaming.Tests.Mjpeg which displayed JPEG images. After looking at the code I figured out I need to use the RtpFrameBuilder class to assemble the RTSP packets into frames.
private static readonly RtpFrameBuilder _frameBuilder = new JpegFrameBuilder();
...
_frameBuilder.FrameReceived += OnFrameReceived;
...
client.PacketReceived += (sender, e) =>
{
var interleavedPacket = e.Packet as RtspInterleavedPacket;
if (interleavedPacket != null && interleavedPacket.Channel > 0)
{
// In most of case, avoid this packet
Console.ForegroundColor = ConsoleColor.DarkCyan;
Console.WriteLine("Skipping some data : size {0}", e.Packet.Data.Length);
return;
}
_frameBuilder.Write(interleavedPacket.Data);
};
My test harness code was “inspired” by the Nager.VideoStream.TestConsole application with a slightly different file format for the start-stop marker text and camera images files.
I used Path.Combine so no code or configuration changes were required when the application was run on different operating systems (still need to ensure ImageFilepathLocal in the appsettings.json is the correct format).
Developer Desktop
I used my desktop computer a 13th Gen Intel(R) Core(TM) i7-13700 2.10 GHz with 32.0 GB running Windows 11 Pro 24H2.
In the test results below (representative of multiple runs while testing) the delay between starting streaming and the first image file was on average 3.7 seconds with the gap between the images roughly 100mSec.
In the test results below (representative of multiple runs while testing) the delay between starting streaming and the first image file was on average roughly 3.7 seconds but the time between images varied a lot from 30mSec to >300mSec.
At 10FPS the results for my developer desktop were more consistent, and the reComputer J3011 had significantly more “jitter”. Both could cope with 1oFPS so the next step is to integrate YoloDotNet library to process the video frames.
The Myriota Connector only supports Direct Methods which provide immediate confirmation of the result being queued by the Myriota Cloud API. The Myriota (API) control message send method responds with 400 Bad Request if there is already a message being sent to a device.
Myriota Azure Function Environment Variable configuration
Sense and Locate Azure IoT Central Template Fan Speed Enumeration
The fan speed value in the message payload is configured in the fan speed enumeration.
Sense and Locate Azure IoT Central Command Fan Speed Selection
The FanSpeed.cs payload formatter extracts the FanSpeed value from the Javascript Object Notation(JSON) payload and returns a two-byte array containing the message type and speed of the fan.
using System;
using System.Collections.Generic;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
public class FormatterDownlink : PayloadFormatter.IFormatterDownlink
{
public byte[] Evaluate(string terminalId, string methodName, JObject payloadJson, byte[] payloadBytes)
{
byte? status = payloadJson.Value<byte?>("FanSpeed");
if (!status.HasValue)
{
return new byte[] { };
}
return new byte[] { 1, status.Value };
}
}
Sense and Locate Azure IoT Central Command Fan Speed History
Each Azure Application Insights log entry starts with the TerminalID (to simplify searching for all the messages related to device) and the requestId a Globally Unique Identifier (GUID) to simplify searching for all the “steps” associated with sending/receiving a message) with the rest of the logging message containing “step” specific diagnostic information.
Sense and Locate Azure IoT Central Command Fan Speed Application Insights
In the Myriota Device Manager the status of Control Messages can be tracked and they can be cancelled if in the “pending” state.
Myriota Control Message status Pending
A Control Message can take up to 24hrs to be delivered and confirmation of delivery has to be implemented by the application developer.
class Program
{
private static Model.ApplicationSettings _applicationSettings;
private static IMqttClient _client;
private static bool _publisherBusy = false;
static async Task Main()
{
Console.WriteLine($"{DateTime.UtcNow:yy-MM-dd HH:mm:ss} MQTTNet client starting");
try
{
// load the app settings into configuration
var configuration = new ConfigurationBuilder()
.AddJsonFile("appsettings.json", false, true)
.AddUserSecrets<Program>()
.Build();
_applicationSettings = configuration.GetSection("ApplicationSettings").Get<Model.ApplicationSettings>();
var mqttFactory = new MqttFactory();
using (_client = mqttFactory.CreateMqttClient())
{
// Certificate based authentication
List<X509Certificate2> certificates = new List<X509Certificate2>
{
new X509Certificate2(_applicationSettings.ClientCertificateFileName, _applicationSettings.ClientCertificatePassword)
};
var tlsOptions = new MqttClientTlsOptionsBuilder()
.WithClientCertificates(certificates)
.WithSslProtocols(System.Security.Authentication.SslProtocols.Tls12)
.UseTls(true)
.Build();
MqttClientOptions mqttClientOptions = new MqttClientOptionsBuilder()
.WithClientId(_applicationSettings.ClientId)
.WithTcpServer(_applicationSettings.Host, _applicationSettings.Port)
.WithCredentials(_applicationSettings.UserName, _applicationSettings.Password)
.WithCleanStart(_applicationSettings.CleanStart)
.WithTlsOptions(tlsOptions)
.Build();
var connectResult = await _client.ConnectAsync(mqttClientOptions);
if (connectResult.ResultCode != MqttClientConnectResultCode.Success)
{
throw new Exception($"Failed to connect: {connectResult.ReasonString}");
}
_client.ApplicationMessageReceivedAsync += OnApplicationMessageReceivedAsync;
Console.WriteLine($"Subscribed to Topic");
foreach (string topic in _applicationSettings.SubscribeTopics.Split(',', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries))
{
var subscribeResult = await _client.SubscribeAsync(topic, _applicationSettings.SubscribeQualityOfService);
Console.WriteLine($" {topic} Result:{subscribeResult.Items.First().ResultCode}");
}
}
//...
}
MQTTnet client console application output
The design of the MQTT protocol means that the hivemq-mqtt-client-dotnet and MQTTnet implementations are similar. Having used both I personally prefer the HiveMQ client library.
For one test deployment it took me an hour to generate the Root, Intermediate and a number of Devices certificates which was a waste of time. At this point I decided investigate writing some applications to simplify the process.
static void Main(string[] args)
{
var serviceProvider = new ServiceCollection()
.AddCertificateManager()
.BuildServiceProvider();
// load the app settings into configuration
var configuration = new ConfigurationBuilder()
.AddJsonFile("appsettings.json", false, true)
.AddUserSecrets<Program>()
.Build();
_applicationSettings = configuration.GetSection("ApplicationSettings").Get<Model.ApplicationSettings>();
//------
Console.WriteLine($"validFrom:{validFrom} ValidTo:{validTo}");
var serverRootCertificate = serviceProvider.GetService<CreateCertificatesClientServerAuth>();
var root = serverRootCertificate.NewRootCertificate(
new DistinguishedName {
CommonName = _applicationSettings.CommonName,
Organisation = _applicationSettings.Organisation,
OrganisationUnit = _applicationSettings.OrganisationUnit,
Locality = _applicationSettings.Locality,
StateProvince = _applicationSettings.StateProvince,
Country = _applicationSettings.Country
},
new ValidityPeriod {
ValidFrom = validFrom,
ValidTo = validTo,
},
_applicationSettings.PathLengthConstraint,
_applicationSettings.DnsName);
root.FriendlyName = _applicationSettings.FriendlyName;
Console.Write("PFX Password:");
string password = Console.ReadLine();
if ( String.IsNullOrEmpty(password))
{
Console.WriteLine("PFX Password invalid");
return;
}
var exportCertificate = serviceProvider.GetService<ImportExportCertificate>();
var rootCertificatePfxBytes = exportCertificate.ExportRootPfx(password, root);
File.WriteAllBytes(_applicationSettings.RootCertificateFilePath, rootCertificatePfxBytes);
Console.WriteLine($"Root certificate file:{_applicationSettings.RootCertificateFilePath}");
Console.WriteLine("press enter to exit");
Console.ReadLine();
}
The application’s configuration was split between application settings file(certificate file paths, validity periods, Organisation etc.) or entered at runtime ( certificate filenames, passwords etc.) The first application generates a Root Certificate using the distinguished name information from the application settings, plus file names and passwords entered by the user.
Root Certificate generation application output
The second application generates an Intermediate Certificate using the Root Certificate, the distinguished name information from the application settings, plus file names and passwords entered by the user.
static void Main(string[] args)
{
var serviceProvider = new ServiceCollection()
.AddCertificateManager()
.BuildServiceProvider();
// load the app settings into configuration
var configuration = new ConfigurationBuilder()
.AddJsonFile("appsettings.json", false, true)
.AddUserSecrets<Program>()
.Build();
_applicationSettings = configuration.GetSection("ApplicationSettings").Get<Model.ApplicationSettings>();
//------
Console.WriteLine($"validFrom:{validFrom} be after ValidTo:{validTo}");
Console.WriteLine($"Root Certificate file:{_applicationSettings.RootCertificateFilePath}");
Console.Write("Root Certificate Password:");
string rootPassword = Console.ReadLine();
if (String.IsNullOrEmpty(rootPassword))
{
Console.WriteLine("Fail");
return;
}
var rootCertificate = new X509Certificate2(_applicationSettings.RootCertificateFilePath, rootPassword);
var intermediateCertificateCreate = serviceProvider.GetService<CreateCertificatesClientServerAuth>();
var intermediateCertificate = intermediateCertificateCreate.NewIntermediateChainedCertificate(
new DistinguishedName
{
CommonName = _applicationSettings.CommonName,
Organisation = _applicationSettings.Organisation,
OrganisationUnit = _applicationSettings.OrganisationUnit,
Locality = _applicationSettings.Locality,
StateProvince = _applicationSettings.StateProvince,
Country = _applicationSettings.Country
},
new ValidityPeriod
{
ValidFrom = validFrom,
ValidTo = validTo,
},
_applicationSettings.PathLengthConstraint,
_applicationSettings.DnsName, rootCertificate);
intermediateCertificate.FriendlyName = _applicationSettings.FriendlyName;
Console.Write("Intermediate certificate Password:");
string intermediatePassword = Console.ReadLine();
if (String.IsNullOrEmpty(intermediatePassword))
{
Console.WriteLine("Fail");
return;
}
var importExportCertificate = serviceProvider.GetService<ImportExportCertificate>();
Console.WriteLine($"Intermediate PFX file:{_applicationSettings.IntermediateCertificatePfxFilePath}");
var intermediateCertificatePfxBtyes = importExportCertificate.ExportChainedCertificatePfx(intermediatePassword, intermediateCertificate, rootCertificate);
File.WriteAllBytes(_applicationSettings.IntermediateCertificatePfxFilePath, intermediateCertificatePfxBtyes);
Console.WriteLine($"Intermediate CER file:{_applicationSettings.IntermediateCertificateCerFilePath}");
var intermediateCertificatePemText = importExportCertificate.PemExportPublicKeyCertificate(intermediateCertificate);
File.WriteAllText(_applicationSettings.IntermediateCertificateCerFilePath, intermediateCertificatePemText);
Console.WriteLine("press enter to exit");
Console.ReadLine();
}
Uploading the Intermediate certificate to Azure Event Grid
The third application generates Device Certificates using the Intermediate Certificate, distinguished name information from the application settings, plus device id, file names and passwords entered by the user.
static void Main(string[] args)
{
var serviceProvider = new ServiceCollection()
.AddCertificateManager()
.BuildServiceProvider();
// load the app settings into configuration
var configuration = new ConfigurationBuilder()
.AddJsonFile("appsettings.json", false, true)
.AddUserSecrets<Program>()
.Build();
_applicationSettings = configuration.GetSection("ApplicationSettings").Get<Model.ApplicationSettings>();
//------
Console.WriteLine($"validFrom:{validFrom} ValidTo:{validTo}");
Console.WriteLine($"Intermediate PFX file:{_applicationSettings.IntermediateCertificateFilePath}");
Console.Write("Intermediate PFX Password:");
string intermediatePassword = Console.ReadLine();
if (String.IsNullOrEmpty(intermediatePassword))
{
Console.WriteLine("Intermediate PFX Password invalid");
return;
}
var intermediate = new X509Certificate2(_applicationSettings.IntermediateCertificateFilePath, intermediatePassword);
Console.Write("Device ID:");
string deviceId = Console.ReadLine();
if (String.IsNullOrEmpty(deviceId))
{
Console.WriteLine("Device ID invalid");
return;
}
var createClientServerAuthCerts = serviceProvider.GetService<CreateCertificatesClientServerAuth>();
var device = createClientServerAuthCerts.NewDeviceChainedCertificate(
new DistinguishedName
{
CommonName = deviceId,
Organisation = _applicationSettings.Organisation,
OrganisationUnit = _applicationSettings.OrganisationUnit,
Locality = _applicationSettings.Locality,
StateProvince = _applicationSettings.StateProvince,
Country = _applicationSettings.Country
},
new ValidityPeriod
{
ValidFrom = validFrom,
ValidTo = validTo,
},
deviceId, intermediate);
device.FriendlyName = deviceId;
Console.Write("Device PFX Password:");
string devicePassword = Console.ReadLine();
if (String.IsNullOrEmpty(devicePassword))
{
Console.WriteLine("Fail");
return;
}
var importExportCertificate = serviceProvider.GetService<ImportExportCertificate>();
string devicePfxPath = string.Format(_applicationSettings.DeviceCertificatePfxFilePath, deviceId);
Console.WriteLine($"Device PFX file:{devicePfxPath}");
var deviceCertificatePath = importExportCertificate.ExportChainedCertificatePfx(devicePassword, device, intermediate);
File.WriteAllBytes(devicePfxPath, deviceCertificatePath);
Console.WriteLine("press enter to exit");
Console.ReadLine();
}
Device Certificate generation application output
Uploading the Intermediate certificate to Azure Event Grid
The Method Callback Delegate has different parameters, so I had to update the downlink formatter interface and update all of the sample downlink payload formatters.
public interface IFormatterDownlink
{
public byte[] Evaluate(string terminalId, string methodName, JObject? payloadJson, byte[] payloadBytes);
}
How direct methods will be processed is configured in the application settings. For each direct method name the downlink payload formatter to be invoked and an optional Javascript Object Notation(JSON) payload can be configured.
If there is no configuration for the direct method name, the payload formatter specified in Myriota device “DownlinkDefault” Attribute is used, and if that is not configured the default formatter in the payloadFormatters section of the application settings is used.
namespace devMobile.IoT.MyriotaAzureIoTConnector.Connector
{
internal class IoTHubDownlink(ILogger<IoTHubDownlink> _logger, IOptions<Models.AzureIoT> azureIoTSettings, IPayloadFormatterCache _payloadFormatterCache, IMyriotaModuleAPI _myriotaModuleAPI) : IIoTHubDownlink
{
private readonly Models.AzureIoT _azureIoTSettings = azureIoTSettings.Value;
public async Task<MethodResponse> IotHubMethodHandler(MethodRequest methodRequest, object userContext)
{
// DIY request identifier so processing progress can be tracked in Application Insights
string requestId = Guid.NewGuid().ToString();
Models.DeviceConnectionContext context = (Models.DeviceConnectionContext)userContext;
try
{
_logger.LogInformation("Downlink- TerminalId:{TerminalId} RequestId:{requestId} Name:{Name}", context.TerminalId, requestId, methodRequest.Name);
// Lookup payload formatter name, none specified use context one which is from device attributes or the default in configuration
string payloadFormatterName;
if (_azureIoTSettings.IoTHub.Methods.TryGetValue(methodRequest.Name, out Models.AzureIoTHubMethod? method) && !string.IsNullOrEmpty(method.Formatter))
{
payloadFormatterName = method.Formatter;
_logger.LogInformation("Downlink- IoT Hub TerminalID:{TermimalId} RequestID:{requestId} Method formatter:{payloadFormatterName} ", context.TerminalId, requestId, payloadFormatterName);
}
else
{
payloadFormatterName = context.PayloadFormatterDownlink;
_logger.LogInformation("Downlink- IoT Hub TerminalID:{TermimalId} RequestID:{requestId} Context formatter:{payloadFormatterName} ", context.TerminalId, requestId, payloadFormatterName);
}
// Display methodRequest.Data as Hex
if (methodRequest.Data is not null)
{
_logger.LogInformation("Downlink- IoT Hub TerminalID:{TerminalId} RequestID:{requestId} Data:{Data}", context.TerminalId, requestId, BitConverter.ToString(methodRequest.Data));
}
else
{
_logger.LogInformation("Downlink- IoT Hub TerminalID:{TerminalId} RequestID:{requestId} Data:null", context.TerminalId, requestId);
}
JObject? requestJson = null;
if ((method is not null) && !string.IsNullOrWhiteSpace(method.Payload))
{
// There is a matching method with a possible JSON payload
string payload = method.Payload.Trim();
if ((payload.StartsWith('{') && payload.EndsWith('}')) || (payload.StartsWith('[') && payload.EndsWith(']')))
{
// The payload is could be JSON
try
{
requestJson = JObject.Parse(payload);
}
catch (JsonReaderException jex)
{
_logger.LogWarning(jex, "Downlink- IoT Hub TerminalID:{TerminalId} RequestID:{requestId} Method Payload is not valid JSON", context.TerminalId, requestId);
return new MethodResponse(Encoding.ASCII.GetBytes($"{{\"message\":\"RequestID:{requestId} Method payload is not valid JSON.\"}}"), (int)HttpStatusCode.UnprocessableEntity);
}
}
else
{
// The payload couldn't be JSON
_logger.LogWarning("Downlink- IoT Hub TerminalID:{TerminalId} RequestID:{requestId} Method Payload is definitely not valid JSON", context.TerminalId, requestId);
return new MethodResponse(Encoding.ASCII.GetBytes($"{{\"message\":\"RequestID:{requestId} Method payload is definitely not valid JSON.\"}}"), (int)HttpStatusCode.UnprocessableEntity);
}
_logger.LogInformation("Downlink- IoT Hub TerminalID:{TerminalId} RequestID:{requestId} Method Payload:{requestJson}", context.TerminalId, requestId, JsonConvert.SerializeObject(requestJson, Formatting.Indented));
}
else
{
// If there was not matching method or the payload was "empty" see if the method request payload is valid
if (!string.IsNullOrWhiteSpace(methodRequest.DataAsJson))
{
string payload = methodRequest.DataAsJson.Trim();
if ((payload.StartsWith('{') && payload.EndsWith('}')) || (payload.StartsWith('[') && payload.EndsWith(']')))
{
// The payload is could be JSON
try
{
requestJson = JObject.Parse(payload);
_logger.LogInformation("Downlink- IoT Hub TerminalID:{TerminalId} RequestID:{requestId} DataAsJson:{requestJson}", context.TerminalId, requestId, JsonConvert.SerializeObject(requestJson, Formatting.Indented));
}
catch (JsonReaderException jex)
{
_logger.LogInformation(jex, "Downlink- IoT Hub TerminalID:{TerminalId} RequestID:{requestId} DataAsJson is not valid JSON", context.TerminalId, requestId);
}
}
}
}
// This "shouldn't" fail, but it could for invalid path to blob, timeout retrieving blob, payload formatter syntax error etc.
IFormatterDownlink payloadFormatter = await _payloadFormatterCache.DownlinkGetAsync(payloadFormatterName);
if ( requestJson is null )
{
requestJson = new JObject();
}
// This also "shouldn't" fail, but the payload formatters can throw runtime exceptions like null reference, divide by zero, index out of range etc.
byte[] payloadBytes = payloadFormatter.Evaluate(context.TerminalId, methodRequest.Name, requestJson, methodRequest.Data);
// Validate payload before calling Myriota control message send API method
if (payloadBytes is null)
{
_logger.LogWarning("Downlink- IoT Hub TerminalID:{TerminalId} Request:{requestId} Evaluate returned null", context.TerminalId, requestId);
return new MethodResponse(Encoding.ASCII.GetBytes($"{{\"message\":\"RequestID:{requestId} payload evaluate returned null.\"}}"), (int)HttpStatusCode.UnprocessableEntity);
}
if ((payloadBytes.Length < Constants.DownlinkPayloadMinimumLength) || (payloadBytes.Length > Constants.DownlinkPayloadMaximumLength))
{
_logger.LogWarning("Downlink- IoT Hub TerminalID:{TerminalId} RequestID:{requestId} PayloadBytes:{payloadBytes} length:{Length} invalid, must be {DownlinkPayloadMinimumLength} to {DownlinkPayloadMaximumLength} bytes", context.TerminalId, requestId, BitConverter.ToString(payloadBytes), payloadBytes.Length, Constants.DownlinkPayloadMinimumLength, Constants.DownlinkPayloadMaximumLength); ;
return new MethodResponse(Encoding.ASCII.GetBytes($"{{\"message\":\"RequestID:{requestId} payload evaluation length invalid.\"}}"), (int)HttpStatusCode.UnprocessableEntity);
}
// Finally send Control Message to device using the Myriota API
_logger.LogInformation("Downlink- IoT Hub TerminalID:{TerminalId} RequestID:{requestID} PayloadBytes:{payloadBytes} Length:{Length} sending", context.TerminalId, requestId, BitConverter.ToString(payloadBytes), payloadBytes.Length);
string messageId = await _myriotaModuleAPI.SendAsync(context.TerminalId, payloadBytes);
_logger.LogInformation("Downlink- IoT Hub TerminalID:{TerminalId} RequestID:{requestId} Myriota MessageID:{messageId} sent", context.TerminalId, requestId, messageId);
}
catch (Exception ex)
{
_logger.LogError(ex, "Downlink- IoT Hub TerminalID:{TerminalId} RequestID:{requestId} IotHubMethodHandler processing failed", context.TerminalId, requestId);
return new MethodResponse(Encoding.ASCII.GetBytes($"{{\"message\":\"TerminalID:{context.TerminalId} RequestID:{requestId} method handler failed.\"}}"), (int)HttpStatusCode.InternalServerError);
}
return new MethodResponse(Encoding.ASCII.GetBytes($"{{\"message\":\"TerminalID:{context.TerminalId} RequestID:{requestId} Message sent successfully.\"}}"), (int)HttpStatusCode.OK);
}
}
}
public class FormatterDownlink : PayloadFormatter.IFormatterDownlink
{
public byte[] Evaluate(string terminalId, string methodName, JObject payloadJson, byte[] payloadBytes)
{
byte? status = payloadJson.GetValue("FanSpeed", StringComparison.OrdinalIgnoreCase)?.Value<byte>();
if (!status.HasValue)
{
return new byte[] { };
}
return new byte[] { 1, status.Value };
}
}
The FanSpeed.cs payload formatter extracts the FanSpeed value from the JSON payload and returns a two byte array containing the message type and speed of the fan.
Azure Function application displaying Diagnostic information for control message
Each logging message starts with the TerminalID (to simplify searching for all the direct methods invoked on a device) and the requestId a Globally Unique Identifier (GUID) to simplify searching for all the “steps” associated with sending a message) with the rest of the logging message containing “step” specific diagnostic information.
Azure Application Insights displaying information diagnostic information
Myriota Device manager control message history displaying pending control message
The Azure IoT Explorer payload for an empty message contained two ” characters which is a bit odd. I will have to build a test application which uses the Azure IoT Hub C2D direct method API to see if this is a “feature”.
Each logging message starts with the TerminalID (to simplify searching for all the messages sent to a device) and the message LockToken (to simplify searching for all the “steps” associated with sending a message) with the rest of the logging message containing “step” specific diagnostic information.
Successful Azure IoT Explorer C2D JSON Message
If there is no PayloadFormatter attribute the default in the PayloadFormatters section of the function configuration is used.
using System;
using System.Collections.Generic;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
public class FormatterDownlink : PayloadFormatter.IFormatterDownlink
{
public byte[] Evaluate(IDictionary<string, string> properties, string terminalId, JObject payloadJson, byte[] payloadBytes)
{
byte? status = payloadJson.Value<byte?>("FanSpeed");
if (!status.HasValue)
{
return new byte[] { };
}
return new byte[] { 1, status.Value };
}
}
The FanSpeed.cs payload formatter extracts the FanSpeed value from the JSON payload and returns a two byte array containing the message type and speed of the fan.
Azure IoT Function running waiting for a C2D message
After re-reading the SetMethodHandlerAync documentation I refactored the code (back to the approach used a couple of branches ago) with the “using” wrapping the try/catch.
public async Task AzureIoTHubMessageHandler(Message message, object userContext)
{
Models.DeviceConnectionContext context = (Models.DeviceConnectionContext)userContext;
_logger.LogInformation("Downlink- IoT Hub TerminalId:{TermimalId} LockToken:{LockToken}", context.TerminalId, message.LockToken);
using (message) // https://learn.microsoft.com/en-us/dotnet/api/microsoft.azure.devices.client.deviceclient.setreceivemessagehandlerasync?view=azure-dotnet
{
try
{
// Replace default formatter with message specific formatter if configured.
if (!message.Properties.TryGetValue(Constants.IoTHubDownlinkPayloadFormatterProperty, out string? payloadFormatterName) || string.IsNullOrEmpty(payloadFormatterName))
{
_logger.LogInformation("Downlink- IoT Hub TerminalID:{TermimalId} LockToken:{LockToken} Context formatter:{payloadFormatterName} ", context.TerminalId, message.LockToken, payloadFormatterName);
payloadFormatterName = context.PayloadFormatterDownlink;
}
else
{
_logger.LogInformation("Downlink- IoT Hub TerminalID:{TermimalId} LockToken:{LockToken} Property formatter:{payloadFormatterName} ", context.TerminalId, message.LockToken, payloadFormatterName);
}
// If GetBytes fails payload really badly broken
byte[] messageBytes = message.GetBytes();
_logger.LogInformation("Downlink- IoT Hub TerminalID:{TerminalId} LockToken:{LockToken} Message bytes:{messageBytes}", context.TerminalId, message.LockToken, BitConverter.ToString(messageBytes));
// Try converting the bytes to text then to JSON
JObject? messageJson = null;
try
{
// These will fail for some messages, payload formatter gets bytes only
string messageText = Encoding.UTF8.GetString(messageBytes);
try
{
messageJson = JObject.Parse(messageText);
_logger.LogInformation("Downlink- IoT Hub TerminalID:{TerminalId} LockToken:{LockToken} JSON:{messageJson}", context.TerminalId, message.LockToken, JsonConvert.SerializeObject(messageJson, Formatting.Indented));
}
catch (JsonReaderException jex)
{
_logger.LogInformation(jex, "Downlink- IoT Hub TerminalID:{TerminalId} LockToken:{LockToken} not valid JSON", context.TerminalId, message.LockToken);
}
}
catch (ArgumentException aex)
{
_logger.LogInformation(aex, "Downlink- IoT Hub TerminalID:{TerminalId} LockToken:{LockToken} message bytes not valid text", context.TerminalId, message.LockToken);
}
// This shouldn't fail, but it could for invalid path to blob, timeout retrieving blob, payload formatter syntax error etc.
IFormatterDownlink payloadFormatter = await _payloadFormatterCache.DownlinkGetAsync(payloadFormatterName);
// This will fail if payload formatter throws runtime exceptions like null reference, divide by zero, index out of range etc.
byte[] payloadBytes = payloadFormatter.Evaluate(message.Properties, context.TerminalId, messageJson, messageBytes);
// Validate payload before calling Myriota control message send API method
if (payloadBytes is null)
{
_logger.LogWarning("Downlink- IoT Hub TerminalID:{TerminalId} LockToken:{LockToken} payload formatter:{payloadFormatter} Evaluate returned null", context.TerminalId, message.LockToken, payloadFormatterName);
await context.DeviceClient.RejectAsync(message);
return;
}
if ((payloadBytes.Length < Constants.DownlinkPayloadMinimumLength) || (payloadBytes.Length > Constants.DownlinkPayloadMaximumLength))
{
_logger.LogWarning("Downlink- IoT Hub TerminalID:{TerminalId} LockToken:{LockToken} PayloadBytes:{payloadBytes} length:{Length} invalid must be {DownlinkPayloadMinimumLength} to {DownlinkPayloadMaximumLength} bytes", context.TerminalId, message.LockToken, Convert.ToHexString(payloadBytes), payloadBytes.Length, Constants.DownlinkPayloadMinimumLength, Constants.DownlinkPayloadMaximumLength);
await context.DeviceClient.RejectAsync(message);
return;
}
// Finally send Control Message to device using the Myriota API
_logger.LogInformation("Downlink- IoT Hub TerminalID:{TerminalId} LockToken:{LockToken} PayloadBytes:{payloadBytes} Length:{Length} sending", context.TerminalId, message.LockToken, BitConverter.ToString(payloadBytes), payloadBytes.Length);
string messageId = await _myriotaModuleAPI.SendAsync(context.TerminalId, payloadBytes);
_logger.LogInformation("Downlink- IoT Hub TerminalID:{TerminalId} LockToken:{LockToken} MessageID:{messageId} sent", context.TerminalId, message.LockToken, messageId);
await context.DeviceClient.CompleteAsync(message);
}
catch (Exception ex)
{
_logger.LogError(ex, "Downlink- IoT Hub TerminalID:{TerminalId} LockToken:{LockToken} MessageHandler processing failed", context.TerminalId, message.LockToken);
await context.DeviceClient.RejectAsync(message);
}
}
}
...
// Finally send Control Message to device using the Myriota API
_logger.LogInformation("Downlink- IoT Hub TerminalID:{TerminalId} LockToken:{LockToken} PayloadBytes:{payloadBytes} Length:{Length} sending", context.TerminalId, message.LockToken, BitConverter.ToString(payloadBytes), payloadBytes.Length);
string messageId = await _myriotaModuleAPI.SendAsync(context.TerminalId, payloadBytes);
...
Azure IoT Function successfully sending downlink message.
The Encoding.UTF8.GetString and JObject.Parse are processed in a single Try with a specialised catch for when the payload cannot be converted to text. If the payload cannot be converted to JSON only the payloadBytes parameter of payload formatter is populated.
The Azure IoT Hub downlink message handler was a partial class and part of implementation of the IDeviceConnectionCache which was a hangover from one of the initial versions.