Myriota Connector – Azure IoT Hub Downlink refactoring

The myriota Azure IoT Hub Cloud Identity Translation Gateway downlink message handler was getting a bit “chunky”. So, I started by stripping the code back to the absolute bare minimum that would “work”.

public async Task AzureIoTHubMessageHandler(Message message, object userContext)
{
   Models.DeviceConnectionContext context = (Models.DeviceConnectionContext)userContext;

   _logger.LogInformation("Downlink- IoT Hub TerminalId:{termimalId} LockToken:{LockToken}", context.TerminalId, message.LockToken);

   try
   {

      await context.DeviceClient.CompleteAsync(message);
   }
   catch (Exception ex)
   {
      await context.DeviceClient.RejectAsync(message);

      _logger.LogError(ex, "Downlink- IoT Hub TerminalID:{terminalId} LockToken:{LockToken} failed", context.TerminalId, message.LockToken );
   }
}

Then the code was then extended so it worked for “sunny day” scenarios. The payload formatter was successfully retrieved from the configured Azure Storage Blob, CS-Script successfully compiled the payload formatter, the message payload was valid text, the message text was valid Javascript Object Notation(JSON), the JSON was successfully processed by the compiled payload formatter, and finally the payload was accepted by the Myriota Cloud API.

public async Task AzureIoTHubMessageHandler(Message message, object userContext)
{
   Models.DeviceConnectionContext context = (Models.DeviceConnectionContext)userContext;

   _logger.LogInformation("Downlink- IoT Hub TerminalId:{termimalId} LockToken:{LockToken}", context.TerminalId, message.LockToken);

   string payloadFormatter;

   // Use default formatter and replace with message specific formatter if configured.
   if (!message.Properties.TryGetValue(Constants.IoTHubDownlinkPayloadFormatterProperty, out payloadFormatter) || string.IsNullOrEmpty(payloadFormatter))
   {
      payloadFormatter = context.PayloadFormatterDownlink;
   }

   _logger.LogInformation("Downlink- IoT Hub TerminalID:{termimalId} LockToken:{LockToken} Payload formatter:{payloadFormatter} ", context.TerminalId, message.LockToken, payloadFormatter);

   try
   {
      IFormatterDownlink payloadFormatterDownlink = await _payloadFormatterCache.DownlinkGetAsync(payloadFormatter);

      byte[] messageBytes = message.GetBytes();

      string messageText = Encoding.UTF8.GetString(messageBytes);

      JObject messageJson = JObject.Parse(messageText);

      byte[] payloadBytes = payloadFormatterDownlink.Evaluate(message.Properties, context.TerminalId, messageJson, messageBytes);

      string messageId = await _myriotaModuleAPI.SendAsync(context.TerminalId, payloadBytes);

      _logger.LogInformation("Downlink- IoT Hub TerminalID:{terminalId} LockToken:{LockToken} MessageID:{messageId} sent", context.TerminalId, message.LockToken, messageId);
      
      await context.DeviceClient.CompleteAsync(message);
   }
   catch (Exception ex)
   {
      await context.DeviceClient.RejectAsync(message);

      _logger.LogError(ex, "Downlink- IoT Hub TerminalID:{terminalId} LockToken:{LockToken} failed", context.TerminalId, message.LockToken);
   }
}

Then code was then extended to handle message payloads which were problematic but not “failures”

public async Task AzureIoTHubMessageHandler(Message message, object userContext)
{
   Models.DeviceConnectionContext context = (Models.DeviceConnectionContext)userContext;

   _logger.LogInformation("Downlink- IoT Hub TerminalId:{termimalId} LockToken:{LockToken}", context.TerminalId, message.LockToken);

   string payloadFormatter;

   // Use default formatter and replace with message specific formatter if configured.
   if (!message.Properties.TryGetValue(Constants.IoTHubDownlinkPayloadFormatterProperty, out payloadFormatter) || string.IsNullOrEmpty(payloadFormatter))
   {
      payloadFormatter = context.PayloadFormatterDownlink;
   }

   _logger.LogInformation("Downlink- IoT Hub TerminalID:{termimalId} LockToken:{LockToken} Payload formatter:{payloadFormatter} ", context.TerminalId, message.LockToken, payloadFormatter);

   try
   {
      // If this fails payload broken
      byte[] messageBytes = message.GetBytes();

      string messageText = string.Empty;
      JObject messageJson = null;

      // These will fail for some messages, gets bytes only
      try
      {
         messageText = Encoding.UTF8.GetString(messageBytes);

         messageJson = JObject.Parse(messageText);
      }
      catch (ArgumentException aex)
      {
         _logger.LogInformation("Downlink-DeviceID:{DeviceId} LockToken:{LockToken} messageBytes:{2} not valid Text", context.TerminalId, message.LockToken, BitConverter.ToString(messageBytes));
      }
      catch( JsonReaderException jex)
      {
         _logger.LogInformation("Downlink-DeviceID:{DeviceId} LockToken:{LockToken} messageText:{2} not valid json", context.TerminalId, message.LockToken, BitConverter.ToString(messageBytes));
      }

      IFormatterDownlink payloadFormatterDownlink = await _payloadFormatterCache.DownlinkGetAsync(payloadFormatter);

      byte[] payloadBytes = payloadFormatterDownlink.Evaluate(message.Properties, context.TerminalId, messageJson, messageBytes);

      string messageId = await _myriotaModuleAPI.SendAsync(context.TerminalId, payloadBytes);

      await context.DeviceClient.CompleteAsync(message);

      _logger.LogInformation("Downlink- IoT Hub TerminalID:{terminalId} LockToken:{LockToken} MessageID:{messageId} sent", context.TerminalId, message.LockToken, messageId);
   }
   catch (Exception ex)
   {
      await context.DeviceClient.RejectAsync(message);

      _logger.LogError(ex, "Downlink- IoT Hub TerminalID:{terminalId} LockToken:{LockToken} failed", context.TerminalId, message.LockToken);
   }
   finally
   {
      message.Dispose();
   }
}

Then finally the code was modified to gracefully handle broken payloads returned by the payload formatter evaluation, some comments were added, and the non-managed resources of the DeviceClient.Message disposed.

