The Things Network HTTP Integration Part10

Assembling the components

After a series of articles exploring how portions of solution could be built

I now had working code for receiving The Things Network(TTN) HTTP integration JSON messages with an Azure Function using an HTTPTrigger. (secured with an APIKey) and then putting them into an Azure Storage Queue for processing. This code was intentionally kept as small and as simple as possible so there was less to go wrong. The required configuration is also minimal.

HTTP Endpoint handler application

In the last couple of posts I had been building an Azure Function with a QueueTrigger to process the uplink messages. The function used custom bindings so that the CloudQueueMessage could be accessed, and load the Azure Storage account plus queue name from configuration. I’m still using classes generated by JSON2CSharp (with minimal modifications) for deserialising the payloads with JSON.Net.

The message processor Azure Function uses a ConcurrentCollection to store AzureDeviceClient objects constructed using the information returned by the Azure Device Provisioning Service(DPS). This is so the DPS doesn’t have to be called for the connection details for every message.(When the Azure function is restarted the dictionary of DeviceClient objects has to be repopulated). If there is a backlog of messages the message processor can process more than a dozen messages concurrently so the telemetry events displayed in an application like Azure IoT Central can arrive out of order.

The solution uses DPS Group Enrollment with Symmetric Key Attestation so Azure IoT Hub devices can be “automagically” created when a message from a new device is processed. The processing code is multi-thread and relies on many error conditions being handled by the Azure Function retry mechanism. After a number of failed retries the messages are moved to a poison queue. Azure Storage Explorer is a good tool for viewing payloads and moving poison messages back to the processing queue.

public static class UplinkMessageProcessor
{
   static readonly ConcurrentDictionary<string, DeviceClient> DeviceClients = new ConcurrentDictionary<string, DeviceClient>();

   [FunctionName("UplinkMessageProcessor")]
   public static async Task Run(
      [QueueTrigger("%UplinkQueueName%", Connection = "AzureStorageConnectionString")]
      CloudQueueMessage cloudQueueMessage, // Used to get CloudQueueMessage.Id for logging
      Microsoft.Azure.WebJobs.ExecutionContext context,
      ILogger log)
   {
      PayloadV5 payloadObect;
      DeviceClient deviceClient = null;
      DeviceProvisioningServiceSettings deviceProvisioningServiceConfig;

      string environmentName = Environment.GetEnvironmentVariable("ENVIRONMENT");

      // Load configuration for DPS. Refactor approach and store securely...
      var configuration = new ConfigurationBuilder()
      .SetBasePath(context.FunctionAppDirectory)
      .AddJsonFile($"appsettings.json")
      .AddJsonFile($"appsettings.{environmentName}.json")
      .AddEnvironmentVariables()
      .Build();

      // Load configuration for DPS. Refactor approach and store securely...
      try
      {
         deviceProvisioningServiceConfig = (DeviceProvisioningServiceSettings)configuration.GetSection("DeviceProvisioningService").Get<DeviceProvisioningServiceSettings>(); ;
      }
      catch (Exception ex)
      {
         log.LogError(ex, $"Configuration loading failed");
         throw;
      }

      // Deserialise uplink message from Azure storage queue
      try
      {
         payloadObect = JsonConvert.DeserializeObject<PayloadV5>(cloudQueueMessage.AsString);
      }
      catch (Exception ex)
      {
         log.LogError(ex, $"MessageID:{cloudQueueMessage.Id} uplink message deserialisation failed");
         throw;
      }

      // Extract the device ID as it's used lots of places
      string registrationID = payloadObect.hardware_serial;

      // Construct the prefix used in all the logging
      string messagePrefix = $"MessageID: {cloudQueueMessage.Id} DeviceID:{registrationID} Counter:{payloadObect.counter} Application ID:{payloadObect.app_id}";
      log.LogInformation($"{messagePrefix} Uplink message device processing start");

      // See if the device has already been provisioned
      if (DeviceClients.TryAdd(registrationID, deviceClient))
      {
         log.LogInformation($"{messagePrefix} Device provisioning start");

         string enrollmentGroupSymmetricKey = deviceProvisioningServiceConfig.EnrollmentGroupSymmetricKeyDefault;

         // figure out if custom mapping for TTN applicationID
         if (deviceProvisioningServiceConfig.ApplicationEnrollmentGroupMapping != null)
        {
            deviceProvisioningServiceConfig.ApplicationEnrollmentGroupMapping.GetValueOrDefault(payloadObect.app_id, deviceProvisioningServiceConfig.EnrollmentGroupSymmetricKeyDefault);
         }

         // Do DPS magic first time device seen
         await DeviceRegistration(log, messagePrefix, deviceProvisioningServiceConfig.GlobalDeviceEndpoint, deviceProvisioningServiceConfig.ScopeID, enrollmentGroupSymmetricKey, registrationID);
      }

      // Wait for the Device Provisioning Service to complete on this or other thread
      log.LogInformation($"{messagePrefix} Device provisioning polling start");
      if (!DeviceClients.TryGetValue(registrationID, out deviceClient))
      {
         log.LogError($"{messagePrefix} Device provisioning polling TryGet before while failed");

         throw new ApplicationException($"{messagePrefix} Device provisioning polling TryGet before while failed");
      }

      while (deviceClient == null)
      {
         log.LogInformation($"{messagePrefix} provisioning polling delay");
         await Task.Delay(deviceProvisioningServiceConfig.DeviceProvisioningPollingDelay);

         if (!DeviceClients.TryGetValue(registrationID, out deviceClient))
         {
            log.LogError($"{messagePrefix} Device provisioning polling TryGet while loop failed");

            throw new ApplicationException($"{messagePrefix} Device provisioning polling TryGet while loopfailed");
         }
      }

      // Assemble the JSON payload to send to Azure IoT Hub/Central.
      log.LogInformation($"{messagePrefix} Payload assembly start");
      JObject telemetryEvent = new JObject();
      try
      {
         JObject payloadFields = (JObject)payloadObect.payload_fields;
         telemetryEvent.Add("HardwareSerial", payloadObect.hardware_serial);
         telemetryEvent.Add("Retry", payloadObect.is_retry);
         telemetryEvent.Add("Counter", payloadObect.counter);
         telemetryEvent.Add("DeviceID", payloadObect.dev_id);
         telemetryEvent.Add("ApplicationID", payloadObect.app_id);
         telemetryEvent.Add("Port", payloadObect.port);
         telemetryEvent.Add("PayloadRaw", payloadObect.payload_raw);
         telemetryEvent.Add("ReceivedAt", payloadObect.metadata.time);

         // If the payload has been unpacked in TTN backend add fields to telemetry event payload
         if (payloadFields != null)
         {
            foreach (JProperty child in payloadFields.Children())
            {
               EnumerateChildren(telemetryEvent, child);
            }
         }
      }
      catch (Exception ex)
      {
         if (DeviceClients.TryRemove(registrationID, out deviceClient))
         {
            log.LogWarning($"{messagePrefix} TryRemove payload assembly failed");
         }

         log.LogError(ex, $"{messagePrefix} Payload assembly failed");
         throw;
      }

      // Send the message to Azure IoT Hub/Azure IoT Central
      log.LogInformation($"{messagePrefix} Payload SendEventAsync start");
      try
      {
         using (Message ioTHubmessage = new Message(Encoding.ASCII.GetBytes(JsonConvert.SerializeObject(telemetryEvent))))
         {
            // Ensure the displayed time is the acquired time rather than the uploaded time. esp. importan for messages that end up in poison queue
            ioTHubmessage.Properties.Add("iothub-creation-time-utc", payloadObect.metadata.time.ToString("s", CultureInfo.InvariantCulture));
            await deviceClient.SendEventAsync(ioTHubmessage);
         }
      }
      catch (Exception ex)
      {
         if (DeviceClients.TryRemove(registrationID, out deviceClient))
         {
            log.LogWarning($"{messagePrefix} TryRemove SendEventAsync failed");
         }

         log.LogError(ex, $"{messagePrefix} SendEventAsync failed");
         throw;
      }

   log.LogInformation($"{messagePrefix} Uplink message device processing completed");
   }
}

