Azure Event Grid esp-mqtt-arduino Client – Success

Still couldn’t figure out why my code was failing so I turned up logging to 11 and noticed a couple of messages which didn’t make sense. The device was connecting than disconnecting which indicated a another problem. As part of the Message Queue Telemetry Transport(MQTT) specification there is a “feature” Last Will and Testament(LWT) which a client can configure so that the MQTT broker sends a message to a topic if the device disconnects unexpectedly.

I was looking at the code and noticed that LWT was being used and that the topic didn’t exist in my Azure Event Grid MQTT Broker namespace. When the LWT configuration was commented out the application worked.

void Mqtt5ClientESP32::begin(const char* uri, const char* client_id, const char* user, const char* pass, bool use_v5) {
  connected_ = false;
  insecure_ = false;
  cfg_.broker.address.uri = uri;
  if (client_id) cfg_.credentials.client_id = client_id;
  if (user)      cfg_.credentials.username  = user;
  if (pass)      cfg_.credentials.authentication.password = pass;

  cfg_.broker.verification.use_global_ca_store = false;
  cfg_.broker.verification.certificate = nullptr;
  cfg_.broker.verification.certificate_len = 0;
  cfg_.broker.verification.skip_cert_common_name_check = false;
  
/*
  cfg_.session.last_will.topic  = "devices/esp32/lwt";
  cfg_.session.last_will.msg    = "offline";
  cfg_.session.last_will.qos    = 1;
  cfg_.session.last_will.retain = true;
*/

cfg_.session.protocol_ver = 
#if CONFIG_MQTT_PROTOCOL_5
      use_v5 ? MQTT_PROTOCOL_V_5 : MQTT_PROTOCOL_V_3_1_1;
#else
      MQTT_PROTOCOL_V_3_1_1;
  (void)use_v5;  // MQTT v5 support disabled at build time
#endif
}

Two methods were added so that the LWT could be configured if required

void SetLWT(const char *topic, const char *msg, int msg_len,int qos, int retain);
void Mqtt5ClientESP32::SetLWT(const char *topic, const char *msg, int msg_len,int qos, int retain){
   cfg_.session.last_will.topic  = topic;
   cfg_.session.last_will.msg    = msg;
   cfg_.session.last_will.msg_len= msg_len;
   cfg_.session.last_will.qos    = qos;
   cfg_.session.last_will.retain = retain;
}

Paying close attention to the logging I noticed the “Subscribing to ssl/mqtts” followed by “Subscribe request sent”

I checked the sample application and found that if the connect was successful the application would then try and subscribe to a topic that didn’t exist.

