Azure Event Grid nanoFramework Client – Publisher

Building a .NET nanoFramework application for testing Azure Event Grid MQTT Broker connectivity that would run on my Seeedstudio EdgeBox ESP100 and Seeedstudio Xiao ESP32S3 devices took a couple of hours. Most of that time was spent figuring out how to generate the certificate and elliptic curve private key

Create an elliptic curve private key

 openssl ecparam -name prime256v1 -genkey -noout -out device.key

Generate a certificate signing request

openssl req -new -key device.key -out device.csr -subj "/CN=device.example.com/O=YourOrg/OU=IoT"

Then use the intermediate certificate and key file from earlier to generate a device certificate and key.

 openssl x509 -req -in device.csr -CA IntermediateCA.crt -CAkey IntermediateCA.key -CAcreateserial -out device.crt -days 365 -sha256

In this post I have assumed that the reader is familiar with configuring Azure Event Grid clients, client groups, topic spaces, permission bindings and routing.

The PEM encoded root CA certificate chain that is used to validate the server
public const string CA_ROOT_PEM = @"-----BEGIN CERTIFICATE-----
CN: CN = Microsoft Azure ECC TLS Issuing CA 03
-----END CERTIFICATE-----
-----BEGIN CERTIFICATE-----
CN: CN = DigiCert Global Root G3
-----END CERTIFICATE-----";

The PEM encoded certificate chain that is used to authenticate the device
public const string CLIENT_CERT_PEM_A = @"-----BEGIN CERTIFICATE-----
-----BEGIN CERTIFICATE-----
 CN=Self signed device certificate
-----END CERTIFICATE-----
-----BEGIN CERTIFICATE-----
 CN=Self signed Intermediate certificate
-----END CERTIFICATE-----";

 The PEM encoded private key of device
public const string CLIENT_KEY_PEM_A = @"-----BEGIN EC PRIVATE KEY-----
-----END EC PRIVATE KEY-----";

My application was “inspired” by the .NET nanoFramework m2mqtt example.

public static void Main()
{
   int sequenceNumber = 0;
   MqttClient mqttClient = null;
   Thread.Sleep(1000); // Found this works around some issues with running immediately after a reset

   bool wifiConnected = false;
   Console.WriteLine("WiFi connecting...");
   do
   {
      // Attempt to connect using DHCP
      wifiConnected = WifiNetworkHelper.ConnectDhcp(Secrets.WIFI_SSID, Secrets.WIFI_PASSWORD, requiresDateTime: true);

      if (!wifiConnected)
      {
         Console.WriteLine($"Failed to connect. Error: {WifiNetworkHelper.Status}");
         if (WifiNetworkHelper.HelperException != null)
         {
            Console.WriteLine($"Exception: {WifiNetworkHelper.HelperException}");
         }

         Thread.Sleep(1000);
      }
   }
   while (!wifiConnected);
   Console.WriteLine("WiFi connected");

   var caCert = new X509Certificate(Constants.CA_ROOT_PEM);

   X509Certificate2 clientCert = null;
   try
   {
      clientCert = new X509Certificate2(Secrets.CLIENT_CERT_PEM_A, Secrets.CLIENT_KEY_PEM_A, string.Empty);
   }
   catch (Exception ex)
   {
      Console.WriteLine($"Client Certificate Exception: {ex.Message}");
   }

   mqttClient = new MqttClient(Secrets.MQTT_SERVER, Constants.MQTT_PORT, true, caCert, clientCert, MqttSslProtocols.TLSv1_2);

   mqttClient.ProtocolVersion = MqttProtocolVersion.Version_5;

   bool mqttConnected = false;
   Console.WriteLine("MQTT connecting...");
   do
   {
      try
      {
         // Regular connect
         var resultConnect = mqttClient.Connect(Secrets.MQTT_CLIENTID, Secrets.MQTT_USERNAME, Secrets.MQTT_PASSWORD);
         if (resultConnect != MqttReasonCode.Success)
         {
            Console.WriteLine($"MQTT ERROR connecting: {resultConnect}");
            Thread.Sleep(1000);
         }
         else
         {
            mqttConnected = true;
         }
      }
      catch (Exception ex)
      {
         Console.WriteLine($"MQTT ERROR Exception '{ex.Message}'");
         Thread.Sleep(1000);
      }
   }
   while (!mqttConnected);
   Console.WriteLine("MQTT connected...");

   mqttClient.MqttMsgPublishReceived += MqttMsgPublishReceived;
   mqttClient.MqttMsgSubscribed += MqttMsgSubscribed;
   mqttClient.MqttMsgUnsubscribed += MqttMsgUnsubscribed;
   mqttClient.ConnectionOpened += ConnectionOpened;
   mqttClient.ConnectionClosed += ConnectionClosed;
   mqttClient.ConnectionClosedRequest += ConnectionClosedRequest;

   string topicPublish = string.Format(MQTT_TOPIC_PUBLISH_FORMAT, Secrets.MQTT_CLIENTID);
   while (true)
   {
      Console.WriteLine("MQTT publish message start...");

      var payload = new MessagePayload() { ClientID = Secrets.MQTT_CLIENTID, Sequence = sequenceNumber++ };

      string jsonPayload = JsonSerializer.SerializeObject(payload);

      var result = mqttClient.Publish(topicPublish, Encoding.UTF8.GetBytes(jsonPayload), "application/json; charset=utf-8", null);

      Debug.WriteLine($"MQTT published ({result}): {jsonPayload}");

      Thread.Sleep(100);
   }
}

I then configured my client (Edgebox100Z) and updated the “secrets.cs” file

Azure Event Grid MQTT Broker Clients

The application connected to the Azure Event Grid MQTT broker and started publishing the JSON payload with the incrementing sequence number.

Visual Studio debugger output of JSON payload publishing

The published messages were “routed” to an Azure Storage Queue where they could be inspected with a tool like Azure Storage Explorer.

Azure Event Grid MQTT Broker metrics with messages published selected

I could see the application was working in the Azure Event Grid MQTT broker metrics because the number of messages published was increasing.

Azure Event Grid Arduino Client – Publisher

The Arduino application for testing Azure Event Grid MQTT Broker connectivity worked on my Seeedstudio EdgeBox ESP100 and Seeedstudio Xiao ESP32S3 devices, so the next step was to modify it to publish some messages.

The first version generated the JSON payload using an snprintf which was a bit “nasty”

static uint32_t sequenceNumber = 0;

void loop() {
  mqttClient.loop();

  Serial.println("MQTT Publish start");

  char payloadBuffer[64];

  snprintf(payloadBuffer, sizeof(payloadBuffer), "{\"ClientID\":\"%s\", \"Sequence\": %i}", MQTT_CLIENTID, sequenceNumber++);

  Serial.println(payloadBuffer);

  if (!mqttClient.publish(MQTT_TOPIC_PUBLISH, payloadBuffer, strlen(payloadBuffer))) {
    Serial.print("\nMQTT publish failed:");        
    Serial.println(mqttClient.state());    
  }
  Serial.println("MQTT Publish finish");

  delay(60000);
}

I then configured my client (Edgebox100A) and updated the “secrets.h” file

Azure Event Grid MQTT Broker Clients

The application connected to the Azure Event Grid MQTT broker and started publishing the JSON payload with the incrementing sequence number.

Arduino IDE serial monitor output of JSON payload publishing

The second version generated the JSON payload using ArduinoJson library.

static uint32_t sequenceNumber = 0;

void loop() {
  mqttClient.loop();

  Serial.println("MQTT Publish start");

  // Create a static JSON document with fixed size
  StaticJsonDocument<64> doc;

  doc["Sequence"] = counter++;
  doc["ClientID"] = MQTT_CLIENTID;

  // Serialize JSON to a buffer
  char jsonBuffer[64];
  size_t n = serializeJson(doc, jsonBuffer);

  Serial.println(jsonBuffer);

  if(!mqttClient.publish(MQTT_TOPIC_PUBLISH, jsonBuffer, n))
  {
    Serial.println(mqttClient.state());    
  }

  Serial.println("MQTT Publish finish");

  delay(2000);
}

I could see the application was working in the Azure Event Grid MQTT broker metrics because the number of messages published was increasing.

Azure Event Grid MQTT Broker metrics with messages published selected

