Swarm Space – Asset Tracker Payload Formatter

After writing Swarm Space – Payload Formatter Debugging I then tested it creating a new payload formatter for my new Swarm Asset Tracker.

Swarm Asset Tracker device

The Swarm Asset Tracker has a slightly different payload to the Swarm Eval Kit which is detailed in the product manual.

Swarm Asset Tracker JSON payload

The first message sent shortly after I powered up the device had the latitude and longitude of Null Island

The Asset Tracker UserApplicationId is 65002 and the payload is similar to the Swarm Eval Kit. I created some message payloads (location of Christchurch Cathedral) for testing.

The JSON payload sent by my Swarm Asset Tracker

{
  "dt": 1677396395,
  "lt": -43.5333,
  "ln": 172.6333,
  "al": 25,
  "sp": 0,
  "hd": 126,
  "gj": 92,
  "gs": 1,
  "bv": 4103,
  "tp": 20,
  "rs": -110,
  "tr": -107,
  "ts": 3,
  "td": 1677396357,
  "hp": 166,
  "vp": 187,
  "tf": 36526
}

The Base64 representation of the payload sent by my Swarm Asset Tracker

ew0KICAiZHQiOiAxNjc3Mzk2Mzk1LA0KICAibHQiOiAtNDMuNTMzMywNCiAgImxuIjogMTcyLjYzMzMsDQogICJhbCI6IDI1LA0KICAic3AiOiAwLA0KICAiaGQiOiAxMjYsDQogICJnaiI6IDkyLA0KICAiZ3MiOiAxLA0KICAiYnYiOiA0MTAzLA0KICAidHAiOiAyMCwNCiAgInJzIjogLTExMCwNCiAgInRyIjogLTEwNywNCiAgInRzIjogMywNCiAgInRkIjogMTY3NzM5NjM1NywNCiAgImhwIjogMTY2LA0KICAidnAiOiAxODcsDQogICJ0ZiI6IDM2NTI2DQp9

The initial version of my payload formatter

using System;
using System.Collections.Generic;
using System.Globalization;
using Newtonsoft.Json.Linq;

public class FormatterUplink : PayloadFormatter.IFormatterUplink
{
    public JObject Evaluate(IDictionary<string, string> properties, uint organisationId, uint deviceId, byte deviceType, ushort userApplicationId, JObject payloadJson, string payloadText, byte[] payloadBytes)
    {
        JObject telemetryEvent = new JObject();

        if ((payloadText != "") && (payloadJson != null))
        {
            JObject location = new JObject();

            location.Add("lat", payloadJson.GetValue("lt"));
            location.Add("lon", payloadJson.GetValue("ln"));
            location.Add("alt", payloadJson.GetValue("al"));

            telemetryEvent.Add("DeviceLocation", location);
        }

        // Course & speed
        telemetryEvent.Add("Course", payloadJson.GetValue("hd"));
        telemetryEvent.Add("Speed", payloadJson.GetValue("sp"));

        // Battery voltage
        telemetryEvent.Add("BatteryVoltage", payloadJson.GetValue("bv"));

        // RSSI
        telemetryEvent.Add("RSSI", payloadJson.GetValue("rs"));

        properties.Add("iothub-creation-time-utc", DateTimeOffset.FromUnixTimeSeconds((long)payloadJson.GetValue("dt")).ToString("s", CultureInfo.InvariantCulture));

        return telemetryEvent;
    }
}

The PayloadFormatterMaintenanceApplication command line I used for testing my Swarm Asset Tracker payload formatter

The console output of my Swarm Asset Tracker payload formatter

The PayloadFormatterMaintenanceApplication is better than trying to debug a payload formatter in a staging/production environment.

Currently the payload formatters still have to be manually uploaded to the application’s Azure Blob Storage for final testing.

Swarm Space – Payload Formatter Debugging

After Swarm Space – Uplink Payload Formatters revisited I wrote a couple of payload formatters and they were easy to get wrong and the Azure Application Insights error messages were unhelpful.

namespace PayloadFormatter // Additional namespace for shortening interface when usage in formatter code
{
    using System.Collections.Generic;

    using Newtonsoft.Json.Linq;

    public interface IFormatterUplink
    {
        public JObject Evaluate(IDictionary<string, string> properties, uint organisationId, uint deviceId, byte deviceType, ushort userApplicationId, JObject payloadJson, string payloadText, byte[] payloadBytes);
    }

    public interface IFormatterDownlink
    {
        public byte[] Evaluate(IDictionary<string, string> properties, uint organisationId, uint deviceId, byte deviceType, ushort userApplicationId, JObject payloadJson, string payloadText, byte[] payloadBytes);
    }
}

The definitions of the uplink & downlink payload formatter evaluator interfaces have been updated and shifted to a new project.

Visual Studio 2022 Solution with payloadformatter maintenance application

I built a console application to help with developing and debugging uplink or downlink formatters. The application has a number of command line parameters which specify the formatter to be used, UserApplicationId, OrganizationId, DeviceType etc.

public class CommandLineOptions
{
    [Option('d', "Direction", Required = true, HelpText = "Test Uplink or DownLink formatter")]
	public string Direction { get; set; }

    [Option('p', "filename", HelpText = "Uplink or Downlink Payload file name")]
    public string PayloadFilename { get; set; } = string.Empty;

    [Option('o', "OrganisationId", Required = true, HelpText = "Organisation unique identifier")]
    public uint OrganizationId { get; set; }

    [Option('i', "DeviceId", Required = true, HelpText = "Device unique identitifer")]
    public uint DeviceId { get; set; }

    [Option('t', "DeviceType", Required = true, HelpText = "Device type number")]
    public byte DeviceType { get; set; }

    [Option('u', "UserApplicationId", Required = true, HelpText = "User Application Id")]
    public ushort UserApplicationId { get; set; }

    [Option('h', "SwarmHiveReceivedAtUtc", HelpText = "Swarm Hive received at time UTC")]
    public DateTime? SwarmHiveReceivedAtUtc { get; set; }

    [Option('w', "UplinkWebHookReceivedAtUtc", HelpText = "Webhook received at time UTC")]
    public DateTime? UplinkWebHookReceivedAtUtc { get; set; }