mqtt.onConnected([]{
  Serial.println("[MQTT] Connected event");

   mqttReady = true;
/*
Serial.println("[MQTT] Subscribing to ssl/mqtt5");
if (mqtt.subscribe("ssl/mqtt5", 1, true)) {
  Serial.println("[MQTT] Subscribe request sent");
} else {
  Serial.println("[MQTT] Subscribe request failed");
}
*/

I commented out that code and the application started without any messages

Just to make sure I checked that the message count in the Azure Storage Queue was increasing and the payload client ID matched my device

Yet again a couple of hours lost from my life which I can never get back

Myriota Connector – UplinkMessageProcessor Queue Output Binding

The myriota Azure IoT Hub Cloud Identity Translation Gateway uplink message handler Azure Storage Queue Trigger Function wasn’t processing “transient” vs. “permanent” failures well. Sometimes a “permanent” failure message would be retried multiple times by the function runtime before getting moved to the poison queue.

After some experimentation using an Azure Storage Queue Function Output binding to move messages to the poison queue looked like a reasonable approach. (Though, returning null to indicate the message should be removed from the queue was not obvious from the documentation)

[Function("UplinkMessageProcessor")]
[QueueOutput(queueName: "uplink-poison", Connection = "UplinkQueueStorage")]
public async Task<Models.UplinkPayloadQueueDto> UplinkMessageProcessor([QueueTrigger(queueName: "uplink", Connection = "UplinkQueueStorage")] Models.UplinkPayloadQueueDto payload, CancellationToken cancellationToken)
{
...
   // Process each packet in the payload. Myriota docs say only one packet per payload but just incase...
   foreach (Models.QueuePacket packet in payload.Data.Packets)
   {
      // Lookup the device client in the cache or create a new one
      Models.DeviceConnectionContext context;

      try
      {
         context = await _deviceConnectionCache.GetOrAddAsync(packet.TerminalId, cancellationToken);
      }
      catch (DeviceNotFoundException dnfex)
      {
         _logger.LogError(dnfex, "Uplink- PayloadId:{0} TerminalId:{1} terminal not found", payload.Id, packet.TerminalId);

         return payload;
      }
      catch (Exception ex) // Maybe just send to poison queue or figure if transient error?
      {
         _logger.LogError(ex, "Uplink- PayloadId:{0} TerminalId:{1} ", payload.Id, packet.TerminalId);

         throw;
      }
...
         // Proccessing successful, message can be deleted by QueueTrigger plumbing
         return null;
      }

After building and testing an Azure Storage Queue Function Output binding implementation I’m not certain that it is a good approach. The code is a bit “chunky” and I have had to implement more of the retry process logic.

Myriota device Uplink Serialisation

The Myriota Developer documentation has some sample webhook data payloads so I used JSON2csharp to generate a Data Transfer Object(DTO) to deserialise payload. The format of the message is a bit “odd”, the “Data “Value” contains an “escaped” JSON object.

{
  "EndpointRef": "ksnb8GB_TuGj:__jLfs2BQJ2d",
  "Timestamp": 1692928585,
  "Data": "{"Packets": [{"Timestamp": 1692927646796, "TerminalId": "0001020304", "Value": "00008c9512e624cce066adbae764cccccccccccc"}]}",
  "Id": "a5c1bffe-4b62-4233-bbe9-d4ecc4f8b6cb",
  "CertificateUrl": "https://security.myriota.com/data-13f7751f3c5df569a6c9c42a9ce73a8a.crt",
  "Signature": "FDJpQdWHwCY+tzCN/WvQdnbyjgu4BmP/t3cJIOEF11sREGtt7AH2L9vMUDji6X/lxWBYa4K8tmI0T914iPyFV36i+GtjCO4UHUGuFPJObCtiugVV8934EBM+824xgaeW8Hvsqj9eDeyJoXH2S6C1alcAkkZCVt0pUhRZSZZ4jBJGGEEQ1Gm+SOlYjC2exUOf0mCrI5Pct+qyaDHbtiHRd/qNGW0LOMXrB/9difT+/2ZKE1xvDv9VdxylXi7W0/mARCfNa0J6aWtQrpvEXJ5w22VQqKBYuj3nlGtL1oOuXCZnbFYFf4qkysPaXON31EmUBeB4WbZMyPaoyFK0wG3rwA=="
}
namespace devMobile.IoT.myriotaAzureIoTConnector.myriota.UplinkWebhook.Models
{
    public class UplinkPayloadWebDto
    {
        public string EndpointRef { get; set; }
        public long Timestamp { get; set; } 
        public string Data { get; set; } // Embedded JSON ?
        public string Id { get; set; }
        public string CertificateUrl { get; set; }
        public string Signature { get; set; }
    }
}

The UplinkWebhook controller “automagically” deserialises the message, then in code the embedded JSON is deserialised and “unpacked”, finally the processed message is inserted into an Azure Storage queue.

namespace devMobile.IoT.myriotaAzureIoTConnector.myriota.UplinkWebhook.Controllers
{
    [Route("[controller]")]
    [ApiController]
    public class UplinkController : ControllerBase
    {
        private readonly Models.ApplicationSettings _applicationSettings;
        private readonly ILogger<UplinkController> _logger;
        private readonly QueueServiceClient _queueServiceClient;

        public UplinkController(IOptions<Models.ApplicationSettings> applicationSettings, QueueServiceClient queueServiceClient, ILogger<UplinkController> logger)
        {
            _applicationSettings = applicationSettings.Value;
            _queueServiceClient = queueServiceClient;
            _logger = logger;
        }

        [HttpPost]
        public async Task<IActionResult> Post([FromBody] Models.UplinkPayloadWebDto payloadWeb)
        {
            _logger.LogInformation("SendAsync queue name:{QueueName}", _applicationSettings.QueueName);

            QueueClient queueClient = _queueServiceClient.GetQueueClient(_applicationSettings.QueueName);

            var serializeOptions = new JsonSerializerOptions
            {
                WriteIndented = true,
                Encoder = System.Text.Encodings.Web.JavaScriptEncoder.UnsafeRelaxedJsonEscaping
            };

            await queueClient.SendMessageAsync(Convert.ToBase64String(JsonSerializer.SerializeToUtf8Bytes(payloadWeb, serializeOptions)));

            return this.Ok();
        }
    }
}

The webhook application uses the QueueClientBuilderExtensions and AddServiceClient so a QueueServiceClient can be injected into the webhook controller.

namespace devMobile.IoT.myriotaAzureIoTConnector.myriota.UplinkWebhook
{
    public class Program
    {
        public static void Main(string[] args)
        {
            var builder = WebApplication.CreateBuilder(args);

            // Add services to the container.
            builder.Services.AddControllers();

            builder.Services.AddApplicationInsightsTelemetry(i => i.ConnectionString = builder.Configuration.GetConnectionString("ApplicationInsights"));

            builder.Services.Configure<Models.ApplicationSettings>(builder.Configuration.GetSection("Application"));

            builder.Services.AddAzureClients(azureClient =>
            {
                azureClient.AddQueueServiceClient(builder.Configuration.GetConnectionString("AzureWebApi"));
            });

            var app = builder.Build();

            // Configure the HTTP request pipeline.

            app.UseHttpsRedirection();

            app.MapControllers();

            app.Run();
        }
    }
}

After debugging the application on my desktop with Telerik fiddler I deployed the application to one of my Azure subscriptions.

Azure Resource Group for the myriota Azure IoT Connector
Adding a new Destination in the myriota device manager

As part of configuring a new device test messages can be sent to the configured destinations.

Testing a new Destination in the myriota device manager
{
  "EndpointRef": "N_HlfTNgRsqe:uyXKvYTmTAO5",
  "Timestamp": 1563521870,
  "Data": "{"Packets": [{"Timestamp": 1563521870359,
    "TerminalId": "f74636ec549f9bde50cf765d2bcacbf9",
    "Value": "0101010101010101010101010101010101010101"}]}",
  "Id": "fe77e2c7-8e9c-40d0-8980-43720b9dab75",
  "CertificateUrl":    "https://security.myriota.com/data-13f7751f3c5df569a6c9c42a9ce73a8a.crt",
  "Signature": "k2OIBppMRmBT520rUlIvMxNg+h9soJYBhQhOGSIWGdzkppdT1Po2GbFr7jbg..."
}