public async Task AzureIoTHubMessageHandler(Message message, object userContext)
{
   Models.DeviceConnectionContext context = (Models.DeviceConnectionContext)userContext;

   _logger.LogInformation("Downlink- IoT Hub TerminalId:{termimalId} LockToken:{LockToken}", context.TerminalId, message.LockToken);

   // Use default formatter and replace with message specific formatter if configured.
   string payloadFormatter;
   if (!message.Properties.TryGetValue(Constants.IoTHubDownlinkPayloadFormatterProperty, out payloadFormatter) || string.IsNullOrEmpty(payloadFormatter))
   {
      payloadFormatter = context.PayloadFormatterDownlink;
   }

   _logger.LogInformation("Downlink- IoT Hub TerminalID:{termimalId} LockToken:{LockToken} Payload formatter:{payloadFormatter} ", context.TerminalId, message.LockToken, payloadFormatter);

   try
   {
      // If this fails payload broken
      byte[] messageBytes = message.GetBytes();

      // This will fail for some messages, payload formatter gets bytes only
      string messageText = string.Empty;
      try
      {
         messageText = Encoding.UTF8.GetString(messageBytes);
      }
      catch (ArgumentException aex)
      {
         _logger.LogInformation("Downlink- IoT Hub TerminalID:{TerminalId} LockToken:{LockToken} messageBytes:{2} not valid Text", context.TerminalId, message.LockToken, BitConverter.ToString(messageBytes));
      }

      // This will fail for some messages, payload formatter gets bytes only
      JObject? messageJson = null;
      try
      {
         messageJson = JObject.Parse(messageText);
      }
      catch ( JsonReaderException jex)
      {
         _logger.LogInformation("Downlink- IoT Hub TerminalID:{TerminalId} LockToken:{LockToken} messageText:{2} not valid json", context.TerminalId, message.LockToken, BitConverter.ToString(messageBytes));
      }

      // This shouldn't fail, but it could for lots of diffent reasons, invalid path to blob, syntax error, interface broken etc.
      IFormatterDownlink payloadFormatterDownlink = await _payloadFormatterCache.DownlinkGetAsync(payloadFormatter);

      // This shouldn't fail, but it could for lots of different reasons, null references, divide by zero, out of range etc.
      byte[] payloadBytes = payloadFormatterDownlink.Evaluate(message.Properties, context.TerminalId, messageJson, messageBytes);

      // Validate payload before calling Myriota control message send API method
      if (payloadBytes is null)
      {
         _logger.LogWarning("Downlink- IoT Hub TerminalID:{terminalId} LockToken:{LockToken} payload formatter:{payloadFormatter} Evaluate returned null", context.TerminalId, message.LockToken, payloadFormatter);

         await context.DeviceClient.RejectAsync(message);

         return;
      }

      if ((payloadBytes.Length < Constants.DownlinkPayloadMinimumLength) || (payloadBytes.Length > Constants.DownlinkPayloadMaximumLength))
      {
         _logger.LogWarning("Downlink- IoT Hub TerminalID:{terminalId} LockToken:{LockToken} payloadData length:{Length} invalid must be {DownlinkPayloadMinimumLength} to {DownlinkPayloadMaximumLength} bytes", context.TerminalId, message.LockToken, payloadBytes.Length, Constants.DownlinkPayloadMinimumLength, Constants.DownlinkPayloadMaximumLength);

         await context.DeviceClient.RejectAsync(message);

         return;
      }

      // This shouldn't fail, but it could few reasons mainly connectivity & message queuing etc.
      _logger.LogInformation("Downlink- IoT Hub TerminalID:{TerminalId} LockToken:{LockToken} PayloadData:{payloadData} Length:{Length} sending", context.TerminalId, message.LockToken, Convert.ToHexString(payloadBytes), payloadBytes.Length);

      // Finally send the message using Myriota API
      string messageId = await _myriotaModuleAPI.SendAsync(context.TerminalId, payloadBytes);

      _logger.LogInformation("Downlink- IoT Hub TerminalID:{TerminalId} LockToken:{LockToken} MessageID:{messageId} sent", context.TerminalId, message.LockToken, messageId);

      await context.DeviceClient.CompleteAsync(message);

      _logger.LogInformation("Downlink- IoT Hub TerminalID:{terminalId} LockToken:{LockToken} MessageID:{messageId} sent", context.TerminalId, message.LockToken, messageId);
   }
   catch (Exception ex)
   {
      await context.DeviceClient.RejectAsync(message);

      _logger.LogError(ex, "Downlink- IoT Hub TerminalID:{terminalId} LockToken:{LockToken} failed", context.TerminalId, message.LockToken);
   }
   finally
   {
      // Mop up the non managed resources of message
      message.Dispose();
   }
}

As the code was being extended, I tested different failures to make sure the Application Insights logging messages were useful. The first failure mode tested was the Azure Storage Blob, path was broken or the blob was missing.

Visual Studio 2022 Debugger blob not found exception message
Application Insights blob not found exception logging

Then a series of “broken” payload formatters were created to test CS-Script compile time failures.

// Broken interface implementation
using System;
using System.Collections.Generic;

using Newtonsoft.Json;
using Newtonsoft.Json.Linq;

public class FormatterDownlink : PayloadFormatter.IFormatterDownlink
{
   public byte[] Evaluate(IDictionary<string, string> properties, string terminalId, byte[] payloadBytes)
   {
      return payloadBytes;
   }
}
Visual Studio 2022 Debugger interface implementation broken exception message
Application Insights interface implementation broken exception logging
// Broken syntax
using System;
using System.Collections.Generic;

using Newtonsoft.Json;
using Newtonsoft.Json.Linq;

public class FormatterDownlink : PayloadFormatter.IFormatterDownlink
{
   public byte[] Evaluate(IDictionary<string, string> properties, string terminalId, JObject payloadJson, byte[] payloadBytes)
   {
      return payloadBytes
   }
}
Visual Studio 2022 Debugger syntax error exception message
Application Insights syntax error exception logging
// Runtime error
using System;
using System.Collections.Generic;

using Newtonsoft.Json;
using Newtonsoft.Json.Linq;

public class FormatterDownlink : PayloadFormatter.IFormatterDownlink
{
   public byte[] Evaluate(IDictionary<string, string> properties, string terminalId, JObject payloadJson, byte[] payloadBytes)
   {
      payloadBytes[20] = 0;

      return payloadBytes;
   }
}
Visual Studio 2022 Debugger runtime error exception message
Application Insights syntax error exception logging

Invalid Myriota Cloud API send control message payload

Visual Studio 2022 Debugger Myriota send failure exception message
Application Insights Myriota send failure exception logging

The final test was sending a downlink message which was valid JSON, contained the correct information for the specified payload formatter and was successfully processed by the Myriota Cloud API.

Azure IoT Explorer with valid JSON payload and payload formatter name
Azure function output of successful downlink message
Application Insights successful downlink message logging

After a couple of hours the I think the downlink messageHandler implementation was significantly improved.

Myriota Connector – Payload formatters revisited again

The myriota Azure IoT Hub Cloud Identity Translation Gateway payload formatters use compiled C# code to convert uplink/downlink packet payloads to JSON/byte array. While trying out different formatters I had “compile” and “evaluation” errors which would have been a lot easier to debug if there was more diagnostic information in the Azure Application Insights logging.

namespace PayloadFormatter // Additional namespace for shortening interface when usage in formatter code
{
    using System.Collections.Generic;

    using Newtonsoft.Json.Linq;

    public interface IFormatterUplink
    {
        public JObject Evaluate(IDictionary<string, string> properties, string terminalId, DateTime timestamp, byte[] payloadBytes);
    }

    public interface IFormatterDownlink
    {
        public byte[] Evaluate(IDictionary<string, string> properties, string terminalId, JObject? payloadJson, byte[] payloadBytes);
    }
}

An uplink payload formatter is loaded from Azure Storage Blob, compiled with Oleg Shilo’s CS-Script then cached in memory with Alastair Crabtree’s LazyCache.

// Get the payload formatter from Azure Storage container, compile, and then cache binary.
IFormatterUplink formatterUplink;

try
{
   formatterUplink = await _payloadFormatterCache.UplinkGetAsync(context.PayloadFormatterUplink, cancellationToken);
}
catch (Azure.RequestFailedException aex)
{
   _logger.LogError(aex, "Uplink- PayloadID:{0} payload formatter load failed", payload.Id);

   return payload;
}
catch (NullReferenceException nex)
{
   _logger.LogError(nex, "Uplink- PayloadID:{id} formatter:{formatter} compilation failed missing interface", payload.Id, context.PayloadFormatterUplink);

   return payload;
}
catch (CSScriptLib.CompilerException cex)
{
   _logger.LogError(cex, "Uplink- PayloadID:{id} formatter:{formatter} compiler failed", payload.Id, context.PayloadFormatterUplink);

   return payload;
}
catch (Exception ex)
{
   _logger.LogError(ex, "Uplink- PayloadID:{id} formatter:{formatter} compilation failed", payload.Id, context.PayloadFormatterUplink);

   return payload;
}

If the Azure Storage blob is missing or the payload formatter code incorrect an exception is thrown. I added specialised exception handers for Azure.RequestFailedException, NullReferenceException and CSScriptLib.CompilerException to add more detail to the Azure Application Insights logging.

