The Things Network HTTP Azure IoT Integration Soak Testing

I wanted to do some testing to make sure the application would reliably process messages from 1000’s of devices…

The first thing I learnt was “don’t forget to restart your Azure Function after deleting all the devices from the Azure IoT Hub” as the DeviceClients are cached. Also make sure you delete the devices from both your Azure Device Provisioning service(DPS) and Azure IoT Hub instances.

Applications Insights provisioning event tracking

The next “learning” was that if you forget to enable “always on” the caching won’t work and your application will call the DPS way more often than expected.

Azure Application “always on configuration

The next “learning” was if your soak test sends 24000 messages it will start to fail just after you go out to get a coffee because of the 8000 msgs/day limit on the free version of IoT Hub.

Azure IoT Hub Free tier 8000 messages/day limit

After these “learnings” the application appeared to be working and every so often a message would briefly appear in Azure Storage Explorer queue view.

Azure storage explorer view of uplink messages queue

The console test application simulated 1000 devices sending 24 messages every so often and took roughly 8 hours to complete.

Message generator finished

In the Azure IoT Hub telemetry 24000 messages had been received after roughly 8 hours confirming the test rig was working as expected.

The notch was another “learning”, if you go and do some gardening then after roughly 40 minutes of inactivity your desktop PC will go into power save mode and the test client will stop sending messages.

The caching of settings appeared to be work as there were only a couple of requests to my Azure Key Vault where sensitive information like connection strings, symmetric keys etc. are stored.

Memory consumption did look to bad and topped out at roughly 120M.

In the application logging you can see the 1000 calls to DPS at the beginning (the yellow dependency events) then the regular processing of messages.

Application Insights logging

Even with the “learnings” the testing went pretty well overall. I do need to run the test rig for longer and with even more simulated devices.

The Things Network HTTP Azure IoT Central Integration

This post is an overview of the Azure IoT Central configuration required to process The Things Network(TTN) HTTP integration uplink messages. I have assumed that the reader is already reasonably familiar with these products. There is an overview of configuring TTN HTTP integration in my “Simplicating and securing the HTTP handler” post.

The first step is to copy the IDScope from the Device connection blade.

Device connection blade

Then copy one of the primary or secondary keys

For more complex deployment the ApplicationEnrollmentGroupMapping configuration enables The Things Network(TTN) devices to be provisioned using different GroupEnrollment keys based on the applicationid in the Uplink message which initiates their provisoning.

"DeviceProvisioningService": {
      "GlobalDeviceEndpoint": "global.azure-devices-provisioning.net",
      "IDScope": "",
      "EnrollmentGroupSymmetricKeyDefault": "TopSecretKey",
      "DeviceProvisioningPollingDelay": 500,
      "ApplicationEnrollmentGroupMapping": {
         "Application1": "TopSecretKey1",
         "Application2": "TopSecretKey2"
      }
   }

Shortly after the first uplink message from a TTN device is processed, it will listed in the “Unassociated devices” blade with the DevEUI as the Device ID.

Unassociated devices blade

The device can then be associated with an Azure IoT Central Device Template.

Unassociated devices blade showing recently associated device

The device template provides for the mapping of uplink message payload_fields to measurements. In this example the payload field has been generated by the TTN HTTP integration Cayenne Low Power Protocol(LPP) decoder. Many LoRaWAN devices use LPP to minimise the size of the network payload.

Azure IoT Central Device template blade

Once the device has been associated with a template a user friendly device name etc. can be configured.

Azure IoT Central Device properties blade

In the telemetry event payload sent to Azure IoT Central there are some extra fields to help with debugging and tracing.

// Assemble the JSON payload to send to Azure IoT Hub/Central.
log.LogInformation($"{messagePrefix} Payload assembly start");
JObject telemetryEvent = new JObject();
try
{
   JObject payloadFields = (JObject)payloadObect.payload_fields;
   telemetryEvent.Add("HardwareSerial", payloadObect.hardware_serial);
   telemetryEvent.Add("Retry", payloadObect.is_retry);
   telemetryEvent.Add("Counter", payloadObect.counter);
   telemetryEvent.Add("DeviceID", payloadObect.dev_id);
   telemetryEvent.Add("ApplicationID", payloadObect.app_id);
   telemetryEvent.Add("Port", payloadObect.port);
   telemetryEvent.Add("PayloadRaw", payloadObect.payload_raw);
   telemetryEvent.Add("ReceivedAtUTC", payloadObect.metadata.time);

   // If the payload has been unpacked in TTN backend add fields to telemetry event payload
   if (payloadFields != null)
   {
      foreach (JProperty child in payloadFields.Children())
      {
         EnumerateChildren(telemetryEvent, child);
      }
   }
}
catch (Exception ex)
{
   log.LogError(ex, $"{messagePrefix} Payload processing or Telemetry event assembly failed");
   throw;
}

Azure IoT Central has mapping functionality which can be used to display the location of a device.

Azure Device

The format of the location payload generated by the TTN LPP decoder is different to the one required by Azure IoT Central. I have added temporary code (“a cost effective modification to expedite deployment” aka. a hack) to format the TelemetryEvent payload so it can be processed.