    [Option('s', "Status", HelpText = "Uplink local file system file name")]
    public byte? Status { get; set; }

    [Option('c', "Client", HelpText = "Uplink local file system file name")]
    public string Client { get; set; } 
 }

The downlink formatter (similar approach for uplink) loads the sample file as an array of bytes, then tries to convert it to text, and finally to JSON. Then the formatter code is “compiled” and the executed with the file payload and command line parameters.

private static async Task DownlinkFormatterCore(CommandLineOptions options)
{
    Dictionary<string, string> properties = new Dictionary<string, string>();

    string formatterFolder = Path.Combine(Environment.CurrentDirectory, "downlink");
    Console.WriteLine($"Downlink- uplinkFormatterFolder: {formatterFolder}");

    string formatterFile = Path.Combine(formatterFolder, $"{options.UserApplicationId}.cs");
    Console.WriteLine($"Downlink- UserApplicationId: {options.UserApplicationId}");
    Console.WriteLine($"Downlink- Payload formatter file: {formatterFile}");

    PayloadFormatter.IFormatterDownlink evalulator;
    try
    {
        evalulator = CSScript.Evaluator.LoadFile<PayloadFormatter.IFormatterDownlink>(formatterFile);
     }
    catch (CSScriptLib.CompilerException cex)
    {
        Console.Write($"Loading or compiling file:{formatterFile} failed Exception:{cex}");
        return;
    }

    string payloadFilename = Path.Combine(formatterFolder, options.PayloadFilename);
    Console.WriteLine($"Downlink- payloadFilename:{payloadFilename}");
    byte[] uplinkBytes;

    try
    {
        uplinkBytes = File.ReadAllBytes(payloadFilename);
    }
    catch (DirectoryNotFoundException dex)
    {
        Console.WriteLine($"Uplink payload filename directory {formatterFolder} not found:{dex}");
        return;
    }
    catch (FileNotFoundException fnfex)
    {
        Console.WriteLine($"Uplink payload filename {payloadFilename} not found:{fnfex}");
        return;
    }
    catch (FormatException fex)
    {
        Console.WriteLine($"Uplink payload file invalid format {payloadFilename} not found:{fex}");
        return;
    }

    // See if payload can be converted to a string
    string uplinkText = string.Empty;
    try
    {
        uplinkText = Encoding.UTF8.GetString(uplinkBytes);
    }
    catch (FormatException fex)
    {
        Console.WriteLine("Encoding.UTF8.GetString failed:{0}", fex.Message);
    }

    // See if payload can be converted to JSON
    JObject uplinkJson;
    try
    {
        uplinkJson = JObject.Parse(uplinkText);
    }
    catch (JsonReaderException jrex)
    {
        Console.WriteLine("JObject.Parse failed Exception:{1}", jrex);

        uplinkJson = new JObject();
    }

    Console.WriteLine("Properties");
    foreach (var property in properties)
    {
        Console.WriteLine($"{property.Key}:{property.Value}");
    }

    // Transform the byte and optional text and JSON payload
    Byte[] payload;
    try
    {
        payload = evalulator.Evaluate(properties, options.OrganizationId, options.DeviceId, options.DeviceType, options.UserApplicationId, uplinkJson, uplinkText, uplinkBytes);
    }
    catch (Exception ex)
    {
        Console.WriteLine($"evalulatorUplink.Evaluate failed Exception:{ex}");
        return;
    }

    Console.WriteLine("Payload");
    Console.WriteLine(Convert.ToBase64String(payload));
}

The sample JSON payload is what would be sent by Azure IoT Central to a device to configure the fan speed

Azure IoT Central M138 Breakout device template with the Fan Status command selected
{
  "FanStatus": 2
}

If the downlink payload formatter is compiled and executes successfully the Base64 representation output is displayed

using System;
using System.Collections.Generic;
using Newtonsoft.Json.Linq;

public class FormatterDownlink : PayloadFormatter.IFormatterDownlink
{
    public byte[] Evaluate(IDictionary<string, string> properties, uint organisationId, uint deviceId, byte deviceType, ushort userApplicationId, JObject payloadJson, string payloadText, byte[] payloadBytes)
    {
        byte? status = payloadJson.Value<byte?>("FanStatus");

        if ( status.HasValue ) 
        { 
            return new byte[] { status.Value };
        }

        return new byte[]{};
    }
}

If the downlink payload formatter syntax is incorrect e.g. { status.Value ; }; an error message with the line and column is displayed.

using System;
using System.Collections.Generic;
using Newtonsoft.Json.Linq;

public class FormatterDownlink : PayloadFormatter.IFormatterDownlink
{
    public byte[] Evaluate(IDictionary<string, string> properties, uint organisationId, uint deviceId, byte deviceType, ushort userApplicationId, JObject payloadJson, string payloadText, byte[] payloadBytes)
    {
        byte? status = payloadJson.Value<byte?>("FanStatus");

        if ( status.HasValue ) 
        {
            return new byte[] { status.Value ; };
        }

        return new byte[]{};
    }
}

If the downlink payload formatter syntax is correct but execution fails (in the example code division by zero) an error message is displayed.

using System;
using System.Collections.Generic;
using Newtonsoft.Json.Linq;

public class FormatterDownlink : PayloadFormatter.IFormatterDownlink
{
    public byte[] Evaluate(IDictionary<string, string> properties, uint organisationId, uint deviceId, byte deviceType, ushort userApplicationId, JObject payloadJson, string payloadText, byte[] payloadBytes)
    {
        byte? status = payloadJson.Value<byte?>("FanStatus");

        if ( status.HasValue ) 
        {
            int divideByZero = 10;

            divideByZero = divideByZero / 0;

            return new byte[] { status.Value };
        }

        return new byte[]{};
    }
}

The PayloadFormatterMaintenanceApplication makes it significantly easier to develop formatters. Currently the payload formatters have to be manually uploaded to the application’s Azure Blob Storage for final testing.

Swarm Space – Underlying Architecture sorted