// Process the payload with configured formatter
Dictionary<string, string> properties = new Dictionary<string, string>();
JObject telemetryEvent;

try
{
   telemetryEvent = formatterUplink.Evaluate(properties, packet.TerminalId, packet.Timestamp, payloadBytes);
}
catch (Exception ex)
{
   _logger.LogError(ex, "Uplink- PayloadId:{0} TerminalId:{1} Value:{2} Bytes:{3} payload formatter evaluate failed", payload.Id, packet.TerminalId, packet.Value, Convert.ToHexString(payloadBytes));

   return payload;
}

if (telemetryEvent is null)
{
   _logger.LogError("Uplink- PayloadId:{0} TerminalId:{1} Value:{2} Bytes:{3} payload formatter evaluate failed returned null", payload.Id, packet.TerminalId, packet.Value, Convert.ToHexString(payloadBytes));

   return payload;
}

The Evaluate method can return many different types of exception so in the initial version only the “generic” exception is caught and logged.

using System;
using System.Collections.Generic;

using Newtonsoft.Json;
using Newtonsoft.Json.Linq;

public class FormatterUplink : PayloadFormatter.IFormatterUplink
{
    public JObject Evaluate(IDictionary<string, string> properties, string terminalId, DateTime timestamp, byte[] payloadBytes)
    {
        JObject telemetryEvent = new JObject();

        telemetryEvent.Add("Bytes", BitConverter.ToString(payloadBytes));
        telemetryEvent.Add("Bytes", BitConverter.ToString(payloadBytes));

        return telemetryEvent;
    }
}

There are a number (which should grow over time) of test uplink/downlink payload formatters for testing different compile and execution failures.

Azure IoT Storage Explorer container with sample formatter blobs.

I used Azure Storage Explorer to upload my test payload formatters to the uplink/downlink Azure Storage containers.

Myriota Connector – Uplink Payload Formatters Test Harness

The myriota Azure IoT Hub Cloud Identity Translation Gateway payload formatters use compiled C# code to convert uplink packet payloads to JSON.

...
public class FormatterUplink : PayloadFormatter.IFormatterUplink
{
    public JObject Evaluate(IDictionary<string, string> properties, string application, string terminalId, DateTime timestamp, byte[] payloadBytes)
    {
        JObject telemetryEvent = new JObject();

        if (payloadBytes is null)
        {
            return telemetryEvent;
        }

        telemetryEvent.Add("SequenceNumber", BitConverter.ToUInt16(payloadBytes));

        JObject location = new JObject();

        double latitude = BitConverter.ToInt32(payloadBytes, 2) / 10000000.0;
        location.Add("lat", latitude);

        double longitude = BitConverter.ToInt32(payloadBytes, 6) / 10000000.0;
        location.Add("lon", longitude);

        location.Add("alt", 0);

        telemetryEvent.Add("DeviceLocation", location);

        UInt32 packetimestamp = BitConverter.ToUInt32(payloadBytes, 10);

        DateTime fixAtUtc = DateTime.UnixEpoch.AddSeconds(packetimestamp);

        telemetryEvent.Add("FixAtUtc", fixAtUtc);

        properties.Add("iothub-creation-time-utc", fixAtUtc.ToString("s", CultureInfo.InvariantCulture));

        return telemetryEvent;
    }
}

When writing payload formatters, the Visual Studio 2022 syntax highlighting is really useful for spotting syntax errors and with the “Downlink Payload Formatter Test Harness” application payload formatters can be executed and debugged before deployment with Azure Storage Explorer.

private static void ApplicationCore(CommandLineOptions options)
{
    Dictionary<string, string> properties = new Dictionary<string, string>();

    Console.WriteLine($"Uplink formatter file:{options.FormatterPath}");

    PayloadFormatter.IFormatterUplink evalulatorUplink;
    try
    {
        evalulatorUplink = CSScript.Evaluator.LoadFile<PayloadFormatter.IFormatterUplink>(options.FormatterPath);
    }
    catch (CSScriptLib.CompilerException cex)
    {
        Console.Write($"Loading or compiling file:{options.FormatterPath} failed Exception:{cex}");
        return;
    }

    byte[] payloadBytes;
    try
    {
        payloadBytes = Convert.FromHexString(options.PayloadHex);
    }
    catch (FormatException fex)
    {
        Console.WriteLine("Convert.FromHexString failed:{0}", fex.Message);
        return;
    }

    DateTime timeStamp;
    if (options.TimeStamp.HasValue)
    {
        timeStamp = options.TimeStamp.Value;
    }
    else
    {
        timeStamp = DateTime.UtcNow;
    }

    JObject telemetryEvent;

    try
    {
        telemetryEvent = evalulatorUplink.Evaluate(properties, options.Application, options.TerminalId, timeStamp, payloadBytes);
    }
    catch (Exception ex)
    {
        Console.WriteLine($"evalulatorUplink.Evaluate failed Exception:{ex}");
        return;
    }

    telemetryEvent.TryAdd("Application", options.Application);
    telemetryEvent.TryAdd("TerminalId", options.TerminalId);
    if ( options.TimeStamp.HasValue)
    {
        telemetryEvent.TryAdd("TimeStamp", options.TimeStamp.Value.ToString("s", CultureInfo.InvariantCulture));
    }
    telemetryEvent.TryAdd("DataLength", payloadBytes.Length);
    telemetryEvent.TryAdd("Data", Convert.ToHexString( payloadBytes));

    Console.WriteLine("Properties:");
    foreach (var property in properties)
    {
        Console.WriteLine($"{property.Key}:{property.Value}");
    }
    Console.WriteLine("");

    Console.WriteLine("JSON Telemetry event payload");
    Console.WriteLine(telemetryEvent.ToString(Formatting.Indented));
}

-f C:\Users\…\PayloadFormatters\Uplink\tracker.cs -t 0088812345 -a Tracker -h 3800bd9812e6fed5e066bd8e0c65cccccccccccc

The myriota uplink packet payload are only 20 bytes long (40 Hex characters) which can be copied n paste from the uplink queue messages.

Myriota Connector – Uplink Payload formatters revisited

The myriota Azure IoT Hub Cloud Identity Translation Gateway payload formatters use compiled C# code to convert uplink packet payloads to JSON.

namespace PayloadFormattercode
{
    using System.Collections.Generic;

    using Newtonsoft.Json.Linq;

    public interface IFormatterUplink
    {
        public JObject Evaluate(IDictionary<string, string> properties, string application, string terminalId, DateTime timestamp, JObject payloadJson, string payloadText, byte[] payloadBytes);
    }
..
}

The myriota uplink packet payload is only 20 bytes long so it is very unlikely that the payloadText and payloadJSON parameters would ever be populated so I removed them from the interface. The uplink message handler interface has been updated and the code to convert (if possible) the payload bytes to text and then to JSON deleted.

namespace PayloadFormatter
{
    using System.Collections.Generic;

    using Newtonsoft.Json.Linq;

    public interface IFormatterUplink
    {
        public JObject Evaluate(IDictionary<string, string> properties, string application, string terminalId, DateTime timestamp, byte[] payloadBytes);
    }
...
}

All of the sample payload formatters have been updated to reflect the updated parameters. The sample Tracker.cs payload formatter unpacks a message from Myriota Dev Kit running the Tracker sample and returns an Azure IoT Central compatible location telemetry payload.

/*
myriota tracker payload format

typedef struct {
  uint16_t sequence_number;
  int32_t latitude;   // scaled by 1e7, e.g. -891234567 (south 89.1234567)
  int32_t longitude;  // scaled by 1e7, e.g. 1791234567 (east 179.1234567)
  uint32_t time;      // epoch timestamp of last fix
} __attribute__((packed)) tracker_message; 

*/ 
using System;
using System.Collections.Generic;
using System.Globalization;