if (token.First is JValue)
{
   // Temporary dirty hack for Azure IoT Central compatibility
   if (token.Parent is JObject possibleGpsProperty)
   {
      if (possibleGpsProperty.Path.StartsWith("GPS", StringComparison.OrdinalIgnoreCase))
      {
         if (string.Compare(property.Name, "Latitude", true) == 0)
         {
            jobject.Add("lat", property.Value);
         }
         if (string.Compare(property.Name, "Longitude", true) == 0)
         {
            jobject.Add("lon", property.Value);
         }
         if (string.Compare(property.Name, "Altitude", true) == 0)
         {
            jobject.Add("alt", property.Value);
         }
      }
   }
   jobject.Add(property.Name, property.Value);
}

I need review the IoT Plug and Play specification documentation to see what other payload transformations maybe required.

I did observe that if a device had not reported its position the default location was zero degrees latitude and zero degrees longitude which is about 610 KM south of Ghana and 1080 KM west of Gabon in the Atlantic Ocean.

Azure IoT Central mapping default position

After configuring a device template, associating my devices with the template, and modifying each device’s properties I could create a dashboard to view the temperature and humidity information returned by my Seeeduino LoRaWAN devices.

Azure IoT Central dashboard

The Things Network HTTP Azure IoT Hub Integration

This post provides an overview of the required Azure Device Provisioning Service(DPS) and Azure IoT Hub configuration to process The Things Network(TTN) HTTP integration uplink messages. I have assumed that the reader is already familiar with all these products. There is an overview of configuring TTN HTTP integration in my “Simplicating and securing the HTTP handler” post.

The first step is to configure a DPS Enrollment Group

DPS Group Enrollment blade

The scopeID and the primary/secondary key need to be configured in the appsettings.json file of uplink message processing Azure QueueTrigger function.

For more complex deployments the ApplicationEnrollmentGroupMapping configuration enables The Things Network(TTN) devices to be provisioned using different GroupEnrollment keys based on the applicationid in the first Uplink message which initiates provisoning.

"DeviceProvisioningService": {
      "GlobalDeviceEndpoint": "global.azure-devices-provisioning.net",
      "ScopeID": "",
      "EnrollmentGroupSymmetricKeyDefault": "TopSecretKey",
      "DeviceProvisioningPollingDelay": 500,
      "ApplicationEnrollmentGroupMapping": {
         "Application1": "TopSecretKey1",
         "Application2": "TopSecretKey2"
      }
   }

DPS Group Enrolment with no provisioned devices

Then as uplink messages from the TTN integration are processed devices are “automagically” created in the DPS.

Simultaneously devices are created in the Azure IoT Hub

Then shortly after telemetry events are available for applications to process or inspection with tools like Azure IoT Explorer.

In the telemetry event payload sent to the Azure IoT IoT Hub are some extra fields to help with debugging and tracing. The raw payload is also included so messages not decoded by TTN can be processed by the client application(s).

/ Assemble the JSON payload to send to Azure IoT Hub/Central.
log.LogInformation($"{messagePrefix} Payload assembly start");
JObject telemetryEvent = new JObject();
try
{
   JObject payloadFields = (JObject)payloadObect.payload_fields;
   telemetryEvent.Add("HardwareSerial", payloadObect.hardware_serial);
   telemetryEvent.Add("Retry", payloadObect.is_retry);
   telemetryEvent.Add("Counter", payloadObect.counter);
   telemetryEvent.Add("DeviceID", payloadObect.dev_id);
   telemetryEvent.Add("ApplicationID", payloadObect.app_id);
   telemetryEvent.Add("Port", payloadObect.port);
   telemetryEvent.Add("PayloadRaw", payloadObect.payload_raw);
   telemetryEvent.Add("ReceivedAtUTC", payloadObect.metadata.time);
 
   // If the payload has been unpacked in TTN backend add fields to telemetry event payload
   if (payloadFields != null)
   {
      foreach (JProperty child in payloadFields.Children())
      {
         EnumerateChildren(telemetryEvent, child);
      }
   }
}
catch (Exception ex)
{
   log.LogError(ex, $"{messagePrefix} Payload processing or Telemetry event assembly failed");
   throw;
}

Beware, the Azure Storage Account and storage queue names have a limited character set. This caused me problems several times when I used camel cased queue names etc.

The Things Network HTTP Integration Part10

Assembling the components

After a series of articles exploring how portions of solution could be built

I now had working code for receiving The Things Network(TTN) HTTP integration JSON messages with an Azure Function using an HTTPTrigger. (secured with an APIKey) and then putting them into an Azure Storage Queue for processing. This code was intentionally kept as small and as simple as possible so there was less to go wrong. The required configuration is also minimal.

HTTP Endpoint handler application

In the last couple of posts I had been building an Azure Function with a QueueTrigger to process the uplink messages. The function used custom bindings so that the CloudQueueMessage could be accessed, and load the Azure Storage account plus queue name from configuration. I’m still using classes generated by JSON2CSharp (with minimal modifications) for deserialising the payloads with JSON.Net.

The message processor Azure Function uses a ConcurrentCollection to store AzureDeviceClient objects constructed using the information returned by the Azure Device Provisioning Service(DPS). This is so the DPS doesn’t have to be called for the connection details for every message.(When the Azure function is restarted the dictionary of DeviceClient objects has to be repopulated). If there is a backlog of messages the message processor can process more than a dozen messages concurrently so the telemetry events displayed in an application like Azure IoT Central can arrive out of order.