After figuring out that calling an Azure Http Trigger function to load the cache wasn’t going to work reliably, I have revisited the architecture one last time and significantly refactored the SwarmSpaceAzuureIoTConnector project.

Visual Studio 2022 solution

The application now has a StartUpService which loads the Azure DeviceClient cache (Lazy Cache) in the background as the application starts up. If an uplink message is received from a SwarmDevice before, it has been loaded by the FunctionsStartup the DeviceClient information is cached and another connection to the Azure IoT Hub is not established.

...
using Microsoft.Azure.Functions.Extensions.DependencyInjection;

[assembly: FunctionsStartup(typeof(devMobile.IoT.SwarmSpaceAzureIoTConnector.Connector.StartUpService))]
namespace devMobile.IoT.SwarmSpaceAzureIoTConnector.Connector
{
...
    public class StartUpService : BackgroundService
    {
        private readonly ILogger<StartUpService> _logger;
        private readonly ISwarmSpaceBumblebeeHive _swarmSpaceBumblebeeHive;
        private readonly Models.ApplicationSettings _applicationSettings;
        private readonly IAzureDeviceClientCache _azureDeviceClientCache;

        public StartUpService(ILogger<StartUpService> logger, IAzureDeviceClientCache azureDeviceClientCache, ISwarmSpaceBumblebeeHive swarmSpaceBumblebeeHive, IOptions<Models.ApplicationSettings> applicationSettings)//, IOptions<Models.AzureIoTSettings> azureIoTSettings)
        {
            _logger = logger;
            _azureDeviceClientCache = azureDeviceClientCache;
            _swarmSpaceBumblebeeHive = swarmSpaceBumblebeeHive;
            _applicationSettings = applicationSettings.Value;
        }

        protected override async Task ExecuteAsync(CancellationToken cancellationToken)
        {
            await Task.Yield();

            _logger.LogInformation("StartUpService.ExecuteAsync start");

            try
            {
                _logger.LogInformation("BumblebeeHiveCacheRefresh start");

                foreach (SwarmSpace.BumblebeeHiveClient.Device device in await _swarmSpaceBumblebeeHive.DeviceListAsync(cancellationToken))
                {
                    _logger.LogInformation("BumblebeeHiveCacheRefresh DeviceId:{DeviceId} DeviceName:{DeviceName}", device.DeviceId, device.DeviceName);

                    Models.AzureIoTDeviceClientContext context = new Models.AzureIoTDeviceClientContext()
                    {
                        OrganisationId = _applicationSettings.OrganisationId,
                        DeviceType = (byte)device.DeviceType,
                        DeviceId = (uint)device.DeviceId,
                    };

                    await _azureDeviceClientCache.GetOrAddAsync(context.DeviceId, context);
                }

                _logger.LogInformation("BumblebeeHiveCacheRefresh finish");
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "StartUpService.ExecuteAsync error");

                throw;
            }

            _logger.LogInformation("StartUpService.ExecuteAsync finish");
        }
    }
}

The uplink and downlink payload formatters are stored in Azure Blob Storage are compiled (CS-Script) as they are loaded then cached (Lazy Cache)

Azure Storage explorer displaying list of uplink payload formatter blobs.
Azure Storage explorer displaying list of downlink payload formatter blobs.
private async Task<IFormatterDownlink> DownlinkLoadAsync(int userApplicationId)
{
    BlobClient blobClient = new BlobClient(_payloadFormatterConnectionString, _applicationSettings.PayloadFormattersDownlinkContainer, $"{userApplicationId}.cs");

    if (!await blobClient.ExistsAsync())
    {
        _logger.LogInformation("PayloadFormatterDownlink- UserApplicationId:{0} Container:{1} not found using default:{2}", userApplicationId, _applicationSettings.PayloadFormattersUplinkContainer, _applicationSettings.PayloadFormatterUplinkBlobDefault);

        blobClient = new BlobClient(_payloadFormatterConnectionString, _applicationSettings.PayloadFormatterDownlinkBlobDefault, _applicationSettings.PayloadFormatterDownlinkBlobDefault);
    }

    BlobDownloadResult downloadResult = await blobClient.DownloadContentAsync();

    return CSScript.Evaluator.LoadCode<PayloadFormatter.IFormatterDownlink>(downloadResult.Content.ToString());
}

The uplink and downlink formatters can be edited in Visual Studio 2022 with syntax highlighting (currently they have to be manually uploaded).

The SwarmSpaceBumbleebeehive module no longer has public login or logout methods.

    public interface ISwarmSpaceBumblebeeHive
    {
        public Task<ICollection<Device>> DeviceListAsync(CancellationToken cancellationToken);

        public Task SendAsync(uint organisationId, uint deviceId, byte deviceType, ushort userApplicationId, byte[] payload);
    }

The DeviceListAsync and SendAsync methods now call the BumblebeeHive login method after configurable period of inactivity.

public async Task<ICollection<Device>> DeviceListAsync(CancellationToken cancellationToken)
{
        if ((_TokenActivityAtUtC + _bumblebeeHiveSettings.TokenValidFor) < DateTime.UtcNow)
        {
            await Login();
        }

        using (HttpClient httpClient = _httpClientFactory.CreateClient())
       {
            Client client = new Client(httpClient);

            client.BaseUrl = _bumblebeeHiveSettings.BaseUrl;

            httpClient.DefaultRequestHeaders.Add("Authorization", $"bearer {_token}");

            return await client.GetDevicesAsync(null, null, null, null, null, null, null, null, null, cancellationToken);
        }
}

I’m looking at building a webby user interface where users an interactivity list, create, edit, delete formatters with syntax highlighter support, and the executing the formatter with sample payloads.

Swarm Space Azure IoT Connector Identity Translation Gateway Architecture

This approach uses most of the existing building blocks, and that’s it no more changes.

Swarm Space – Uplink with Azure Functions or WebAPI

This post could have been much longer with more screen grabs and code snippets, so this is the “highlights package”. This post took a lot longer than I expected as building, testing locally, then deploying the different implementations was time consuming.

Swarm Space Connector Functions Projects

I built the projects to investigate the different options taking into account reliability, robustness, amount of code, performance (I think slow startup could be a problem). The code is very “plain” I used the default options, no copyright notices, default formatting, context sensitive error messages were used to add any required “using” statements, libraries etc.