The DTO generated with JSON2csharp needed some manual “tweaking” after examining how a couple of the sample messages were deserialised.

Azure Storage Explorer messages

I left the Myriota Developer Toolkit device (running the tracker sample) outside overnight and the following day I could see with Azure Storage Explorer a couple of messages in the Azure Storage Queue

Myriota device configuration

For a couple of weeks Myriota Developer Toolkit has been sitting under my desk and today I got some time to setup a device, register it, then upload some data.

Myriota Developer Toolkit

The first step was to download and install the Myriota Configurator so I could get the device registration information and install the tracker example application.

Using Windows File Explorer to “unblock” the downloaded file

After “unblocking” the zip file and upgrading my pip install the install script worked.

Myriota Configurator installation script

The application had to be run from the command line with “python MyriotaConfigurator.py”

Myriota Configurator main menu
Myriota Configurator retrieving device registration code

On the device I’m using the Tracker sample application to generate some sample payloads.

Myriota Configurator downloading tracker sample to device

The next step was to “register” my device and configure the destination(s) for its messages.

Myriota Device Manager Device configuration

Once the device and device manager configuration were sorted, I put the Tracker out on the back lawn on top of a large flowerpot.

Device Manager Access Times

On the “Access Times” page I could see that there were several periods when a satellite was overhead and overnight a couple of messages were uploaded.

Azure Functions Isolated Worker support for VB.Net 4.8

As part of my “day job” I spend a bit of time working with VB.Net 4.X “legacy” projects doing upgrades, and bug fixes. Currently I am updating a number of Windows Service applications to run as Microsoft Azure Functions. With the release of the Azure functions runtime V4 Isolated Worker Processes with .NET Framework V4.8 support this is the last post in my Azure Functions with VB.Net 4.X and Azure Functions with VB.Net on .NET Core V6 series.

I have published source code for Azure Storage BlobTrigger, Azure Storage QueueTrigger, and TimerTriggers.

Visual Studio Solution explorer Azure Functions projects

All of the examples now have a program.vb which initialises the Trigger.

Namespace VBNet....TriggerIsolated
    Friend Class Program
        Public Shared Sub Main(ByVal args As String())
            Call FunctionsDebugger.Enable()

            Dim host = New HostBuilder().ConfigureFunctionsWorkerDefaults().Build()

            host.Run()
        End Sub
    End Class
End Namespace

All of the Isolated worker process Triggers displayed this message which appeared to be benign.

Csproj not found in C:\Users\..\VBNetHttpTriggerIsolated\bin\Debug\net48 directory tree. Skipping user secrets file configuration.

There were a lot of articles about problems building Docker images but the only relevant ones appeared to talk about getting F# and other .NET Core languages to work in Azure Functions.