The solution uses DPS Group Enrollment with Symmetric Key Attestation so Azure IoT Hub devices can be “automagically” created when a message from a new device is processed. The processing code is multi-thread and relies on many error conditions being handled by the Azure Function retry mechanism. After a number of failed retries the messages are moved to a poison queue. Azure Storage Explorer is a good tool for viewing payloads and moving poison messages back to the processing queue.

public static class UplinkMessageProcessor
{
   static readonly ConcurrentDictionary<string, DeviceClient> DeviceClients = new ConcurrentDictionary<string, DeviceClient>();

   [FunctionName("UplinkMessageProcessor")]
   public static async Task Run(
      [QueueTrigger("%UplinkQueueName%", Connection = "AzureStorageConnectionString")]
      CloudQueueMessage cloudQueueMessage, // Used to get CloudQueueMessage.Id for logging
      Microsoft.Azure.WebJobs.ExecutionContext context,
      ILogger log)
   {
      PayloadV5 payloadObect;
      DeviceClient deviceClient = null;
      DeviceProvisioningServiceSettings deviceProvisioningServiceConfig;

      string environmentName = Environment.GetEnvironmentVariable("ENVIRONMENT");

      // Load configuration for DPS. Refactor approach and store securely...
      var configuration = new ConfigurationBuilder()
      .SetBasePath(context.FunctionAppDirectory)
      .AddJsonFile($"appsettings.json")
      .AddJsonFile($"appsettings.{environmentName}.json")
      .AddEnvironmentVariables()
      .Build();

      // Load configuration for DPS. Refactor approach and store securely...
      try
      {
         deviceProvisioningServiceConfig = (DeviceProvisioningServiceSettings)configuration.GetSection("DeviceProvisioningService").Get<DeviceProvisioningServiceSettings>(); ;
      }
      catch (Exception ex)
      {
         log.LogError(ex, $"Configuration loading failed");
         throw;
      }

      // Deserialise uplink message from Azure storage queue
      try
      {
         payloadObect = JsonConvert.DeserializeObject<PayloadV5>(cloudQueueMessage.AsString);
      }
      catch (Exception ex)
      {
         log.LogError(ex, $"MessageID:{cloudQueueMessage.Id} uplink message deserialisation failed");
         throw;
      }

      // Extract the device ID as it's used lots of places
      string registrationID = payloadObect.hardware_serial;

      // Construct the prefix used in all the logging
      string messagePrefix = $"MessageID: {cloudQueueMessage.Id} DeviceID:{registrationID} Counter:{payloadObect.counter} Application ID:{payloadObect.app_id}";
      log.LogInformation($"{messagePrefix} Uplink message device processing start");

      // See if the device has already been provisioned
      if (DeviceClients.TryAdd(registrationID, deviceClient))
      {
         log.LogInformation($"{messagePrefix} Device provisioning start");

         string enrollmentGroupSymmetricKey = deviceProvisioningServiceConfig.EnrollmentGroupSymmetricKeyDefault;

         // figure out if custom mapping for TTN applicationID
         if (deviceProvisioningServiceConfig.ApplicationEnrollmentGroupMapping != null)
        {
            deviceProvisioningServiceConfig.ApplicationEnrollmentGroupMapping.GetValueOrDefault(payloadObect.app_id, deviceProvisioningServiceConfig.EnrollmentGroupSymmetricKeyDefault);
         }

         // Do DPS magic first time device seen
         await DeviceRegistration(log, messagePrefix, deviceProvisioningServiceConfig.GlobalDeviceEndpoint, deviceProvisioningServiceConfig.ScopeID, enrollmentGroupSymmetricKey, registrationID);
      }

      // Wait for the Device Provisioning Service to complete on this or other thread
      log.LogInformation($"{messagePrefix} Device provisioning polling start");
      if (!DeviceClients.TryGetValue(registrationID, out deviceClient))
      {
         log.LogError($"{messagePrefix} Device provisioning polling TryGet before while failed");

         throw new ApplicationException($"{messagePrefix} Device provisioning polling TryGet before while failed");
      }

      while (deviceClient == null)
      {
         log.LogInformation($"{messagePrefix} provisioning polling delay");
         await Task.Delay(deviceProvisioningServiceConfig.DeviceProvisioningPollingDelay);

         if (!DeviceClients.TryGetValue(registrationID, out deviceClient))
         {
            log.LogError($"{messagePrefix} Device provisioning polling TryGet while loop failed");

            throw new ApplicationException($"{messagePrefix} Device provisioning polling TryGet while loopfailed");
         }
      }

      // Assemble the JSON payload to send to Azure IoT Hub/Central.
      log.LogInformation($"{messagePrefix} Payload assembly start");
      JObject telemetryEvent = new JObject();
      try
      {
         JObject payloadFields = (JObject)payloadObect.payload_fields;
         telemetryEvent.Add("HardwareSerial", payloadObect.hardware_serial);
         telemetryEvent.Add("Retry", payloadObect.is_retry);
         telemetryEvent.Add("Counter", payloadObect.counter);
         telemetryEvent.Add("DeviceID", payloadObect.dev_id);
         telemetryEvent.Add("ApplicationID", payloadObect.app_id);
         telemetryEvent.Add("Port", payloadObect.port);
         telemetryEvent.Add("PayloadRaw", payloadObect.payload_raw);
         telemetryEvent.Add("ReceivedAt", payloadObect.metadata.time);

         // If the payload has been unpacked in TTN backend add fields to telemetry event payload
         if (payloadFields != null)
         {
            foreach (JProperty child in payloadFields.Children())
            {
               EnumerateChildren(telemetryEvent, child);
            }
         }
      }
      catch (Exception ex)
      {
         if (DeviceClients.TryRemove(registrationID, out deviceClient))
         {
            log.LogWarning($"{messagePrefix} TryRemove payload assembly failed");
         }

         log.LogError(ex, $"{messagePrefix} Payload assembly failed");
         throw;
      }

      // Send the message to Azure IoT Hub/Azure IoT Central
      log.LogInformation($"{messagePrefix} Payload SendEventAsync start");
      try
      {
         using (Message ioTHubmessage = new Message(Encoding.ASCII.GetBytes(JsonConvert.SerializeObject(telemetryEvent))))
         {
            // Ensure the displayed time is the acquired time rather than the uploaded time. esp. importan for messages that end up in poison queue
            ioTHubmessage.Properties.Add("iothub-creation-time-utc", payloadObect.metadata.time.ToString("s", CultureInfo.InvariantCulture));
            await deviceClient.SendEventAsync(ioTHubmessage);
         }
      }
      catch (Exception ex)
      {
         if (DeviceClients.TryRemove(registrationID, out deviceClient))
         {
            log.LogWarning($"{messagePrefix} TryRemove SendEventAsync failed");
         }

         log.LogError(ex, $"{messagePrefix} SendEventAsync failed");
         throw;
      }

   log.LogInformation($"{messagePrefix} Uplink message device processing completed");
   }
}