The desktop emulator hosting the six functions

I also deployed the Azure Functions and ASP .NET Core WebAPI application to check there were no difference (beyond performance) in the way they worked. I included a “default” function (generated by the new project wizard) for reference while I was building the others.

The function application with six functions in deployed to Azure

The “dynamic” type function worked but broke when the Javascript Object Notation(JSON) was invalid, or fields were missing, and it didn’t enforce the payload was correct.

namespace WebhookHttpTrigger
{
    public static class Dynamic
    {
        [FunctionName("Dynamic")]
        public static async Task<IActionResult> Run(
            [HttpTrigger(AuthorizationLevel.Function, "get", "post", Route = null)] dynamic input,
            ILogger log)
        {
            log.LogInformation($"C# HTTP Dynamic trigger function processed a request PacketId:{input.packetId}.");

            return new OkObjectResult("Hello, This HTTP triggered Dynamic function executed successfully.");
        }
    }
}
Dynamic Trigger function failing because PacketId format was valid (x in numeric field)

The “TypedAutomagic” function worked, it ensured the Javascript Object Notation(JSON) was valid, the payload format was correct but didn’t enforce the System.ComponentModel.DataAnnotations attributes.

namespace WebhookHttpTrigger
{
    public static class TypedAutomagic
    {
        [FunctionName("TypedAutomagic")]
        public static async Task<IActionResult> Run(
            [HttpTrigger(AuthorizationLevel.Function, "get", "post", Route = null)] UplinkPayload payload,
            ILogger log)
        {
            log.LogInformation($"C# HTTP trigger function typed TypedAutomagic UplinkPayload processed a request PacketId:{payload.PacketId}");

            return new OkObjectResult("Hello, This HTTP triggered automagic function executed successfully.");
        }
    }
}
Successful execution of TypedAutomagic function

The “TypedAutomagic” implementation also detected when the JSON property values in the payload couldn’t be deserialised successfully, but if the hiveRxTime was invalid the value was set to 1/1/0001 12:00:00 am.

TypedAutomagic hiveRxTime deserialisation failing

The “TypedDeserializeObject” function worked, it ensured the Javascript Object Notation(JSON) was valid, the payload format was correct but also didn’t enforce the System.ComponentModel.DataAnnotations attributes.

namespace WebhookHttpTrigger
{
    public static class TypedDeserializeObject
    {
        [FunctionName("TypedDeserializeObject")]
        public static async Task<IActionResult> Run(
            [HttpTrigger(AuthorizationLevel.Function, "get", "post", Route = null)] string httpPayload,
            ILogger log)
        {
            UplinkPayload uplinkPayload;

            try
            {
                uplinkPayload = JsonConvert.DeserializeObject<UplinkPayload>(httpPayload);
            }
            catch(Exception ex) 
            {
                log.LogWarning(ex, "JsonConvert.DeserializeObject failed");

                return new BadRequestObjectResult(ex.Message);
            }

            log.LogInformation($"C# HTTP trigger function typed DeserializeObject UplinkPayload processed a request PacketId:{uplinkPayload.PacketId}");

            return new OkObjectResult("Hello, This HTTP triggered DeserializeObject function executed successfully.");
        }
    }
}
TypedDeserializeObject function failing because PacketId format was valid (x in numeric field)
TypedDeserializeObject function failing because deviceId value is negative but datatype is unsigned
Successful execution of TypedDeserializeObject function

The “TypedDeserializeObjectAnnotations” function worked, it ensured the Javascript Object Notation (JSON) was valid, the payload format was correct and enforced the System.ComponentModel.DataAnnotations attributes.

namespace WebhookHttpTrigger
{
    public static class TypedDeserializeObjectAnnotations
    {
        [FunctionName("TypedDeserializeObjectAnnotations")]
        public static async Task<IActionResult> Run(
            [HttpTrigger(AuthorizationLevel.Function, "get", "post", Route = null)] string httpPayload,
            ILogger log)
        {
            UplinkPayload uplinkPayload;

            try
            {
                uplinkPayload = JsonConvert.DeserializeObject<UplinkPayload>(httpPayload);
            }
            catch (Exception ex)
            {
                log.LogWarning(ex, "JsonConvert.DeserializeObject failed");

                return new BadRequestObjectResult(ex.Message);
            }

            var context = new ValidationContext(uplinkPayload, serviceProvider: null, items: null);

            var results = new List<ValidationResult>();

            var isValid = Validator.TryValidateObject(uplinkPayload, context, results,true);

            if (!isValid)
            {
                log.LogWarning("Validator.TryValidateObject failed results:{results}", results);

                return new BadRequestObjectResult(results);
            }

            log.LogInformation($"C# HTTP trigger function typed DeserializeObject UplinkPayload processed a request PacketId:{uplinkPayload.PacketId}");

            return new OkObjectResult("Hello, This HTTP triggered DeserializeObject function executed successfully.");
        }
    }
}

I built an ASP .NET Core WebAPI version with two uplink method implementations, one which used dependency injection (DI) and the other that didn’t. I also added code to validate the deserialisation of HiveRxTimeUtc.

....
[HttpPost]
public async Task<IActionResult> Post([FromBody] UplinkPayload payload)
{
    if ( payload.HiveRxTimeUtc == DateTime.MinValue)
    {
        _logger.LogWarning("HiveRxTimeUtc validation failed");

        return this.BadRequest();
    }

    QueueClient queueClient = _queueServiceClient.GetQueueClient("uplink");

    await queueClient.SendMessageAsync(Convert.ToBase64String(JsonSerializer.SerializeToUtf8Bytes(payload)));

    return this.Ok();
}
...
 [HttpPost]