Namespace devMobile.Azure.VBNetBlobTriggerIsolated
    Public Class BlobTrigger
        Private ReadOnly _logger As ILogger

        Public Sub New(ByVal loggerFactory As ILoggerFactory)
            _logger = loggerFactory.CreateLogger(Of BlobTrigger)()
        End Sub

        <[Function]("vbnetblobtriggerisolated")>
        Public Sub Run(
        <BlobTrigger("vbnetblobtriggerisolated/{name}", Connection:="blobendpoint")> ByVal myBlob As String, ByVal name As String)

            _logger.LogInformation($"VB.Net NET 4.8 Isolated Blob trigger function Processed blob Name: {name}  Data: {myBlob}")
        End Sub
    End Class
End Namespace

I used Azure Storage Explorer to upload files containing Lorem Ipsum for testing the BlobTrigger.

Azure BlobTrigger function running in the desktop emulator
Azure BlobTrigger Function logging in Application Insights

I used Telerik Fiddler to POST messages to the desktop emulator and Azure endpoints.

Namespace VBNetHttpTriggerIsolated
    Public Class HttpTrigger
        Private Shared executionCount As Int32
        Private ReadOnly _logger As ILogger

        Public Sub New(ByVal loggerFactory As ILoggerFactory)
            _logger = loggerFactory.CreateLogger(Of HttpTrigger)()
        End Sub

        <[Function]("Notifications")>
        Public Function Run(
        <HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")> ByVal req As HttpRequestData) As HttpResponseData
            Interlocked.Increment(executionCount)

            _logger.LogInformation("VB.Net NET 4.8 Isolated HTTP trigger Execution count:{executionCount} Method:{req.Method}", executionCount, req.Method)

            Dim response = req.CreateResponse(HttpStatusCode.OK)
            response.Headers.Add("Content-Type", "text/plain; charset=utf-8")

            Return response
        End Function
    End Class
End Namespace
Azure HttpTrigger Function running in the desktop emulator
Azure HttpTrigger Function logging in Application Insights

I used Azure Storage Explorer to create messages for testing the QueueTrigger

Namespace devMobile.Azure.VBNetQueueTriggerIsolated

    Public Class QueueTrigger
        Private Shared _logger As ILogger
        Private Shared _concurrencyCount As Integer = 0
        Private Shared _executionCount As Integer = 0

        Public Sub New(ByVal loggerFactory As ILoggerFactory)
            _logger = loggerFactory.CreateLogger(Of QueueTrigger)()
        End Sub

        <[Function]("VBNetQueueTriggerIsolated")>
        Public Sub Run(
        <QueueTrigger("vbnetqueuetriggerisolated", Connection:="QueueEndpoint")> ByVal message As String)
            Interlocked.Increment(_concurrencyCount)
            Interlocked.Increment(_executionCount)

            _logger.LogInformation("VB.Net .NET 4.8 Isolated Queue Trigger Concurrency:{_concurrencyCount} ExecutionCount:{_executionCount} Message:{message}", _concurrencyCount, _executionCount, message)

            Interlocked.Decrement(_concurrencyCount)
        End Sub
    End Class
End Namespace
Azure QueueTrigger Function running in the desktop emulator
Azure QueueTrigger Function logging in Application Insights
Namespace devMobile.Azure.VBNetTimerTriggerIsolated
    Public Class TimerTrigger
        Private Shared _logger As ILogger
        Private Shared _executionCount As Integer = 0

        Public Sub New(ByVal loggerFactory As ILoggerFactory)
            _logger = loggerFactory.CreateLogger(Of TimerTrigger)()
        End Sub

        <[Function]("Timer")>
        Public Sub Run(
        <TimerTrigger("0 */1 * * * *")> ByVal myTimer As MyInfo)

            Interlocked.Increment(_executionCount)
            _logger.LogInformation("VB.Net Isolated TimerTrigger next trigger:{0} Execution count:{1}", myTimer.ScheduleStatus.Next, _executionCount)
        End Sub
    End Class
Azure TimerTrigger Function running in the desktop emulator
Azure TimerTrigger Function logging in Application Insights

The development, debugging and deployment of these functions took a lot of time. Initially Azure Application Insights didn’t work when the Azure Isolated Worker triggers were deployed to Azure. After some experimentation I found that Application Insights Connection Strings worked and Application Instrumentation Keys did not.

With the Microsoft: ‘We Do Not Plan to Evolve Visual Basic as a Language this should hopefully be my last post about VB.Net ever.

Swarm Space – Underlying Architecture sorted

After figuring out that calling an Azure Http Trigger function to load the cache wasn’t going to work reliably, I have revisited the architecture one last time and significantly refactored the SwarmSpaceAzuureIoTConnector project.

Visual Studio 2022 solution

The application now has a StartUpService which loads the Azure DeviceClient cache (Lazy Cache) in the background as the application starts up. If an uplink message is received from a SwarmDevice before, it has been loaded by the FunctionsStartup the DeviceClient information is cached and another connection to the Azure IoT Hub is not established.

...
using Microsoft.Azure.Functions.Extensions.DependencyInjection;

[assembly: FunctionsStartup(typeof(devMobile.IoT.SwarmSpaceAzureIoTConnector.Connector.StartUpService))]
namespace devMobile.IoT.SwarmSpaceAzureIoTConnector.Connector
{
...
    public class StartUpService : BackgroundService
    {
        private readonly ILogger<StartUpService> _logger;
        private readonly ISwarmSpaceBumblebeeHive _swarmSpaceBumblebeeHive;
        private readonly Models.ApplicationSettings _applicationSettings;
        private readonly IAzureDeviceClientCache _azureDeviceClientCache;

        public StartUpService(ILogger<StartUpService> logger, IAzureDeviceClientCache azureDeviceClientCache, ISwarmSpaceBumblebeeHive swarmSpaceBumblebeeHive, IOptions<Models.ApplicationSettings> applicationSettings)//, IOptions<Models.AzureIoTSettings> azureIoTSettings)
        {
            _logger = logger;
            _azureDeviceClientCache = azureDeviceClientCache;
            _swarmSpaceBumblebeeHive = swarmSpaceBumblebeeHive;
            _applicationSettings = applicationSettings.Value;
        }

        protected override async Task ExecuteAsync(CancellationToken cancellationToken)
        {
            await Task.Yield();

            _logger.LogInformation("StartUpService.ExecuteAsync start");

            try
            {
                _logger.LogInformation("BumblebeeHiveCacheRefresh start");

                foreach (SwarmSpace.BumblebeeHiveClient.Device device in await _swarmSpaceBumblebeeHive.DeviceListAsync(cancellationToken))
                {
                    _logger.LogInformation("BumblebeeHiveCacheRefresh DeviceId:{DeviceId} DeviceName:{DeviceName}", device.DeviceId, device.DeviceName);

                    Models.AzureIoTDeviceClientContext context = new Models.AzureIoTDeviceClientContext()
                    {
                        OrganisationId = _applicationSettings.OrganisationId,
                        DeviceType = (byte)device.DeviceType,
                        DeviceId = (uint)device.DeviceId,
                    };

                    await _azureDeviceClientCache.GetOrAddAsync(context.DeviceId, context);
                }

                _logger.LogInformation("BumblebeeHiveCacheRefresh finish");
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "StartUpService.ExecuteAsync error");

                throw;
            }

            _logger.LogInformation("StartUpService.ExecuteAsync finish");
        }
    }
}