The published messages were “routed” to an Azure Storage Queue where they can be inspected with a tool like Azure Storage Explorer.

Azure Storage Explorer displaying a message’s payload

The message payload is in Base64 encoded so I used copilot convert it to text.

Microsoft copilot decoding the Base64 payload

In this post I have assumed that the reader is familiar with configuring Azure Event Grid clients, client groups, topic spaces, permission bindings and routing.

Bonus also managed to slip in a reference to copilot.

Azure Event Grid Arduino Client – The joy of certs

“Lets start at the very beginning, A very good place to start”

The Azure Event Grid MQTT Broker server X509 certificate chain can be copy ‘n’ paste from the output of the openssl command

openssl s_client -connect YourNamespace.newzealandnorth-1.ts.eventgrid.azure.net:8883 -showcerts

A self-signed X509 root certificate which can sign intermediate X509 certificates and key file can be generated with a single openssl command.

openssl req -x509 -newkey rsa:4096 -keyout rootCA.key -out rootCA.crt -days 3650 -nodes -subj "/CN=devMobile  /O=devMobile.co.nz /C=NZ" -addext "basicConstraints=critical,CA:TRUE" -addext "keyUsage=critical,keyCertSign"

For a non-trivial system there should be a number of intermediate certificates. I have tried creating intermediate certificates for a device type, geography, application, customer and combinations of these. The first couple of times got it wrong so start with a field trial so that it isn’t so painful to go back and fix. (beware the sunk cost fallacy)

openssl genrsa -out intermediate.key 4096

openssl req -new -key intermediate.key -out intermediate.csr -subj "/CN=intermediate  /O=devMobile.co.nz /C=NZ"

I found creating an intermediate certificate that could sign device certificates required a conf file for the basicConstraints and keyUsage configuration.

[ v3_intermediate_ca ]
basicConstraints = critical, CA:TRUE, pathlen:0
keyUsage = critical, keyCertSign
  • critical-The extension must be understood and processed by any application validating the certificate. If the application does not understand it, the certificate must be rejected.
  • CA:TRUE-This certificate is allowed to act as a Certificate Authority (CA), meaning it can sign other certificates.
  • pathlen:0-This CA can only issue end-entity (leaf) certificates and cannot issue further intermediate CA certificates.
  • keyCertSig- The certificate can be used to sign other certificates (i.e., it’s a CA certificate).
openssl x509 -req -in intermediate.csr  -CA rootCA.crt -CAkey rootCA.key -CAcreateserial -out intermediate.crt -days 1825 -extfile intermediate_ext.cnf -extensions v3_intermediate_ca

Creating a device certificate is similar to the process for the intermediate certificate but doesn’t need to be able to sign certificates.

openssl genrsa -out EdgeBox100A.key 4096

openssl req -new -key EdgeBox100A.key -out EdgeBox100A.csr -subj "/CN=EdgeBox100A"

openssl x509 -req -in EdgeBox100A.csr -CA intermediate.crt -CAkey intermediate.key -CAcreateserial -out EdgeBox100A.crt -days 365 

For production systems putting some thought into the Common name(CN), Organizational unit name(OU), Organization name(O), locality name(L), state or province name(S) and Country name(C)

// Minimalist ESP32 + Event Grid MQTT (mTLS) with PubSubClient
// Copyright (c) November 2025, devMobile Software
#include <PubSubClient.h>
#include <WiFi.h>
#include <WiFiClientSecure.h>

#include "constants.h"
#include "secrets.h"

// --- Wi-Fi ---
//const char* WIFI_SSID     = "";
//const char* WIFI_PASSWORD = "";

// --- Event Grid MQTT ---
//const char* MQTT_SERVER = "";
const uint16_t MQTT_PORT = 8883;

//const char* MQTT_CLIENTID = "";
//const char* MQTT_USERNAME = "";
//const char* MQTT_PASSWORD = "";
//const char* MQTT_TOPIC_PUBLISH = "devices/";
//const char* MQTT_TOPIC_SUBSCRIBE = "devices/";

/*
// The certificate that is used to authenticate the MQTT Broker
const char CA_ROOT_PEM[] PROGMEM = R"PEM(
-----BEGIN CERTIFICATE-----
      Thumbprint: 56D955C849887874AA1767810366D90ADF6C8536
      CN: CN=Microsoft Azure ECC TLS Issuing CA 03
-----END CERTIFICATE-----
-----BEGIN CERTIFICATE-----
      Thumbprint: 7E04DE896A3E666D00E687D33FFAD93BE83D349E
      CN: CN=DigiCert Global Root G3
-----END CERTIFICATE-----
)PEM";

The certificate that is used to authenticate the device
static const char CLIENT_CERT_PEM[] PROGMEM = R"PEM(
-----BEGIN CERTIFICATE-----
 CN=Self signed device certificate
-----END CERTIFICATE-----
-----BEGIN CERTIFICATE-----
 CN=Self signed Intermediate certificate
-----END CERTIFICATE-----
)PEM";

 The PEM encoded private key of device
static const char CLIENT_KEY_PEM[] PROGMEM = R"PEM(
-----BEGIN PRIVATE KEY-----
-----END PRIVATE KEY-----
)PEM";
*/

WiFiClientSecure secureClient;
PubSubClient mqttClient(secureClient);

void setup() {
  Serial.begin(9600);
  delay(5000);
  Serial.println();

  // Connect to WiFi
  Serial.println("WiFi connecting");
  WiFi.begin(WIFI_SSID, WIFI_PASSWORD);
  Serial.print("*");
  while (WiFi.status() != WL_CONNECTED) {
    delay(500);
    Serial.print("*");
  }
  Serial.println("\nWiFi connected");

  // Sync time for TLS
  Serial.println("\nTime synchronising");
  configTime(0, 0, "pool.ntp.org", "time.nist.gov");
  Serial.print("*");
  while (time(nullptr) < 100000) {
    delay(500);
    Serial.print("*");
  }
  Serial.println("\nTime synchronised");

  Serial.println("\nValidating ServerFQDN-Certificate combination");
  secureClient.setCACert(CA_ROOT_PEM);

  Serial.println("TCP connecting");
  if (secureClient.connect(MQTT_SERVER, MQTT_PORT)) {
    Serial.println("\nTCP connected");
  } else {
    Serial.println("\nTCP connection failed");
    return;
  }

  secureClient.setCertificate(CLIENT_CERT_A_PEM);
  secureClient.setPrivateKey(CLIENT_KEY_A_PEM);

  mqttClient.setServer(MQTT_SERVER, MQTT_PORT);

  Serial.println("\nMQTT connecting");
  Serial.print("*");
  while (!mqttClient.connect(MQTT_CLIENTID, MQTT_USERNAME, MQTT_PASSWORD)) {
    Serial.println(mqttClient.state());
    delay(5000);
    Serial.print("*");
  }
  Serial.println("\nMQTT connected");
}

static uint32_t sequenceNumber = 0;

void loop() {
  mqttClient.loop();

  Serial.print("'.");
  delay(10000);
}

My Arduino Xiao ESP32S3 and EdgeBox-ESP-100-Industrial Edge Controller devices could connect to the local Wi-Fi, get the time and date using the network time protocol(NTP), and validate the Azure Event Grid MQTT broker certificate. Then connect to the Azure Event Grid MQTT broker with the client name specified in the subject name of its X509 certificate.

Establishing a connection to the Azure Event Grid MQTT broker often failed which surprised me. Initially I didn’t have any retry logic which meant I wasted quite a bit of time trying to debug failed connections

Azure Function SendGrid Binding Fail

This post is for Azure Function developers having issues with the SendGrid binding throwing exceptions like the one below.

System.Private.CoreLib: Exception while executing function: Functions.AzureBlobFileUploadEmailer. Microsoft.Azure.WebJobs.Extensions.SendGrid: A 'To' address must be specified for the message

My Azure BlobTrigger Function sends an email (with SendGrid) when a file is uploaded to an Azure Blob Storage Container(a couple of times a day).

public class FileUploadEmailer(ILogger<FileUploadEmailer> logger, IOptions<EmailSettings> emailSettings)
{
   private readonly ILogger<FileUploadEmailer> _logger = logger;
   private readonly EmailSettings _emailSettings = emailSettings.Value;