using Newtonsoft.Json;
using Newtonsoft.Json.Linq;


public class FormatterUplink : PayloadFormatter.IFormatterUplink
{
    public JObject Evaluate(IDictionary<string, string> properties, string application, string terminalId, DateTime timestamp, byte[] payloadBytes)
    {
        JObject telemetryEvent = new JObject();

        if (payloadBytes is null)
        {
            return telemetryEvent;
        }

        telemetryEvent.Add("SequenceNumber", BitConverter.ToUInt16(payloadBytes));

        JObject location = new JObject();

        double latitude = BitConverter.ToInt32(payloadBytes, 2) / 10000000.0;
        location.Add("lat", latitude);

        double longitude = BitConverter.ToInt32(payloadBytes, 6) / 10000000.0;
        location.Add("lon", longitude);

        location.Add("alt", 0);

        telemetryEvent.Add("DeviceLocation", location);

        UInt32 packetimestamp = BitConverter.ToUInt32(payloadBytes, 10);

        DateTime fixAtUtc = DateTime.UnixEpoch.AddSeconds(packetimestamp);

        telemetryEvent.Add("FixAtUtc", fixAtUtc);

        properties.Add("iothub-creation-time-utc", fixAtUtc.ToString("s", CultureInfo.InvariantCulture));

        return telemetryEvent;
    }
}

If a message payload is text or JSON it can still be converted in the payload formatter.

.NET Core web API + Dapper – Redis Cache

The IDistributedCache has Memory, SQL Server and Redis implementations so I wanted to explore how the Stack Exchange Redis library works. The ConnectionMultiplexer class in the Stack Exchange Redis library hides the details of managing connections to multiple Redis servers, connection timeouts etc. The object is fairly “chunky” so it should be initialized once and reused for the lifetime of the program.

public static void Main(string[] args)
{
    var builder = WebApplication.CreateBuilder(args);

    // Add services to the container.
    builder.Services.AddApplicationInsightsTelemetry();

    // Add services to the container.
    builder.Services.AddTransient<IDapperContext>(s => new DapperContext(builder.Configuration));

    builder.Services.AddControllers();

    builder.Services.AddSingleton<IConnectionMultiplexer>(s => ConnectionMultiplexer.Connect(builder.Configuration.GetConnectionString("Redis")));

    var app = builder.Build();

    // Configure the HTTP request pipeline.
    app.UseHttpsRedirection();
    app.MapControllers();

    app.Run();
}

I trialed the initial versions of my Redis project with Memurai on my development machine, then configured an Azure Cache for Redis. I then load tested the project with several Azure AppService client and there was a significant improvement in response time.

[ApiController]
[Route("api/[controller]")]
public class StockItemsController : ControllerBase
{
    private const int StockItemSearchMaximumRowsToReturn = 15;
    private readonly TimeSpan StockItemListExpiration = new TimeSpan(0, 5, 0);

    private const string sqlCommandText = @"SELECT [StockItemID] as ""ID"", [StockItemName] as ""Name"", [RecommendedRetailPrice], [TaxRate] FROM [Warehouse].[StockItems]";
    //private const string sqlCommandText = @"SELECT [StockItemID] as ""ID"", [StockItemName] as ""Name"", [RecommendedRetailPrice], [TaxRate] FROM [Warehouse].[StockItems]; WAITFOR DELAY '00:00:02'";

    private readonly ILogger<StockItemsController> logger;
    private readonly IDbConnection dbConnection;
    private readonly IDatabase redisCache;

    public StockItemsController(ILogger<StockItemsController> logger, IDapperContext dapperContext, IConnectionMultiplexer connectionMultiplexer)
    {
        this.logger = logger;
        this.dbConnection = dapperContext.ConnectionCreate();
        this.redisCache = connectionMultiplexer.GetDatabase();
    }

        [HttpGet]
    public async Task<ActionResult<IEnumerable<Model.StockItemListDtoV1>>> Get()
    {
        var cached = await redisCache.StringGetAsync("StockItems");
        if (cached.HasValue)
        {
            return Content(cached, "application/json");
        }

        var stockItems = await dbConnection.QueryWithRetryAsync<Model.StockItemListDtoV1>(sql: sqlCommandText, commandType: CommandType.Text);

#if SERIALISER_SOURCE_GENERATION
        string json = JsonSerializer.Serialize(stockItems, typeof(List<Model.StockItemListDtoV1>), Model.StockItemListDtoV1GenerationContext.Default);
#else
        string json = JsonSerializer.Serialize(stockItems);
#endif

        await redisCache.StringSetAsync("StockItems", json, expiry: StockItemListExpiration);

        return Content(json, "application/json");
    }

...

    [HttpDelete()]
    public async Task<ActionResult> ListCacheDelete()
    {
        await redisCache.KeyDeleteAsync("StockItems");

        logger.LogInformation("StockItems list removed");

        return this.Ok();
    }
}

Like Regular Expressions in .NET, the System.Test.Json object serialisations can be compiled to MSIL code instead of high-level internal instructions. This allows .NET’s just-in-time (JIT) compiler to convert the serialisation to native machine code for higher performance.

public class StockItemListDtoV1
{
    public int Id { get; set; }

    public string Name { get; set; }

    public decimal RecommendedRetailPrice { get; set; }

    public decimal TaxRate { get; set; }
}

[JsonSourceGenerationOptions(PropertyNamingPolicy = JsonKnownNamingPolicy.CamelCase)]
[JsonSerializable(typeof(List<StockItemListDtoV1>))]
public partial class StockItemListDtoV1GenerationContext : JsonSerializerContext
{
}

The cost of constructing the Serialiser may be higher, but the cost of performing serialisation with it is much smaller.

[HttpGet]
public async Task<ActionResult<IEnumerable<Model.StockItemListDtoV1>>> Get()
{
    var cached = await redisCache.StringGetAsync("StockItems");
    if (cached.HasValue)
    {
        return Content(cached, "application/json");
    }

    var stockItems = await dbConnection.QueryWithRetryAsync<Model.StockItemListDtoV1>(sql: sqlCommandText, commandType: CommandType.Text);

#if SERIALISER_SOURCE_GENERATION
    string json = JsonSerializer.Serialize(stockItems, typeof(List<Model.StockItemListDtoV1>), Model.StockItemListDtoV1GenerationContext.Default);
#else
    string json = JsonSerializer.Serialize(stockItems);
#endif

    await redisCache.StringSetAsync("StockItems", json, expiry: StockItemListExpiration);

    return Content(json, "application/json");
}

I used Telerik Fiddler to empty the cache then load the StockItems list 10 times (more tests would improve the quality of the results). The first trial was with the “conventional” serialiser

The average time for the conventional serialiser was 0.028562 seconds

The average time for the generated version was 0.030546 seconds. But, if the initial compilation step was ignored the average duration dropped to 0.000223 seconds a significant improvement.

Myriota – Uplink Payload formatters and caching

My myriota Azure IoT Hub Cloud Identity Translation Gateway payload formatters uses C# code (compiled with CS-Script cached with Alastair Crabtrees’s LazyCache) to convert uplink packet payloads to JSON.

I have found that putting the C/C++ structure for the uplink payload at the top of the convertor really helpful.

/*
myriota tracker payload format

typedef struct {
  uint16_t sequence_number;
  int32_t latitude;   // scaled by 1e7, e.g. -891234567 (south 89.1234567)
  int32_t longitude;  // scaled by 1e7, e.g. 1791234567 (east 179.1234567)
  uint32_t time;      // epoch timestamp of last fix
} __attribute__((packed)) tracker_message; 

*/ 
using System;
using System.Collections.Generic;

using Newtonsoft.Json;
using Newtonsoft.Json.Linq;