The uplink and downlink payload formatters are stored in Azure Blob Storage are compiled (CS-Script) as they are loaded then cached (Lazy Cache)

Azure Storage explorer displaying list of uplink payload formatter blobs.
Azure Storage explorer displaying list of downlink payload formatter blobs.
private async Task<IFormatterDownlink> DownlinkLoadAsync(int userApplicationId)
{
    BlobClient blobClient = new BlobClient(_payloadFormatterConnectionString, _applicationSettings.PayloadFormattersDownlinkContainer, $"{userApplicationId}.cs");

    if (!await blobClient.ExistsAsync())
    {
        _logger.LogInformation("PayloadFormatterDownlink- UserApplicationId:{0} Container:{1} not found using default:{2}", userApplicationId, _applicationSettings.PayloadFormattersUplinkContainer, _applicationSettings.PayloadFormatterUplinkBlobDefault);

        blobClient = new BlobClient(_payloadFormatterConnectionString, _applicationSettings.PayloadFormatterDownlinkBlobDefault, _applicationSettings.PayloadFormatterDownlinkBlobDefault);
    }

    BlobDownloadResult downloadResult = await blobClient.DownloadContentAsync();

    return CSScript.Evaluator.LoadCode<PayloadFormatter.IFormatterDownlink>(downloadResult.Content.ToString());
}

The uplink and downlink formatters can be edited in Visual Studio 2022 with syntax highlighting (currently they have to be manually uploaded).

The SwarmSpaceBumbleebeehive module no longer has public login or logout methods.

    public interface ISwarmSpaceBumblebeeHive
    {
        public Task<ICollection<Device>> DeviceListAsync(CancellationToken cancellationToken);

        public Task SendAsync(uint organisationId, uint deviceId, byte deviceType, ushort userApplicationId, byte[] payload);
    }

The DeviceListAsync and SendAsync methods now call the BumblebeeHive login method after configurable period of inactivity.