public async Task<IActionResult> Post([FromBody] UplinkPayload payload)
{
    // Check that the post data is good
    if (!this.ModelState.IsValid)
    {
        _logger.LogWarning("QueuedController validation failed {0}", this.ModelState.ToString());

        return this.BadRequest(this.ModelState);
    }

    if ( payload.HiveRxTimeUtc == DateTime.MinValue)
    {
        _logger.LogWarning("HiveRxTimeUtc validation failed");

        return this.BadRequest();
    }

    try
    {
        QueueClient queueClient = new QueueClient(_configuration.GetConnectionString("AzureWebApi"), "uplink");

        //await queueClient.CreateIfNotExistsAsync();

        await queueClient.SendMessageAsync(Convert.ToBase64String(JsonSerializer.SerializeToUtf8Bytes(payload)));
    }
    catch (Exception ex)
    {
        _logger.LogError(ex,"Unable to open/create queue or send message", ex);

        return this.Problem("Unable to open queue (creating if it doesn't exist) or send message", statusCode: 500, title: "Uplink payload not sent");
    }

    return this.Ok();
}

In Telerik Fiddler I could see calls to the Azure Functions and the ASP .NET Core WebAPI were taking similar time to execute (Though I did see 5+ seconds) and the ASP .NET Core WebAPI appeared to take much longer to startup. (I did see 100+ seconds when I made four requests as the ASP .NET Core WebAPI was starting)

I’m going to use the ASP .NET Core WebAPI with dependency injection (DI) approach just because “it’s always better with DI”.

I noticed some other “oddness” while implementing then testing the Azure Http Trigger functions and ASP .NET Core WebAPI which I will cover off in some future posts.

Swarm Space – Underlying Architecture Revisited

After figuring out that calling a CS-Script uplink payload formatter inside an Azure Http Trigger function wasn’t going to work I needed a new architecture.

Swarm Space Azure IoT Connector Identity Translation Gateway Architecture

The new approach uses most of the existing building blocks but adds an Azure HTTP Trigger which receives the Swarm Space Bumble bee hive Webhook Delivery Method calls and writes them to an Azure Storage Queue.

Swarm Space Bumble bee hive Web Hook Delivery method

The uplink and downlink formatters are now called asynchronously so they have limited impact on the overall performance of the application.

Swarm Space – Uplink Payload Startup Problem

I initially noticed a couple of duplicate Swarm Space message PacketIds in Azure IoT Central.

Azure IoT Central with consecutive duplicate PacketIds

Then I started to pay more attention and noticed that duplicate PacketIds could be interleaved

Azure IoT Central with interleaved duplicate PacketIds

Shortly after noticing the interleaved PacketIds I checked the Delivery Method and found there were message delivery timeouts.

Swarm Space Delivery with method timeouts

In Azure Application Insights I could see that the UplinkController was taking up to 15 seconds to execute which was longer than the bumblebee hive delivery timeout.

Azure Application Insights displaying UplinkController metrics.

In Telerik Fiddler I could see calls to the UplinkController taking 16 seconds to execute. (I did see 30+ seconds)

Telerik Fiddler showing duration of Uplink controller calls

To see if the problem was loading CS-Script I added code to load a simple function as the application started. After averaging the duration over many executions there was little difference in the duration.