public class FormatterUplink : PayloadFormatter.IFormatterUplink
{
    public JObject Evaluate(IDictionary<string, string> properties, string application, string terminalId, DateTime timestamp, JObject payloadJson, string payloadText, byte[] payloadBytes)
    {
        JObject telemetryEvent = new JObject();

        telemetryEvent.Add("SequenceNumber", BitConverter.ToUInt16(payloadBytes));

        double latitude = BitConverter.ToInt32(payloadBytes, 2) / 10000000.0;
        telemetryEvent.Add("Latitude", latitude);

        double longitude = BitConverter.ToInt32(payloadBytes, 6) / 10000000.0;
        telemetryEvent.Add("Longitude", longitude);

        UInt32 packetimestamp = BitConverter.ToUInt32(payloadBytes, 10);
        DateTime lastFix = DateTime.UnixEpoch.AddSeconds(packetimestamp);

       properties.Add("iothub-creation-time-utc", lastFix .ToString("s", CultureInfo.InvariantCulture));

       return telemetryEvent;
    }
}

The sample Tracker.cs payload formatter unpacks a message from Myriota Dev Kit running the Tracker sample and returns an Azure IoT Central compatible location telemetry payload.

BEWARE : I think the Azure IoT Central Position lat, lon + alt values might be case sensitive.

Azure IoT Explorer displaying Tracker.cs payload formatter output

The identity payload formatter to use is configured as part of the Destination webhook Uniform Resource Locator (URL).

Myriota Destination configuration application name URL configuration
namespace devMobile.IoT.MyriotaAzureIoTConnector.Connector.Models
{
    public class UplinkPayloadQueueDto
    {
        public string Application { get; set; }
        public string EndpointRef { get; set; }
        public DateTime PayloadReceivedAtUtc { get; set; }
        public DateTime PayloadArrivedAtUtc { get; set; }
        public QueueData Data { get; set; }
        public string Id { get; set; }
        public Uri CertificateUrl { get; set; }
        public string Signature { get; set; }
    }

    public class QueueData
    {
        public List<QueuePacket> Packets { get; set; }
    }

    public class QueuePacket
    {
        public string TerminalId { get; set; }

        public DateTime Timestamp { get; set; }

        public string Value { get; set; }
    }
}

A pair of Azure Blob Storage containers are used to store the uplink/downlink (coming soon) formatter files. The compiled payload formatters are cached with Uplink/Downlink + Application (from the UplinkPayloadQueueDto) as the key.

Azure IoT Storage Explorer uplink payload formatters

The default uplink and downlink formatters used when there is no payload formatter for “Application” are configured in the application settings.

.NET Core web API + Dapper – Distributed Cache

I have used LazyCache for several projects (The Things Network V2 HTTP, The Things Industries V2 MQTT The Things Industries V3 and Swarm Space Azure IoT Connector etc.) to cache Azure IoT Hub DeviceClient and other object instances.

The note on the wiki page For LazyCache v2+ users, you should consider switching away from LazyCache to IDistributedCache. More information at #59 caught my attention.

I have written other posts about caching Dapper query results with the Dapper Extension Library which worked well but had some configuration limitations. I also have posts about off-loading read-only workloads with Azure Active geo-replication or SQL Data Sync for Azure, which worked well in some scenarios but had limitations (performance and operational costs).

The IDistributedCache has Memory, SQL Server and Redis implementations so I built an Azure AppService to explore the functionality in more detail. In another project I had been working with the Azure SignalR Service and the use of the MessagePack library(rather than serialised JSON) caught my attention so I have added basic support for that as well.

I explored the in-memory implementation (AddDistributedMemoryCache) on my development machine and found “tinkering” with the configuration options had little impact on the performance of my trivial sample application.

public static void Main(string[] args)
{
    var builder = WebApplication.CreateBuilder(args);

    // Add services to the container.
    builder.Services.AddApplicationInsightsTelemetry();

    // Add services to the container.
    builder.Services.AddSingleton<IDapperContext>(s => new DapperContext(builder.Configuration));

    builder.Services.AddControllers();

#if SERIALISATION_MESSAGE_PACK
    //MessagePackSerializer.DefaultOptions = MessagePack.Resolvers.ContractlessStandardResolver.Options;
    //MessagePackSerializer.DefaultOptions = MessagePack.Resolvers.ContractlessStandardResolver.Options.WithCompression(MessagePackCompression.Lz4Block);
    MessagePackSerializer.DefaultOptions = MessagePack.Resolvers.ContractlessStandardResolver.Options.WithCompression(MessagePackCompression.Lz4BlockArray);
#endif

#if DISTRIBUTED_CACHE_MEMORY
    builder.Services.AddDistributedMemoryCache(options =>
    {
       options.SizeLimit = 1000 * 1024 * 1024; // 1000MB
    });
    builder.Services.AddDistributedMemoryCache();
#endif

#if DISTRIBUTED_CACHE_REDIS
    var configurationOptions = new ConfigurationOptions
    {
        EndPoints = { builder.Configuration.GetSection("RedisConnection").GetValue<string>("EndPoints") },
        AllowAdmin = true,
        Password = builder.Configuration.GetSection("RedisConnection").GetValue<string>("Password"),
        Ssl = true,
        ConnectRetry = 5,
        ConnectTimeout = 10000,
        SslProtocols = System.Security.Authentication.SslProtocols.Tls12,
        AbortOnConnectFail = false,
    };

    builder.Services.AddStackExchangeRedisCache(options =>
    {
        options.InstanceName = "Dapper WebAPI Instance";
        options.ConfigurationOptions = configurationOptions;
    });
#endif

#if DISTRIBUTED_CACHE_SQL_SERVER
    builder.Services.AddDistributedSqlServerCache(options =>
    {
        options.ConnectionString = builder.Configuration.GetConnectionString("CacheDatabase");
        options.SchemaName = "dbo";
        options.TableName = "StockItemsCache";
    });
#endif

    var app = builder.Build();

    // Configure the HTTP request pipeline.
    app.UseHttpsRedirection();
    app.MapControllers();
    app.Run();
}

I tested the SQL Server implementation (AddDistributedSqlServerCached) using the SQL Server on my development machine, and Azure SQL as a backing store. I did consider using SQL Azure In-Memory OLTP but the performance improvement with my trivial example would most probably not worth the additional cost of the required SKU.

CREATE TABLE [dbo].[StockItemsCache](
	[Id] [nvarchar](449) NOT NULL,
	[Value] [varbinary](max) NOT NULL,
	[ExpiresAtTime] [datetimeoffset](7) NOT NULL,
	[SlidingExpirationInSeconds] [bigint] NULL,
	[AbsoluteExpiration] [datetimeoffset](7) NULL,
PRIMARY KEY CLUSTERED 
(
	[Id] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON, OPTIMIZE_FOR_SEQUENTIAL_KEY = OFF) ON [PRIMARY]
) ON [PRIMARY] TEXTIMAGE_ON [PRIMARY]
GO

The table used to store the data wasn’t very complex and I could view the data associated with a cache key in SQL Server Mangement studio.

SQL Server Managment Studio displaying cache table contents

One of the applications I work on uses a complex SQL Server Stored procedure to load reference data (updated daily) and being able to purge the cache at the end of this process like this might be useful. For a geographically distributed application putting the Azure SQL instance “closer” to the application’s users might be worth considering.

I trialed the Redis implementation with Memurai (on my development machine) and Azure Cache for Redis with multiple Azure AppService clients and there was a significant improvement in performance.