There is also support for using a specific GroupEnrollment based on the application_id in the uplink message payload.

"DeviceProvisioningService": {
      "GlobalDeviceEndpoint": "global.azure-devices-provisioning.net",
      "ScopeID": "",
      "EnrollmentGroupSymmetricKeyDefault": "TopSecretKey",
      "DeviceProvisioningPollingDelay": 500,
      "ApplicationEnrollmentGroupMapping": {
         "Application1": "TopSecretKey1",
         "Application2": "TopSecretKey2"
      }
   }

In addition to the appsettings.json there is configuration for application insights, uplink message queue name and Azure Storage connection strings. The “Environment” setting is important as it specifies what appsettings.json file should be used if code is being debugged etc..

TTN Integration uplink message processor configuration

The deployed solution application consists of Azure IoTHub and DPS instances. There are two Azure functions, one for putting the messages from the TTN into a queue the other is for processing them. The Azure Functions are hosted in an Azure AppService plan.

Azure solution deployment

An Azure Storage account is used for the queue and Azure Function synchronisation information and Azure Application Insights is used to monitor the solution.

The Things Network HTTP Integration Part6

Provisioning Devices on demand.

For development and testing being able to provision an individual device is really useful, though for Azure IoT Central it is not easy (especially with the deprecation of DPS-KeyGen). With an Azure IoT Hub device connection strings are available in the portal which is convenient but not terribly scalable.

Azure IoT Hub is integrated with, and Azure IoT Central forces the use of the Device Provisioning Service(DPS) which is designed to support the management of 1000’s of devices.

My HTTP Integration for The Things Network(TTN) is intended to support many devices and integrate with Azure IoT Central so I built yet another “nasty” console application to explore how the DPS works. The DPS also supports device attestation with a Trusted Platform Module(TPM) but this approach was not suitable for my application.

My command-line application supports individual and group enrollments with Symmetric Key Attestation and it can also generate group enrollment device keys.

