Myriota Connector – Azure IoT Hub Downlink logging refactor

After several refactorings the code stabilised and the Azure IoT Hub downlink message handler (configured with SetMethodDefaultHandlerAsync ) was ready for testing. I used Azure IoT Explorer to send some “hand-crafted” JavaScript Object Notation(JSON) Cloud to Device(C2D) messages.

Each logging message starts with the TerminalID (to simplify searching for all the messages sent to a device) and the message LockToken (to simplify searching for all the “steps” associated with sending a message) with the rest of the logging message containing “step” specific diagnostic information.

Successful Azure IoT Explorer C2D JSON Message

If there is no PayloadFormatter attribute the default in the PayloadFormatters section of the function configuration is used.

using System;
using System.Collections.Generic;

using Newtonsoft.Json;
using Newtonsoft.Json.Linq;

public class FormatterDownlink : PayloadFormatter.IFormatterDownlink
{
   public byte[] Evaluate(IDictionary<string, string> properties, string terminalId, JObject payloadJson, byte[] payloadBytes)
   {
      byte? status = payloadJson.Value<byte?>("FanSpeed");

      if (!status.HasValue)
      {
         return new byte[] { };
      }

      return new byte[] { 1, status.Value };
   }
}

The FanSpeed.cs payload formatter extracts the FanSpeed value from the JSON payload and returns a two byte array containing the message type and speed of the fan.

Azure IoT Function running waiting for a C2D message

After re-reading the SetMethodHandlerAync documentation I refactored the code (back to the approach used a couple of branches ago) with the “using” wrapping the try/catch.

public async Task AzureIoTHubMessageHandler(Message message, object userContext)
{
   Models.DeviceConnectionContext context = (Models.DeviceConnectionContext)userContext;

   _logger.LogInformation("Downlink- IoT Hub TerminalId:{TermimalId} LockToken:{LockToken}", context.TerminalId, message.LockToken);

   using (message) // https://learn.microsoft.com/en-us/dotnet/api/microsoft.azure.devices.client.deviceclient.setreceivemessagehandlerasync?view=azure-dotnet
   {
      try
      {
         // Replace default formatter with message specific formatter if configured.
         if (!message.Properties.TryGetValue(Constants.IoTHubDownlinkPayloadFormatterProperty, out string? payloadFormatterName) || string.IsNullOrEmpty(payloadFormatterName))
         {
            _logger.LogInformation("Downlink- IoT Hub TerminalID:{TermimalId} LockToken:{LockToken} Context formatter:{payloadFormatterName} ", context.TerminalId, message.LockToken, payloadFormatterName);

            payloadFormatterName = context.PayloadFormatterDownlink;
         }
         else
         {
            _logger.LogInformation("Downlink- IoT Hub TerminalID:{TermimalId} LockToken:{LockToken} Property formatter:{payloadFormatterName} ", context.TerminalId, message.LockToken, payloadFormatterName);
         }


         // If GetBytes fails payload really badly broken
         byte[] messageBytes = message.GetBytes();

         _logger.LogInformation("Downlink- IoT Hub TerminalID:{TerminalId} LockToken:{LockToken} Message bytes:{messageBytes}", context.TerminalId, message.LockToken, BitConverter.ToString(messageBytes));


         // Try converting the bytes to text then to JSON
         JObject? messageJson = null;
         try
         {
            // These will fail for some messages, payload formatter gets bytes only
            string messageText = Encoding.UTF8.GetString(messageBytes);

            try
            {
               messageJson = JObject.Parse(messageText);

               _logger.LogInformation("Downlink- IoT Hub TerminalID:{TerminalId} LockToken:{LockToken} JSON:{messageJson}", context.TerminalId, message.LockToken, JsonConvert.SerializeObject(messageJson, Formatting.Indented));
            }
            catch (JsonReaderException jex)
            {
               _logger.LogInformation(jex, "Downlink- IoT Hub TerminalID:{TerminalId} LockToken:{LockToken} not valid JSON", context.TerminalId, message.LockToken);
            }
         }
         catch (ArgumentException aex)
         {
            _logger.LogInformation(aex, "Downlink- IoT Hub TerminalID:{TerminalId} LockToken:{LockToken} message bytes not valid text", context.TerminalId, message.LockToken);
         }


         // This shouldn't fail, but it could for invalid path to blob, timeout retrieving blob, payload formatter syntax error etc.
         IFormatterDownlink payloadFormatter = await _payloadFormatterCache.DownlinkGetAsync(payloadFormatterName);

         // This will fail if payload formatter throws runtime exceptions like null reference, divide by zero, index out of range etc.
         byte[] payloadBytes = payloadFormatter.Evaluate(message.Properties, context.TerminalId, messageJson, messageBytes);


         // Validate payload before calling Myriota control message send API method
         if (payloadBytes is null)
         {
            _logger.LogWarning("Downlink- IoT Hub TerminalID:{TerminalId} LockToken:{LockToken} payload formatter:{payloadFormatter} Evaluate returned null", context.TerminalId, message.LockToken, payloadFormatterName);

            await context.DeviceClient.RejectAsync(message);

            return;
         }

         if ((payloadBytes.Length < Constants.DownlinkPayloadMinimumLength) || (payloadBytes.Length > Constants.DownlinkPayloadMaximumLength))
         {
            _logger.LogWarning("Downlink- IoT Hub TerminalID:{TerminalId} LockToken:{LockToken} PayloadBytes:{payloadBytes} length:{Length} invalid must be {DownlinkPayloadMinimumLength} to {DownlinkPayloadMaximumLength} bytes", context.TerminalId, message.LockToken, Convert.ToHexString(payloadBytes), payloadBytes.Length, Constants.DownlinkPayloadMinimumLength, Constants.DownlinkPayloadMaximumLength);

            await context.DeviceClient.RejectAsync(message);

            return;
         }


         // Finally send Control Message to device using the Myriota API
         _logger.LogInformation("Downlink- IoT Hub TerminalID:{TerminalId} LockToken:{LockToken} PayloadBytes:{payloadBytes} Length:{Length} sending", context.TerminalId, message.LockToken, BitConverter.ToString(payloadBytes), payloadBytes.Length);

         string messageId = await _myriotaModuleAPI.SendAsync(context.TerminalId, payloadBytes);

         _logger.LogInformation("Downlink- IoT Hub TerminalID:{TerminalId} LockToken:{LockToken} MessageID:{messageId} sent", context.TerminalId, message.LockToken, messageId);

         await context.DeviceClient.CompleteAsync(message);
      }
      catch (Exception ex)
      {
         _logger.LogError(ex, "Downlink- IoT Hub TerminalID:{TerminalId} LockToken:{LockToken} MessageHandler processing failed", context.TerminalId, message.LockToken);

         await context.DeviceClient.RejectAsync(message);
      }
   }
}

The first time I ran the myriotaAzureIoTConnector Azure function in the Core Tools debugging environment there were no errors and the Microsoft.Azure.Devices.Client.DeviceClient connection cache loaded in the background.

Azure IoT Function failing with a SystemArgumentOutOfRangeException

The first time I sent a downlink message the handler failed spectacularly with a SystemArgumentOutOfRangeException