There is also support for using a specific GroupEnrollment based on the application_id in the uplink message payload.

"DeviceProvisioningService": {
      "GlobalDeviceEndpoint": "global.azure-devices-provisioning.net",
      "ScopeID": "",
      "EnrollmentGroupSymmetricKeyDefault": "TopSecretKey",
      "DeviceProvisioningPollingDelay": 500,
      "ApplicationEnrollmentGroupMapping": {
         "Application1": "TopSecretKey1",
         "Application2": "TopSecretKey2"
      }
   }

In addition to the appsettings.json there is configuration for application insights, uplink message queue name and Azure Storage connection strings. The “Environment” setting is important as it specifies what appsettings.json file should be used if code is being debugged etc..

TTN Integration uplink message processor configuration

The deployed solution application consists of Azure IoTHub and DPS instances. There are two Azure functions, one for putting the messages from the TTN into a queue the other is for processing them. The Azure Functions are hosted in an Azure AppService plan.

Azure solution deployment

An Azure Storage account is used for the queue and Azure Function synchronisation information and Azure Application Insights is used to monitor the solution.

The QueueTrigger function retry “rabbit hole”

My first couple of attempts at an Azure Queue Trigger Function which could do retries when an uplink message couldn’t be processed immediately(I didn’t want to throw an exception as this was just a transient issue) didn’t work. I wanted to return the uplink message to the Azure Storage Queue with the initial visibility set to a couple of seconds without throwing an exception.

I tried decorating the method with an Azure Storage Queue output binding but finally settled on the approach below. I can insert a single message into the storage queue and the application would start looping every minute.

public static class UplinkMessageProcessor
{
   const string RunTag = "Processor001";
   static int ConcurrentThreadCount = 0;
   static int MessagesProcessed = 0;

   [FunctionName("UplinkMessageProcessor")]
   public static void Run(
      [QueueTrigger("%UplinkQueueName%", Connection = "AzureStorageConnectionString")] 
      CloudQueueMessage cloudQueueMessage, 
      IBinder binder, ILogger log)
   {
      try
      {
         Interlocked.Increment(ref ConcurrentThreadCount);
         Interlocked.Increment(ref MessagesProcessed);

         log.LogInformation($"{MessagesProcessed} {RunTag} Threads:{ConcurrentThreadCount}");

         CloudQueue outputQueue = binder.Bind<CloudQueue>(new QueueAttribute("%UplinkQueueName%"));

         CloudQueueMessage message = new CloudQueueMessage(cloudQueueMessage.AsString);

         outputQueue.AddMessage(message, initialVisibilityDelay: new TimeSpan(0, 1, 0));
    
         Thread.Sleep(2000);

         Interlocked.Decrement(ref ConcurrentThreadCount);
      }
      catch (Exception ex)
      {
         log.LogError(ex, "Processing of Uplink message failed");

         throw;
      }
   }
}

I used the binder.bind method to get the CloudQueue and CloudQueueMessage details so I could insert a hidden messages back into the queue.

The version of Azure Storage queue libraries used by the function bindings (Sep 2020) may cause some compile time warnings if you select the wrong NuGet package.

Hopefully this has enough keywords that someone trying todo the same thing finds it.

The Things Network HTTP Integration Part9

Simplicating and securing the HTTP handler

There was lots of code in nested classes for deserialising the The Things Network(TTN) JSON uplink messages in my WebAPI project. It looked a bit fragile and if the process failed uplink messages could be lost.

My first attempt at an Azure HTTP Trigger Function to handle an uplink message didn’t work. I had decorated the HTTP Trigger method with an Azure Storage Queue as the destination for the output.

public static class UplinkProcessor
{
   [FunctionName("UplinkProcessor")]
   [return: Queue("%UplinkQueueName%", Connection = "AzureStorageConnectionString")]
   public static async Task<IActionResult> Run([HttpTrigger(AuthorizationLevel.Anonymous, "get", "post", Route = null)] HttpRequest request, ILogger log)
   {
      string payload;

      log.LogInformation("C# HTTP trigger function processed a request.");

      using (StreamReader streamReader = new StreamReader(request.Body))
      {
         payload = await streamReader.ReadToEndAsync();
      }

      return new OkObjectResult(payload);
   }
}