public interface IApplication
{
    public DateTime Startup(DateTime utcNow);
}
...
protected override async Task ExecuteAsync(CancellationToken cancellationToken)
{
    await Task.Yield();

    _logger.LogInformation("StartUpService.ExecuteAsync start");
            
    // Force the loading and startup of CS Script evaluator
    dynamic application = CSScript.Evaluator
        .LoadCode(
                @"using System;
                public class Application : IApplication
                {
                    public DateTime Startup(DateTime utcNow)
                    {
                        return utcNow;
                    }
                }");

    DateTime result = application.Startup(DateTime.UtcNow);
            
    try
    {
        await _swarmSpaceBumblebeeHive.Login(cancellationToken);

       await _azureIoTDeviceClientCache.Load(cancellationToken);
    }
    catch (Exception ex)
    {
        _logger.LogError(ex, "StartUpService.ExecuteAsync error");

        throw;
    }

    _logger.LogInformation("StartUpService.ExecuteAsync finish");
}

The Swarm Eval Kit uplink formatter (UserApplicationId 65535.cs) “unpacks” the uplink Javascript ObjectNotation(JSON) message, adds an Azure IoT Central compatible location which requires a number of libraries to be loaded.

using System;
using System.Globalization;
using System.Text;

using Microsoft.Azure.Devices.Client;

using Newtonsoft.Json;
using Newtonsoft.Json.Linq;

public class FormatterUplink : PayloadFormatter.IFormatterUplink
{
    public Message Evaluate(int organisationId, int deviceId, int deviceType, int userApplicationId, JObject telemetryEvent, JObject payloadJson, string payloadText, byte[] payloadBytes)
    {
        if ((payloadText != "") && (payloadJson != null))
        {
            JObject location = new JObject();

            location.Add("lat", payloadJson.GetValue("lt"));
            location.Add("lon", payloadJson.GetValue("ln"));
            location.Add("alt", payloadJson.GetValue("a"));

            telemetryEvent.Add("DeviceLocation", location);
        }

        Message ioTHubmessage = new Message(Encoding.ASCII.GetBytes(JsonConvert.SerializeObject(telemetryEvent)));

        ioTHubmessage.Properties.Add("iothub-creation-time-utc", DateTimeOffset.FromUnixTimeSeconds((long)payloadJson.GetValue("d")).ToString("s", CultureInfo.InvariantCulture));

        return ioTHubmessage;
    }
}

I then added code to load the most complex uplink and downlink formatters as the application started. There was a significant reduction in the UplinkController execution durations, but it could still take more than 30 seconds.

try
{
    await _swarmSpaceBumblebeeHive.Login(cancellationToken);

    await _azureIoTDeviceClientCache.Load(cancellationToken);

    await _formatterCache.UplinkGetAsync(65535);

    await _formatterCache.DownlinkGetAsync(20);
}
catch (Exception ex)
{
    _logger.LogError(ex, "StartUpService.ExecuteAsync error");

    throw;
}

I then added detailed telemetry to the code and found that the duration (also variability) was a combination of Azure IoT Device Provisoning Service(DPS) registration, Azure IoT Hub connection establishment, CS-Script payload formatter loading/compilation/execution, application startup tasks and message uploading durations.

After much experimentation It looks like that “synchronously” calling the payload processing code from the Uplink controller is not a viable approach as the Swarm Space Bumblebee hive calls regularly timeout resulting in duplicate messages.

Swarm Space – Uplink Payload Formatters revisited

The approach used in Swarm Space–Uplink Payload Message Creation Time had significant limitations e.g. setting the iothub-creation-time-utc message property.

public interface IFormatterUplink
{
    public Message Evaluate(int organisationId, int deviceId, int deviceType, int userApplicationId, JObject telemetryEvent, JObject payloadJson, string payloadText, byte[] payloadBytes);
}

Uplink payload formatters now return a Microsoft.Azure.Azure.Devices.Client message object to the UplinkController.

...
JObject telemetryEvent = new JObject
{
    { "packetId", payload.PacketId},
    { "deviceType" , payload.DeviceType},
    { "DeviceID", payload.DeviceId },
    { "organizationId", payload.OrganizationId },
    { "UserApplicationId", payload.UserApplicationId},
    { "ReceivedAtUtc", payload.HiveRxTime.ToString("s", CultureInfo.InvariantCulture) },
    { "DataLength", payload.Len },
    { "Data", payload.Data },
    { "Status", payload.Status },
};

    _logger.LogDebug("Uplink-DeviceId:{0} PacketId:{1} TelemetryEvent before:{0}", payload.DeviceId, payload.PacketId, JsonConvert.SerializeObject(telemetryEvent, Formatting.Indented));

    using (Message ioTHubmessage = swarmSpaceFormatterUplink.Evaluate(payload.OrganizationId, payload.DeviceId, context.DeviceType, payload.UserApplicationId, telemetryEvent, payloadJson, payloadText, payloadBytes))
{
    _logger.LogDebug("Uplink-DeviceId:{0} PacketId:{1} TelemetryEvent after:{0}", payload.DeviceId, payload.PacketId, JsonConvert.SerializeObject(telemetryEvent, Formatting.Indented));

    ioTHubmessage.Properties.Add("PacketId", payload.PacketId.ToString());
    ioTHubmessage.Properties.Add("OrganizationId", payload.OrganizationId.ToString());
    ioTHubmessage.Properties.Add("UserApplicationId", payload.UserApplicationId.ToString());
    ioTHubmessage.Properties.Add("DeviceId", payload.DeviceId.ToString());
    ioTHubmessage.Properties.Add("deviceType", payload.DeviceType.ToString());

    await deviceClient.SendEventAsync(ioTHubmessage);

    _logger.LogInformation("Uplink-DeviceID:{deviceId} PacketId:{1} SendEventAsync success", payload.DeviceId, payload.PacketId);
}
...

The default uplink payload formatter (UserApplicationId 0.cs) returns a Microsoft.Azure.Azure.Devices.Client message object with a serialised TelemetryEvent payload.

public class FormatterUplink : PayloadFormatter.IFormatterUplink
{
    public Message Evaluate(int organisationId, int deviceId, int deviceType, int userApplicationId, JObject telemetryEvent, JObject payloadJson, string payloadText, byte[] payloadBytes)
    {
        Message ioTHubmessage = new Message(Encoding.ASCII.GetBytes(JsonConvert.SerializeObject(telemetryEvent)));

        return ioTHubmessage;
    }
}

The Swarm Eval Kit uplink sample formatter (UserApplicationId 65535.cs) “unpacks” the uplink Javascript ObjectNotation(JSON) message, adds an Azure IoT Central compatible location to the TelemetryEvent and an “iothub-creation-time-utc” message property.

public class FormatterUplink : PayloadFormatter.IFormatterUplink
{
    public Message Evaluate(int organisationId, int deviceId, int deviceType, int userApplicationId, JObject telemetryEvent, JObject payloadJson, string payloadText, byte[] payloadBytes)
    {
        if ((payloadText != "") && (payloadJson != null))
        {
            JObject location = new JObject();

            location.Add("lat", payloadJson.GetValue("lt"));
            location.Add("lon", payloadJson.GetValue("ln"));
            location.Add("alt", payloadJson.GetValue("a"));

            telemetryEvent.Add("DeviceLocation", location);
        }

        Message ioTHubmessage = new Message(Encoding.ASCII.GetBytes(JsonConvert.SerializeObject(telemetryEvent)));

        ioTHubmessage.Properties.Add("iothub-creation-time-utc", DateTimeOffset.FromUnixTimeSeconds((long)payloadJson.GetValue("d")).ToString("s", CultureInfo.InvariantCulture));

        return ioTHubmessage;
    }
}

If the uplink formatter compilation or execution fails, a detailed exception message is logged to Azure Application Insights

Detailed compilation error message in Application Insights

I need to add some tools to make the creation, modification, deletion and debugging of downlink/uplink formatters easier.

Swarm Space – Uplink Payload Message Creation Time

The Swarm Space satellite constellation doesn’t have continuous coverage (Jan 2023) so messages sent when there is no coverage are queued (default 48hrs) by the Swarm M138 Modem for transmission when a satellite passes overhead.

Satellite Passes with gap in coverage from 16:18 to 18:42 highlighted

In the Swarm Hive Delivery Method messages from the Swarm Eval Kit and Swarm Tracker in my backyard arriving in “clusters”.

Swarm Hive Delivery Methods webhook calls.

The messages in each “cluster” were processed by a payload formatter then forwarded to Azure IoT Central for processing. All the messages in a cluster had similar event creation times which was “breaking” graphs and device tracking maps. After running the application locally using Telerik Fiddler to try different payloads I realised that the Microsoft.Azure.Azure.Devices.Client message iothub-creation-time-utc property was set to the when the message was received by Swarm Space infrastructure.

_logger.LogDebug("Uplink-DeviceId:{0} PacketId:{1} TelemetryEvent before:{0}", payload.DeviceId, payload.PacketId, JsonConvert.SerializeObject(telemetryEvent, Formatting.Indented));

telemetryEvent = swarmSpaceFormatterUplink.Evaluate(telemetryEvent, payload.Data, payloadBytes, payloadText, payloadJson);

_logger.LogDebug("Uplink-DeviceId:{0} PacketId:{1} TelemetryEvent after:{0}", payload.DeviceId, payload.PacketId, JsonConvert.SerializeObject(telemetryEvent, Formatting.Indented));

// Send the message to Azure IoT Hub
using (Message ioTHubmessage = new Message(Encoding.ASCII.GetBytes(JsonConvert.SerializeObject(telemetryEvent))))
{
   // Ensure the displayed time is the acquired time rather than the uploaded time. 
   ioTHubmessage.Properties.Add("iothub-creation-time-utc", payload.HiveRxTime.ToString("s", CultureInfo.InvariantCulture));
   ioTHubmessage.Properties.Add("PacketId", payload.PacketId.ToString());
   ioTHubmessage.Properties.Add("OrganizationId", payload.OrganizationId.ToString());
   ioTHubmessage.Properties.Add("ApplicationId", payload.UserApplicationId.ToString());
   ioTHubmessage.Properties.Add("DeviceId", payload.DeviceId.ToString());
   ioTHubmessage.Properties.Add("deviceType", payload.DeviceType.ToString());

   await deviceClient.SendEventAsync(ioTHubmessage);

   _logger.LogInformation("Uplink-DeviceID:{deviceId} SendEventAsync success", payload.DeviceId);
}

The Swarm Eval Kit uplink (JSON) message generated by the sample firmware “d” field is the number of seconds since the Unix Epoch that the message payload was constructed.

Swarm Hive Messages with “d” field in the JSON payload highlighted
Online Unix Epoch Convertor displaying Unix Epoch 1672561286 in NZDT and UTC time

The revised 65355.cs payload formatter adds an “iothub-creation-time-utc” field to the TelemetryEvent

using System;
using System.Globalization;

using Newtonsoft.Json.Linq;

public class FormatterUplink : PayloadFormatter.IFormatterUplink
{
    public JObject Evaluate(JObject telemetryEvent, string payloadBase64, byte[] payloadBytes, string payloadText, JObject payloadJson)
    {
        if ((payloadText != "" ) && ( payloadJson != null))
        {
            JObject location = new JObject();

            location.Add("lat", payloadJson.GetValue("lt"));
            location.Add("lon", payloadJson.GetValue("ln"));
            location.Add("alt", payloadJson.GetValue("a"));

            telemetryEvent.Add("DeviceLocation", location);
        };

        telemetryEvent.Add("iothub-creation-time-utc", DateTimeOffset.FromUnixTimeSeconds((long)payloadJson.GetValue("d")).ToString("s", CultureInfo.InvariantCulture));

        return telemetryEvent;
    }
}

Which, if present is used to populate theMicrosoft.Azure.Azure.Devices.Client message iothub-creation-time-utc property

_logger.LogDebug("Uplink-DeviceId:{0} PacketId:{1} TelemetryEvent before:{0}", payload.DeviceId, payload.PacketId, JsonConvert.SerializeObject(telemetryEvent, Formatting.Indented));

telemetryEvent = swarmSpaceFormatterUplink.Evaluate(telemetryEvent, payload.Data, payloadBytes, payloadText, payloadJson);

.LogDebug("Uplink-DeviceId:{0} PacketId:{1} TelemetryEvent after:{0}", payload.DeviceId, payload.PacketId, JsonConvert.SerializeObject(telemetryEvent, Formatting.Indented));

Send the message to Azure IoT Hub
using (Message ioTHubmessage = new Message(Encoding.ASCII.GetBytes(JsonConvert.SerializeObject(telemetryEvent))))
{
   // Ensure the displayed time is the acquired time rather than the uploaded time. 
   ioTHubmessage.Properties.Add("PacketId", payload.PacketId.ToString());
   ioTHubmessage.Properties.Add("OrganizationId", payload.OrganizationId.ToString());
   ioTHubmessage.Properties.Add("UserApplicationId", payload.UserApplicationId.ToString());
   ioTHubmessage.Properties.Add("DeviceId", payload.DeviceId.ToString());
   ioTHubmessage.Properties.Add("deviceType", payload.DeviceType.ToString());

   if (telemetryEvent.ContainsKey("iothub-creation-time-utc"))
   {
      ioTHubmessage.Properties.Add("iothub-creation-time-utc",telemetryEvent.Value<string>("iothub-creation-time-utc"));
   }

   await deviceClient.SendEventAsync(ioTHubmessage);

   _logger.LogInformation("Uplink-DeviceID:{deviceId} SendEventAsync success", payload.DeviceId);
}

The Azure IoT Central message now had the correct timestamp and “event creation time” values.

AzureIoT Central “Raw Data” with valid timestamp and event creation times

I don’t think this is a good solution

The design of the payload formatters will have to be revisited

Swarm Space – Uplink Payload formatter caching and files

The payload formatters of my Azure IoT Hub Cloud Identity Translation Gateway use CS-Script and even a simple one was taking more than half a second to compile each time it was called.

using System;
using System.Globalization;

using Newtonsoft.Json.Linq;

public class FormatterUplink : PayloadFormatter.IFormatterUplink
{
    public JObject Evaluate(JObject telemetryEvent, string payloadBase64, byte[] payloadBytes, string payloadText, JObject payloadJson)
    {
        if ((payloadText != "" ) && ( payloadJson != null))
        {
            JObject location = new JObject();

            location.Add("lat", payloadJson.GetValue("lt"));
            location.Add("lon", payloadJson.GetValue("ln"));
            location.Add("alt", payloadJson.GetValue("a"));

            telemetryEvent.Add("Location", location);
        };

        return telemetryEvent;
    }
}

The Swarm Eval Kit default message has a userApplicationId of 65335

{"ln":123.456,"si":0.0,"bi":0.2,"sv":0.152,"lt":-12.345,"bv":4.032,"d":1671704370,"n":2,"a":9.0,"s":1.0,"c":208.0,"r":-94,"ti":0.032}

The 65355.cs payload formatter adds an Azure IoT Central compatible location to the telemetry payload.

Azure IoT Central uplink telemetry message payload

The formatter files are currently part of the SwarmSpaceAzureIoTConnector project (moving to Azure Blob Storage) so are configured as “content” (bonus syntax highlighting works) and “copy if newer” so they are included in the deployment package.

Visual Studio 2022 Sample payload formatter

I used Alastair Crabtrees’s LazyCache to store compiled payload formatters with Uplink/Downlink + UserApplicationId as the cache key.

public class FormatterCache : IFormatterCache
{
    private readonly ILogger<FormatterCache> _logger;
    private readonly Models.ApplicationSettings _applicationSettings;
    private readonly static IAppCache _payloadFormatters = new CachingService();

    public FormatterCache(ILogger<FormatterCache>logger, IOptions<Models.ApplicationSettings> applicationSettings)
    {
        _logger = logger;
        _applicationSettings = applicationSettings.Value;
    }

    public async Task<IFormatterUplink> UplinkGetAsync(int userApplicationId)
    {
        IFormatterUplink payloadFormatterUplink = await _payloadFormatters.GetOrAddAsync<PayloadFormatter.IFormatterUplink>($"U{userApplicationId}", (ICacheEntry x) => UplinkLoadAsync(userApplicationId), memoryCacheEntryOptions);

        return payloadFormatterUplink;
    }

    private async Task<IFormatterUplink> UplinkLoadAsync(int userApplicationId)
    {
        string payloadformatterFilePath = $"{_applicationSettings.PayloadFormattersUplinkFilePath}\\{userApplicationId}.cs";

        if (!File.Exists(payloadformatterFilePath))
        {
            _logger.LogInformation("PayloadFormatterUplink- UserApplicationId:{0} PayloadFormatterPath:{1} not found using default:{2}", userApplicationId, payloadformatterFilePath, _applicationSettings.PayloadFormatterUplinkDefault);

            return CSScript.Evaluator.LoadFile<PayloadFormatter.IFormatterUplink>(_applicationSettings.PayloadFormatterUplinkDefault);
        }

        _logger.LogInformation("PayloadFormatterUplink- UserApplicationId:{0} loading PayloadFormatterPath:{1}", userApplicationId, payloadformatterFilePath);

        return CSScript.Evaluator.LoadFile<PayloadFormatter.IFormatterUplink>(payloadformatterFilePath);
    }
...
}

The default uplink and downlink formatters are configured in application settings and are used when a UserApplicationId specific formatter is not configured.

Fiddler Composer illustrating compiled formatter timings before and after caching

Swarm Space – Uplink Payload formatter Proof of Concept(PoC)

My Azure IoT Hub Cloud Identity Translation Gateway will support the translation of Base64 encoded uplink payloads to Javascript Object Notation (JSON) so they can be processed by Azure IoT Hub client applications and Azure IoT Central. This PoC uses CS-Script by Oleg Shilo to transform the Swarm Eval Kit Base64 encoded JSON uplink messages.

Swarm Hive message list with a message payload

A sample decoded (JSON) Swarm Eval Kit uplink message

{"ln":123.456,"si":0.0,"bi":0.2,"sv":0.152,"lt":-12.345,"bv":4.032,"d":1671704370,"n":2,"a":9.0,"s":1.0,"c":208.0,"r":-94,"ti":0.032}

A Webhook Delivery method forwards uplink messages to my Azure IoT Hub Cloud Identity Translation Gateway.

Swarm Hive Delivery configuration with recent uplink messages

My first hard-coded payload formatter adds an Azure IoT Central compatible location to the telemetry event payload.

const string codeSwarmSpaceUplinkFormatterCode = @"
   using Newtonsoft.Json.Linq;

   public class UplinkFormatter : PayloadFormatter.ISwarmSpaceFormatterUplink
   {
       public JObject Evaluate(JObject telemetryEvent, string payloadBase64, byte[] payloadBytes, string payloadText, JObject payloadJson)
       {
           if ((payloadText != """" ) && ( payloadJson != null))
           {
               JObject location = new JObject() ;

               location.Add(""Lat"", payloadJson.GetValue(""lt""));
               location.Add(""Lon"", payloadJson.GetValue(""ln""));
               location.Add(""Alt"", payloadJson.GetValue(""a""));

               telemetryEvent.Add( ""location"", location);
           };

           return telemetryEvent;
       }
   }";
}

The PayloadFormatter namespace was added to reduce the length of the payload formatter C# interface declarations.

namespace PayloadFormatter 
{
    using Newtonsoft.Json.Linq;

    public interface ISwarmSpaceFormatterUplink
    {
        public JObject Evaluate(JObject telemetry, string payloadBase64, byte[] payloadBytes, string payloadText, JObject payloadJson);
    }

    public interface ISwarmSpaceFormatterDownlink
    {
        public string Evaluate(JObject payloadJson, string payloadText, byte[] payloadBytes, string payloadBase64);
    }
}

namespace devMobile.IoT.SwarmSpace.AzureIoT.Connector
{
    using System.Threading.Tasks;
    using Microsoft.Extensions.Logging;

    using CSScriptLib;

    using PayloadFormatter;

    public interface ISwarmSpaceFormatterCache
    {
        public Task<ISwarmSpaceFormatterUplink> PayloadFormatterGetOrAddAsync(int userApplicationId);

    }

    public class SwarmSpaceFormatterCache : ISwarmSpaceFormatterCache
    {
        private readonly ILogger<SwarmSpaceFormatterCache> _logger;

        public SwarmSpaceFormatterCache(ILogger<SwarmSpaceFormatterCache>logger)
        {
            _logger = logger;
        }

        public async Task<ISwarmSpaceFormatterUplink> PayloadFormatterGetOrAddAsync(int deviceId)
        {
            return CSScript.Evaluator.LoadCode<PayloadFormatter.ISwarmSpaceFormatterUplink>(codeSwarmSpaceUplinkFormatterCode);
        }
...
}

The parameters of the formatter are Base64 encoded, textual and a Newtonsoft JObject representations of the uplink payload and a telemetry event populated with some uplink message metadata.

Azure IoT Central uplink telemetry message payload

The initial “compile” of an uplink formatter was taking approximately 2.1 seconds so they will be “compiled” on demand and cached in a Dictionary with the UserApplicationId as the key. A default uplink formatter will be used when a UserApplicationId specific uplink formatter is not configured.