[HttpGet]
public async Task<ActionResult<IEnumerable<Model.StockItemListDtoV1>>> Get()
{
    var utcNow = DateTime.UtcNow;

    var cached = await distributedCache.GetAsync("StockItems");
    if (cached != null)
    {
#if SERIALISATION_JSON
        return this.Ok(JsonSerializer.Deserialize<List<Model.StockItemListDtoV1>>(cached));
#endif
#if SERIALISATION_MESSAGE_PACK
        return this.Ok(MessagePackSerializer.Deserialize<List<Model.StockItemListDtoV1>>(cached));
#endif
    }

    var stockItems = await dbConnection.QueryWithRetryAsync<Model.StockItemListDtoV1>(sql: sqlCommandText, commandType: CommandType.Text);

#if SERIALISATION_JSON
    await distributedCache.SetAsync("StockItems", JsonSerializer.SerializeToUtf8Bytes(stockItems), new DistributedCacheEntryOptions()
#endif
#if SERIALISATION_MESSAGE_PACK
    await distributedCache.SetAsync("StockItems", MessagePackSerializer.Serialize(stockItems), new DistributedCacheEntryOptions()
#endif
    {
        AbsoluteExpiration = new DateTime(utcNow.Year, utcNow.Month, DateTime.DaysInMonth(utcNow.Year, utcNow.Month), StockItemListAbsoluteExpiration.Hours, StockItemListAbsoluteExpiration.Minutes, StockItemListAbsoluteExpiration.Seconds)
    });

    return this.Ok(stockItems);
}

[HttpGet("NoLoad")]
public async Task<ActionResult<IEnumerable<Model.StockItemListDtoV1>>> GetNoLoad()
{
    var cached = await distributedCache.GetAsync("StockItems");
    if (cached == null)
    {
        return this.NoContent();
    }

#if SERIALISATION_JSON
    return this.Ok(JsonSerializer.Deserialize<List<Model.StockItemListDtoV1>>(cached));
#endif
#if SERIALISATION_MESSAGE_PACK
        return this.Ok(MessagePackSerializer.Deserialize<List<Model.StockItemListDtoV1>>(cached));
#endif
}

In my test environment the JSON payload for a list of stock items was a bit “chunky” at 25K bytes, so I added compile time configurable support for the MessagePack library. This significantly reduced the size of the payload LZ4Block (5K bytes) and LZ4BlockArray (5K2 bytes) which should reduce network traffic.

Assuming the overheads of JSON vs. MessagePack serialisation are similar and the much smaller MessagePack library payload I would most probably use MessagePack and LZ4BlockArray (For improved compatibility with other implementations) compression.

Swarm Space – Uplink Payload formatter caching and files

The payload formatters of my Azure IoT Hub Cloud Identity Translation Gateway use CS-Script and even a simple one was taking more than half a second to compile each time it was called.

using System;
using System.Globalization;

using Newtonsoft.Json.Linq;

public class FormatterUplink : PayloadFormatter.IFormatterUplink
{
    public JObject Evaluate(JObject telemetryEvent, string payloadBase64, byte[] payloadBytes, string payloadText, JObject payloadJson)
    {
        if ((payloadText != "" ) && ( payloadJson != null))
        {
            JObject location = new JObject();

            location.Add("lat", payloadJson.GetValue("lt"));
            location.Add("lon", payloadJson.GetValue("ln"));
            location.Add("alt", payloadJson.GetValue("a"));

            telemetryEvent.Add("Location", location);
        };

        return telemetryEvent;
    }
}

The Swarm Eval Kit default message has a userApplicationId of 65335

{"ln":123.456,"si":0.0,"bi":0.2,"sv":0.152,"lt":-12.345,"bv":4.032,"d":1671704370,"n":2,"a":9.0,"s":1.0,"c":208.0,"r":-94,"ti":0.032}

The 65355.cs payload formatter adds an Azure IoT Central compatible location to the telemetry payload.

Azure IoT Central uplink telemetry message payload

The formatter files are currently part of the SwarmSpaceAzureIoTConnector project (moving to Azure Blob Storage) so are configured as “content” (bonus syntax highlighting works) and “copy if newer” so they are included in the deployment package.

Visual Studio 2022 Sample payload formatter

I used Alastair Crabtrees’s LazyCache to store compiled payload formatters with Uplink/Downlink + UserApplicationId as the cache key.

public class FormatterCache : IFormatterCache
{
    private readonly ILogger<FormatterCache> _logger;
    private readonly Models.ApplicationSettings _applicationSettings;
    private readonly static IAppCache _payloadFormatters = new CachingService();

    public FormatterCache(ILogger<FormatterCache>logger, IOptions<Models.ApplicationSettings> applicationSettings)
    {
        _logger = logger;
        _applicationSettings = applicationSettings.Value;
    }

    public async Task<IFormatterUplink> UplinkGetAsync(int userApplicationId)
    {
        IFormatterUplink payloadFormatterUplink = await _payloadFormatters.GetOrAddAsync<PayloadFormatter.IFormatterUplink>($"U{userApplicationId}", (ICacheEntry x) => UplinkLoadAsync(userApplicationId), memoryCacheEntryOptions);

        return payloadFormatterUplink;
    }

    private async Task<IFormatterUplink> UplinkLoadAsync(int userApplicationId)
    {
        string payloadformatterFilePath = $"{_applicationSettings.PayloadFormattersUplinkFilePath}\\{userApplicationId}.cs";

        if (!File.Exists(payloadformatterFilePath))
        {
            _logger.LogInformation("PayloadFormatterUplink- UserApplicationId:{0} PayloadFormatterPath:{1} not found using default:{2}", userApplicationId, payloadformatterFilePath, _applicationSettings.PayloadFormatterUplinkDefault);

            return CSScript.Evaluator.LoadFile<PayloadFormatter.IFormatterUplink>(_applicationSettings.PayloadFormatterUplinkDefault);
        }

        _logger.LogInformation("PayloadFormatterUplink- UserApplicationId:{0} loading PayloadFormatterPath:{1}", userApplicationId, payloadformatterFilePath);

        return CSScript.Evaluator.LoadFile<PayloadFormatter.IFormatterUplink>(payloadformatterFilePath);
    }
...
}

The default uplink and downlink formatters are configured in application settings and are used when a UserApplicationId specific formatter is not configured.

Fiddler Composer illustrating compiled formatter timings before and after caching

Swarm Space – Uplink Payload formatter Proof of Concept(PoC)

My Azure IoT Hub Cloud Identity Translation Gateway will support the translation of Base64 encoded uplink payloads to Javascript Object Notation (JSON) so they can be processed by Azure IoT Hub client applications and Azure IoT Central. This PoC uses CS-Script by Oleg Shilo to transform the Swarm Eval Kit Base64 encoded JSON uplink messages.

Swarm Hive message list with a message payload

A sample decoded (JSON) Swarm Eval Kit uplink message

{"ln":123.456,"si":0.0,"bi":0.2,"sv":0.152,"lt":-12.345,"bv":4.032,"d":1671704370,"n":2,"a":9.0,"s":1.0,"c":208.0,"r":-94,"ti":0.032}

A Webhook Delivery method forwards uplink messages to my Azure IoT Hub Cloud Identity Translation Gateway.

Swarm Hive Delivery configuration with recent uplink messages

My first hard-coded payload formatter adds an Azure IoT Central compatible location to the telemetry event payload.

const string codeSwarmSpaceUplinkFormatterCode = @"
   using Newtonsoft.Json.Linq;

   public class UplinkFormatter : PayloadFormatter.ISwarmSpaceFormatterUplink
   {
       public JObject Evaluate(JObject telemetryEvent, string payloadBase64, byte[] payloadBytes, string payloadText, JObject payloadJson)
       {
           if ((payloadText != """" ) && ( payloadJson != null))
           {
               JObject location = new JObject() ;

               location.Add(""Lat"", payloadJson.GetValue(""lt""));
               location.Add(""Lon"", payloadJson.GetValue(""ln""));
               location.Add(""Alt"", payloadJson.GetValue(""a""));

               telemetryEvent.Add( ""location"", location);
           };

           return telemetryEvent;
       }
   }";
}

The PayloadFormatter namespace was added to reduce the length of the payload formatter C# interface declarations.

namespace PayloadFormatter 
{
    using Newtonsoft.Json.Linq;

    public interface ISwarmSpaceFormatterUplink
    {
        public JObject Evaluate(JObject telemetry, string payloadBase64, byte[] payloadBytes, string payloadText, JObject payloadJson);
    }

    public interface ISwarmSpaceFormatterDownlink
    {
        public string Evaluate(JObject payloadJson, string payloadText, byte[] payloadBytes, string payloadBase64);
    }
}

namespace devMobile.IoT.SwarmSpace.AzureIoT.Connector
{
    using System.Threading.Tasks;
    using Microsoft.Extensions.Logging;

    using CSScriptLib;

    using PayloadFormatter;

    public interface ISwarmSpaceFormatterCache
    {
        public Task<ISwarmSpaceFormatterUplink> PayloadFormatterGetOrAddAsync(int userApplicationId);

    }

    public class SwarmSpaceFormatterCache : ISwarmSpaceFormatterCache
    {
        private readonly ILogger<SwarmSpaceFormatterCache> _logger;

        public SwarmSpaceFormatterCache(ILogger<SwarmSpaceFormatterCache>logger)
        {
            _logger = logger;
        }

        public async Task<ISwarmSpaceFormatterUplink> PayloadFormatterGetOrAddAsync(int deviceId)
        {
            return CSScript.Evaluator.LoadCode<PayloadFormatter.ISwarmSpaceFormatterUplink>(codeSwarmSpaceUplinkFormatterCode);
        }
...
}

The parameters of the formatter are Base64 encoded, textual and a Newtonsoft JObject representations of the uplink payload and a telemetry event populated with some uplink message metadata.

Azure IoT Central uplink telemetry message payload

The initial “compile” of an uplink formatter was taking approximately 2.1 seconds so they will be “compiled” on demand and cached in a Dictionary with the UserApplicationId as the key. A default uplink formatter will be used when a UserApplicationId specific uplink formatter is not configured.

Downlink messages NahYeah

While running my The Things IndustriesTTI) gateway I noticed an exception in the logs every so often

Exception of type 'Microsoft.Azure.Devices.Client.Exceptions.DeviceMessageLockLostException' was thrown.

My client subscribes to Message Queue Telemetry Transport Topics(MQTT) (using MQTTNet) for each TTI Application and establishes a connection (using an Azure DeviceClient) for each TTI Device to an Azure IoT Hub(s).

  • v3/{application id}@{tenant id}/devices/{device id}/up
  • v3/{application id}@{tenant id}/devices/{device id}/down/queued
  • v3/{application id}@{tenant id}/devices/{device id}/down/sent
  • v3/{application id}@{tenant id}/devices/{device id}/down/ack
  • v3/{application id}@{tenant id}/devices/{device id}/down/nack
  • v3/{application id}@{tenant id}/devices/{device id}/down/failed

The application subscribes to the queued, ack, nack, and failed topics so the progress of a downlink message can be monitored. For downlink messages the correlation_id “az:LockToken:” contains the message.LockToken so that they can be Abandoned, Completed or Rejected in the MQTT receive messageHandler.

Below is the logging from my application for an odd sequence of messages

*****Nothing much happening for a couple of hours the .'s represent approx 1 second. Wisnode 4 sends roughly every 5 minues

.....................................................................................................................................................................................................................................................................................................................
03:36:08 TTN Uplink message
 ApplicationID: application1
 DeviceID: wisnodetest04
 Port: 5
.....................................................................................................................................................................................................................................................................................................................
03:41:18 TTN Uplink message
 ApplicationID: application1
 DeviceID: wisnodetest04
 Port: 5
...........................................................................
03:42:34 Azure IoT Hub downlink message
 ApplicationID: application1
 DeviceID: wisnodetest04
 LockToken: 57ea0fad-b6b3-492e-b194-10c4ff3e53cb
 Body: vu8=

*****I then started sending 5 messages to Wisnode 5 same payload vu8=, port 71 thru 75 

***** 71 Queued
03:42:34 Queued: v3/application1@tenant1/devices/wisnodetest04/down/queued
 payload: {"end_device_ids":{"device_id":"wisnodetest04","application_ids":{"application_id":"application1"}},
	"correlation_ids":[
"az:LockToken:57ea0fad-b6b3-492e-b194-10c4ff3e53cb",
"as:downlink:01EXX9B1CA4DB68PKCDAK4SS4H"],
	"downlink_queued":{"f_port":71,"frm_payload":"vu8=","confirmed":true,"priority":"NORMAL",
	"correlation_ids":[
"az:LockToken:57ea0fad-b6b3-492e-b194-10c4ff3e53cb",
"as:downlink:01EXX9B1CA4DB68PKCDAK4SS4H"]}}
...
03:42:37 Azure IoT Hub downlink message
 ApplicationID: application1
 DeviceID: wisnodetest04
 LockToken: e2fef28c-fb1f-42cd-bb40-3ad8e6051da9
 Body: vu8=
.

***** 72 Queued
03:42:38 Queued: v3/application1@tenant1/devices/wisnodetest04/down/queued
 payload: {"end_device_ids":{"device_id":"wisnodetest04","application_ids":{"application_id":"application1"}},
	"correlation_ids":[
"az:LockToken:e2fef28c-fb1f-42cd-bb40-3ad8e6051da9",
"as:downlink:01EXX9B4RGSCJ4BN21GHPM85W5"],
	"downlink_queued":{"f_port":72,"frm_payload":"vu8=",
"confirmed":true,"priority":"NORMAL",
	"correlation_ids":[
"az:LockToken:e2fef28c-fb1f-42cd-bb40-3ad8e6051da9",
"as:downlink:01EXX9B4RGSCJ4BN21GHPM85W5"]}}
...
03:42:41 Azure IoT Hub downlink message
 ApplicationID: application1
 DeviceID: wisnodetest04
 LockToken: 70d61d71-9b24-44d2-b54b-7cc08da4d072
 Body: vu8=

***** 73 Queued
03:42:41 Queued: v3/application1@tenant1/devices/wisnodetest04/down/queued
 payload: {"end_device_ids":{"device_id":"wisnodetest04","application_ids":{"application_id":"application1"}},
	"correlation_ids":[
"az:LockToken:70d61d71-9b24-44d2-b54b-7cc08da4d072","as:downlink:01EXX9B800WF7FEP56J3EZ3M8A"],
	"downlink_queued":{"f_port":73,"frm_payload":"vu8=",
"confirmed":true,"priority":"NORMAL",
	"correlation_ids":[
"az:LockToken:70d61d71-9b24-44d2-b54b-7cc08da4d072",
"as:downlink:01EXX9B800WF7FEP56J3EZ3M8A"]}}
...

***** 74 Queued
03:42:45 Azure IoT Hub downlink message
 ApplicationID: application1
 DeviceID: wisnodetest04
 LockToken: 12537728-de4a-4489-ace5-92923e49b8e4
 Body: vu8=
.
03:42:45 Queued: v3/application1@tenant1/devices/wisnodetest04/down/queued
 payload: {"end_device_ids":{"device_id":"wisnodetest04","application_ids":{"application_id":"application1"}},
	"correlation_ids":[
"az:LockToken:12537728-de4a-4489-ace5-92923e49b8e4",
"as:downlink:01EXX9BBWA2YNCN2DFE5FC3BP3"],
	"downlink_queued":{
"f_port":74,"frm_payload":"vu8=",
"confirmed":true,"priority":"NORMAL",
	"correlation_ids":[
"az:LockToken:12537728-de4a-4489-ace5-92923e49b8e4",
"as:downlink:01EXX9BBWA2YNCN2DFE5FC3BP3"]}}
...

***** 75 Queued
03:42:48 Azure IoT Hub downlink message
 ApplicationID: application1
 DeviceID: wisnodetest04
 LockToken: 388efc11-4514-406e-8147-9109289095f4
 Body: vu8=

03:42:49 Queued: v3/application1@tenant1/devices/wisnodetest04/down/queued
 payload: {"end_device_ids":{"device_id":"wisnodetest04","application_ids":{"application_id":"application1"}},
	"correlation_ids":[
"az:LockToken:388efc11-4514-406e-8147-9109289095f4",
"as:downlink:01EXX9BFCM2G51EPYNWGDWPS0N"],
	"downlink_queued":{"f_port":75,"frm_payload":"vu8=",
"confirmed":true,"priority":"NORMAL",
	"correlation_ids":[
"az:LockToken:388efc11-4514-406e-8147-9109289095f4",
"as:downlink:01EXX9BFCM2G51EPYNWGDWPS0N"]}}

***** Waiting for Wisniode
..........................................................................................................................................................................
03:47:18 TTN Uplink message
 ApplicationID: application1
 DeviceID: wisnodetest04
 Port: 5

***** Waiting for Wisniode again, I think might have been such a long delay becuase TTI didn't get
..........................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................
***** 71 Nack'd
03:56:52 Nack: v3/application1@tenant1/devices/wisnodetest04/down/nack
 payload: {"end_device_ids":{"device_id":"wisnodetest04","application_ids":{"application_id":"application1"},
	"dev_eui":"60C5A8FFFE781691","join_eui":"70B3D57ED0000000","dev_addr":"26083BE1"},
	"correlation_ids":[
"as:downlink:01EXX9B1CA4DB68PKCDAK4SS4H",
"as:up:01EXXA572VHN7X7G5KFTHBQPNG",
"az:LockToken:57ea0fad-b6b3-492e-b194-10c4ff3e53cb",
"gs:conn:01EXRPTTFGFNTRGH7V8FTC3R0S",
"gs:up:host:01EXRPTTFTEXBNV87KZFYFWP5V",
"gs:uplink:01EXXA56VPK14XG5S8JB9Q0V0X",
"ns:uplink:01EXXA56VYCHGGPPN1K77REMNM",
"rpc:/ttn.lorawan.v3.GsNs/HandleUplink:01EXXA56VRG6811HRCF803VJ34"],
	"received_at":"2021-02-07T03:56:53.211893610Z",
	"downlink_nack":{
"session_key_id":"AXd6GPmneD3dKVoArcS36g==",
"f_port":71,"f_cnt":35,
"frm_payload":"vu8=",
"confirmed":true,"priority":"NORMAL",
	"correlation_ids":[
"az:LockToken:57ea0fad-b6b3-492e-b194-10c4ff3e53cb",
"as:downlink:01EXX9B1CA4DB68PKCDAK4SS4H"]}}

 Found az:LockToken:

03:56:52 TTN Uplink message
 ApplicationID: application1
 DeviceID: wisnodetest04
 Port: 5

03:56:52 Azure IoT Hub downlink message
 ApplicationID: application1
 DeviceID: wisnodetest04
 LockToken: 856f5a9b-bc37-435c-8de9-19d2213999f8
 Body: vu8=

03:56:53 Queued: v3/application1@tenant1/devices/wisnodetest04/down/queued
 payload: {
"end_device_ids":{"device_id":"wisnodetest04","application_ids":{"application_id":"application1"},
	"correlation_ids":[
"az:LockToken:856f5a9b-bc37-435c-8de9-19d2213999f8",
"as:downlink:01EXXA57JJWWYEDX3Z55TNSTP5"],
	"downlink_queued":{"f_port":71,
"frm_payload":"vu8=",
"confirmed":true,"priority":"NORMAL",
	"correlation_ids":
["az:LockToken:856f5a9b-bc37-435c-8de9-19d2213999f8",
"as:downlink:01EXXA57JJWWYEDX3Z55TNSTP5"]}}

......
***** 71 Ack'd
03:56:58 Ack: v3/application1@tenant1/devices/wisnodetest04/down/ack
 payload: {"end_device_ids":{"device_id":"wisnodetest04","application_ids":{"application_id":"application1"},
	"dev_eui":"60C5A8FFFE781691","join_eui":"70B3D57ED0000000","dev_addr":"26083BE1"},
	"correlation_ids":[
"as:downlink:01EXX9B1CA4DB68PKCDAK4SS4H",
"as:up:01EXXA5D45E77S19TXEV1E4GAJ",
"az:LockToken:57ea0fad-b6b3-492e-b194-10c4ff3e53cb",
"gs:conn:01EXRPTTFGFNTRGH7V8FTC3R0S",
"gs:up:host:01EXRPTTFTEXBNV87KZFYFWP5V",
"gs:uplink:01EXXA5CV73THH2RKEAC2T9MDP",
"ns:uplink:01EXXA5CVDCWPFBTXGGGB3T02W",
"rpc:/ttn.lorawan.v3.GsNs/HandleUplink:01EXXA5CVDEXDFBPYXC0J01Q3E"],
	"received_at":"2021-02-07T03:56:59.397330003Z",
	"downlink_ack":{
"session_key_id":"AXd6GPmneD3dKVoArcS36g==",
"f_port":71,"f_cnt":36,"frm_payload":"vu8=",
"confirmed":true,"priority":"NORMAL",
	"correlation_ids":[
"az:LockToken:57ea0fad-b6b3-492e-b194-10c4ff3e53cb",
"as:downlink:01EXX9B1CA4DB68PKCDAK4SS4H"]}}

 Found az:LockToken:
Exception of type 'Microsoft.Azure.Devices.Client.Exceptions.DeviceMessageLockLostException' was thrown.

03:56:59 TTN Uplink message
 ApplicationID: application1
 DeviceID: wisnodetest04
 Port: 0
......
03:57:04 Ack: v3/application1@tenant1/devices/wisnodetest04/down/ack
 payload: {"end_device_ids":{"device_id":"wisnodetest04","application_ids":{"application_id":"application1"},
"dev_eui":"60C5A8FFFE781691","join_eui":"70B3D57ED0000000","dev_addr":"26083BE1"},
"correlation_ids":[
"as:downlink:01EXX9B4RGSCJ4BN21GHPM85W5",
"as:up:01EXXA5K2FWGP9DGD7THWZ8HNR",
"az:LockToken:e2fef28c-fb1f-42cd-bb40-3ad8e6051da9",
"gs:conn:01EXRPTTFGFNTRGH7V8FTC3R0S",
"gs:up:host:01EXRPTTFTEXBNV87KZFYFWP5V",
"gs:uplink:01EXXA5JVDR102TKCWQ77P4YYF",
"ns:uplink:01EXXA5JVGNGMZN33FNT47G6PF",
"rpc:/ttn.lorawan.v3.GsNs/HandleUplink:01EXXA5JVGJFFQVEWX2M1XSFKK"],
"received_at":"2021-02-07T03:57:05.487910418Z","downlink_ack":{"session_key_id":"AXd6GPmneD3dKVoArcS36g==",
"f_port":72,"f_cnt":37,
"frm_payload":"vu8=",
"confirmed":true,"priority":"NORMAL","correlation_ids":
["az:LockToken:e2fef28c-fb1f-42cd-bb40-3ad8e6051da9","as:downlink:01EXX9B4RGSCJ4BN21GHPM85W5"]}}

The sequence of messages is a bit odd, in the Azure DeviceClient ReceiveMessageHandler a downlink message is published, then a queued message is received, then a nak and finally an ack, The exception was because my client was trying to Complete the delivery of a message that had already been Abandoned.