When I returned OkObjectResult(object value) the message JSON was prefixed with “value”. This broke message deserialisation in the Azure queue trigger function for processing uplink events.

There were a couple of other versions which failed with encoding issues.

Invalid uplink event JSON
public static class UplinkProcessor
{
   [FunctionName("UplinkProcessor")]
   [return: Queue("%UplinkQueueName%", Connection = "AzureStorageConnectionString")]
   public static async Task<string> Run([HttpTrigger("post", Route = null)] HttpRequest request, ILogger log)
   {
      string payload;

      log.LogInformation("C# HTTP trigger function processed a request.");

      using (StreamReader streamReader = new StreamReader(request.Body))
      {
         payload = await streamReader.ReadToEndAsync();
      }

      return payload;
   }
}

I finally settled on returning a string, which with the benefit of hindsight was obvious.

Valid JSON message

By storing the raw uplink event JSON from TTN the application can recover if it they can’t deserialised, (message format has changed or generated class issues) The queue processor won’t be able to process the uplink event messages so they will end up in the poison message queue after being retried a few times.

I hadn’t added any security plumbing to the my other test application but I really did need to secure my uplink message endpoint in production (this functionality is disabled when running locally). Azure http trigger functions support host and method scope API key authorisation which integrates easily with TTN.

In the Azure management portal I generated a method scope API key.

Azure HTTP function API key management

I then added an x-functions-key header in the TTN application integration configuration and it worked second attempt due to a copy and past fail.

Things Network Application integration

To confirm my APIKey setup was correct I changed the header name and my requests started to fail with a 401 Unauthorised error.

After some experimentation it took less than two dozen lines of C# to create a secure endpoint to receive uplink messages and put them in an Azure Storage queue.

The Things Network HTTP Integration Part8

Logging and the start of simplification

While testing the processing of queued The Things Network(TTN) uplink messages I had noticed that some of the Azure Application Insights events from my Log4Net setup were missing. I could see the MessagesProcessed counter was correct but there weren’t enough events.

public static class UplinkMessageProcessor
{
   const string RunTag = "Log4Net001";
   static readonly ILog log = log4net.LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);
   static readonly ConcurrentDictionary<string, PayloadV5> DevicesSeen = new ConcurrentDictionary<string, PayloadV5>();
   static int ConcurrentThreadCount = 0;
   static int MessagesProcessed = 0;

   [FunctionName("UplinkMessageProcessor")]
   public static void Run([QueueTrigger("%UplinkQueueName%", Connection = "AzureStorageConnectionString")] string myQueueItem, Microsoft.Azure.WebJobs.ExecutionContext executionContext)
   {
      try
      {
         var logRepository = LogManager.GetRepository(Assembly.GetEntryAssembly());
         XmlConfigurator.Configure(logRepository, new FileInfo(Path.Combine(executionContext.FunctionAppDirectory, "log4net.config")));

         PayloadV5 payloadMessage = (PayloadV5)JsonSerializer.Deserialize(myQueueItem, typeof(PayloadV5));
         PayloadV5 payload = (PayloadV5)DevicesSeen.GetOrAdd(payloadMessage.dev_id, payloadMessage);

         Interlocked.Increment(ref ConcurrentThreadCount);
         Interlocked.Increment(ref MessagesProcessed);

         log.Info($"{MessagesProcessed} {RunTag} DevEui:{payload.dev_id} Threads:{ConcurrentThreadCount} First:{payload.metadata.time} Current:{payloadMessage.metadata.time} PayloadRaw:{payload.payload_raw}");

         Thread.Sleep(2000);

         Interlocked.Decrement(ref ConcurrentThreadCount);
      }
      catch (Exception ex)
      {
         log.Error("Processing of Uplink message failed", ex);

         throw;
      }
   }
}
Log4Net in Application Insights Event viewer

I assume the missing events were because I wasn’t flushing at the end of the Run method. There was also a lot of “plumbing” code (including loading configuration files) to setup Log4Net.

I then built another Azure function using the Application Insights API

public static class UplinkMessageProcessor
{
   const string RunTag = "Insights001";
   static readonly ConcurrentDictionary<string, PayloadV5> DevicesSeen = new ConcurrentDictionary<string, PayloadV5>();
   static int ConcurrentThreadCount = 0;
   static int MessagesProcessed = 0;

   [FunctionName("UplinkMessageProcessor")]
   public static void Run([QueueTrigger("%UplinkQueueName%", Connection = "AzureStorageConnectionString")] string myQueueItem, Microsoft.Azure.WebJobs.ExecutionContext executionContext)
   {
      using (TelemetryConfiguration telemetryConfiguration = TelemetryConfiguration.CreateDefault())
      {
         TelemetryClient telemetryClient = new TelemetryClient(telemetryConfiguration);

         try
         {
            PayloadV5 payloadMessage = (PayloadV5)JsonSerializer.Deserialize(myQueueItem, typeof(PayloadV5));
            PayloadV5 payload = (PayloadV5)DevicesSeen.GetOrAdd(payloadMessage.dev_id, payloadMessage);

            Interlocked.Increment(ref ConcurrentThreadCount);
            Interlocked.Increment(ref MessagesProcessed);

            telemetryClient.TrackEvent($"{MessagesProcessed} {RunTag} DevEui:{payload.dev_id} Threads:{ConcurrentThreadCount} First:{payload.metadata.time} Current:{payloadMessage.metadata.time} PayloadRaw:{payload.payload_raw}");

            Thread.Sleep(2000);

            Interlocked.Decrement(ref ConcurrentThreadCount);
         }
         catch (Exception ex)
         {
            telemetryClient.TrackException(ex);
            throw;
         }
      }
   }
}