class Program
{
   private const string GlobalDeviceEndpoint = "global.azure-devices-provisioning.net";

   static async Task Main(string[] args)
   {
      string registrationId;
...   
      registrationId = args[1];

      switch (args[0])
      {
         case "e":
         case "E":
            string scopeId = args[2];
            string symmetricKey = args[3];

            Console.WriteLine($"Enrolllment RegistrationID:{ registrationId} ScopeID:{scopeId}");
            await Enrollement(registrationId, scopeId, symmetricKey);
            break;
         case "k":
         case "K":
            string primaryKey = args[2];
            string secondaryKey = args[3];

            Console.WriteLine($"Enrollment Keys RegistrationID:{ registrationId}");
            GroupEnrollementKeys(registrationId, primaryKey, secondaryKey);
            break;
         default:
            Console.WriteLine("Unknown option");
            break;
      }
      Console.WriteLine("Press <enter> to exit");
      Console.ReadLine();
   }

   static async Task Enrollement(string registrationId, string scopeId, string symetricKey)
   {
      try
      {
         using (var securityProvider = new SecurityProviderSymmetricKey(registrationId, symetricKey, null))
         {
            using (var transport = new ProvisioningTransportHandlerAmqp(TransportFallbackType.TcpOnly))
            {
               ProvisioningDeviceClient provClient = ProvisioningDeviceClient.Create(GlobalDeviceEndpoint, scopeId, securityProvider, transport);

               DeviceRegistrationResult result = await provClient.RegisterAsync();

               Console.WriteLine($"Hub:{result.AssignedHub} DeviceID:{result.DeviceId} RegistrationID:{result.RegistrationId} Status:{result.Status}");
               if (result.Status != ProvisioningRegistrationStatusType.Assigned)
               {
                  Console.WriteLine($"DeviceID{ result.Status} already assigned");
               }

               IAuthenticationMethod authentication = new DeviceAuthenticationWithRegistrySymmetricKey(result.DeviceId, (securityProvider as SecurityProviderSymmetricKey).GetPrimaryKey());

               using (DeviceClient iotClient = DeviceClient.Create(result.AssignedHub, authentication, TransportType.Amqp))
               {
                  Console.WriteLine("DeviceClient OpenAsync.");
                  await iotClient.OpenAsync().ConfigureAwait(false);
                  Console.WriteLine("DeviceClient SendEventAsync.");
                  await iotClient.SendEventAsync(new Message(Encoding.UTF8.GetBytes("TestMessage"))).ConfigureAwait(false);
                  Console.WriteLine("DeviceClient CloseAsync.");
                  await iotClient.CloseAsync().ConfigureAwait(false);
               }
            }
         }
      }
      catch (Exception ex)
      {
         Console.WriteLine(ex.Message);
      }
   }

   static void GroupEnrollementKeys(string registrationId, string primaryKey, string secondaryKey)
   {
      string primaryDeviceKey = ComputeDerivedSymmetricKey(Convert.FromBase64String(primaryKey), registrationId);
      string secondaryDeviceKey = ComputeDerivedSymmetricKey(Convert.FromBase64String(secondaryKey), registrationId);

      Console.WriteLine($"RegistrationID:{registrationId}");
      Console.WriteLine($" PrimaryDeviceKey:{primaryDeviceKey}");
      Console.WriteLine($" SecondaryDeviceKey:{secondaryDeviceKey}");
   }

   public static string ComputeDerivedSymmetricKey(byte[] masterKey, string registrationId)
   {
      using (var hmac = new HMACSHA256(masterKey))
      {
         return Convert.ToBase64String(hmac.ComputeHash(Encoding.UTF8.GetBytes(registrationId)));
      }
   }
}

I have five seeeduino LoRaWAN and a single Seeeduino LoRaWAN W/GPS device leftover from another project so I created a SeeeduinoLoRaWAN enrollment group.

DPS Enrollment Group configuration

Initially the enrollment group had no registration records so I ran my command-line application to generate group enrollment keys for one of my devices.

Device registration before running my command line application

Then I ran the command-line application with my scopeID, registrationID (LoRaWAN deviceEUI) and the device group enrollment key I had generated in the previous step.

Registering a device and sending a message to the my Azure IoT Hub

After running the command line application the device was visible in the enrollment group registration records.

Device registration after running my command line application

Provisioning a device with an individual enrollment has a different workflow. I had to run my command-line application with the RegistrationID, ScopeID, and one of the symmetric keys from the DPS individual enrollment device configuration.

DPS Individual enrollment configuration

A major downside to an individual enrollment is either the primary or the secondary symmetric key for the device has to be deployed on the device which could be problematic if the device has no secure storage.

With a group enrollment only the registration ID and the derived symmetric key have to be deployed on the device which is more secure.

Registering a device and sending a message to the my Azure IoT Hub

In Azure IoT Explorer I could see messages from both my group and individually enrolled devices arriving at my Azure IoT hub

After some initial issues I found DPS was quite reliable and surprisingly easy to configure. I did find the DPS ProvisioningDeviceClient.RegisterAsync method sometimes took several seconds to execute which may have some ramifications when my application is doing this on demand.