public async Task<ICollection<Device>> DeviceListAsync(CancellationToken cancellationToken)
{
        if ((_TokenActivityAtUtC + _bumblebeeHiveSettings.TokenValidFor) < DateTime.UtcNow)
        {
            await Login();
        }

        using (HttpClient httpClient = _httpClientFactory.CreateClient())
       {
            Client client = new Client(httpClient);

            client.BaseUrl = _bumblebeeHiveSettings.BaseUrl;

            httpClient.DefaultRequestHeaders.Add("Authorization", $"bearer {_token}");

            return await client.GetDevicesAsync(null, null, null, null, null, null, null, null, null, cancellationToken);
        }
}

I’m looking at building a webby user interface where users an interactivity list, create, edit, delete formatters with syntax highlighter support, and the executing the formatter with sample payloads.

Swarm Space Azure IoT Connector Identity Translation Gateway Architecture

This approach uses most of the existing building blocks, and that’s it no more changes.

Swarm Space – Uplink with WebAPI Revisited again

After reviewing my ASP .NET Core WebAPI Swarm Space Delivery Method webhook implementation I have made a final round of changes.

There are now separate Data Transfer Objects(DTO) for the uplink and queue message payloads mainly, because the UplinkPayloadQueueDto has additional fields for the client (based on the x-api-key) and when the webhook was called.

public class UplinkPayloadQueueDto
{
    public ulong PacketId { get; set; }
    public byte DeviceType { get; set; }
    public uint DeviceId { get; set; }
    public ushort UserApplicationId { get; set; }
    public uint OrganizationId { get; set; }
    public string Data { get; set; } = string.Empty;
    public byte Length { get; set; }
    public int Status { get; set; }
    public DateTime SwarmHiveReceivedAtUtc { get; set; }
    public DateTime UplinkWebHookReceivedAtUtc { get; set; }
    public string Client { get; set; } = string.Empty;
 }

public class UplinkPayloadWebDto
{
    public ulong PacketId { get; set; }
    public byte DeviceType { get; set; }
    public uint DeviceId { get; set; }
    public ushort UserApplicationId { get; set; }
    public uint OrganizationId { get; set; }
    public string Data { get; set; } = string.Empty;

    [Range(Constants.PayloadLengthMinimum, Constants.PayloadLengthMaximum)]
    public byte Len { get; set; }
    public int Status { get; set; }

    public DateTime HiveRxTime { get; set; }
}

I did consider using AutoMapper to copy the values from the UplinkPayloadWebDto to the UplinkPayloadQueueDto but the additional complexity/configuration required for one mapping wasn’t worth it.

The UplinkController has a single POST method, which has a JSON payload(FromBody) and a single header (FromHeader) “x-api-key” which is to secure the method and identify the caller.

[HttpPost]
public async Task<IActionResult> Post([FromHeader(Name = "x-api-key")] string xApiKeyValue, [FromBody] Models.UplinkPayloadWebDto payloadWeb)
{
    if (!_applicationSettings.XApiKeys.TryGetValue(xApiKeyValue, out string apiKeyName))
    {
        _logger.LogWarning("Authentication unsuccessful X-API-KEY value:{xApiKeyValue}", xApiKeyValue);

        return this.Unauthorized("Unauthorized client");
    }

    _logger.LogInformation("Authentication successful X-API-KEY value:{apiKeyName}", apiKeyName);

    // Could of used AutoMapper but didn't seem worth it for one place
    Models.UplinkPayloadQueueDto payloadQueue = new()
    {
        PacketId = payloadWeb.PacketId,
        DeviceType = payloadWeb.DeviceType,
        DeviceId = payloadWeb.DeviceId,
        UserApplicationId = payloadWeb.UserApplicationId,
        OrganizationId = payloadWeb.OrganizationId,
        Data = payloadWeb.Data,
        Length = payloadWeb.Len,
        Status = payloadWeb.Status,
        SwarmHiveReceivedAtUtc = payloadWeb.HiveRxTime,
        UplinkWebHookReceivedAtUtc = DateTime.UtcNow,
        Client = apiKeyName,
    };

    _logger.LogInformation("SendAsync queue name:{QueueName}", _applicationSettings.QueueName);

    QueueClient queueClient = _queueServiceClient.GetQueueClient(_applicationSettings.QueueName);

    await queueClient.SendMessageAsync(Convert.ToBase64String(JsonSerializer.SerializeToUtf8Bytes(payloadQueue)));

    return this.Ok();
 }

I’ve also used dependency injection (DI) to get a QueueClient just because “it’s always better with DI”.

Azure Web App Application settings with x-api-key configuration

The “x-api-key” values can also be updated without having to redeploy the application.

Swarm Space – Underlying Architecture Revisited

After figuring out that calling a CS-Script uplink payload formatter inside an Azure Http Trigger function wasn’t going to work I needed a new architecture.