Application Insights API in Application Insights Event viewer

I assume there were no missing events because the using statement was “flushing” every time the Run method completed. There was still a bit of “plumbing” code and which it would be good to get rid of.

When I generated Azure Function stubs there was an ILogger parameter which the Dependency Injection (DI) infrastructure setup.

public static class UplinkMessageProcessor
{
  const string RunTag = "Logger002";
   static readonly ConcurrentDictionary<string, PayloadV5> DevicesSeen = new ConcurrentDictionary<string, PayloadV5>();
   static int ConcurrentThreadCount = 0;
   static int MessagesProcessed = 0;

   [FunctionName("UplinkMessageProcessor")]
   public static void Run([QueueTrigger("%UplinkQueueName%", Connection = "AzureStorageConnectionString")] string myQueueItem, ILogger log)
   {
      try
      {
            PayloadV5 payloadMessage = (PayloadV5)JsonSerializer.Deserialize(myQueueItem, typeof(PayloadV5));
         PayloadV5 payload = (PayloadV5)DevicesSeen.GetOrAdd(payloadMessage.dev_id, payloadMessage);

         Interlocked.Increment(ref ConcurrentThreadCount);
         Interlocked.Increment(ref MessagesProcessed);

         log.LogInformation($"{MessagesProcessed} {RunTag} DevEui:{payload.dev_id} Threads:{ConcurrentThreadCount} First:{payload.metadata.time} Current:{payloadMessage.metadata.time} PayloadRaw:{payload.payload_raw}");

         Thread.Sleep(2000);

         Interlocked.Decrement(ref ConcurrentThreadCount);
      }
      catch (Exception ex)
      { 
         log.LogError(ex,"Processing of Uplink message failed");

         throw;
      }
   }
}
ILogger in Application Insights Event viewer

This implementation had even less code and all the messages were visible in the Azure Application Insights event viewer.

All the Azure functions for logging

While built the Proof of Concept(PoC) implementations I added the configurable “runtag” so I could search for the messages relating to a session in the Azure Application Insights event viewer. The queue name and storage account were “automagically” loaded by the runtime which also reduced the amount of code.

[QueueTrigger("%UplinkQueueName%", Connection = "AzureStorageConnectionString")]

At this point I had minimised the amount and complexity of the code required to process messages in the ttnuplinkmessages queue. Reducing the amount of “startup” required should make my QueueTrigger Azure function faster. But there was still a lot of boilerplate code for serialising the body of the message which added complexity.

At this point I realised I had a lot of code across multiple projects which had helped me breakdown the problem into manageable chunks but didn’t add a lot of value.

The Things Network HTTP Integration Part7

Queuing uplink messages

For my HTTP Integration I need to reliably forward messages to an Azure IoT Hub or Azure IoT Central. This solution needs to be robust and not lose any messages even when portions of the system are unavailable because of failures or sudden spikes in inbound traffic.

I added yet another controller, it receives an uplink messages from The Things Network(TTN) and puts them into an Azure Storage Queue.

[Route("[controller]")]
[ApiController]
public class Queued : ControllerBase
{
   private readonly string storageConnectionString;
   private readonly string queueName;
   private static readonly ILog log = log4net.LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);

   public Queued( IConfiguration configuration)
   {
      this.storageConnectionString = configuration.GetSection("AzureStorageConnectionString").Value;
      this.queueName = configuration.GetSection("UplinkQueueName").Value;
   }

   public string Index()
   {
      return "Queued move along nothing to see";
   }

   [HttpPost]
   public async Task<IActionResult> Post([FromBody] PayloadV5 payload)
   {
      string payloadFieldsUnpacked = string.Empty;

      // Check that the post data is good
      if (!this.ModelState.IsValid)
      {
         log.WarnFormat("QueuedController validation failed {0}", this.ModelState.Messages());

         return this.BadRequest(this.ModelState);
      }

      try
      {
         QueueClient queueClient = new QueueClient(storageConnectionString, queueName);

         await queueClient.CreateIfNotExistsAsync();

         await queueClient.SendMessageAsync(Convert.ToBase64String(JsonSerializer.SerializeToUtf8Bytes(payload)));
      }
      catch( Exception ex)
      {
         log.Error("Unable to open/create queue or send message", ex);

         return this.Problem("Unable to open queue (creating if it doesn't exist) or send message", statusCode:500, title:"Uplink payload not sent" );
      }

      return this.Ok();
   }
}

An Azure Function with a Queue Trigger processes the messages and for this test pauses for 2 seconds (simulating a call to the Device Provisioning Service(DPS) ). It keeps track of the number of concurrent processing threads and when the first message for each device was received since the program was started.