   [Function(nameof(AzureBlobFileUploadEmailer))]
   [SendGridOutput(ApiKey = "SendGridAPIKey")]
   public string Run([BlobTrigger("filestobeprocesed/{name}", Connection = "upload-file-storage")] Stream stream, string name)
   {
      _logger.LogInformation("FileUploadEmailer Blob trigger function Processed blob Name:{0} start", name);

      try
      {
         var message = new SendGridMessage();

         message.SetFrom(_emailSettings.From);
         message.AddTo(_emailSettings.To);
         message.Subject = _emailSettings.Subject;

         message.AddContent(MimeType.Html, string.Format(_emailSettings.BodyFormat, name, DateTime.UtcNow));

         // WARNING - Use Newtonsoft JSON serializer to produce JSON string. System.Text.Json won't work because property annotations are different
         var messageJson = Newtonsoft.Json.JsonConvert.SerializeObject(message);

         _logger.LogInformation("FileUploadEmailer Blob trigger function Processed blob Name:{0} finish", name);

         return messageJson;
      }
      catch (Exception ex)
      {
         _logger.LogError(ex, "FileUploadEmailer Blob trigger function Processed blob Name: {0}", name);

         throw;
      }
   }
}

I missed the first clue when I looked at the JSON and missed the Tos, Ccs, Bccs property names.

{
"From":{"Name":"Foo","Email":"bryn.lewis@devmobile.co.nz"},
"Subject":"Hi 30/09/2024 1:27:49 pm",
"Personalizations":[{"Tos":[{"Name":"Bar","Email":"bryn.lewis@devmobile.co.nz"}],
"Ccs":null,
"Bccs":null,
"From":null,
"Subject":null,
"Headers":null,
"Substitutions":null,
"CustomArgs":null,
"SendAt":null,
"TemplateData":null}],
"Contents":[{"Type":"text/html","Value":"\u003Ch2\u003EHello AssemblyInfo.cs\u003C/h2\u003E"}],
"PlainTextContent":null,
"HtmlContent":null,
"Attachments":null,
"TemplateId":null,
"Headers":null,
"Sections":null,
"Categories":null,
"CustomArgs":null,
"SendAt":null,
"Asm":null,
"BatchId":null,
"IpPoolName":null,
"MailSettings":null,
"TrackingSettings":null,
"ReplyTo":null,
"ReplyTos":null
}

I wasn’t paying close enough attention to the sample code and used the System.Text.Json rather than Newtonsoft.Json to serialize the SendGridMessage object. They use different attributes for property names etc. so the JSON generated was wrong.

Initially, I tried adding System.Text.Json attributes to the SendGridMessage class

namespace SendGrid.Helpers.Mail
{
   /// <summary>
   /// Class SendGridMessage builds an object that sends an email through Twilio SendGrid.
   /// </summary>
   [JsonObject(IsReference = false)]
   public class SendGridMessage
   {
      /// <summary>
      /// Gets or sets an email object containing the email address and name of the sender. Unicode encoding is not supported for the from field.
      /// </summary>
      //[JsonProperty(PropertyName = "from")]
      [JsonPropertyName("from")]
      public EmailAddress From { get; set; }

      /// <summary>
      /// Gets or sets the subject of your email. This may be overridden by personalizations[x].subject.
      /// </summary>
      //[JsonProperty(PropertyName = "subject")]
      [JsonPropertyName("subject")]
      public string Subject { get; set; }

      /// <summary>
      /// Gets or sets a list of messages and their metadata. Each object within personalizations can be thought of as an envelope - it defines who should receive an individual message and how that message should be handled. For more information, please see our documentation on Personalizations. Parameters in personalizations will override the parameters of the same name from the message level.
      /// https://sendgrid.com/docs/Classroom/Send/v3_Mail_Send/personalizations.html.
      /// </summary>
      //[JsonProperty(PropertyName = "personalizations", IsReference = false)]
      [JsonPropertyName("personalizations")]
      public List<Personalization> Personalizations { get; set; }
...
}

SendGridMessage uses other classes like EmailAddress which worked because the property names matched the JSON

namespace SendGrid.Helpers.Mail
{
    /// <summary>
    /// An email object containing the email address and name of the sender or recipient.
    /// </summary>
    [JsonObject(IsReference = false)]
    public class EmailAddress : IEquatable<EmailAddress>
    {
        /// <summary>
        /// Initializes a new instance of the <see cref="EmailAddress"/> class.
        /// </summary>
        public EmailAddress()
        {
        }

        /// <summary>
        /// Initializes a new instance of the <see cref="EmailAddress"/> class.
        /// </summary>
        /// <param name="email">The email address of the sender or recipient.</param>
        /// <param name="name">The name of the sender or recipient.</param>
        public EmailAddress(string email, string name = null)
        {
            this.Email = email;
            this.Name = name;
        }

        /// <summary>
        /// Gets or sets the name of the sender or recipient.
        /// </summary>
        [JsonProperty(PropertyName = "name")]
        public string Name { get; set; }

        /// <summary>
        /// Gets or sets the email address of the sender or recipient.
        /// </summary>
        [JsonProperty(PropertyName = "email")]
        public string Email { get; set; }
...
}

Many of the property name “mismatch” issues were in the Personalization class with the Toos, Ccs, bccs etc. properties

namespace SendGrid.Helpers.Mail
{
    /// <summary>
    /// An array of messages and their metadata. Each object within personalizations can be thought of as an envelope - it defines who should receive an individual message and how that message should be handled. For more information, please see our documentation on Personalizations. Parameters in personalizations will override the parameters of the same name from the message level.
    /// https://sendgrid.com/docs/Classroom/Send/v3_Mail_Send/personalizations.html.
    /// </summary>
    [JsonObject(IsReference = false)]
    public class Personalization
    {
        /// <summary>
        /// Gets or sets an array of recipients. Each email object within this array may contain the recipient’s name, but must always contain the recipient’s email.
        /// </summary>
        [JsonProperty(PropertyName = "to", IsReference = false)]
        [JsonConverter(typeof(RemoveDuplicatesConverter<EmailAddress>))]
        public List<EmailAddress> Tos { get; set; }

        /// <summary>
        /// Gets or sets an array of recipients who will receive a copy of your email. Each email object within this array may contain the recipient’s name, but must always contain the recipient’s email.
        /// </summary>
        [JsonProperty(PropertyName = "cc", IsReference = false)]
        [JsonConverter(typeof(RemoveDuplicatesConverter<EmailAddress>))]
        public List<EmailAddress> Ccs { get; set; }

        /// <summary>
        /// Gets or sets an array of recipients who will receive a blind carbon copy of your email. Each email object within this array may contain the recipient’s name, but must always contain the recipient’s email.
        /// </summary>
        [JsonProperty(PropertyName = "bcc", IsReference = false)]
        [JsonConverter(typeof(RemoveDuplicatesConverter<EmailAddress>))]
        public List<EmailAddress> Bccs { get; set; }

        /// <summary>
        /// Gets or sets the from email address. The domain must match the domain of the from email property specified at root level of the request body.
        /// </summary>
        [JsonProperty(PropertyName = "from")]
        public EmailAddress From { get; set; }