After adding some breakpoints, restarting the application, then single stepping through the code I found that I had accidentally used BitConverter.ToSingle(payloadBytes) instead of BitConverter.ToString(payloadBytes) to get the Hexadecimal representation of the payload bytes.

...
// Finally send Control Message to device using the Myriota API
_logger.LogInformation("Downlink- IoT Hub TerminalID:{TerminalId} LockToken:{LockToken} PayloadBytes:{payloadBytes} Length:{Length} sending", context.TerminalId, message.LockToken, BitConverter.ToString(payloadBytes), payloadBytes.Length);

string messageId = await _myriotaModuleAPI.SendAsync(context.TerminalId, payloadBytes);
...
Azure IoT Function successfully sending downlink message.

The Encoding.UTF8.GetString and JObject.Parse are processed in a single Try with a specialised catch for when the payload cannot be converted to text. If the payload cannot be converted to JSON only the payloadBytes parameter of payload formatter is populated.

Myriota Connector – Azure IoT Hub Downlink final refactoring

I often print code and review it away from a computer. I can’t be distracted by “tinkering” with the code and I find that drawing on it helps me visualise what is going on. The payload formatters are retrieved from Azure Storage blob which have a default retry policy, the Azure IoT Hub DeviceClient methods have a default retry policy, the Myriota Cloud API SendMessage has retries (Implemented with Polly) and if the CS-Script compilation fails there is nothing that can be done so the code could be simplified.

The Azure IoT Hub downlink message handler was a partial class and part of implementation of the IDeviceConnectionCache which was a hangover from one of the initial versions.