public static class UplinkMessageProcessor
{
   static readonly ILog log = log4net.LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);
   static ConcurrentDictionary<string, PayloadV5> DevicesSeen = new ConcurrentDictionary<string, PayloadV5>();
   static int ConcurrentThreadCount = 0;

   [FunctionName("UplinkMessageProcessor")]
   public static void Run([QueueTrigger("%UplinkQueueName%", Connection = "AzureStorageConnectionString")] string myQueueItem, Microsoft.Azure.WebJobs.ExecutionContext executionContext)
   {
      Interlocked.Increment(ref ConcurrentThreadCount);
      var logRepository = LogManager.GetRepository(Assembly.GetEntryAssembly());
      XmlConfigurator.Configure(logRepository, new FileInfo(Path.Combine(executionContext.FunctionAppDirectory, "log4net.config")));

      log.Info($"Uplink message function triggered: {myQueueItem}");

      PayloadV5 payloadMessage = (PayloadV5)JsonSerializer.Deserialize(myQueueItem, typeof(PayloadV5));
      PayloadV5 payload = (PayloadV5)DevicesSeen.GetOrAdd(payloadMessage.dev_id, payloadMessage);

      log.Info($"Uplink message DevEui:{payload.dev_id} Threads:{ConcurrentThreadCount} First:{payload.metadata.time} Current:{payloadMessage.metadata.time} PayloadRaw:{payload.payload_raw}");

      Thread.Sleep(2000);

      Interlocked.Decrement(ref ConcurrentThreadCount);
   }
}

To explore how this processing worked I sent 1000 uplink messages from my Seeeduino LoRaWAN devices which were buffered in a queue.

Azure storage Explorer 1000 queued messages
Application insights 1000 events

I processed 1000’s of messages with the Azure Function but every so often 10-20% of the logging messages wouldn’t turn up in the logs. I’m using Log4Net and I think it is most probably caused by not flushing the messages before the function finishes

The Things Network HTTP Integration Part6

Provisioning Devices on demand.

For development and testing being able to provision an individual device is really useful, though for Azure IoT Central it is not easy (especially with the deprecation of DPS-KeyGen). With an Azure IoT Hub device connection strings are available in the portal which is convenient but not terribly scalable.

Azure IoT Hub is integrated with, and Azure IoT Central forces the use of the Device Provisioning Service(DPS) which is designed to support the management of 1000’s of devices.

My HTTP Integration for The Things Network(TTN) is intended to support many devices and integrate with Azure IoT Central so I built yet another “nasty” console application to explore how the DPS works. The DPS also supports device attestation with a Trusted Platform Module(TPM) but this approach was not suitable for my application.

My command-line application supports individual and group enrollments with Symmetric Key Attestation and it can also generate group enrollment device keys.

class Program
{
   private const string GlobalDeviceEndpoint = "global.azure-devices-provisioning.net";

   static async Task Main(string[] args)
   {
      string registrationId;
...   
      registrationId = args[1];

      switch (args[0])
      {
         case "e":
         case "E":
            string scopeId = args[2];
            string symmetricKey = args[3];

            Console.WriteLine($"Enrolllment RegistrationID:{ registrationId} ScopeID:{scopeId}");
            await Enrollement(registrationId, scopeId, symmetricKey);
            break;
         case "k":
         case "K":
            string primaryKey = args[2];
            string secondaryKey = args[3];

            Console.WriteLine($"Enrollment Keys RegistrationID:{ registrationId}");
            GroupEnrollementKeys(registrationId, primaryKey, secondaryKey);
            break;
         default:
            Console.WriteLine("Unknown option");
            break;
      }
      Console.WriteLine("Press <enter> to exit");
      Console.ReadLine();
   }

   static async Task Enrollement(string registrationId, string scopeId, string symetricKey)
   {
      try
      {
         using (var securityProvider = new SecurityProviderSymmetricKey(registrationId, symetricKey, null))
         {
            using (var transport = new ProvisioningTransportHandlerAmqp(TransportFallbackType.TcpOnly))
            {
               ProvisioningDeviceClient provClient = ProvisioningDeviceClient.Create(GlobalDeviceEndpoint, scopeId, securityProvider, transport);

               DeviceRegistrationResult result = await provClient.RegisterAsync();

               Console.WriteLine($"Hub:{result.AssignedHub} DeviceID:{result.DeviceId} RegistrationID:{result.RegistrationId} Status:{result.Status}");
               if (result.Status != ProvisioningRegistrationStatusType.Assigned)
               {
                  Console.WriteLine($"DeviceID{ result.Status} already assigned");
               }

               IAuthenticationMethod authentication = new DeviceAuthenticationWithRegistrySymmetricKey(result.DeviceId, (securityProvider as SecurityProviderSymmetricKey).GetPrimaryKey());

               using (DeviceClient iotClient = DeviceClient.Create(result.AssignedHub, authentication, TransportType.Amqp))
               {
                  Console.WriteLine("DeviceClient OpenAsync.");
                  await iotClient.OpenAsync().ConfigureAwait(false);
                  Console.WriteLine("DeviceClient SendEventAsync.");
                  await iotClient.SendEventAsync(new Message(Encoding.UTF8.GetBytes("TestMessage"))).ConfigureAwait(false);
                  Console.WriteLine("DeviceClient CloseAsync.");
                  await iotClient.CloseAsync().ConfigureAwait(false);
               }
            }
         }
      }
      catch (Exception ex)
      {
         Console.WriteLine(ex.Message);
      }
   }