Swarm Space Azure IoT Connector Identity Translation Gateway Architecture

The new approach uses most of the existing building blocks but adds an Azure HTTP Trigger which receives the Swarm Space Bumble bee hive Webhook Delivery Method calls and writes them to an Azure Storage Queue.

Swarm Space Bumble bee hive Web Hook Delivery method

The uplink and downlink formatters are now called asynchronously so they have limited impact on the overall performance of the application.

Swarm Space – Azure IoT FromDevice with webhooks

The initial versions of the Swarm Space Azure Cloud Identity Gateway were based on my The Things Industries(TTI) Azure IoT Connector which used six HTTP Triggered Azure Functions. My Swarm Space Azure IoT connector only has one webhook endpoint so a .NET Core WebAPI with controllers based solution appeared to be more practical. The first step was to get some sample JavaScript Object Notation(JSON) uplink message payloads with the SwarmSpace-From Device with Webhooks project.

{
  "packetId": 0,
  "deviceType": 1,
  "deviceId": 0,
  "userApplicationId": 0,
  "organizationId": 65760,
  "data": "VGhpcyBpcyBhIHRlc3QgbWVzc2FnZS4gVGhlIHBhY2tldElkIGFuZCBkZXZpY2VJZCBhcmUgbm90IHBvcHVsYXRlZCwgYnV0IHdpbGwgYmUgZm9yIGEgcmVhbCBtZXNzYWdlLg==",
  "len": 100,
  "status": 0,
  "hiveRxTime": "2022-11-29T04:52:06"
}

I used JSON2CSharp to generate an initial version of a Plain Old CLR(ComonLanguage Runtime) Object(POCO) to deserialise the Delivery Webhook payload.

 https://json2csharp.com/
    
    // Root myDeserializedClass = JsonConvert.DeserializeObject<Root>(myJsonResponse);
    public class Root
    {
        public int packetId { get; set; }
        public int deviceType { get; set; }
        public int deviceId { get; set; }
        public int userApplicationId { get; set; }
        public int organizationId { get; set; }
        public string data { get; set; }
        public int len { get; set; }
        public int status { get; set; }
        public DateTime hiveRxTime { get; set; }
    }
*/

I then “tweaked” the JSON2CSharp class

 public class UplinkPayload
    {
        [JsonProperty("packetId")]
        public int PacketId { get; set; }

        [JsonProperty("deviceType")]
        public int DeviceType { get; set; }

        [JsonProperty("deviceId")]
        public int DeviceId { get; set; }

        [JsonProperty("userApplicationId")]
        public int UserApplicationId { get; set; }

        [JsonProperty("organizationId")]
        public int OrganizationId { get; set; }

        [JsonProperty("data")]
        [JsonRequired]
        public string Data { get; set; }

        [JsonProperty("len")]
        public int Len { get; set; }

        [JsonProperty("status")]
        public int Status { get; set; }

        [JsonProperty("hiveRxTime")]
        public DateTime HiveRxTime { get; set; }
    }

This class is used to “automagically” deserialise Delivery Webhook payloads. There is also some additional payload validation which discards test messages (not certain this is a good idea) etc.

//---------------------------------------------------------------------------------
// Copyright (c) December 2022, devMobile Software
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//---------------------------------------------------------------------------------
namespace devMobile.IoT.SwarmSpace.AzureIoT.Connector.Controllers
{
    using System.Globalization;
    using System.Text;
    using System.Threading.Tasks;

    using Microsoft.AspNetCore.Mvc;
    using Microsoft.Azure.Devices.Client;
    using Microsoft.Extensions.Logging;

    using Newtonsoft.Json;
    using Newtonsoft.Json.Linq;

    [ApiController]
    [Route("api/[controller]")]
    public class UplinkController : ControllerBase
    {
        private readonly ILogger<UplinkController> _logger;
        private readonly IAzureIoTDeviceClientCache _azureIoTDeviceClientCache;

        public UplinkController(ILogger<UplinkController> logger, IAzureIoTDeviceClientCache azureIoTDeviceClientCache)
        {
            _logger = logger;
            _azureIoTDeviceClientCache = azureIoTDeviceClientCache;
        }