        /// <summary>
        /// Gets or sets the subject line of your email.
        /// </summary>
        [JsonProperty(PropertyName = "subject")]
        public string Subject { get; set; }
...
}

After a couple of failed attempts at decorating the SendGrid SendGridMessage, EmailAddress, Personalization etc. classes I gave up and reverted to the Newtonsoft.Json serialiser.

Note to self – pay closer attention to the samples.

Myriota Connector – Azure IoT Hub Downlink logging refactor

After several refactorings the code stabilised and the Azure IoT Hub downlink message handler (configured with SetMethodDefaultHandlerAsync ) was ready for testing. I used Azure IoT Explorer to send some “hand-crafted” JavaScript Object Notation(JSON) Cloud to Device(C2D) messages.

Each logging message starts with the TerminalID (to simplify searching for all the messages sent to a device) and the message LockToken (to simplify searching for all the “steps” associated with sending a message) with the rest of the logging message containing “step” specific diagnostic information.

Successful Azure IoT Explorer C2D JSON Message

If there is no PayloadFormatter attribute the default in the PayloadFormatters section of the function configuration is used.

using System;
using System.Collections.Generic;

using Newtonsoft.Json;
using Newtonsoft.Json.Linq;

public class FormatterDownlink : PayloadFormatter.IFormatterDownlink
{
   public byte[] Evaluate(IDictionary<string, string> properties, string terminalId, JObject payloadJson, byte[] payloadBytes)
   {
      byte? status = payloadJson.Value<byte?>("FanSpeed");

      if (!status.HasValue)
      {
         return new byte[] { };
      }

      return new byte[] { 1, status.Value };
   }
}

The FanSpeed.cs payload formatter extracts the FanSpeed value from the JSON payload and returns a two byte array containing the message type and speed of the fan.

Azure IoT Function running waiting for a C2D message

After re-reading the SetMethodHandlerAync documentation I refactored the code (back to the approach used a couple of branches ago) with the “using” wrapping the try/catch.

public async Task AzureIoTHubMessageHandler(Message message, object userContext)
{
   Models.DeviceConnectionContext context = (Models.DeviceConnectionContext)userContext;

   _logger.LogInformation("Downlink- IoT Hub TerminalId:{TermimalId} LockToken:{LockToken}", context.TerminalId, message.LockToken);

   using (message) // https://learn.microsoft.com/en-us/dotnet/api/microsoft.azure.devices.client.deviceclient.setreceivemessagehandlerasync?view=azure-dotnet
   {
      try
      {
         // Replace default formatter with message specific formatter if configured.
         if (!message.Properties.TryGetValue(Constants.IoTHubDownlinkPayloadFormatterProperty, out string? payloadFormatterName) || string.IsNullOrEmpty(payloadFormatterName))
         {
            _logger.LogInformation("Downlink- IoT Hub TerminalID:{TermimalId} LockToken:{LockToken} Context formatter:{payloadFormatterName} ", context.TerminalId, message.LockToken, payloadFormatterName);

            payloadFormatterName = context.PayloadFormatterDownlink;
         }
         else
         {
            _logger.LogInformation("Downlink- IoT Hub TerminalID:{TermimalId} LockToken:{LockToken} Property formatter:{payloadFormatterName} ", context.TerminalId, message.LockToken, payloadFormatterName);
         }


         // If GetBytes fails payload really badly broken
         byte[] messageBytes = message.GetBytes();

         _logger.LogInformation("Downlink- IoT Hub TerminalID:{TerminalId} LockToken:{LockToken} Message bytes:{messageBytes}", context.TerminalId, message.LockToken, BitConverter.ToString(messageBytes));


         // Try converting the bytes to text then to JSON
         JObject? messageJson = null;
         try
         {
            // These will fail for some messages, payload formatter gets bytes only
            string messageText = Encoding.UTF8.GetString(messageBytes);

            try
            {
               messageJson = JObject.Parse(messageText);

               _logger.LogInformation("Downlink- IoT Hub TerminalID:{TerminalId} LockToken:{LockToken} JSON:{messageJson}", context.TerminalId, message.LockToken, JsonConvert.SerializeObject(messageJson, Formatting.Indented));
            }
            catch (JsonReaderException jex)
            {
               _logger.LogInformation(jex, "Downlink- IoT Hub TerminalID:{TerminalId} LockToken:{LockToken} not valid JSON", context.TerminalId, message.LockToken);
            }
         }
         catch (ArgumentException aex)
         {
            _logger.LogInformation(aex, "Downlink- IoT Hub TerminalID:{TerminalId} LockToken:{LockToken} message bytes not valid text", context.TerminalId, message.LockToken);
         }


         // This shouldn't fail, but it could for invalid path to blob, timeout retrieving blob, payload formatter syntax error etc.
         IFormatterDownlink payloadFormatter = await _payloadFormatterCache.DownlinkGetAsync(payloadFormatterName);

         // This will fail if payload formatter throws runtime exceptions like null reference, divide by zero, index out of range etc.
         byte[] payloadBytes = payloadFormatter.Evaluate(message.Properties, context.TerminalId, messageJson, messageBytes);


         // Validate payload before calling Myriota control message send API method
         if (payloadBytes is null)
         {
            _logger.LogWarning("Downlink- IoT Hub TerminalID:{TerminalId} LockToken:{LockToken} payload formatter:{payloadFormatter} Evaluate returned null", context.TerminalId, message.LockToken, payloadFormatterName);

            await context.DeviceClient.RejectAsync(message);

            return;
         }

         if ((payloadBytes.Length < Constants.DownlinkPayloadMinimumLength) || (payloadBytes.Length > Constants.DownlinkPayloadMaximumLength))
         {
            _logger.LogWarning("Downlink- IoT Hub TerminalID:{TerminalId} LockToken:{LockToken} PayloadBytes:{payloadBytes} length:{Length} invalid must be {DownlinkPayloadMinimumLength} to {DownlinkPayloadMaximumLength} bytes", context.TerminalId, message.LockToken, Convert.ToHexString(payloadBytes), payloadBytes.Length, Constants.DownlinkPayloadMinimumLength, Constants.DownlinkPayloadMaximumLength);

            await context.DeviceClient.RejectAsync(message);

            return;
         }


         // Finally send Control Message to device using the Myriota API
         _logger.LogInformation("Downlink- IoT Hub TerminalID:{TerminalId} LockToken:{LockToken} PayloadBytes:{payloadBytes} Length:{Length} sending", context.TerminalId, message.LockToken, BitConverter.ToString(payloadBytes), payloadBytes.Length);

         string messageId = await _myriotaModuleAPI.SendAsync(context.TerminalId, payloadBytes);

         _logger.LogInformation("Downlink- IoT Hub TerminalID:{TerminalId} LockToken:{LockToken} MessageID:{messageId} sent", context.TerminalId, message.LockToken, messageId);

         await context.DeviceClient.CompleteAsync(message);
      }
      catch (Exception ex)
      {
         _logger.LogError(ex, "Downlink- IoT Hub TerminalID:{TerminalId} LockToken:{LockToken} MessageHandler processing failed", context.TerminalId, message.LockToken);

         await context.DeviceClient.RejectAsync(message);
      }
   }
}

The first time I ran the myriotaAzureIoTConnector Azure function in the Core Tools debugging environment there were no errors and the Microsoft.Azure.Devices.Client.DeviceClient connection cache loaded in the background.

Azure IoT Function failing with a SystemArgumentOutOfRangeException

The first time I sent a downlink message the handler failed spectacularly with a SystemArgumentOutOfRangeException

After adding some breakpoints, restarting the application, then single stepping through the code I found that I had accidentally used BitConverter.ToSingle(payloadBytes) instead of BitConverter.ToString(payloadBytes) to get the Hexadecimal representation of the payload bytes.

...
// Finally send Control Message to device using the Myriota API
_logger.LogInformation("Downlink- IoT Hub TerminalID:{TerminalId} LockToken:{LockToken} PayloadBytes:{payloadBytes} Length:{Length} sending", context.TerminalId, message.LockToken, BitConverter.ToString(payloadBytes), payloadBytes.Length);

string messageId = await _myriotaModuleAPI.SendAsync(context.TerminalId, payloadBytes);
...
Azure IoT Function successfully sending downlink message.

The Encoding.UTF8.GetString and JObject.Parse are processed in a single Try with a specialised catch for when the payload cannot be converted to text. If the payload cannot be converted to JSON only the payloadBytes parameter of payload formatter is populated.

Myriota Connector – Azure IoT Hub Downlink final refactoring

I often print code and review it away from a computer. I can’t be distracted by “tinkering” with the code and I find that drawing on it helps me visualise what is going on. The payload formatters are retrieved from Azure Storage blob which have a default retry policy, the Azure IoT Hub DeviceClient methods have a default retry policy, the Myriota Cloud API SendMessage has retries (Implemented with Polly) and if the CS-Script compilation fails there is nothing that can be done so the code could be simplified.

The Azure IoT Hub downlink message handler was a partial class and part of implementation of the IDeviceConnectionCache which was a hangover from one of the initial versions.

internal partial class DeviceConnectionCache : IDeviceConnectionCache
{
   public async Task AzureIoTHubMessageHandler(Message message, object userContext)
   {
      Models.DeviceConnectionContext context = (Models.DeviceConnectionContext)userContext;

      _logger.LogInformation("Downlink- IoT Hub TerminalId:{termimalId} LockToken:{LockToken}", context.TerminalId, message.LockToken);

I replaced the IDeviceConnectionCache interface with IIoTHubDownlink which was declare in a new file in the interfaces folder.

namespace devMobile.IoT.MyriotaAzureIoTConnector.Connector
{
   public interface IIoTHubDownlink
   {
      public Task AzureIoTHubMessageHandler(Message message, object userContext);
   }
}

Then had to inject all the required dependencies which had been implemented in one of the other partial class files.

internal class IoTHubDownlink : IIoTHubDownlink
{
   private readonly ILogger<IoTHubDownlink> _logger;
   private readonly IPayloadFormatterCache _payloadFormatterCache;
   private readonly IMyriotaModuleAPI _myriotaModuleAPI;

   public IoTHubDownlink(ILogger<IoTHubDownlink> logger, IPayloadFormatterCache payloadFormatterCache, IMyriotaModuleAPI myriotaModuleAPI)
   {
      _logger = logger;
      _payloadFormatterCache = payloadFormatterCache;
      _myriotaModuleAPI = myriotaModuleAPI;
   }
...
}

The implementation had been extracted to a separate class so it had to be constructed by the Dependency Injection plumbing.

...
services.AddSingleton<IPayloadFormatterCache, PayloadFormatterCache>();
services.AddSingleton<IIoTHubDownlink, IoTHubDownlink>();
services.AddSingleton<IIoTCentralDownlink, IoTCentralDownlink>();
services.AddOptions<Models.MyriotaSettings>().Configure<IConfiguration>((settings, configuration) =>
{
    configuration.GetSection("Myriota").Bind(settings);
 });
 services.AddSingleton<IMyriotaModuleAPI, MyriotaModuleAPI>();
...

The lifetime of the Microsoft.Azure.Devices.Client.Message was being managed manually which seemed a bit odd.

public async Task AzureIoTHubMessageHandler(Message message, object userContext)
{
   Models.DeviceConnectionContext context = (Models.DeviceConnectionContext)userContext;

   _logger.LogInformation("Downlink- IoT Hub TerminalId:{termimalId} LockToken:{LockToken}", context.TerminalId, message.LockToken);

   // Use default formatter and replace with message specific formatter if configured.
   string payloadFormatter;
   if (!message.Properties.TryGetValue(Constants.IoTHubDownlinkPayloadFormatterProperty, out payloadFormatter) || string.IsNullOrEmpty(payloadFormatter))
   {
      payloadFormatter = context.PayloadFormatterDownlink;
   }

   _logger.LogInformation("Downlink- IoT Hub TerminalID:{termimalId} LockToken:{LockToken} Payload formatter:{payloadFormatter} ", context.TerminalId, message.LockToken, payloadFormatter);

   try
   {
   ...
   }
   finally
   {
      // Mop up the non managed resources of message
      message.Dispose();
   }
}

I replaced this with a with a “using” which “automagically” manages the lifetime of any non-managed resources. I also added string Locktoken variable for DeviceClient.RejectAsync and DeviceClient.CompletedAsync so that the “using” could be inside the try/catch (there is scope to reduce the amount of code in the “using”)

public async Task AzureIoTHubMessageHandler(Message message, object userContext)
{
   Models.DeviceConnectionContext context = (Models.DeviceConnectionContext)userContext;

   _logger.LogInformation("Downlink- IoT Hub TerminalId:{TermimalId} LockToken:{lockToken}", context.TerminalId, message.LockToken);

   // broken out so using for message only has to be inside try
   string lockToken = message.LockToken; 

   try
   {
      using (message)
      {
...
      }
      catch (Exception ex)
      {
         _logger.LogError(ex, "Downlink- IoT Hub TerminalID:{TerminalId} LockToken:{lockToken} MessageHandler processing failed", context.TerminalId, lockToken);

         await context.DeviceClient.RejectAsync(lockToken);
      }
   }
}

The handling of the Encoding.UTF8.GetString and JObject.Parse payload was broken. If the Encoding.UTF8.GetString threw an exception there was no point in calling the JObject.Parse

// If this fails payload broken
byte[] messageBytes = message.GetBytes();

// This will fail for some messages, payload formatter gets bytes only
string messageText = string.Empty;
try
{
   messageText = Encoding.UTF8.GetString(messageBytes);
}
catch (ArgumentException aex)
{
   _logger.LogInformation("Downlink- IoT Hub TerminalID:{TerminalId} LockToken:{LockToken} messageBytes:{2} not valid Text", context.TerminalId, message.LockToken, BitConverter.ToString(messageBytes));
}

// This will fail for some messages, payload formatter gets bytes only
JObject? messageJson = null;
try
{
   messageJson = JObject.Parse(messageText);
}
catch ( JsonReaderException jex)
{
   _logger.LogInformation("Downlink- IoT Hub TerminalID:{TerminalId} LockToken:{LockToken} messageText:{2} not valid json", context.TerminalId, message.LockToken, BitConverter.ToString(messageBytes));
}

The Encoding.UTF8.GetString and JObject.Parse are now processed in a single Try with a specialised catch handling.

// These will fail for some messages, then payload formatter gets bytes only
string messageText = string.Empty;
JObject? messageJson = null;
try
{
   messageText = Encoding.UTF8.GetString(messageBytes);

   messageJson = JObject.Parse(messageText);
 }
catch (ArgumentException aex)
{
   _logger.LogInformation("Downlink- IoT Hub TerminalID:{TerminalId} LockToken:{lockToken} messageBytes:{messageBytes} not valid text exception:{Message}", context.TerminalId, lockToken, BitConverter.ToString(messageBytes), aex.Message);
}
catch (JsonReaderException jex)
{
   _logger.LogInformation("Downlink- IoT Hub TerminalID:{TerminalId} LockToken:{lockToken} messageText:{messageText} not valid json exception:{Message}", context.TerminalId, lockToken, messageText, jex.Message);
}

// This shouldn't fail, but it could for lots of different reasons, invalid path to blob, syntax error, interface broken etc.
IFormatterDownlink payloadFormatter = await _payloadFormatterCache.DownlinkGetAsync(payloadFormatterName);

This refactored code now looks an awful lot like the “sunny days” code checked in on the 3rd of November.

Myriota Connector – UplinkMessageProcessor Queue Output Binding

The myriota Azure IoT Hub Cloud Identity Translation Gateway uplink message handler Azure Storage Queue Trigger Function wasn’t processing “transient” vs. “permanent” failures well. Sometimes a “permanent” failure message would be retried multiple times by the function runtime before getting moved to the poison queue.

After some experimentation using an Azure Storage Queue Function Output binding to move messages to the poison queue looked like a reasonable approach. (Though, returning null to indicate the message should be removed from the queue was not obvious from the documentation)

[Function("UplinkMessageProcessor")]
[QueueOutput(queueName: "uplink-poison", Connection = "UplinkQueueStorage")]
public async Task<Models.UplinkPayloadQueueDto> UplinkMessageProcessor([QueueTrigger(queueName: "uplink", Connection = "UplinkQueueStorage")] Models.UplinkPayloadQueueDto payload, CancellationToken cancellationToken)
{
...
   // Process each packet in the payload. Myriota docs say only one packet per payload but just incase...
   foreach (Models.QueuePacket packet in payload.Data.Packets)
   {
      // Lookup the device client in the cache or create a new one
      Models.DeviceConnectionContext context;

      try
      {
         context = await _deviceConnectionCache.GetOrAddAsync(packet.TerminalId, cancellationToken);
      }
      catch (DeviceNotFoundException dnfex)
      {
         _logger.LogError(dnfex, "Uplink- PayloadId:{0} TerminalId:{1} terminal not found", payload.Id, packet.TerminalId);

         return payload;
      }
      catch (Exception ex) // Maybe just send to poison queue or figure if transient error?
      {
         _logger.LogError(ex, "Uplink- PayloadId:{0} TerminalId:{1} ", payload.Id, packet.TerminalId);

         throw;
      }
...
         // Proccessing successful, message can be deleted by QueueTrigger plumbing
         return null;
      }

After building and testing an Azure Storage Queue Function Output binding implementation I’m not certain that it is a good approach. The code is a bit “chunky” and I have had to implement more of the retry process logic.

Myriota Connector – Azure IoT Hub Downlink refactoring

The myriota Azure IoT Hub Cloud Identity Translation Gateway downlink message handler was getting a bit “chunky”. So, I started by stripping the code back to the absolute bare minimum that would “work”.

public async Task AzureIoTHubMessageHandler(Message message, object userContext)
{
   Models.DeviceConnectionContext context = (Models.DeviceConnectionContext)userContext;

   _logger.LogInformation("Downlink- IoT Hub TerminalId:{termimalId} LockToken:{LockToken}", context.TerminalId, message.LockToken);

   try
   {

      await context.DeviceClient.CompleteAsync(message);
   }
   catch (Exception ex)
   {
      await context.DeviceClient.RejectAsync(message);

      _logger.LogError(ex, "Downlink- IoT Hub TerminalID:{terminalId} LockToken:{LockToken} failed", context.TerminalId, message.LockToken );
   }
}

Then the code was then extended so it worked for “sunny day” scenarios. The payload formatter was successfully retrieved from the configured Azure Storage Blob, CS-Script successfully compiled the payload formatter, the message payload was valid text, the message text was valid Javascript Object Notation(JSON), the JSON was successfully processed by the compiled payload formatter, and finally the payload was accepted by the Myriota Cloud API.

public async Task AzureIoTHubMessageHandler(Message message, object userContext)
{
   Models.DeviceConnectionContext context = (Models.DeviceConnectionContext)userContext;

   _logger.LogInformation("Downlink- IoT Hub TerminalId:{termimalId} LockToken:{LockToken}", context.TerminalId, message.LockToken);

   string payloadFormatter;

   // Use default formatter and replace with message specific formatter if configured.
   if (!message.Properties.TryGetValue(Constants.IoTHubDownlinkPayloadFormatterProperty, out payloadFormatter) || string.IsNullOrEmpty(payloadFormatter))
   {
      payloadFormatter = context.PayloadFormatterDownlink;
   }

   _logger.LogInformation("Downlink- IoT Hub TerminalID:{termimalId} LockToken:{LockToken} Payload formatter:{payloadFormatter} ", context.TerminalId, message.LockToken, payloadFormatter);

   try
   {
      IFormatterDownlink payloadFormatterDownlink = await _payloadFormatterCache.DownlinkGetAsync(payloadFormatter);

      byte[] messageBytes = message.GetBytes();

      string messageText = Encoding.UTF8.GetString(messageBytes);

      JObject messageJson = JObject.Parse(messageText);

      byte[] payloadBytes = payloadFormatterDownlink.Evaluate(message.Properties, context.TerminalId, messageJson, messageBytes);

      string messageId = await _myriotaModuleAPI.SendAsync(context.TerminalId, payloadBytes);

      _logger.LogInformation("Downlink- IoT Hub TerminalID:{terminalId} LockToken:{LockToken} MessageID:{messageId} sent", context.TerminalId, message.LockToken, messageId);
      
      await context.DeviceClient.CompleteAsync(message);
   }
   catch (Exception ex)
   {
      await context.DeviceClient.RejectAsync(message);

      _logger.LogError(ex, "Downlink- IoT Hub TerminalID:{terminalId} LockToken:{LockToken} failed", context.TerminalId, message.LockToken);
   }
}

Then code was then extended to handle message payloads which were problematic but not “failures”

public async Task AzureIoTHubMessageHandler(Message message, object userContext)
{
   Models.DeviceConnectionContext context = (Models.DeviceConnectionContext)userContext;

   _logger.LogInformation("Downlink- IoT Hub TerminalId:{termimalId} LockToken:{LockToken}", context.TerminalId, message.LockToken);

   string payloadFormatter;

   // Use default formatter and replace with message specific formatter if configured.
   if (!message.Properties.TryGetValue(Constants.IoTHubDownlinkPayloadFormatterProperty, out payloadFormatter) || string.IsNullOrEmpty(payloadFormatter))
   {
      payloadFormatter = context.PayloadFormatterDownlink;
   }

   _logger.LogInformation("Downlink- IoT Hub TerminalID:{termimalId} LockToken:{LockToken} Payload formatter:{payloadFormatter} ", context.TerminalId, message.LockToken, payloadFormatter);

   try
   {
      // If this fails payload broken
      byte[] messageBytes = message.GetBytes();

      string messageText = string.Empty;
      JObject messageJson = null;

      // These will fail for some messages, gets bytes only
      try
      {
         messageText = Encoding.UTF8.GetString(messageBytes);

         messageJson = JObject.Parse(messageText);
      }
      catch (ArgumentException aex)
      {
         _logger.LogInformation("Downlink-DeviceID:{DeviceId} LockToken:{LockToken} messageBytes:{2} not valid Text", context.TerminalId, message.LockToken, BitConverter.ToString(messageBytes));
      }
      catch( JsonReaderException jex)
      {
         _logger.LogInformation("Downlink-DeviceID:{DeviceId} LockToken:{LockToken} messageText:{2} not valid json", context.TerminalId, message.LockToken, BitConverter.ToString(messageBytes));
      }

      IFormatterDownlink payloadFormatterDownlink = await _payloadFormatterCache.DownlinkGetAsync(payloadFormatter);

      byte[] payloadBytes = payloadFormatterDownlink.Evaluate(message.Properties, context.TerminalId, messageJson, messageBytes);

      string messageId = await _myriotaModuleAPI.SendAsync(context.TerminalId, payloadBytes);

      await context.DeviceClient.CompleteAsync(message);

      _logger.LogInformation("Downlink- IoT Hub TerminalID:{terminalId} LockToken:{LockToken} MessageID:{messageId} sent", context.TerminalId, message.LockToken, messageId);
   }
   catch (Exception ex)
   {
      await context.DeviceClient.RejectAsync(message);

      _logger.LogError(ex, "Downlink- IoT Hub TerminalID:{terminalId} LockToken:{LockToken} failed", context.TerminalId, message.LockToken);
   }
   finally
   {
      message.Dispose();
   }
}

Then finally the code was modified to gracefully handle broken payloads returned by the payload formatter evaluation, some comments were added, and the non-managed resources of the DeviceClient.Message disposed.

public async Task AzureIoTHubMessageHandler(Message message, object userContext)
{
   Models.DeviceConnectionContext context = (Models.DeviceConnectionContext)userContext;

   _logger.LogInformation("Downlink- IoT Hub TerminalId:{termimalId} LockToken:{LockToken}", context.TerminalId, message.LockToken);

   // Use default formatter and replace with message specific formatter if configured.
   string payloadFormatter;
   if (!message.Properties.TryGetValue(Constants.IoTHubDownlinkPayloadFormatterProperty, out payloadFormatter) || string.IsNullOrEmpty(payloadFormatter))
   {
      payloadFormatter = context.PayloadFormatterDownlink;
   }

   _logger.LogInformation("Downlink- IoT Hub TerminalID:{termimalId} LockToken:{LockToken} Payload formatter:{payloadFormatter} ", context.TerminalId, message.LockToken, payloadFormatter);

   try
   {
      // If this fails payload broken
      byte[] messageBytes = message.GetBytes();

      // This will fail for some messages, payload formatter gets bytes only
      string messageText = string.Empty;
      try
      {
         messageText = Encoding.UTF8.GetString(messageBytes);
      }
      catch (ArgumentException aex)
      {
         _logger.LogInformation("Downlink- IoT Hub TerminalID:{TerminalId} LockToken:{LockToken} messageBytes:{2} not valid Text", context.TerminalId, message.LockToken, BitConverter.ToString(messageBytes));
      }

      // This will fail for some messages, payload formatter gets bytes only
      JObject? messageJson = null;
      try
      {
         messageJson = JObject.Parse(messageText);
      }
      catch ( JsonReaderException jex)
      {
         _logger.LogInformation("Downlink- IoT Hub TerminalID:{TerminalId} LockToken:{LockToken} messageText:{2} not valid json", context.TerminalId, message.LockToken, BitConverter.ToString(messageBytes));
      }

      // This shouldn't fail, but it could for lots of diffent reasons, invalid path to blob, syntax error, interface broken etc.
      IFormatterDownlink payloadFormatterDownlink = await _payloadFormatterCache.DownlinkGetAsync(payloadFormatter);

      // This shouldn't fail, but it could for lots of different reasons, null references, divide by zero, out of range etc.
      byte[] payloadBytes = payloadFormatterDownlink.Evaluate(message.Properties, context.TerminalId, messageJson, messageBytes);

      // Validate payload before calling Myriota control message send API method
      if (payloadBytes is null)
      {
         _logger.LogWarning("Downlink- IoT Hub TerminalID:{terminalId} LockToken:{LockToken} payload formatter:{payloadFormatter} Evaluate returned null", context.TerminalId, message.LockToken, payloadFormatter);

         await context.DeviceClient.RejectAsync(message);

         return;
      }

      if ((payloadBytes.Length < Constants.DownlinkPayloadMinimumLength) || (payloadBytes.Length > Constants.DownlinkPayloadMaximumLength))
      {
         _logger.LogWarning("Downlink- IoT Hub TerminalID:{terminalId} LockToken:{LockToken} payloadData length:{Length} invalid must be {DownlinkPayloadMinimumLength} to {DownlinkPayloadMaximumLength} bytes", context.TerminalId, message.LockToken, payloadBytes.Length, Constants.DownlinkPayloadMinimumLength, Constants.DownlinkPayloadMaximumLength);

         await context.DeviceClient.RejectAsync(message);

         return;
      }

      // This shouldn't fail, but it could few reasons mainly connectivity & message queuing etc.
      _logger.LogInformation("Downlink- IoT Hub TerminalID:{TerminalId} LockToken:{LockToken} PayloadData:{payloadData} Length:{Length} sending", context.TerminalId, message.LockToken, Convert.ToHexString(payloadBytes), payloadBytes.Length);

      // Finally send the message using Myriota API
      string messageId = await _myriotaModuleAPI.SendAsync(context.TerminalId, payloadBytes);

      _logger.LogInformation("Downlink- IoT Hub TerminalID:{TerminalId} LockToken:{LockToken} MessageID:{messageId} sent", context.TerminalId, message.LockToken, messageId);

      await context.DeviceClient.CompleteAsync(message);

      _logger.LogInformation("Downlink- IoT Hub TerminalID:{terminalId} LockToken:{LockToken} MessageID:{messageId} sent", context.TerminalId, message.LockToken, messageId);
   }
   catch (Exception ex)
   {
      await context.DeviceClient.RejectAsync(message);

      _logger.LogError(ex, "Downlink- IoT Hub TerminalID:{terminalId} LockToken:{LockToken} failed", context.TerminalId, message.LockToken);
   }
   finally
   {
      // Mop up the non managed resources of message
      message.Dispose();
   }
}

As the code was being extended, I tested different failures to make sure the Application Insights logging messages were useful. The first failure mode tested was the Azure Storage Blob, path was broken or the blob was missing.

Visual Studio 2022 Debugger blob not found exception message
Application Insights blob not found exception logging

Then a series of “broken” payload formatters were created to test CS-Script compile time failures.

// Broken interface implementation
using System;
using System.Collections.Generic;

using Newtonsoft.Json;
using Newtonsoft.Json.Linq;

public class FormatterDownlink : PayloadFormatter.IFormatterDownlink
{
   public byte[] Evaluate(IDictionary<string, string> properties, string terminalId, byte[] payloadBytes)
   {
      return payloadBytes;
   }
}
Visual Studio 2022 Debugger interface implementation broken exception message
Application Insights interface implementation broken exception logging
// Broken syntax
using System;
using System.Collections.Generic;

using Newtonsoft.Json;
using Newtonsoft.Json.Linq;

public class FormatterDownlink : PayloadFormatter.IFormatterDownlink
{
   public byte[] Evaluate(IDictionary<string, string> properties, string terminalId, JObject payloadJson, byte[] payloadBytes)
   {
      return payloadBytes
   }
}
Visual Studio 2022 Debugger syntax error exception message
Application Insights syntax error exception logging
// Runtime error
using System;
using System.Collections.Generic;

using Newtonsoft.Json;
using Newtonsoft.Json.Linq;

public class FormatterDownlink : PayloadFormatter.IFormatterDownlink
{
   public byte[] Evaluate(IDictionary<string, string> properties, string terminalId, JObject payloadJson, byte[] payloadBytes)
   {
      payloadBytes[20] = 0;

      return payloadBytes;
   }
}
Visual Studio 2022 Debugger runtime error exception message
Application Insights syntax error exception logging

Invalid Myriota Cloud API send control message payload

Visual Studio 2022 Debugger Myriota send failure exception message
Application Insights Myriota send failure exception logging

The final test was sending a downlink message which was valid JSON, contained the correct information for the specified payload formatter and was successfully processed by the Myriota Cloud API.

Azure IoT Explorer with valid JSON payload and payload formatter name
Azure function output of successful downlink message
Application Insights successful downlink message logging

After a couple of hours the I think the downlink messageHandler implementation was significantly improved.

Swarm Space – Asset Tracker Payload Formatter

After writing Swarm Space – Payload Formatter Debugging I then tested it creating a new payload formatter for my new Swarm Asset Tracker.

Swarm Asset Tracker device

The Swarm Asset Tracker has a slightly different payload to the Swarm Eval Kit which is detailed in the product manual.

Swarm Asset Tracker JSON payload

The first message sent shortly after I powered up the device had the latitude and longitude of Null Island

The Asset Tracker UserApplicationId is 65002 and the payload is similar to the Swarm Eval Kit. I created some message payloads (location of Christchurch Cathedral) for testing.

The JSON payload sent by my Swarm Asset Tracker

{
  "dt": 1677396395,
  "lt": -43.5333,
  "ln": 172.6333,
  "al": 25,
  "sp": 0,
  "hd": 126,
  "gj": 92,
  "gs": 1,
  "bv": 4103,
  "tp": 20,
  "rs": -110,
  "tr": -107,
  "ts": 3,
  "td": 1677396357,
  "hp": 166,
  "vp": 187,
  "tf": 36526
}

The Base64 representation of the payload sent by my Swarm Asset Tracker

ew0KICAiZHQiOiAxNjc3Mzk2Mzk1LA0KICAibHQiOiAtNDMuNTMzMywNCiAgImxuIjogMTcyLjYzMzMsDQogICJhbCI6IDI1LA0KICAic3AiOiAwLA0KICAiaGQiOiAxMjYsDQogICJnaiI6IDkyLA0KICAiZ3MiOiAxLA0KICAiYnYiOiA0MTAzLA0KICAidHAiOiAyMCwNCiAgInJzIjogLTExMCwNCiAgInRyIjogLTEwNywNCiAgInRzIjogMywNCiAgInRkIjogMTY3NzM5NjM1NywNCiAgImhwIjogMTY2LA0KICAidnAiOiAxODcsDQogICJ0ZiI6IDM2NTI2DQp9

The initial version of my payload formatter

using System;
using System.Collections.Generic;
using System.Globalization;
using Newtonsoft.Json.Linq;

public class FormatterUplink : PayloadFormatter.IFormatterUplink
{
    public JObject Evaluate(IDictionary<string, string> properties, uint organisationId, uint deviceId, byte deviceType, ushort userApplicationId, JObject payloadJson, string payloadText, byte[] payloadBytes)
    {
        JObject telemetryEvent = new JObject();

        if ((payloadText != "") && (payloadJson != null))
        {
            JObject location = new JObject();

            location.Add("lat", payloadJson.GetValue("lt"));
            location.Add("lon", payloadJson.GetValue("ln"));
            location.Add("alt", payloadJson.GetValue("al"));

            telemetryEvent.Add("DeviceLocation", location);
        }

        // Course & speed
        telemetryEvent.Add("Course", payloadJson.GetValue("hd"));
        telemetryEvent.Add("Speed", payloadJson.GetValue("sp"));

        // Battery voltage
        telemetryEvent.Add("BatteryVoltage", payloadJson.GetValue("bv"));

        // RSSI
        telemetryEvent.Add("RSSI", payloadJson.GetValue("rs"));

        properties.Add("iothub-creation-time-utc", DateTimeOffset.FromUnixTimeSeconds((long)payloadJson.GetValue("dt")).ToString("s", CultureInfo.InvariantCulture));

        return telemetryEvent;
    }
}

The PayloadFormatterMaintenanceApplication command line I used for testing my Swarm Asset Tracker payload formatter

The console output of my Swarm Asset Tracker payload formatter

The PayloadFormatterMaintenanceApplication is better than trying to debug a payload formatter in a staging/production environment.

Currently the payload formatters still have to be manually uploaded to the application’s Azure Blob Storage for final testing.

Swarm Space – Payload Formatter Debugging

After Swarm Space – Uplink Payload Formatters revisited I wrote a couple of payload formatters and they were easy to get wrong and the Azure Application Insights error messages were unhelpful.

namespace PayloadFormatter // Additional namespace for shortening interface when usage in formatter code
{
    using System.Collections.Generic;

    using Newtonsoft.Json.Linq;

    public interface IFormatterUplink
    {
        public JObject Evaluate(IDictionary<string, string> properties, uint organisationId, uint deviceId, byte deviceType, ushort userApplicationId, JObject payloadJson, string payloadText, byte[] payloadBytes);
    }

    public interface IFormatterDownlink
    {
        public byte[] Evaluate(IDictionary<string, string> properties, uint organisationId, uint deviceId, byte deviceType, ushort userApplicationId, JObject payloadJson, string payloadText, byte[] payloadBytes);
    }
}

The definitions of the uplink & downlink payload formatter evaluator interfaces have been updated and shifted to a new project.

Visual Studio 2022 Solution with payloadformatter maintenance application

I built a console application to help with developing and debugging uplink or downlink formatters. The application has a number of command line parameters which specify the formatter to be used, UserApplicationId, OrganizationId, DeviceType etc.

public class CommandLineOptions
{
    [Option('d', "Direction", Required = true, HelpText = "Test Uplink or DownLink formatter")]
	public string Direction { get; set; }

    [Option('p', "filename", HelpText = "Uplink or Downlink Payload file name")]
    public string PayloadFilename { get; set; } = string.Empty;

    [Option('o', "OrganisationId", Required = true, HelpText = "Organisation unique identifier")]
    public uint OrganizationId { get; set; }

    [Option('i', "DeviceId", Required = true, HelpText = "Device unique identitifer")]
    public uint DeviceId { get; set; }

    [Option('t', "DeviceType", Required = true, HelpText = "Device type number")]
    public byte DeviceType { get; set; }

    [Option('u', "UserApplicationId", Required = true, HelpText = "User Application Id")]
    public ushort UserApplicationId { get; set; }

    [Option('h', "SwarmHiveReceivedAtUtc", HelpText = "Swarm Hive received at time UTC")]
    public DateTime? SwarmHiveReceivedAtUtc { get; set; }

    [Option('w', "UplinkWebHookReceivedAtUtc", HelpText = "Webhook received at time UTC")]
    public DateTime? UplinkWebHookReceivedAtUtc { get; set; }

    [Option('s', "Status", HelpText = "Uplink local file system file name")]
    public byte? Status { get; set; }

    [Option('c', "Client", HelpText = "Uplink local file system file name")]
    public string Client { get; set; } 
 }

The downlink formatter (similar approach for uplink) loads the sample file as an array of bytes, then tries to convert it to text, and finally to JSON. Then the formatter code is “compiled” and the executed with the file payload and command line parameters.

private static async Task DownlinkFormatterCore(CommandLineOptions options)
{
    Dictionary<string, string> properties = new Dictionary<string, string>();

    string formatterFolder = Path.Combine(Environment.CurrentDirectory, "downlink");
    Console.WriteLine($"Downlink- uplinkFormatterFolder: {formatterFolder}");

    string formatterFile = Path.Combine(formatterFolder, $"{options.UserApplicationId}.cs");
    Console.WriteLine($"Downlink- UserApplicationId: {options.UserApplicationId}");
    Console.WriteLine($"Downlink- Payload formatter file: {formatterFile}");

    PayloadFormatter.IFormatterDownlink evalulator;
    try
    {
        evalulator = CSScript.Evaluator.LoadFile<PayloadFormatter.IFormatterDownlink>(formatterFile);
     }
    catch (CSScriptLib.CompilerException cex)
    {
        Console.Write($"Loading or compiling file:{formatterFile} failed Exception:{cex}");
        return;
    }

    string payloadFilename = Path.Combine(formatterFolder, options.PayloadFilename);
    Console.WriteLine($"Downlink- payloadFilename:{payloadFilename}");
    byte[] uplinkBytes;

    try
    {
        uplinkBytes = File.ReadAllBytes(payloadFilename);
    }
    catch (DirectoryNotFoundException dex)
    {
        Console.WriteLine($"Uplink payload filename directory {formatterFolder} not found:{dex}");
        return;
    }
    catch (FileNotFoundException fnfex)
    {
        Console.WriteLine($"Uplink payload filename {payloadFilename} not found:{fnfex}");
        return;
    }
    catch (FormatException fex)
    {
        Console.WriteLine($"Uplink payload file invalid format {payloadFilename} not found:{fex}");
        return;
    }

    // See if payload can be converted to a string
    string uplinkText = string.Empty;
    try
    {
        uplinkText = Encoding.UTF8.GetString(uplinkBytes);
    }
    catch (FormatException fex)
    {
        Console.WriteLine("Encoding.UTF8.GetString failed:{0}", fex.Message);
    }

    // See if payload can be converted to JSON
    JObject uplinkJson;
    try
    {
        uplinkJson = JObject.Parse(uplinkText);
    }
    catch (JsonReaderException jrex)
    {
        Console.WriteLine("JObject.Parse failed Exception:{1}", jrex);

        uplinkJson = new JObject();
    }

    Console.WriteLine("Properties");
    foreach (var property in properties)
    {
        Console.WriteLine($"{property.Key}:{property.Value}");
    }

    // Transform the byte and optional text and JSON payload
    Byte[] payload;
    try
    {
        payload = evalulator.Evaluate(properties, options.OrganizationId, options.DeviceId, options.DeviceType, options.UserApplicationId, uplinkJson, uplinkText, uplinkBytes);
    }
    catch (Exception ex)
    {
        Console.WriteLine($"evalulatorUplink.Evaluate failed Exception:{ex}");
        return;
    }

    Console.WriteLine("Payload");
    Console.WriteLine(Convert.ToBase64String(payload));
}

The sample JSON payload is what would be sent by Azure IoT Central to a device to configure the fan speed

Azure IoT Central M138 Breakout device template with the Fan Status command selected
{
  "FanStatus": 2
}

If the downlink payload formatter is compiled and executes successfully the Base64 representation output is displayed

using System;
using System.Collections.Generic;
using Newtonsoft.Json.Linq;

public class FormatterDownlink : PayloadFormatter.IFormatterDownlink
{
    public byte[] Evaluate(IDictionary<string, string> properties, uint organisationId, uint deviceId, byte deviceType, ushort userApplicationId, JObject payloadJson, string payloadText, byte[] payloadBytes)
    {
        byte? status = payloadJson.Value<byte?>("FanStatus");

        if ( status.HasValue ) 
        { 
            return new byte[] { status.Value };
        }

        return new byte[]{};
    }
}

If the downlink payload formatter syntax is incorrect e.g. { status.Value ; }; an error message with the line and column is displayed.

using System;
using System.Collections.Generic;
using Newtonsoft.Json.Linq;

public class FormatterDownlink : PayloadFormatter.IFormatterDownlink
{
    public byte[] Evaluate(IDictionary<string, string> properties, uint organisationId, uint deviceId, byte deviceType, ushort userApplicationId, JObject payloadJson, string payloadText, byte[] payloadBytes)
    {
        byte? status = payloadJson.Value<byte?>("FanStatus");

        if ( status.HasValue ) 
        {
            return new byte[] { status.Value ; };
        }

        return new byte[]{};
    }
}

If the downlink payload formatter syntax is correct but execution fails (in the example code division by zero) an error message is displayed.

using System;
using System.Collections.Generic;
using Newtonsoft.Json.Linq;

public class FormatterDownlink : PayloadFormatter.IFormatterDownlink
{
    public byte[] Evaluate(IDictionary<string, string> properties, uint organisationId, uint deviceId, byte deviceType, ushort userApplicationId, JObject payloadJson, string payloadText, byte[] payloadBytes)
    {
        byte? status = payloadJson.Value<byte?>("FanStatus");

        if ( status.HasValue ) 
        {
            int divideByZero = 10;

            divideByZero = divideByZero / 0;

            return new byte[] { status.Value };
        }

        return new byte[]{};
    }
}

The PayloadFormatterMaintenanceApplication makes it significantly easier to develop formatters. Currently the payload formatters have to be manually uploaded to the application’s Azure Blob Storage for final testing.