   static void GroupEnrollementKeys(string registrationId, string primaryKey, string secondaryKey)
   {
      string primaryDeviceKey = ComputeDerivedSymmetricKey(Convert.FromBase64String(primaryKey), registrationId);
      string secondaryDeviceKey = ComputeDerivedSymmetricKey(Convert.FromBase64String(secondaryKey), registrationId);

      Console.WriteLine($"RegistrationID:{registrationId}");
      Console.WriteLine($" PrimaryDeviceKey:{primaryDeviceKey}");
      Console.WriteLine($" SecondaryDeviceKey:{secondaryDeviceKey}");
   }

   public static string ComputeDerivedSymmetricKey(byte[] masterKey, string registrationId)
   {
      using (var hmac = new HMACSHA256(masterKey))
      {
         return Convert.ToBase64String(hmac.ComputeHash(Encoding.UTF8.GetBytes(registrationId)));
      }
   }
}

I have five seeeduino LoRaWAN and a single Seeeduino LoRaWAN W/GPS device leftover from another project so I created a SeeeduinoLoRaWAN enrollment group.

DPS Enrollment Group configuration

Initially the enrollment group had no registration records so I ran my command-line application to generate group enrollment keys for one of my devices.

Device registration before running my command line application

Then I ran the command-line application with my scopeID, registrationID (LoRaWAN deviceEUI) and the device group enrollment key I had generated in the previous step.

Registering a device and sending a message to the my Azure IoT Hub

After running the command line application the device was visible in the enrollment group registration records.

Device registration after running my command line application

Provisioning a device with an individual enrollment has a different workflow. I had to run my command-line application with the RegistrationID, ScopeID, and one of the symmetric keys from the DPS individual enrollment device configuration.

DPS Individual enrollment configuration

A major downside to an individual enrollment is either the primary or the secondary symmetric key for the device has to be deployed on the device which could be problematic if the device has no secure storage.

With a group enrollment only the registration ID and the derived symmetric key have to be deployed on the device which is more secure.

Registering a device and sending a message to the my Azure IoT Hub

In Azure IoT Explorer I could see messages from both my group and individually enrolled devices arriving at my Azure IoT hub

After some initial issues I found DPS was quite reliable and surprisingly easy to configure. I did find the DPS ProvisioningDeviceClient.RegisterAsync method sometimes took several seconds to execute which may have some ramifications when my application is doing this on demand.

The Things Network HTTP Integration Part5

First TTN payload to the cloud

For my HTTP Integration I need to securely forward messages to an Azure IoT Hub or Azure IoT Central. This “nasty” console application loads a sample The Things Network(TTN) message from a file, connects the specified Azure IOT Hub, reformats the payload and sends it.

I couldn’t use System.Text.Json to construct the message payload as JsonDocument is not modifable(Sept2020). I had to rewrite my code to use Json.Net from Newtonsoft instead.

static async Task Main(string[] args)
{
   string filename ;
   string azureIoTHubconnectionString;
   DeviceClient azureIoTHubClient;
   Payload payload;
   JObject telemetryDataPoint = new JObject();

...
   filename = args[0];
   azureIoTHubconnectionString = args[1];

   try
   {
      payload = JsonConvert.DeserializeObject<Payload>(File.ReadAllText(filename));

      JObject payloadFields = (JObject)payload.payload_fields;

      using (azureIoTHubClient = DeviceClient.CreateFromConnectionString(azureIoTHubconnectionString, TransportType.Amqp))
      {
         await azureIoTHubClient.OpenAsync();

         foreach (JProperty child in payloadFields.Children())
         {
            EnumerateChildren(telemetryDataPoint, child);
         }
               
         using (Message message = new Message(Encoding.ASCII.GetBytes(JsonConvert.SerializeObject(telemetryDataPoint))))
         {
            Console.WriteLine(" {0:HH:mm:ss} AzureIoTHubDeviceClient SendEventAsync start", DateTime.UtcNow);
            await azureIoTHubClient.SendEventAsync(message);
            Console.WriteLine(" {0:HH:mm:ss} AzureIoTHubDeviceClient SendEventAsync finish", DateTime.UtcNow);
         }

         await azureIoTHubClient.CloseAsync();
      }
   }
   catch (Exception ex)
   {            
      Console.WriteLine(ex.Message);
   }

   Console.WriteLine("Press <enter> to exit");
   Console.ReadLine();
   return;
}

static void EnumerateChildren(JObject jobject, JToken token)
{
   if (token is JProperty property)
   {
      if (token.First is JValue)
      {
         jobject.Add(property.Name, property.Value);
      }
      else
      {
         JObject parentObject = new JObject();
         foreach (JToken token2 in token.Children())
         {
            EnumerateChildren(parentObject, token2);
            jobject.Add(property.Name, parentObject);
          }
     }
   }
   else
   {
      foreach (JToken token2 in token.Children())
      {
         EnumerateChildren(jobject, token2);
      }
   }
}

To connect to an Azure IoT Hub I copied the connection string from the portal.

Azure IoT Hub connection string components

Retrieving a connection string for a device connected to Azure IoT Central (without using the Device Provisioning Service(DPS)) is a bit more involved. There is a deprecated command line application dps-keygen which calls the DPS with a device ID , device SAS key and scope ID and returns a connection string.

Azure IoT Central Device Connection Information
Azure DPS-Keygen command-line

Using Azure IoT Explorer I could see reformatted JSON messages from my client application.

Azure IoT Explorer displaying message payload

These two approached are fine for testing but wouldn’t scale well and would be painful to use it there were 1000s, 100s or even 10s of devices.