        [HttpPost]
        public async Task<IActionResult> Uplink([FromBody] Models.UplinkPayload payload)
        {
            DeviceClient deviceClient;

            _logger.LogDebug("Payload {0}", JsonConvert.SerializeObject(payload, Formatting.Indented));

            if (payload.PacketId == 0)
            {
                _logger.LogWarning("Uplink-payload simulated DeviceId:{DeviceId}", payload.DeviceId);

                return this.Ok();
            }

            if ((payload.UserApplicationId < Constants.UserApplicationIdMinimum) || (payload.UserApplicationId > Constants.UserApplicationIdMaximum))
            {
                _logger.LogWarning("Uplink-payload invalid User Application Id:{UserApplicationId}", payload.UserApplicationId);

                return this.BadRequest($"Invalid User Application Id {payload.UserApplicationId}");
            }

            if ((payload.Len < Constants.PayloadLengthMinimum) || string.IsNullOrEmpty(payload.Data))
            {
                _logger.LogWarning("Uplink-payload.Data is empty PacketId:{PacketId}", payload.PacketId);

                return this.Ok("payload.Data is empty");
            }

            Models.AzureIoTDeviceClientContext context = new Models.AzureIoTDeviceClientContext()
            {
                OrganisationId = payload.OrganizationId,
                UserApplicationId = payload.UserApplicationId,
                DeviceType = payload.DeviceType,
                DeviceId = payload.DeviceId,
            };

            deviceClient = await _azureIoTDeviceClientCache.GetOrAddAsync(payload.DeviceId.ToString(), context);

            JObject telemetryEvent = new JObject
            {
                { "packetId", payload.PacketId},
                { "deviceType" , payload.DeviceType},
                { "DeviceID", payload.DeviceId },
                { "organizationId", payload.OrganizationId },
                { "ApplicationId", payload.UserApplicationId},
                { "ReceivedAtUtc", payload.HiveRxTime.ToString("s", CultureInfo.InvariantCulture) },
                { "DataLength", payload.Len },
                { "Data", payload.Data },
                { "Status", payload.Status },
            };

            // Send the message to Azure IoT Hub
            using (Message ioTHubmessage = new Message(Encoding.ASCII.GetBytes(JsonConvert.SerializeObject(telemetryEvent))))
            {
                // Ensure the displayed time is the acquired time rather than the uploaded time. 
                ioTHubmessage.Properties.Add("iothub-creation-time-utc", payload.HiveRxTime.ToString("s", CultureInfo.InvariantCulture));
                ioTHubmessage.Properties.Add("OrganizationId", payload.OrganizationId.ToString());
                ioTHubmessage.Properties.Add("ApplicationId", payload.UserApplicationId.ToString());
                ioTHubmessage.Properties.Add("DeviceId", payload.DeviceId.ToString());
                ioTHubmessage.Properties.Add("deviceType", payload.DeviceType.ToString());

                await deviceClient.SendEventAsync(ioTHubmessage);

                _logger.LogInformation("Uplink-DeviceID:{deviceId} SendEventAsync success", payload.DeviceId);
            }

            return this.Ok();
        }
    }
}

I initially debugged and tested the Uplink controller with Telerik Fiddler using sample payloads captured with the SwarmSpace-From Device with Webhooks project.

Using Telerik Fiddler to make test delivery webhook calls

Which I could then inspect with Azure IoT Explorer as they arrived

Azure IoT Explorer displaying a test message

The next step was to create a new Delivery Method

Swarm delivery webhook creation

Configured to call my Uplink controller endpoint.

Swarm delivery webhook configuration

The webhook was configured to “acknowledge messages on successful delivery”. I then checked my Delivery Method configuration with a couple of “Test” messages.

My Swarm Space Eval Kit arrived md-week and after some issues with jumper settings it started reporting position and status information.

Swarm Eval Kit in my backyard

The first position was just of the coast of West Africa(null island)

Swarm Map centered on Null Island

After the Global Positioning System(GPS) receiver got a good fix the location of the Eval Kit was in the middle of my backyard.

Azure IoT Explorer displaying payload with good latitude and longitude
Swarm Map displaying the location of my device (zoomed out)

Swarm Space – FromDevice with webhooks

I modified my TTI V3 Connector Azure Storage Queues project which uses Azure Functions HTTP Triggers to put messages into Azure Storage Queues to process Swarm FromDevice Webhook messages.

First step was to configure a webhook with the Swarm dashboard

Swarm dashboard webhooks configuration

I configured the webhook, and to “acknowledge messages on successful delivery”. Then checked my configuration with a couple of “Test” messages.

Swarm dashboard webhook configuration

The Swagger API documentation has methods for configuring endpoints which can be called by an application.

Swagger API Documentation for managing endpoints

I queued a couple of messages on my Satellite Transceiver Breakout and when the next satellite passed overhead, shortly after they were visible in the Swarm Dashboard Messages tab.

Swarm Dashboard with test and live fromdevice messages

The messages were also delivered to an Azure Storage Queue, and I could view them with Azure Storage Explorer.

Azure Storage Explorer displaying a webhook message payload