internal partial class DeviceConnectionCache : IDeviceConnectionCache
{
   public async Task AzureIoTHubMessageHandler(Message message, object userContext)
   {
      Models.DeviceConnectionContext context = (Models.DeviceConnectionContext)userContext;

      _logger.LogInformation("Downlink- IoT Hub TerminalId:{termimalId} LockToken:{LockToken}", context.TerminalId, message.LockToken);

I replaced the IDeviceConnectionCache interface with IIoTHubDownlink which was declare in a new file in the interfaces folder.

namespace devMobile.IoT.MyriotaAzureIoTConnector.Connector
{
   public interface IIoTHubDownlink
   {
      public Task AzureIoTHubMessageHandler(Message message, object userContext);
   }
}

Then had to inject all the required dependencies which had been implemented in one of the other partial class files.

internal class IoTHubDownlink : IIoTHubDownlink
{
   private readonly ILogger<IoTHubDownlink> _logger;
   private readonly IPayloadFormatterCache _payloadFormatterCache;
   private readonly IMyriotaModuleAPI _myriotaModuleAPI;

   public IoTHubDownlink(ILogger<IoTHubDownlink> logger, IPayloadFormatterCache payloadFormatterCache, IMyriotaModuleAPI myriotaModuleAPI)
   {
      _logger = logger;
      _payloadFormatterCache = payloadFormatterCache;
      _myriotaModuleAPI = myriotaModuleAPI;
   }
...
}

The implementation had been extracted to a separate class so it had to be constructed by the Dependency Injection plumbing.

...
services.AddSingleton<IPayloadFormatterCache, PayloadFormatterCache>();
services.AddSingleton<IIoTHubDownlink, IoTHubDownlink>();
services.AddSingleton<IIoTCentralDownlink, IoTCentralDownlink>();
services.AddOptions<Models.MyriotaSettings>().Configure<IConfiguration>((settings, configuration) =>
{
    configuration.GetSection("Myriota").Bind(settings);
 });
 services.AddSingleton<IMyriotaModuleAPI, MyriotaModuleAPI>();
...

The lifetime of the Microsoft.Azure.Devices.Client.Message was being managed manually which seemed a bit odd.

public async Task AzureIoTHubMessageHandler(Message message, object userContext)
{
   Models.DeviceConnectionContext context = (Models.DeviceConnectionContext)userContext;

   _logger.LogInformation("Downlink- IoT Hub TerminalId:{termimalId} LockToken:{LockToken}", context.TerminalId, message.LockToken);

   // Use default formatter and replace with message specific formatter if configured.
   string payloadFormatter;
   if (!message.Properties.TryGetValue(Constants.IoTHubDownlinkPayloadFormatterProperty, out payloadFormatter) || string.IsNullOrEmpty(payloadFormatter))
   {
      payloadFormatter = context.PayloadFormatterDownlink;
   }

   _logger.LogInformation("Downlink- IoT Hub TerminalID:{termimalId} LockToken:{LockToken} Payload formatter:{payloadFormatter} ", context.TerminalId, message.LockToken, payloadFormatter);

   try
   {
   ...
   }
   finally
   {
      // Mop up the non managed resources of message
      message.Dispose();
   }
}

I replaced this with a with a “using” which “automagically” manages the lifetime of any non-managed resources. I also added string Locktoken variable for DeviceClient.RejectAsync and DeviceClient.CompletedAsync so that the “using” could be inside the try/catch (there is scope to reduce the amount of code in the “using”)

public async Task AzureIoTHubMessageHandler(Message message, object userContext)
{
   Models.DeviceConnectionContext context = (Models.DeviceConnectionContext)userContext;

   _logger.LogInformation("Downlink- IoT Hub TerminalId:{TermimalId} LockToken:{lockToken}", context.TerminalId, message.LockToken);

   // broken out so using for message only has to be inside try
   string lockToken = message.LockToken; 

   try
   {
      using (message)
      {
...
      }
      catch (Exception ex)
      {
         _logger.LogError(ex, "Downlink- IoT Hub TerminalID:{TerminalId} LockToken:{lockToken} MessageHandler processing failed", context.TerminalId, lockToken);

         await context.DeviceClient.RejectAsync(lockToken);
      }
   }
}

The handling of the Encoding.UTF8.GetString and JObject.Parse payload was broken. If the Encoding.UTF8.GetString threw an exception there was no point in calling the JObject.Parse

// If this fails payload broken
byte[] messageBytes = message.GetBytes();

// This will fail for some messages, payload formatter gets bytes only
string messageText = string.Empty;
try
{
   messageText = Encoding.UTF8.GetString(messageBytes);
}
catch (ArgumentException aex)
{
   _logger.LogInformation("Downlink- IoT Hub TerminalID:{TerminalId} LockToken:{LockToken} messageBytes:{2} not valid Text", context.TerminalId, message.LockToken, BitConverter.ToString(messageBytes));
}

// This will fail for some messages, payload formatter gets bytes only
JObject? messageJson = null;
try
{
   messageJson = JObject.Parse(messageText);
}
catch ( JsonReaderException jex)
{
   _logger.LogInformation("Downlink- IoT Hub TerminalID:{TerminalId} LockToken:{LockToken} messageText:{2} not valid json", context.TerminalId, message.LockToken, BitConverter.ToString(messageBytes));
}

The Encoding.UTF8.GetString and JObject.Parse are now processed in a single Try with a specialised catch handling.

// These will fail for some messages, then payload formatter gets bytes only
string messageText = string.Empty;
JObject? messageJson = null;
try
{
   messageText = Encoding.UTF8.GetString(messageBytes);

   messageJson = JObject.Parse(messageText);
 }
catch (ArgumentException aex)
{
   _logger.LogInformation("Downlink- IoT Hub TerminalID:{TerminalId} LockToken:{lockToken} messageBytes:{messageBytes} not valid text exception:{Message}", context.TerminalId, lockToken, BitConverter.ToString(messageBytes), aex.Message);
}
catch (JsonReaderException jex)
{
   _logger.LogInformation("Downlink- IoT Hub TerminalID:{TerminalId} LockToken:{lockToken} messageText:{messageText} not valid json exception:{Message}", context.TerminalId, lockToken, messageText, jex.Message);
}

// This shouldn't fail, but it could for lots of different reasons, invalid path to blob, syntax error, interface broken etc.
IFormatterDownlink payloadFormatter = await _payloadFormatterCache.DownlinkGetAsync(payloadFormatterName);

This refactored code now looks an awful lot like the “sunny days” code checked in on the 3rd of November.

Myriota Connector – UplinkMessageProcessor Queue Output Binding

The myriota Azure IoT Hub Cloud Identity Translation Gateway uplink message handler Azure Storage Queue Trigger Function wasn’t processing “transient” vs. “permanent” failures well. Sometimes a “permanent” failure message would be retried multiple times by the function runtime before getting moved to the poison queue.

After some experimentation using an Azure Storage Queue Function Output binding to move messages to the poison queue looked like a reasonable approach. (Though, returning null to indicate the message should be removed from the queue was not obvious from the documentation)

[Function("UplinkMessageProcessor")]
[QueueOutput(queueName: "uplink-poison", Connection = "UplinkQueueStorage")]
public async Task<Models.UplinkPayloadQueueDto> UplinkMessageProcessor([QueueTrigger(queueName: "uplink", Connection = "UplinkQueueStorage")] Models.UplinkPayloadQueueDto payload, CancellationToken cancellationToken)
{
...
   // Process each packet in the payload. Myriota docs say only one packet per payload but just incase...
   foreach (Models.QueuePacket packet in payload.Data.Packets)
   {
      // Lookup the device client in the cache or create a new one
      Models.DeviceConnectionContext context;

      try
      {
         context = await _deviceConnectionCache.GetOrAddAsync(packet.TerminalId, cancellationToken);
      }
      catch (DeviceNotFoundException dnfex)
      {
         _logger.LogError(dnfex, "Uplink- PayloadId:{0} TerminalId:{1} terminal not found", payload.Id, packet.TerminalId);

         return payload;
      }
      catch (Exception ex) // Maybe just send to poison queue or figure if transient error?
      {
         _logger.LogError(ex, "Uplink- PayloadId:{0} TerminalId:{1} ", payload.Id, packet.TerminalId);

         throw;
      }
...
         // Proccessing successful, message can be deleted by QueueTrigger plumbing
         return null;
      }

After building and testing an Azure Storage Queue Function Output binding implementation I’m not certain that it is a good approach. The code is a bit “chunky” and I have had to implement more of the retry process logic.

Myriota Connector – Azure IoT Hub Downlink refactoring

The myriota Azure IoT Hub Cloud Identity Translation Gateway downlink message handler was getting a bit “chunky”. So, I started by stripping the code back to the absolute bare minimum that would “work”.

public async Task AzureIoTHubMessageHandler(Message message, object userContext)
{
   Models.DeviceConnectionContext context = (Models.DeviceConnectionContext)userContext;

   _logger.LogInformation("Downlink- IoT Hub TerminalId:{termimalId} LockToken:{LockToken}", context.TerminalId, message.LockToken);

   try
   {

      await context.DeviceClient.CompleteAsync(message);
   }
   catch (Exception ex)
   {
      await context.DeviceClient.RejectAsync(message);

      _logger.LogError(ex, "Downlink- IoT Hub TerminalID:{terminalId} LockToken:{LockToken} failed", context.TerminalId, message.LockToken );
   }
}

Then the code was then extended so it worked for “sunny day” scenarios. The payload formatter was successfully retrieved from the configured Azure Storage Blob, CS-Script successfully compiled the payload formatter, the message payload was valid text, the message text was valid Javascript Object Notation(JSON), the JSON was successfully processed by the compiled payload formatter, and finally the payload was accepted by the Myriota Cloud API.

public async Task AzureIoTHubMessageHandler(Message message, object userContext)
{
   Models.DeviceConnectionContext context = (Models.DeviceConnectionContext)userContext;

   _logger.LogInformation("Downlink- IoT Hub TerminalId:{termimalId} LockToken:{LockToken}", context.TerminalId, message.LockToken);

   string payloadFormatter;

   // Use default formatter and replace with message specific formatter if configured.
   if (!message.Properties.TryGetValue(Constants.IoTHubDownlinkPayloadFormatterProperty, out payloadFormatter) || string.IsNullOrEmpty(payloadFormatter))
   {
      payloadFormatter = context.PayloadFormatterDownlink;
   }

   _logger.LogInformation("Downlink- IoT Hub TerminalID:{termimalId} LockToken:{LockToken} Payload formatter:{payloadFormatter} ", context.TerminalId, message.LockToken, payloadFormatter);

   try
   {
      IFormatterDownlink payloadFormatterDownlink = await _payloadFormatterCache.DownlinkGetAsync(payloadFormatter);

      byte[] messageBytes = message.GetBytes();

      string messageText = Encoding.UTF8.GetString(messageBytes);

      JObject messageJson = JObject.Parse(messageText);

      byte[] payloadBytes = payloadFormatterDownlink.Evaluate(message.Properties, context.TerminalId, messageJson, messageBytes);

      string messageId = await _myriotaModuleAPI.SendAsync(context.TerminalId, payloadBytes);

      _logger.LogInformation("Downlink- IoT Hub TerminalID:{terminalId} LockToken:{LockToken} MessageID:{messageId} sent", context.TerminalId, message.LockToken, messageId);
      
      await context.DeviceClient.CompleteAsync(message);
   }
   catch (Exception ex)
   {
      await context.DeviceClient.RejectAsync(message);

      _logger.LogError(ex, "Downlink- IoT Hub TerminalID:{terminalId} LockToken:{LockToken} failed", context.TerminalId, message.LockToken);
   }
}

Then code was then extended to handle message payloads which were problematic but not “failures”

public async Task AzureIoTHubMessageHandler(Message message, object userContext)
{
   Models.DeviceConnectionContext context = (Models.DeviceConnectionContext)userContext;

   _logger.LogInformation("Downlink- IoT Hub TerminalId:{termimalId} LockToken:{LockToken}", context.TerminalId, message.LockToken);

   string payloadFormatter;

   // Use default formatter and replace with message specific formatter if configured.
   if (!message.Properties.TryGetValue(Constants.IoTHubDownlinkPayloadFormatterProperty, out payloadFormatter) || string.IsNullOrEmpty(payloadFormatter))
   {
      payloadFormatter = context.PayloadFormatterDownlink;
   }

   _logger.LogInformation("Downlink- IoT Hub TerminalID:{termimalId} LockToken:{LockToken} Payload formatter:{payloadFormatter} ", context.TerminalId, message.LockToken, payloadFormatter);

   try
   {
      // If this fails payload broken
      byte[] messageBytes = message.GetBytes();

      string messageText = string.Empty;
      JObject messageJson = null;

      // These will fail for some messages, gets bytes only
      try
      {
         messageText = Encoding.UTF8.GetString(messageBytes);

         messageJson = JObject.Parse(messageText);
      }
      catch (ArgumentException aex)
      {
         _logger.LogInformation("Downlink-DeviceID:{DeviceId} LockToken:{LockToken} messageBytes:{2} not valid Text", context.TerminalId, message.LockToken, BitConverter.ToString(messageBytes));
      }
      catch( JsonReaderException jex)
      {
         _logger.LogInformation("Downlink-DeviceID:{DeviceId} LockToken:{LockToken} messageText:{2} not valid json", context.TerminalId, message.LockToken, BitConverter.ToString(messageBytes));
      }

      IFormatterDownlink payloadFormatterDownlink = await _payloadFormatterCache.DownlinkGetAsync(payloadFormatter);

      byte[] payloadBytes = payloadFormatterDownlink.Evaluate(message.Properties, context.TerminalId, messageJson, messageBytes);

      string messageId = await _myriotaModuleAPI.SendAsync(context.TerminalId, payloadBytes);

      await context.DeviceClient.CompleteAsync(message);

      _logger.LogInformation("Downlink- IoT Hub TerminalID:{terminalId} LockToken:{LockToken} MessageID:{messageId} sent", context.TerminalId, message.LockToken, messageId);
   }
   catch (Exception ex)
   {
      await context.DeviceClient.RejectAsync(message);

      _logger.LogError(ex, "Downlink- IoT Hub TerminalID:{terminalId} LockToken:{LockToken} failed", context.TerminalId, message.LockToken);
   }
   finally
   {
      message.Dispose();
   }
}

Then finally the code was modified to gracefully handle broken payloads returned by the payload formatter evaluation, some comments were added, and the non-managed resources of the DeviceClient.Message disposed.

public async Task AzureIoTHubMessageHandler(Message message, object userContext)
{
   Models.DeviceConnectionContext context = (Models.DeviceConnectionContext)userContext;

   _logger.LogInformation("Downlink- IoT Hub TerminalId:{termimalId} LockToken:{LockToken}", context.TerminalId, message.LockToken);

   // Use default formatter and replace with message specific formatter if configured.
   string payloadFormatter;
   if (!message.Properties.TryGetValue(Constants.IoTHubDownlinkPayloadFormatterProperty, out payloadFormatter) || string.IsNullOrEmpty(payloadFormatter))
   {
      payloadFormatter = context.PayloadFormatterDownlink;
   }

   _logger.LogInformation("Downlink- IoT Hub TerminalID:{termimalId} LockToken:{LockToken} Payload formatter:{payloadFormatter} ", context.TerminalId, message.LockToken, payloadFormatter);

   try
   {
      // If this fails payload broken
      byte[] messageBytes = message.GetBytes();

      // This will fail for some messages, payload formatter gets bytes only
      string messageText = string.Empty;
      try
      {
         messageText = Encoding.UTF8.GetString(messageBytes);
      }
      catch (ArgumentException aex)
      {
         _logger.LogInformation("Downlink- IoT Hub TerminalID:{TerminalId} LockToken:{LockToken} messageBytes:{2} not valid Text", context.TerminalId, message.LockToken, BitConverter.ToString(messageBytes));
      }

      // This will fail for some messages, payload formatter gets bytes only
      JObject? messageJson = null;
      try
      {
         messageJson = JObject.Parse(messageText);
      }
      catch ( JsonReaderException jex)
      {
         _logger.LogInformation("Downlink- IoT Hub TerminalID:{TerminalId} LockToken:{LockToken} messageText:{2} not valid json", context.TerminalId, message.LockToken, BitConverter.ToString(messageBytes));
      }

      // This shouldn't fail, but it could for lots of diffent reasons, invalid path to blob, syntax error, interface broken etc.
      IFormatterDownlink payloadFormatterDownlink = await _payloadFormatterCache.DownlinkGetAsync(payloadFormatter);

      // This shouldn't fail, but it could for lots of different reasons, null references, divide by zero, out of range etc.
      byte[] payloadBytes = payloadFormatterDownlink.Evaluate(message.Properties, context.TerminalId, messageJson, messageBytes);

      // Validate payload before calling Myriota control message send API method
      if (payloadBytes is null)
      {
         _logger.LogWarning("Downlink- IoT Hub TerminalID:{terminalId} LockToken:{LockToken} payload formatter:{payloadFormatter} Evaluate returned null", context.TerminalId, message.LockToken, payloadFormatter);

         await context.DeviceClient.RejectAsync(message);

         return;
      }

      if ((payloadBytes.Length < Constants.DownlinkPayloadMinimumLength) || (payloadBytes.Length > Constants.DownlinkPayloadMaximumLength))
      {
         _logger.LogWarning("Downlink- IoT Hub TerminalID:{terminalId} LockToken:{LockToken} payloadData length:{Length} invalid must be {DownlinkPayloadMinimumLength} to {DownlinkPayloadMaximumLength} bytes", context.TerminalId, message.LockToken, payloadBytes.Length, Constants.DownlinkPayloadMinimumLength, Constants.DownlinkPayloadMaximumLength);

         await context.DeviceClient.RejectAsync(message);

         return;
      }

      // This shouldn't fail, but it could few reasons mainly connectivity & message queuing etc.
      _logger.LogInformation("Downlink- IoT Hub TerminalID:{TerminalId} LockToken:{LockToken} PayloadData:{payloadData} Length:{Length} sending", context.TerminalId, message.LockToken, Convert.ToHexString(payloadBytes), payloadBytes.Length);

      // Finally send the message using Myriota API
      string messageId = await _myriotaModuleAPI.SendAsync(context.TerminalId, payloadBytes);

      _logger.LogInformation("Downlink- IoT Hub TerminalID:{TerminalId} LockToken:{LockToken} MessageID:{messageId} sent", context.TerminalId, message.LockToken, messageId);

      await context.DeviceClient.CompleteAsync(message);

      _logger.LogInformation("Downlink- IoT Hub TerminalID:{terminalId} LockToken:{LockToken} MessageID:{messageId} sent", context.TerminalId, message.LockToken, messageId);
   }
   catch (Exception ex)
   {
      await context.DeviceClient.RejectAsync(message);

      _logger.LogError(ex, "Downlink- IoT Hub TerminalID:{terminalId} LockToken:{LockToken} failed", context.TerminalId, message.LockToken);
   }
   finally
   {
      // Mop up the non managed resources of message
      message.Dispose();
   }
}

As the code was being extended, I tested different failures to make sure the Application Insights logging messages were useful. The first failure mode tested was the Azure Storage Blob, path was broken or the blob was missing.

Visual Studio 2022 Debugger blob not found exception message
Application Insights blob not found exception logging

Then a series of “broken” payload formatters were created to test CS-Script compile time failures.

// Broken interface implementation
using System;
using System.Collections.Generic;

using Newtonsoft.Json;
using Newtonsoft.Json.Linq;

public class FormatterDownlink : PayloadFormatter.IFormatterDownlink
{
   public byte[] Evaluate(IDictionary<string, string> properties, string terminalId, byte[] payloadBytes)
   {
      return payloadBytes;
   }
}
Visual Studio 2022 Debugger interface implementation broken exception message
Application Insights interface implementation broken exception logging
// Broken syntax
using System;
using System.Collections.Generic;

using Newtonsoft.Json;
using Newtonsoft.Json.Linq;

public class FormatterDownlink : PayloadFormatter.IFormatterDownlink
{
   public byte[] Evaluate(IDictionary<string, string> properties, string terminalId, JObject payloadJson, byte[] payloadBytes)
   {
      return payloadBytes
   }
}
Visual Studio 2022 Debugger syntax error exception message
Application Insights syntax error exception logging
// Runtime error
using System;
using System.Collections.Generic;

using Newtonsoft.Json;
using Newtonsoft.Json.Linq;

public class FormatterDownlink : PayloadFormatter.IFormatterDownlink
{
   public byte[] Evaluate(IDictionary<string, string> properties, string terminalId, JObject payloadJson, byte[] payloadBytes)
   {
      payloadBytes[20] = 0;

      return payloadBytes;
   }
}
Visual Studio 2022 Debugger runtime error exception message
Application Insights syntax error exception logging

Invalid Myriota Cloud API send control message payload

Visual Studio 2022 Debugger Myriota send failure exception message
Application Insights Myriota send failure exception logging

The final test was sending a downlink message which was valid JSON, contained the correct information for the specified payload formatter and was successfully processed by the Myriota Cloud API.

Azure IoT Explorer with valid JSON payload and payload formatter name
Azure function output of successful downlink message
Application Insights successful downlink message logging

After a couple of hours the I think the downlink messageHandler implementation was significantly improved.

Myriota Connector – Payload formatters revisited again

The myriota Azure IoT Hub Cloud Identity Translation Gateway payload formatters use compiled C# code to convert uplink/downlink packet payloads to JSON/byte array. While trying out different formatters I had “compile” and “evaluation” errors which would have been a lot easier to debug if there was more diagnostic information in the Azure Application Insights logging.

namespace PayloadFormatter // Additional namespace for shortening interface when usage in formatter code
{
    using System.Collections.Generic;

    using Newtonsoft.Json.Linq;

    public interface IFormatterUplink
    {
        public JObject Evaluate(IDictionary<string, string> properties, string terminalId, DateTime timestamp, byte[] payloadBytes);
    }

    public interface IFormatterDownlink
    {
        public byte[] Evaluate(IDictionary<string, string> properties, string terminalId, JObject? payloadJson, byte[] payloadBytes);
    }
}

An uplink payload formatter is loaded from Azure Storage Blob, compiled with Oleg Shilo’s CS-Script then cached in memory with Alastair Crabtree’s LazyCache.

// Get the payload formatter from Azure Storage container, compile, and then cache binary.
IFormatterUplink formatterUplink;

try
{
   formatterUplink = await _payloadFormatterCache.UplinkGetAsync(context.PayloadFormatterUplink, cancellationToken);
}
catch (Azure.RequestFailedException aex)
{
   _logger.LogError(aex, "Uplink- PayloadID:{0} payload formatter load failed", payload.Id);

   return payload;
}
catch (NullReferenceException nex)
{
   _logger.LogError(nex, "Uplink- PayloadID:{id} formatter:{formatter} compilation failed missing interface", payload.Id, context.PayloadFormatterUplink);

   return payload;
}
catch (CSScriptLib.CompilerException cex)
{
   _logger.LogError(cex, "Uplink- PayloadID:{id} formatter:{formatter} compiler failed", payload.Id, context.PayloadFormatterUplink);

   return payload;
}
catch (Exception ex)
{
   _logger.LogError(ex, "Uplink- PayloadID:{id} formatter:{formatter} compilation failed", payload.Id, context.PayloadFormatterUplink);

   return payload;
}

If the Azure Storage blob is missing or the payload formatter code incorrect an exception is thrown. I added specialised exception handers for Azure.RequestFailedException, NullReferenceException and CSScriptLib.CompilerException to add more detail to the Azure Application Insights logging.

// Process the payload with configured formatter
Dictionary<string, string> properties = new Dictionary<string, string>();
JObject telemetryEvent;

try
{
   telemetryEvent = formatterUplink.Evaluate(properties, packet.TerminalId, packet.Timestamp, payloadBytes);
}
catch (Exception ex)
{
   _logger.LogError(ex, "Uplink- PayloadId:{0} TerminalId:{1} Value:{2} Bytes:{3} payload formatter evaluate failed", payload.Id, packet.TerminalId, packet.Value, Convert.ToHexString(payloadBytes));

   return payload;
}

if (telemetryEvent is null)
{
   _logger.LogError("Uplink- PayloadId:{0} TerminalId:{1} Value:{2} Bytes:{3} payload formatter evaluate failed returned null", payload.Id, packet.TerminalId, packet.Value, Convert.ToHexString(payloadBytes));

   return payload;
}

The Evaluate method can return many different types of exception so in the initial version only the “generic” exception is caught and logged.

using System;
using System.Collections.Generic;

using Newtonsoft.Json;
using Newtonsoft.Json.Linq;

public class FormatterUplink : PayloadFormatter.IFormatterUplink
{
    public JObject Evaluate(IDictionary<string, string> properties, string terminalId, DateTime timestamp, byte[] payloadBytes)
    {
        JObject telemetryEvent = new JObject();

        telemetryEvent.Add("Bytes", BitConverter.ToString(payloadBytes));
        telemetryEvent.Add("Bytes", BitConverter.ToString(payloadBytes));

        return telemetryEvent;
    }
}

There are a number (which should grow over time) of test uplink/downlink payload formatters for testing different compile and execution failures.

Azure IoT Storage Explorer container with sample formatter blobs.

I used Azure Storage Explorer to upload my test payload formatters to the uplink/downlink Azure Storage containers.

Myriota Connector – Uplink Payload Formatters Test Harness

The myriota Azure IoT Hub Cloud Identity Translation Gateway payload formatters use compiled C# code to convert uplink packet payloads to JSON.

...
public class FormatterUplink : PayloadFormatter.IFormatterUplink
{
    public JObject Evaluate(IDictionary<string, string> properties, string application, string terminalId, DateTime timestamp, byte[] payloadBytes)
    {
        JObject telemetryEvent = new JObject();

        if (payloadBytes is null)
        {
            return telemetryEvent;
        }

        telemetryEvent.Add("SequenceNumber", BitConverter.ToUInt16(payloadBytes));

        JObject location = new JObject();

        double latitude = BitConverter.ToInt32(payloadBytes, 2) / 10000000.0;
        location.Add("lat", latitude);

        double longitude = BitConverter.ToInt32(payloadBytes, 6) / 10000000.0;
        location.Add("lon", longitude);

        location.Add("alt", 0);

        telemetryEvent.Add("DeviceLocation", location);

        UInt32 packetimestamp = BitConverter.ToUInt32(payloadBytes, 10);

        DateTime fixAtUtc = DateTime.UnixEpoch.AddSeconds(packetimestamp);

        telemetryEvent.Add("FixAtUtc", fixAtUtc);

        properties.Add("iothub-creation-time-utc", fixAtUtc.ToString("s", CultureInfo.InvariantCulture));

        return telemetryEvent;
    }
}

When writing payload formatters, the Visual Studio 2022 syntax highlighting is really useful for spotting syntax errors and with the “Downlink Payload Formatter Test Harness” application payload formatters can be executed and debugged before deployment with Azure Storage Explorer.

private static void ApplicationCore(CommandLineOptions options)
{
    Dictionary<string, string> properties = new Dictionary<string, string>();

    Console.WriteLine($"Uplink formatter file:{options.FormatterPath}");

    PayloadFormatter.IFormatterUplink evalulatorUplink;
    try
    {
        evalulatorUplink = CSScript.Evaluator.LoadFile<PayloadFormatter.IFormatterUplink>(options.FormatterPath);
    }
    catch (CSScriptLib.CompilerException cex)
    {
        Console.Write($"Loading or compiling file:{options.FormatterPath} failed Exception:{cex}");
        return;
    }

    byte[] payloadBytes;
    try
    {
        payloadBytes = Convert.FromHexString(options.PayloadHex);
    }
    catch (FormatException fex)
    {
        Console.WriteLine("Convert.FromHexString failed:{0}", fex.Message);
        return;
    }

    DateTime timeStamp;
    if (options.TimeStamp.HasValue)
    {
        timeStamp = options.TimeStamp.Value;
    }
    else
    {
        timeStamp = DateTime.UtcNow;
    }

    JObject telemetryEvent;

    try
    {
        telemetryEvent = evalulatorUplink.Evaluate(properties, options.Application, options.TerminalId, timeStamp, payloadBytes);
    }
    catch (Exception ex)
    {
        Console.WriteLine($"evalulatorUplink.Evaluate failed Exception:{ex}");
        return;
    }

    telemetryEvent.TryAdd("Application", options.Application);
    telemetryEvent.TryAdd("TerminalId", options.TerminalId);
    if ( options.TimeStamp.HasValue)
    {
        telemetryEvent.TryAdd("TimeStamp", options.TimeStamp.Value.ToString("s", CultureInfo.InvariantCulture));
    }
    telemetryEvent.TryAdd("DataLength", payloadBytes.Length);
    telemetryEvent.TryAdd("Data", Convert.ToHexString( payloadBytes));

    Console.WriteLine("Properties:");
    foreach (var property in properties)
    {
        Console.WriteLine($"{property.Key}:{property.Value}");
    }
    Console.WriteLine("");

    Console.WriteLine("JSON Telemetry event payload");
    Console.WriteLine(telemetryEvent.ToString(Formatting.Indented));
}

-f C:\Users\…\PayloadFormatters\Uplink\tracker.cs -t 0088812345 -a Tracker -h 3800bd9812e6fed5e066bd8e0c65cccccccccccc

The myriota uplink packet payload are only 20 bytes long (40 Hex characters) which can be copied n paste from the uplink queue messages.

Myriota Connector – Uplink Payload formatters revisited

The myriota Azure IoT Hub Cloud Identity Translation Gateway payload formatters use compiled C# code to convert uplink packet payloads to JSON.

namespace PayloadFormattercode
{
    using System.Collections.Generic;

    using Newtonsoft.Json.Linq;

    public interface IFormatterUplink
    {
        public JObject Evaluate(IDictionary<string, string> properties, string application, string terminalId, DateTime timestamp, JObject payloadJson, string payloadText, byte[] payloadBytes);
    }
..
}

The myriota uplink packet payload is only 20 bytes long so it is very unlikely that the payloadText and payloadJSON parameters would ever be populated so I removed them from the interface. The uplink message handler interface has been updated and the code to convert (if possible) the payload bytes to text and then to JSON deleted.

namespace PayloadFormatter
{
    using System.Collections.Generic;

    using Newtonsoft.Json.Linq;

    public interface IFormatterUplink
    {
        public JObject Evaluate(IDictionary<string, string> properties, string application, string terminalId, DateTime timestamp, byte[] payloadBytes);
    }
...
}

All of the sample payload formatters have been updated to reflect the updated parameters. The sample Tracker.cs payload formatter unpacks a message from Myriota Dev Kit running the Tracker sample and returns an Azure IoT Central compatible location telemetry payload.

/*
myriota tracker payload format

typedef struct {
  uint16_t sequence_number;
  int32_t latitude;   // scaled by 1e7, e.g. -891234567 (south 89.1234567)
  int32_t longitude;  // scaled by 1e7, e.g. 1791234567 (east 179.1234567)
  uint32_t time;      // epoch timestamp of last fix
} __attribute__((packed)) tracker_message; 

*/ 
using System;
using System.Collections.Generic;
using System.Globalization;

using Newtonsoft.Json;
using Newtonsoft.Json.Linq;


public class FormatterUplink : PayloadFormatter.IFormatterUplink
{
    public JObject Evaluate(IDictionary<string, string> properties, string application, string terminalId, DateTime timestamp, byte[] payloadBytes)
    {
        JObject telemetryEvent = new JObject();

        if (payloadBytes is null)
        {
            return telemetryEvent;
        }

        telemetryEvent.Add("SequenceNumber", BitConverter.ToUInt16(payloadBytes));

        JObject location = new JObject();

        double latitude = BitConverter.ToInt32(payloadBytes, 2) / 10000000.0;
        location.Add("lat", latitude);

        double longitude = BitConverter.ToInt32(payloadBytes, 6) / 10000000.0;
        location.Add("lon", longitude);

        location.Add("alt", 0);

        telemetryEvent.Add("DeviceLocation", location);

        UInt32 packetimestamp = BitConverter.ToUInt32(payloadBytes, 10);

        DateTime fixAtUtc = DateTime.UnixEpoch.AddSeconds(packetimestamp);

        telemetryEvent.Add("FixAtUtc", fixAtUtc);

        properties.Add("iothub-creation-time-utc", fixAtUtc.ToString("s", CultureInfo.InvariantCulture));

        return telemetryEvent;
    }
}

If a message payload is text or JSON it can still be converted in the payload formatter.

Myriota Connector – Azure IoT Hub DTDL Support

The Myriota connector supports the use of Digital Twin Definition Language(DTDL) for Azure IoT Hub Connection Strings and the Azure IoT Hub Device Provisioning Service(DPS).

{
  "ConnectionStrings": {
    "ApplicationInsights": "...",
    "UplinkQueueStorage": "...",
    "PayloadFormattersStorage": "..."
  },
  "AzureIoT": {
   ...
 "ApplicationToDtdlModelIdMapping": {
   "tracker": "dtmi:myriotaconnector:Tracker_2lb;1",
     }
  }
 ...    
}

The Digital Twin Definition Language(DTDL) configuration used when a device is provisioned or when it connects is determined by the payload application which is based on the Myriota Destination endpoint.

The Azure Function Configuration of Application to DTDL Model ID

BEWARE – They application in ApplicationToDtdlModelIdMapping is case sensitive!

Azure IoT Central Device Template Configuration

I used Azure IoT Central Device Template functionality to create my Azure Digital Twin definitions.

Azure IoT Hub Device Connection String

The DeviceClient CreateFromConnectionString method has an optional ClientOptions parameter which specifies the DTLDL model ID for the duration of the connection.

private async Task<DeviceClient> AzureIoTHubDeviceConnectionStringConnectAsync(string terminalId, string application, object context)
{
    DeviceClient deviceClient;

    if (_azureIoTSettings.ApplicationToDtdlModelIdMapping.TryGetValue(application, out string? modelId))
    {
        ClientOptions clientOptions = new ClientOptions()
        {
            ModelId = modelId
        };

        deviceClient = DeviceClient.CreateFromConnectionString(_azureIoTSettings.AzureIoTHub.ConnectionString, terminalId, TransportSettings, clientOptions);
    }
    else
    { 
        deviceClient = DeviceClient.CreateFromConnectionString(_azureIoTSettings.AzureIoTHub.ConnectionString, terminalId, TransportSettings);
    }

    await deviceClient.OpenAsync();

    return deviceClient;
}
Azure IoT Explorer Telemetry message with DTDL Model ID

Azure IoT Hub Device Provisioning Service

The ProvisioningDeviceClient RegisterAsync method has an optional ProvisionRegistrationAdditionalData parameter. The PnpConnection CreateDpsPayload is used to generate the JsonData property which specifies the DTLDL model ID used when the device is initially provisioned.

private async Task<DeviceClient> AzureIoTHubDeviceProvisioningServiceConnectAsync(string terminalId, string application, object context)
{
    DeviceClient deviceClient;

    string deviceKey;
    using (var hmac = new HMACSHA256(Convert.FromBase64String(_azureIoTSettings.AzureIoTHub.DeviceProvisioningService.GroupEnrollmentKey)))
    {
        deviceKey = Convert.ToBase64String(hmac.ComputeHash(Encoding.UTF8.GetBytes(terminalId)));
    }

    using (var securityProvider = new SecurityProviderSymmetricKey(terminalId, deviceKey, null))
    {
        using (var transport = new ProvisioningTransportHandlerAmqp(TransportFallbackType.TcpOnly))
        {
            DeviceRegistrationResult result;

            ProvisioningDeviceClient provClient = ProvisioningDeviceClient.Create(
                _azureIoTSettings.AzureIoTHub.DeviceProvisioningService.GlobalDeviceEndpoint,
                _azureIoTSettings.AzureIoTHub.DeviceProvisioningService.IdScope,
                securityProvider,
            transport);

            if (_azureIoTSettings.ApplicationToDtdlModelIdMapping.TryGetValue(application, out string? modelId))
            {
                ClientOptions clientOptions = new ClientOptions()
                {
                    ModelId = modelId
                };

                ProvisioningRegistrationAdditionalData provisioningRegistrationAdditionalData = new ProvisioningRegistrationAdditionalData()
                {
                    JsonData = PnpConvention.CreateDpsPayload(modelId)
                };
                result = await provClient.RegisterAsync(provisioningRegistrationAdditionalData);
            }
            else
            {
                result = await provClient.RegisterAsync();
            }
  
            if (result.Status != ProvisioningRegistrationStatusType.Assigned)
            {
                _logger.LogWarning("Uplink-DeviceID:{0} RegisterAsync status:{1} failed ", terminalId, result.Status);

                throw new ApplicationException($"Uplink-DeviceID:{0} RegisterAsync status:{1} failed");
            }

            IAuthenticationMethod authentication = new DeviceAuthenticationWithRegistrySymmetricKey(result.DeviceId, (securityProvider as SecurityProviderSymmetricKey).GetPrimaryKey());

            deviceClient = DeviceClient.Create(result.AssignedHub, authentication, TransportSettings);
        }
    }

    await deviceClient.OpenAsync();

    return deviceClient;
}
Azure IoT Central Device Connection Group configuration

An Azure IoT Central Device connection groups can be configured to “automagically” provision devices.

Myriota Connector – Azure IoT Hub Connectivity

The Myriota connector supports the use of Azure IoT Hub Connection Strings and the Azure IoT Hub Device Provisioning Service(DPS) for device management. I use Alastair Crabtree’s LazyCache to store Azure IoT Hub connections which are opened the first time they are used.

 public async Task<DeviceClient> GetOrAddAsync(string terminalId, object context)
 {
     DeviceClient deviceClient;

     switch (_azureIoTSettings.AzureIoTHub.ConnectionType)
     {
         case Models.AzureIotHubConnectionType.DeviceConnectionString:
             deviceClient = await _azuredeviceClientCache.GetOrAddAsync(terminalId, (ICacheEntry x) => AzureIoTHubDeviceConnectionStringConnectAsync(terminalId, context));
             break;
         case Models.AzureIotHubConnectionType.DeviceProvisioningService:
             deviceClient = await _azuredeviceClientCache.GetOrAddAsync(terminalId, (ICacheEntry x) => AzureIoTHubDeviceProvisioningServiceConnectAsync(terminalId, context));
             break;
         default:
             _logger.LogError("Uplink- Azure IoT Hub ConnectionType unknown {0}", _azureIoTSettings.AzureIoTHub.ConnectionType);

             throw new NotImplementedException("AzureIoT Hub unsupported ConnectionType");
     }

     return deviceClient;
 }

The IAzureDeviceClientCache.GetOrAddAsync method returns an open Azure IoT Hub DeviceClient connection or uses the method specified in the application configuration.

Azure IoT Hub Device Connection String

The Azure IoT Hub delegate uses a Device Connection String which is retrieved from the application configuration.

{
  "ConnectionStrings": {
    "ApplicationInsights": "...",
    "UplinkQueueStorage": "...",
    "PayloadFormattersStorage": "..."
  },
  "AzureIoT": {
    "AzureIoTHub": {
      "ConnectionType": "DeviceConnectionString",
      "connectionString": "HostName=....azure-devices.net;SharedAccessKeyName=device;SharedAccessKey=...",
        }
   }
 ...    
}
Azure Function with IoT Hub Device connection string configuration
private async Task<DeviceClient> AzureIoTHubDeviceConnectionStringConnectAsync(string terminalId, object context)
{
    DeviceClient deviceClient = DeviceClient.CreateFromConnectionString(_azureIoTSettings.AzureIoTHub.ConnectionString, terminalId, TransportSettings);

    await deviceClient.OpenAsync();

    return deviceClient;
 }
Azure IoT Hub Device Shared Access Policy for Device Connection String

One of my customers uses an Azure Logic Application to manage Myriota and Azure IoT Connector configuration.

Azure IoT Hub manual Device configuration

Azure IoT Hub Device Provisioning Service

The Azure IoT Hub Device Provisioning Service(DPS) delegate uses Symmetric Key Attestation with the Global Device Endpoint, ID Scope and Group Enrollment Key retrieved from the application configuration.

{
  "ConnectionStrings": {
    "ApplicationInsights": "...",
    "UplinkQueueStorage": "...",
    "PayloadFormattersStorage": "..."
  },
  "AzureIoT": {
      "ConnectionType": "DeviceProvisioningService",
      "DeviceProvisioningServiceIoTHub": {
        "GlobalDeviceEndpoint": "global.azure-devices-provisioning.net",
        "IDScope": ".....",
        "GroupEnrollmentKey": "...."
      }
   }
}
Azure IoT Function with Azure IoT Hub Device Provisioning Service(DPS) configuration

Symmetric key attestation with the Azure IoT Hub Device Provisioning Service(DPS) is performed using the same security tokens supported by Azure IoT Hubs to securely connect devices. The symmetric key of an enrollment group isn’t used directly by devices in the provisioning process. Instead, devices that provision through an enrollment group do so using a derived device key.

private async Task<DeviceClient> AzureIoTHubDeviceProvisioningServiceConnectAsync(string terminalId, object context)
{
    DeviceClient deviceClient;

    string deviceKey;
    using (var hmac = new HMACSHA256(Convert.FromBase64String(_azureIoTSettings.AzureIoTHub.DeviceProvisioningService.GroupEnrollmentKey)))
    {
        deviceKey = Convert.ToBase64String(hmac.ComputeHash(Encoding.UTF8.GetBytes(terminalId)));
    }

    using (var securityProvider = new SecurityProviderSymmetricKey(terminalId, deviceKey, null))
    {
        using (var transport = new ProvisioningTransportHandlerAmqp(TransportFallbackType.TcpOnly))
        {
            DeviceRegistrationResult result;

            ProvisioningDeviceClient provClient = ProvisioningDeviceClient.Create(
                _azureIoTSettings.AzureIoTHub.DeviceProvisioningService.GlobalDeviceEndpoint,
                _azureIoTSettings.AzureIoTHub.DeviceProvisioningService.IdScope,
                securityProvider,
                transport);

            result = await provClient.RegisterAsync();
  
            if (result.Status != ProvisioningRegistrationStatusType.Assigned)
            {
                _logger.LogWarning("Uplink-DeviceID:{0} RegisterAsync status:{1} failed ", terminalId, result.Status);

                throw new ApplicationException($"Uplink-DeviceID:{0} RegisterAsync status:{1} failed");
            }

            IAuthenticationMethod authentication = new DeviceAuthenticationWithRegistrySymmetricKey(result.DeviceId, (securityProvider as SecurityProviderSymmetricKey).GetPrimaryKey());

            deviceClient = DeviceClient.Create(result.AssignedHub, authentication, TransportSettings);
        }
    }

    await deviceClient.OpenAsync();

    return deviceClient;
}

The derived device key is a hash of the device’s registration ID and is computed using the symmetric key of the enrollment group. The device can then use its derived device key to sign the SAS token it uses to register with DPS.

Azure Device Provisioning Service Adding Enrollment Group Attestation
Azure Device Provisioning Service Add Enrollment Group IoT Hub(s) selection.
Azure Device Provisioning Service Manager Enrollments

For initial development and testing I ran the function application in the desktop emulator and simulated Myriota Device Manager webhook calls with Azure Storage Explorer and modified sample payloads.

Azure Storage Explorer Storage Account Queued Messages

I then used Azure IoT Explorer to configure devices, view uplink traffic etc.

Azure IoT Explorer Devices

When I connected to my Azure IoT Hub shortly after starting the Myriota Azure IoT Connector Function my test devices started connecting as messages arrived.

Azure IoT Explorer Device Telemetry

I then deployed my function to Azure and configured the Azure IoT Hub connection string, Azure Application Insights connection string etc.

Azure Portal Myriota Resource Group
Azure Portal Myriota IoT Hub Metrics

There was often a significant delay for the Device Status to update. which shouldn’t be a problem.

Myriota – Uplink Payload formatters and caching

My myriota Azure IoT Hub Cloud Identity Translation Gateway payload formatters uses C# code (compiled with CS-Script cached with Alastair Crabtrees’s LazyCache) to convert uplink packet payloads to JSON.

I have found that putting the C/C++ structure for the uplink payload at the top of the convertor really helpful.

/*
myriota tracker payload format

typedef struct {
  uint16_t sequence_number;
  int32_t latitude;   // scaled by 1e7, e.g. -891234567 (south 89.1234567)
  int32_t longitude;  // scaled by 1e7, e.g. 1791234567 (east 179.1234567)
  uint32_t time;      // epoch timestamp of last fix
} __attribute__((packed)) tracker_message; 

*/ 
using System;
using System.Collections.Generic;

using Newtonsoft.Json;
using Newtonsoft.Json.Linq;


public class FormatterUplink : PayloadFormatter.IFormatterUplink
{
    public JObject Evaluate(IDictionary<string, string> properties, string application, string terminalId, DateTime timestamp, JObject payloadJson, string payloadText, byte[] payloadBytes)
    {
        JObject telemetryEvent = new JObject();

        telemetryEvent.Add("SequenceNumber", BitConverter.ToUInt16(payloadBytes));

        double latitude = BitConverter.ToInt32(payloadBytes, 2) / 10000000.0;
        telemetryEvent.Add("Latitude", latitude);

        double longitude = BitConverter.ToInt32(payloadBytes, 6) / 10000000.0;
        telemetryEvent.Add("Longitude", longitude);

        UInt32 packetimestamp = BitConverter.ToUInt32(payloadBytes, 10);
        DateTime lastFix = DateTime.UnixEpoch.AddSeconds(packetimestamp);

       properties.Add("iothub-creation-time-utc", lastFix .ToString("s", CultureInfo.InvariantCulture));

       return telemetryEvent;
    }
}

The sample Tracker.cs payload formatter unpacks a message from Myriota Dev Kit running the Tracker sample and returns an Azure IoT Central compatible location telemetry payload.

BEWARE : I think the Azure IoT Central Position lat, lon + alt values might be case sensitive.

Azure IoT Explorer displaying Tracker.cs payload formatter output

The identity payload formatter to use is configured as part of the Destination webhook Uniform Resource Locator (URL).

Myriota Destination configuration application name URL configuration
namespace devMobile.IoT.MyriotaAzureIoTConnector.Connector.Models
{
    public class UplinkPayloadQueueDto
    {
        public string Application { get; set; }
        public string EndpointRef { get; set; }
        public DateTime PayloadReceivedAtUtc { get; set; }
        public DateTime PayloadArrivedAtUtc { get; set; }
        public QueueData Data { get; set; }
        public string Id { get; set; }
        public Uri CertificateUrl { get; set; }
        public string Signature { get; set; }
    }

    public class QueueData
    {
        public List<QueuePacket> Packets { get; set; }
    }

    public class QueuePacket
    {
        public string TerminalId { get; set; }

        public DateTime Timestamp { get; set; }

        public string Value { get; set; }
    }
}

A pair of Azure Blob Storage containers are used to store the uplink/downlink (coming soon) formatter files. The compiled payload formatters are cached with Uplink/Downlink + Application (from the UplinkPayloadQueueDto) as the key.

Azure IoT Storage Explorer uplink payload formatters

The default uplink and downlink formatters used when there is no payload formatter for “Application” are configured in the application settings.