Downlink messages NahYeah

While running my The Things IndustriesTTI) gateway I noticed an exception in the logs every so often

Exception of type 'Microsoft.Azure.Devices.Client.Exceptions.DeviceMessageLockLostException' was thrown.

My client subscribes to Message Queue Telemetry Transport Topics(MQTT) (using MQTTNet) for each TTI Application and establishes a connection (using an Azure DeviceClient) for each TTI Device to an Azure IoT Hub(s).

  • v3/{application id}@{tenant id}/devices/{device id}/up
  • v3/{application id}@{tenant id}/devices/{device id}/down/queued
  • v3/{application id}@{tenant id}/devices/{device id}/down/sent
  • v3/{application id}@{tenant id}/devices/{device id}/down/ack
  • v3/{application id}@{tenant id}/devices/{device id}/down/nack
  • v3/{application id}@{tenant id}/devices/{device id}/down/failed

The application subscribes to the queued, ack, nack, and failed topics so the progress of a downlink message can be monitored. For downlink messages the correlation_id “az:LockToken:” contains the message.LockToken so that they can be Abandoned, Completed or Rejected in the MQTT receive messageHandler.

Below is the logging from my application for an odd sequence of messages

*****Nothing much happening for a couple of hours the .'s represent approx 1 second. Wisnode 4 sends roughly every 5 minues

.....................................................................................................................................................................................................................................................................................................................
03:36:08 TTN Uplink message
 ApplicationID: application1
 DeviceID: wisnodetest04
 Port: 5
.....................................................................................................................................................................................................................................................................................................................
03:41:18 TTN Uplink message
 ApplicationID: application1
 DeviceID: wisnodetest04
 Port: 5
...........................................................................
03:42:34 Azure IoT Hub downlink message
 ApplicationID: application1
 DeviceID: wisnodetest04
 LockToken: 57ea0fad-b6b3-492e-b194-10c4ff3e53cb
 Body: vu8=

*****I then started sending 5 messages to Wisnode 5 same payload vu8=, port 71 thru 75 

***** 71 Queued
03:42:34 Queued: v3/application1@tenant1/devices/wisnodetest04/down/queued
 payload: {"end_device_ids":{"device_id":"wisnodetest04","application_ids":{"application_id":"application1"}},
	"correlation_ids":[
"az:LockToken:57ea0fad-b6b3-492e-b194-10c4ff3e53cb",
"as:downlink:01EXX9B1CA4DB68PKCDAK4SS4H"],
	"downlink_queued":{"f_port":71,"frm_payload":"vu8=","confirmed":true,"priority":"NORMAL",
	"correlation_ids":[
"az:LockToken:57ea0fad-b6b3-492e-b194-10c4ff3e53cb",
"as:downlink:01EXX9B1CA4DB68PKCDAK4SS4H"]}}
...
03:42:37 Azure IoT Hub downlink message
 ApplicationID: application1
 DeviceID: wisnodetest04
 LockToken: e2fef28c-fb1f-42cd-bb40-3ad8e6051da9
 Body: vu8=
.

***** 72 Queued
03:42:38 Queued: v3/application1@tenant1/devices/wisnodetest04/down/queued
 payload: {"end_device_ids":{"device_id":"wisnodetest04","application_ids":{"application_id":"application1"}},
	"correlation_ids":[
"az:LockToken:e2fef28c-fb1f-42cd-bb40-3ad8e6051da9",
"as:downlink:01EXX9B4RGSCJ4BN21GHPM85W5"],
	"downlink_queued":{"f_port":72,"frm_payload":"vu8=",
"confirmed":true,"priority":"NORMAL",
	"correlation_ids":[
"az:LockToken:e2fef28c-fb1f-42cd-bb40-3ad8e6051da9",
"as:downlink:01EXX9B4RGSCJ4BN21GHPM85W5"]}}
...
03:42:41 Azure IoT Hub downlink message
 ApplicationID: application1
 DeviceID: wisnodetest04
 LockToken: 70d61d71-9b24-44d2-b54b-7cc08da4d072
 Body: vu8=

***** 73 Queued
03:42:41 Queued: v3/application1@tenant1/devices/wisnodetest04/down/queued
 payload: {"end_device_ids":{"device_id":"wisnodetest04","application_ids":{"application_id":"application1"}},
	"correlation_ids":[
"az:LockToken:70d61d71-9b24-44d2-b54b-7cc08da4d072","as:downlink:01EXX9B800WF7FEP56J3EZ3M8A"],
	"downlink_queued":{"f_port":73,"frm_payload":"vu8=",
"confirmed":true,"priority":"NORMAL",
	"correlation_ids":[
"az:LockToken:70d61d71-9b24-44d2-b54b-7cc08da4d072",
"as:downlink:01EXX9B800WF7FEP56J3EZ3M8A"]}}
...

***** 74 Queued
03:42:45 Azure IoT Hub downlink message
 ApplicationID: application1
 DeviceID: wisnodetest04
 LockToken: 12537728-de4a-4489-ace5-92923e49b8e4
 Body: vu8=
.
03:42:45 Queued: v3/application1@tenant1/devices/wisnodetest04/down/queued
 payload: {"end_device_ids":{"device_id":"wisnodetest04","application_ids":{"application_id":"application1"}},
	"correlation_ids":[
"az:LockToken:12537728-de4a-4489-ace5-92923e49b8e4",
"as:downlink:01EXX9BBWA2YNCN2DFE5FC3BP3"],
	"downlink_queued":{
"f_port":74,"frm_payload":"vu8=",
"confirmed":true,"priority":"NORMAL",
	"correlation_ids":[
"az:LockToken:12537728-de4a-4489-ace5-92923e49b8e4",
"as:downlink:01EXX9BBWA2YNCN2DFE5FC3BP3"]}}
...

***** 75 Queued
03:42:48 Azure IoT Hub downlink message
 ApplicationID: application1
 DeviceID: wisnodetest04
 LockToken: 388efc11-4514-406e-8147-9109289095f4
 Body: vu8=

03:42:49 Queued: v3/application1@tenant1/devices/wisnodetest04/down/queued
 payload: {"end_device_ids":{"device_id":"wisnodetest04","application_ids":{"application_id":"application1"}},
	"correlation_ids":[
"az:LockToken:388efc11-4514-406e-8147-9109289095f4",
"as:downlink:01EXX9BFCM2G51EPYNWGDWPS0N"],
	"downlink_queued":{"f_port":75,"frm_payload":"vu8=",
"confirmed":true,"priority":"NORMAL",
	"correlation_ids":[
"az:LockToken:388efc11-4514-406e-8147-9109289095f4",
"as:downlink:01EXX9BFCM2G51EPYNWGDWPS0N"]}}

***** Waiting for Wisniode
..........................................................................................................................................................................
03:47:18 TTN Uplink message
 ApplicationID: application1
 DeviceID: wisnodetest04
 Port: 5

***** Waiting for Wisniode again, I think might have been such a long delay becuase TTI didn't get
..........................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................
***** 71 Nack'd
03:56:52 Nack: v3/application1@tenant1/devices/wisnodetest04/down/nack
 payload: {"end_device_ids":{"device_id":"wisnodetest04","application_ids":{"application_id":"application1"},
	"dev_eui":"60C5A8FFFE781691","join_eui":"70B3D57ED0000000","dev_addr":"26083BE1"},
	"correlation_ids":[
"as:downlink:01EXX9B1CA4DB68PKCDAK4SS4H",
"as:up:01EXXA572VHN7X7G5KFTHBQPNG",
"az:LockToken:57ea0fad-b6b3-492e-b194-10c4ff3e53cb",
"gs:conn:01EXRPTTFGFNTRGH7V8FTC3R0S",
"gs:up:host:01EXRPTTFTEXBNV87KZFYFWP5V",
"gs:uplink:01EXXA56VPK14XG5S8JB9Q0V0X",
"ns:uplink:01EXXA56VYCHGGPPN1K77REMNM",
"rpc:/ttn.lorawan.v3.GsNs/HandleUplink:01EXXA56VRG6811HRCF803VJ34"],
	"received_at":"2021-02-07T03:56:53.211893610Z",
	"downlink_nack":{
"session_key_id":"AXd6GPmneD3dKVoArcS36g==",
"f_port":71,"f_cnt":35,
"frm_payload":"vu8=",
"confirmed":true,"priority":"NORMAL",
	"correlation_ids":[
"az:LockToken:57ea0fad-b6b3-492e-b194-10c4ff3e53cb",
"as:downlink:01EXX9B1CA4DB68PKCDAK4SS4H"]}}

 Found az:LockToken:

03:56:52 TTN Uplink message
 ApplicationID: application1
 DeviceID: wisnodetest04
 Port: 5

03:56:52 Azure IoT Hub downlink message
 ApplicationID: application1
 DeviceID: wisnodetest04
 LockToken: 856f5a9b-bc37-435c-8de9-19d2213999f8
 Body: vu8=

03:56:53 Queued: v3/application1@tenant1/devices/wisnodetest04/down/queued
 payload: {
"end_device_ids":{"device_id":"wisnodetest04","application_ids":{"application_id":"application1"},
	"correlation_ids":[
"az:LockToken:856f5a9b-bc37-435c-8de9-19d2213999f8",
"as:downlink:01EXXA57JJWWYEDX3Z55TNSTP5"],
	"downlink_queued":{"f_port":71,
"frm_payload":"vu8=",
"confirmed":true,"priority":"NORMAL",
	"correlation_ids":
["az:LockToken:856f5a9b-bc37-435c-8de9-19d2213999f8",
"as:downlink:01EXXA57JJWWYEDX3Z55TNSTP5"]}}

......
***** 71 Ack'd
03:56:58 Ack: v3/application1@tenant1/devices/wisnodetest04/down/ack
 payload: {"end_device_ids":{"device_id":"wisnodetest04","application_ids":{"application_id":"application1"},
	"dev_eui":"60C5A8FFFE781691","join_eui":"70B3D57ED0000000","dev_addr":"26083BE1"},
	"correlation_ids":[
"as:downlink:01EXX9B1CA4DB68PKCDAK4SS4H",
"as:up:01EXXA5D45E77S19TXEV1E4GAJ",
"az:LockToken:57ea0fad-b6b3-492e-b194-10c4ff3e53cb",
"gs:conn:01EXRPTTFGFNTRGH7V8FTC3R0S",
"gs:up:host:01EXRPTTFTEXBNV87KZFYFWP5V",
"gs:uplink:01EXXA5CV73THH2RKEAC2T9MDP",
"ns:uplink:01EXXA5CVDCWPFBTXGGGB3T02W",
"rpc:/ttn.lorawan.v3.GsNs/HandleUplink:01EXXA5CVDEXDFBPYXC0J01Q3E"],
	"received_at":"2021-02-07T03:56:59.397330003Z",
	"downlink_ack":{
"session_key_id":"AXd6GPmneD3dKVoArcS36g==",
"f_port":71,"f_cnt":36,"frm_payload":"vu8=",
"confirmed":true,"priority":"NORMAL",
	"correlation_ids":[
"az:LockToken:57ea0fad-b6b3-492e-b194-10c4ff3e53cb",
"as:downlink:01EXX9B1CA4DB68PKCDAK4SS4H"]}}

 Found az:LockToken:
Exception of type 'Microsoft.Azure.Devices.Client.Exceptions.DeviceMessageLockLostException' was thrown.

03:56:59 TTN Uplink message
 ApplicationID: application1
 DeviceID: wisnodetest04
 Port: 0
......
03:57:04 Ack: v3/application1@tenant1/devices/wisnodetest04/down/ack
 payload: {"end_device_ids":{"device_id":"wisnodetest04","application_ids":{"application_id":"application1"},
"dev_eui":"60C5A8FFFE781691","join_eui":"70B3D57ED0000000","dev_addr":"26083BE1"},
"correlation_ids":[
"as:downlink:01EXX9B4RGSCJ4BN21GHPM85W5",
"as:up:01EXXA5K2FWGP9DGD7THWZ8HNR",
"az:LockToken:e2fef28c-fb1f-42cd-bb40-3ad8e6051da9",
"gs:conn:01EXRPTTFGFNTRGH7V8FTC3R0S",
"gs:up:host:01EXRPTTFTEXBNV87KZFYFWP5V",
"gs:uplink:01EXXA5JVDR102TKCWQ77P4YYF",
"ns:uplink:01EXXA5JVGNGMZN33FNT47G6PF",
"rpc:/ttn.lorawan.v3.GsNs/HandleUplink:01EXXA5JVGJFFQVEWX2M1XSFKK"],
"received_at":"2021-02-07T03:57:05.487910418Z","downlink_ack":{"session_key_id":"AXd6GPmneD3dKVoArcS36g==",
"f_port":72,"f_cnt":37,
"frm_payload":"vu8=",
"confirmed":true,"priority":"NORMAL","correlation_ids":
["az:LockToken:e2fef28c-fb1f-42cd-bb40-3ad8e6051da9","as:downlink:01EXX9B4RGSCJ4BN21GHPM85W5"]}}

The sequence of messages is a bit odd, in the Azure DeviceClient ReceiveMessageHandler a downlink message is published, then a queued message is received, then a nak and finally an ack, The exception was because my client was trying to Complete the delivery of a message that had already been Abandoned.

Application Insights & Configuration

As part of my The Things IndustriesTTI) Integration my current approach is to use an Azure web job and configure the Azure App Service host so it doesn’t get shutdown after a period of inactivity. This so my application won’t have to repeatedly use the TTI API to request the Application and Device configuration information to reload the cache (still not certain if this is going to be implemented with a ConcurrentDictionary or ObjectCache).

namespace devMobile.TheThingsNetwork.WorkerService
{
   using System.Collections.Generic;

   public class AzureDeviceProvisiongServiceSettings
   {
      public string IdScope { get; set; }
      public string GroupEnrollmentKey { get; set; }
   }

   public class AzureSettings
   {
      public string IoTHubConnectionString { get; set; }
      public AzureDeviceProvisiongServiceSettings DeviceProvisioningServiceSettings { get; set; }
   }

   public class ApplicationSetting
   {
      public AzureSettings AzureSettings { get; set; }

      public string MQTTAccessKey { get; set; }

      public byte? ApplicationPageSize { get; set; }

      public bool? DeviceIntegrationDefault { get; set; }
      public byte? DevicePageSize { get; set; }
   }

   public class TheThingsIndustries
   {
      public string MqttServerName { get; set; }
      public string MqttClientName { get; set; }

      public string Tennant { get; set; }
      public string ApiBaseUrl { get; set; }
      public string ApiKey { get; set; }

      public bool ApplicationIntegrationDefault { get; set; }
      public byte ApplicationPageSize { get; set; }

      public bool DeviceIntegrationDefault { get; set; }
      public byte DevicePageSize { get; set; }
   }

   public class ProgramSettings
   {
      public TheThingsIndustries TheThingsIndustries { get; set; }

      public AzureSettings AzureSettingsDefault { get; set; }

      public Dictionary<string, ApplicationSetting> Applications { get; set; }
   }
}

The amount of configuration required to support multiple TTI Applications containing many Devices is also starting to get out of hand.

I need to subscribe to a Message Queue Telemetry Transport Topics(MQTT using MQTTNet) for each Application and establish a connection (using an Azure DeviceClient) for each TTI Device to the configured Azure IoT Hub(s).

  • v3/{application id}@{tenant id}/devices/{device id}/up
  • v3/{application id}@{tenant id}/devices/{device id}/down/queued
  • v3/{application id}@{tenant id}/devices/{device id}/down/sent
  • v3/{application id}@{tenant id}/devices/{device id}/down/ack
  • v3/{application id}@{tenant id}/devices/{device id}/down/nack
  • v3/{application id}@{tenant id}/devices/{device id}/down/failed

The Azure DeviceClient has to be configured and OpenAsync called just before/after subscribing to the TTI Application /up topic so the SendEventAsync method can be called to send messages to the configured Azure IoT Hub(s). For downlink messages the SetReceiveMessageHandler method will need to be called just before/after subscribing to ../down/queued, ../down/sent,../down/ack,…/down/nack and ,…/down/failed downlink topics.

The ordering of downloading the Application and Device configuration so downlink messages can be sent and uplink message received as soon as possible (so no messages are lost) is important. I have considered making the downlink process multi-threaded so API calls are made concurrently but I’m not certain the additional complexity would be worth it, especially in initial versions.

I’m also currently not certain about how to register my program for Application and Device registry changes so it doesn’t have to be restarted when configuration changes. I have also considered reverting to an HTTP Integration so that I could use Azure Storage queues to buffer uplink and downlink messages. This may also introduce ordering issues when multiple threads are created for Azure Queue Trigger functions to process a message backlog.

For debugging the application and monitoring in production I was planning on using the Apache Log4Net library but now I’m not certain the additional configuration complexity and dependencies are worth it. The built in Microsoft.Extensions.Logging library with Azure Application Insights integration looks like a “light weight” alternative with sufficient functionality .

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
   while (!stoppingToken.IsCancellationRequested)
   {
      _logger.LogDebug("Debug worker running at: {time}", DateTimeOffset.Now);
      _logger.LogInformation("Info worker running at: {time}", DateTimeOffset.Now);
      _logger.LogWarning("Warning worker running at: {time}", DateTimeOffset.Now);
      _logger.LogError("Error running at: {time}", DateTimeOffset.Now);

      using (_logger.BeginScope("TheThingsIndustries configuration"))
      {
         _logger.LogInformation("Tennant: {0}", _programSettings.TheThingsIndustries.Tennant);
         _logger.LogInformation("ApiBaseUrl: {0}", _programSettings.TheThingsIndustries.ApiBaseUrl);
         _logger.LogInformation("ApiKey: {0}", _programSettings.TheThingsIndustries.ApiKey);

         _logger.LogInformation("ApplicationPageSize: {0}", _programSettings.TheThingsIndustries.ApplicationPageSize);
         _logger.LogInformation("DevicePageSize: {0}", _programSettings.TheThingsIndustries.DevicePageSize);

         _logger.LogInformation("ApplicationIntegrationDefault: {0}", _programSettings.TheThingsIndustries.ApplicationIntegrationDefault);
         _logger.LogInformation("DeviceIntegrationDefault: {0}", _programSettings.TheThingsIndustries.DeviceIntegrationDefault);

         _logger.LogInformation("MQTTServerName: {0}", _programSettings.TheThingsIndustries.MqttServerName);
         _logger.LogInformation("MQTTClientName: {0}", _programSettings.TheThingsIndustries.MqttClientName);
      }

      using (_logger.BeginScope("Azure default configuration"))
      {
         if (_programSettings.AzureSettingsDefault.IoTHubConnectionString != null)
         {
            _logger.LogInformation("AzureSettingsDefault.IoTHubConnectionString: {0}", _programSettings.AzureSettingsDefault.IoTHubConnectionString);
         }

         if (_programSettings.AzureSettingsDefault.DeviceProvisioningServiceSettings != null)
         {
            _logger.LogInformation("AzureSettings.DeviceProvisioningServiceSettings.IdScope: {0}", _programSettings.AzureSettingsDefault.DeviceProvisioningServiceSettings.IdScope);
            _logger.LogInformation("AzureSettings.DeviceProvisioningServiceSettings.GroupEnrollmentKey: {0}", _programSettings.AzureSettingsDefault.DeviceProvisioningServiceSettings.GroupEnrollmentKey);
         }
      }
    
      foreach (var application in _programSettings.Applications)
      {
         using (_logger.BeginScope(new[] { new KeyValuePair<string, object>("Application", application.Key)}))
         {
            _logger.LogInformation("MQTTAccessKey: {0} ", application.Value.MQTTAccessKey);

            if (application.Value.ApplicationPageSize.HasValue)
            {
               _logger.LogInformation("ApplicationPageSize: {0} ", application.Value.ApplicationPageSize.Value);
            }

            if (application.Value.DeviceIntegrationDefault.HasValue)
            {
               _logger.LogInformation("DeviceIntegation: {0} ", application.Value.DeviceIntegrationDefault.Value);
            }

            if (application.Value.DevicePageSize.HasValue)
            {
               _logger.LogInformation("DevicePageSize: {0} ", application.Value.DevicePageSize.Value);
            }

            if (application.Value.AzureSettings.IoTHubConnectionString != null)
            {
               _logger.LogInformation("AzureSettings.IoTHubConnectionString: {0} ", application.Value.AzureSettings.IoTHubConnectionString);
            }

            if (application.Value.AzureSettings.DeviceProvisioningServiceSettings != null)
            {
               _logger.LogInformation("AzureSettings.DeviceProvisioningServiceSettings.IdScope: {0} ", application.Value.AzureSettings.DeviceProvisioningServiceSettings.IdScope);
               _logger.LogInformation("AzureSettings.DeviceProvisioningServiceSettings.GroupEnrollmentKey: {0} ", application.Value.AzureSettings.DeviceProvisioningServiceSettings.GroupEnrollmentKey);
            }
         }
      }

      await Task.Delay(300000, stoppingToken);
   }
}

The logging information formatting is sufficiently readable when running locally

Extensive use of the BeginScope method to include additional meta-data on logged records should make debugging easier.

This long post is to explain some of my design decisions and which ones are still to be decided

MQTTnet Azure Function Binding

It Looks promising

I’m using MQTTnet to build my The Things Industries client and it looked like the amount of code for my Message Queue Telemetry Transport (MQTT) Data API Integration could be reduced by using the AzureFunction MQTT Binding by Kees Schollaart.

I used The Things Industries simulate uplink functionality for my initial testing

TTI uplink message simulator

The first version of the Azure function code proof of concept(PoC) was very compact

namespace MQTTnetAzureFunction
{
   using System;
   using System.Text;
   using Microsoft.Azure.WebJobs;
   using Microsoft.Extensions.Logging;

   using CaseOnline.Azure.WebJobs.Extensions.Mqtt;
   using CaseOnline.Azure.WebJobs.Extensions.Mqtt.Messaging;
   using CaseOnline.Azure.WebJobs.Extensions.Mqtt.Config;
   using CaseOnline.Azure.WebJobs.Extensions.Mqtt.Bindings;

   using MQTTnet.Client.Options;
   using MQTTnet.Extensions.ManagedClient;

   public static class Subscribe
   {
      [FunctionName("UplinkMessageProcessor")]
      public static void UplinkMessageProcessor(
            [MqttTrigger("v3/application123456789012345/devices/+/up", ConnectionString = "TTNMQTTConnectionString")] IMqttMessage message,
IMqttMessage message,
            ILogger log)
      {
         var body = Encoding.UTF8.GetString(message.GetMessage());

         log.LogInformation($"Advanced: message from topic {message.Topic} \nbody: {body}");
      }
   }
}

I configured the TTNMQTTConnectionString in the application’s local.settings.json file

{
    "IsEncrypted": false,
   "Values": {
      "AzureWebJobsStorage": "DefaultEndpointsProtocol=https;AccountName=...",
      "AzureWebJobsDashboard": "DefaultEndpointsProtocol=https;AccountName=...",
      "FUNCTIONS_WORKER_RUNTIME": "dotnet",
      "TTNMQTTConnectionString": "Server=...;Username=application1@...;Password=...",
   }
}

This was a good start but I need to be able to configure the MQTT topic for deployments.

After looking at the binding source code plus some trial and error based on the AdvancedConfiguration sample I have a nasty PoC

public static class Subscribe
{
   [FunctionName("UplinkMessageProcessor")]
   public static void UplinkMessageProcessor(
         [MqttTrigger(typeof(ExampleMqttConfigProvider), "v3/%TopicName%/devices/+/up")] IMqttMessage message,
         ILogger log)
   {
      var body = Encoding.UTF8.GetString(message.GetMessage());

      log.LogInformation($"Advanced: message from topic {message.Topic} \nbody: {body}");
   }
}

public class MqttConfigExample : CustomMqttConfig
{
  public override IManagedMqttClientOptions Options { get; }

  public override string Name { get; }

  public MqttConfigExample(string name, IManagedMqttClientOptions options)
  {
     Options = options;
     Name = name;
   }
}

public class ExampleMqttConfigProvider : ICreateMqttConfig
{
   public CustomMqttConfig Create(INameResolver nameResolver, ILogger logger)
   {
      var connectionString = new MqttConnectionString(nameResolver.Resolve("TTNMQTTConnectionString"), "CustomConfiguration");

      var options = new ManagedMqttClientOptionsBuilder()
             .WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
             .WithClientOptions(new MqttClientOptionsBuilder()
                  .WithClientId(connectionString.ClientId.ToString())
                  .WithTcpServer(connectionString.Server, connectionString.Port)
                  .WithCredentials(connectionString.Username, connectionString.Password)
                  .Build())
             .Build();

      return new MqttConfigExample("CustomConnection", options);
   }
}

The TTNMQTTConnectionString and TopicName can be configured in the application’s local.settings.json file

{
    "IsEncrypted": false,
   "Values": {
      "AzureWebJobsStorage": "DefaultEndpointsProtocol=https;AccountName=...",
      "AzureWebJobsDashboard": "DefaultEndpointsProtocol=https;AccountName=...",
      "FUNCTIONS_WORKER_RUNTIME": "dotnet",
      "TTNMQTTConnectionString": "Server=...;Username=application1@...;Password=...",
      "TopicName": "application1@..."
   }
}

When run in the Azure Functions Core Tools the simulated message properties and payload are displayed

The message “The ‘UplinkMessageProcessor’ function is in error: Unable to configure binding ‘message’ of type ‘mqttTrigger’. This may indicate invalid function.json properties. Can’t figure out which ctor to call.” needs further investigation.

The Things Network Cayenne LPP Support

Uplink Encoding

In my applications the myDevices Cayenne Low power payload(LPP) uplink messages from my *duino devices are decoded by the built in The Things Network(TTN) decoder. I can also see the nicely formatted values in the device data view.

Downlink Encoding

I could successfully download raw data to the device but I found that manually unpacking it on the device was painful.

Raw data

I really want to send LPP formatted messages to my devices so I could use a standard LPP library. I initially populated the payload fields in the downlink message JSON. The TTN documentation appeared to indicate this was possible.

Download JSON payload format

Initially I tried a more complex data type because I was looking at downloading a location to the device.

Complex data type

I could see nicely formatted values in the device data view but they didn’t arrive at the device. I then tried simpler data type to see if the complex data type was an issue.

Simple Data Types

At this point I asked a few questions on the TTN forums and started to dig into the TTN source code.

Learning Go on demand

I had a look at the TTB Go code and learnt a lot as I figured out how the “baked in “encoder/decoder worked. I haven’t done any Go coding so it took a while to get comfortable with the syntax. The code my look a bit odd as a Pascal formatter was the closest I could get to Go.

In core/handler/cayennelpp/encoder.go there was

func (e *Encoder) Encode(fields map[string]interface{}, fPort uint8) ([]byte, bool, error) and func (d *Decoder) Decode(payload []byte, fPort uint8) (map[string]interface{}, bool, error)

Which was a positive sign…

Then in core/handler/convert_fields.go there are these two functions

> // ConvertFieldsUp converts the payload to fields using the application's payload formatter
> func (h *handler) ConvertFieldsUp(ctx ttnlog.Interface, _ *pb_broker.DeduplicatedUplinkMessage, appUp *types.UplinkMessage, dev *device.Device) error {
> 	// Find Application

and

> // ConvertFieldsDown converts the fields into a payload
> func (h *handler) ConvertFieldsDown(ctx ttnlog.Interface, appDown *types.DownlinkMessage, ttnDown *pb_broker.DownlinkMessage, _ *device.Device) error {

Then further down in the second function is this call

var encoder PayloadEncoder
	switch app.PayloadFormat {
	case application.PayloadFormatCustom:
		encoder = &CustomDownlinkFunctions{
			Encoder: app.CustomEncoder,
			Logger:  functions.Ignore,
		}
	case application.PayloadFormatCayenneLPP:
		encoder = &cayennelpp.Encoder{}
	default:
		return nil
	}var encoder PayloadEncoder
	switch app.PayloadFormat {
	case application.PayloadFormatCustom:
		encoder = &CustomDownlinkFunctions{
			Encoder: app.CustomEncoder,
			Logger:  functions.Ignore,
		}
	case application.PayloadFormatCayenneLPP:
		encoder = &cayennelpp.Encoder{}
	default:
		return nil
	}

Which I think calls

// Encode encodes the fields to CayenneLPP
func (e *Encoder) Encode(fields map[string]interface{}, fPort uint8) ([]byte, bool, error) {
	encoder := protocol.NewEncoder()
	for name, value := range fields {
		key, channel, err := parseName(name)
		if err != nil {
			continue
		}
		switch key {
		case valueKey:
			if val, ok := value.(float64); ok {
				encoder.AddPort(channel, float32(val))
			}
		}
	}
	return encoder.Bytes(), true, nil
}

Then right down at the very bottom of the call stack in keys.go

func parseName(name string) (string, uint8, error) {
	parts := strings.Split(name, "_")
	if len(parts) < 2 {
		return "", 0, errors.New("Invalid name")
	}
	key := strings.Join(parts[:len(parts)-1], "_")
	if key == "" {
		return "", 0, errors.New("Invalid key")
	}
	channel, err := strconv.Atoi(parts[len(parts)-1])
	if err != nil {
		return "", 0, err
	}
	if channel < 0 || channel > 255 {
		return "", 0, errors.New("Invalid range")
	}
	return key, uint8(channel), nil
}

At this point I started to hit the limits of my Go skills but with some trial and error I figured it out…

Executive Summary

The downlink payload values are sent as 2 byte floats with a sign bit, 100 multiplier. The fields have to be named “value_X” where X is is a byte value.

Dictionary<string, object> payloadFields = new Dictionary<string, object>();
payloadFields.Add(“value_0”, 0.0);
//00-00-00
payloadFields.Add(“value_1”, 1.0);
//01-00-64
payloadFields.Add(“value_2”, 2.0);
//02-00-C8
payloadFields.Add(“value_3”, 3.0);
//03-01-2C
payloadFields.Add(“value_4”, 4.0);
//04-01-90

payloadFields.Add(“value_0”, -0.0);
//00-00-00
payloadFields.Add(“value_1”, -1.0);
//01-FF-9C
payloadFields.Add(“value_2”, -2.0);
//02-FF-38
payloadFields.Add(“value_3”, -3.0);
//03-FE-D4
payloadFields.Add(“value_4”, -4.0);
//04-FE-70

I could see these arrive on my TinyCLR plus RAK811 device and could manually unpack them

The stream of bytes can be decoded on an Arduino using the electronic cats library (needs a small modification) with code this

byte data[] = {0xff,0x38} ; // bytes which represent -2 
float value = lpp.getValue( data, 2, 100, 1);
Serial.print("value:");
Serial.println(value);

It is possible to use the “baked” in Cayenne Encoder/Decoder to send payload fields to a device but I’m not certain is this is quite what myDevices/TTN intended.

The Things Network HTTP Integration Part13

Connection multiplexing

For the Proof of Concept(PoC) I had used a cache to store Azure IoT Hub connections to reduce the number of calls to the Device Provisioning Service(DPS).

Number of connections with no pooling

When stress testing with 1000’s of devices my program hit the host connection limit so I enabled Advanced Message Queuing Protocol(AMQP) connection pooling.

return DeviceClient.Create(result.AssignedHub,
                  authentication,
                  new ITransportSettings[]
                  {
                     new AmqpTransportSettings(TransportType.Amqp_Tcp_Only)
                     {
                        PrefetchCount = 0,
                        AmqpConnectionPoolSettings = new AmqpConnectionPoolSettings()
                        {
                           Pooling = true,
                        }
                     }
                  }
               );

My first attempt failed as I hadn’t configured “TransportType.Amqp_Tcp_Only” which would have allowed the AMQP implementation to fallback to other protocols which don’t support pooling.

Exception caused by not using TransportType.Amqp_Tcp_Only

I then deployed the updated code and ran my 1000 device stress test (note the different x axis scales)

Number of connections with pooling

This confirmed what I found in the Azure.AMQP source code

/// <summary>
/// The default size of the pool
/// </summary>
/// <remarks>
/// Allows up to 100,000 devices
/// </remarks>
/// private const uint DefaultPoolSize = 100;

The Things Network HTTP Integration Part12

Removing the DIY cache

For the Proof of Concept(PoC) I had written a simple cache using a ConcurrentDictionary to store Azure IoT Hub connections to reduce the number of calls to the Device Provisioning Service(DPS).

Device Provisioning Service calls in stress test

For a PoC the DIY cache was ok but I wanted to replace it with something more robust like the .Net ObjectCache which is in the System.Runtime.Caching namespace.

I started by replacing the ConcurrentDictionary declaration

static readonly ConcurrentDictionary<string, DeviceClient> DeviceClients = new ConcurrentDictionary<string, DeviceClient>();
     

With an ObjectCache declaration.

static readonly ObjectCache DeviceClients = MemoryCache.Default;
  

Then, where there were compiler errors I updated the method call.

// See if the device has already been provisioned or is being provisioned on another thread.
if (DeviceClients.Add(registrationId, deviceContext, cacheItemPolicy))
{
   log.LogInformation("RegID:{registrationId} Device provisioning start", registrationId);
...

One difference I found was that ObjectCache throws an exception if the value is null. I was using a null value to indicate that the Device Provisioning Service(DPS) process had been initiated on another thread and was underway.

I have been planning to add support for downlink messages so I added a new class to store the uplink (Azure IoT Hub DeviceClient) and downlink ( downlink_url in the uplink message) details.

 public class DeviceContext
   {
      public DeviceClient Uplink { get; set; }
      public Uri Downlink { get; set; }
   }

For the first version the only functionality I’m using is sliding expiration which is set to one day

CacheItemPolicy cacheItemPolicy = new CacheItemPolicy()
{
   SlidingExpiration = new TimeSpan(1, 0, 0, 0),
   //RemovedCallback
};

DeviceContext deviceContext = new DeviceContext()
{
   Uplink = null,
   Downlink = new Uri(payload.DownlinkUrl)
};

I didn’t have to make many changes and I’ll double check my implementation in the next round of stress and soak testing.

The Things Network HTTP Integration Part11

Moving Secrets to KeyVault

The application configuration file contained sensitive information like Device Provision Service(DPS) Group Enrollment Symmetric Keys and Azure IoT Hub connection strings which is OK for a proof of concept (PoC) but sub-optimal for production deployments.

"DeviceProvisioningService": {
      "GlobalDeviceEndpoint": "global.azure-devices-provisioning.net",
      "ScopeID": "",
      "EnrollmentGroupSymmetricKeyDefault": "TopSecretKey",
      "DeviceProvisioningPollingDelay": 500,
      "ApplicationEnrollmentGroupMapping": {
         "Application1": "TopSecretKey1",
         "Application2": "TopSecretKey2"
      }
   }

The Azure Key Vault is intended for securing sensitive information like connection strings so I added one to my resource group.

Azure Key Vault overview and basic metrics

I wrote a wrapper which resolves configuration settings based on the The Things Network(TTN) application identifier and port information in the uplink message payload. The resolve methods start by looking for configuration for the applicationId and port (separated by a – ), then the applicationId and then finally falling back to a default value. This functionality is used for AzureIoTHub connection strings, DPS IDScopes, DPS Enrollment Group Symmetric Keys, and is also used to format the cache keys.

public class ApplicationConfiguration
{
const string DpsGlobaDeviceEndpointDefault = "global.azure-devices-provisioning.net";

private IConfiguration Configuration;

public void Initialise( )
{
   // Check that KeyVault URI is configured in environment variables. Not a lot we can do if it isn't....
   if (Configuration == null)
   {
      string keyVaultUri = Environment.GetEnvironmentVariable("KeyVaultURI");
      if (string.IsNullOrWhiteSpace(keyVaultUri))
      {
         throw new ApplicationException("KeyVaultURI environment variable not set");
      }

      // Load configuration from KeyVault 
      Configuration = new ConfigurationBuilder()
         .AddEnvironmentVariables()
         .AddAzureKeyVault(keyVaultUri)
         .Build();
   }
}

public string DpsGlobaDeviceEndpointResolve()
{
   string globaDeviceEndpoint = Configuration.GetSection("DPSGlobaDeviceEndpoint").Value;
   if (string.IsNullOrWhiteSpace(globaDeviceEndpoint))
   {
      globaDeviceEndpoint = DpsGlobaDeviceEndpointDefault;
   }

   return globaDeviceEndpoint;
}

public string ConnectionStringResolve(string applicationId, int port)
{
   // Check to see if there is application + port specific configuration
   string connectionString = Configuration.GetSection($"AzureIotHubConnectionString-{applicationId}-{port}").Value;
   if (!string.IsNullOrWhiteSpace(connectionString))
   {
      return connectionString;
   }

   // Check to see if there is application specific configuration, otherwise run with default
   connectionString = Configuration.GetSection($"AzureIotHubConnectionString-{applicationId}").Value;
   if (!string.IsNullOrWhiteSpace(connectionString))
   {
      return connectionString;
   }

   // get the default as not a specialised configuration
   connectionString = Configuration.GetSection("AzureIotHubConnectionStringDefault").Value;

   return connectionString;
}

public string DpsIdScopeResolve(string applicationId, int port)
{
   // Check to see if there is application + port specific configuration
   string idScope = Configuration.GetSection($"DPSIDScope-{applicationId}-{port}").Value;
   if (!string.IsNullOrWhiteSpace(idScope))
   {
      return idScope;
   }

   // Check to see if there is application specific configuration, otherwise run with default
   idScope = Configuration.GetSection($"DPSIDScope-{applicationId}").Value;
   if (!string.IsNullOrWhiteSpace(idScope))
   {
      return idScope;
   }

   // get the default as not a specialised configuration
   idScope = Configuration.GetSection("DPSIDScopeDefault").Value;

   if (string.IsNullOrWhiteSpace(idScope))
   {
      throw new ApplicationException($"DPSIDScope configuration invalid");
   }

   return idScope;
}

The values of Azure function configuration settings are replaced by a reference to the secret in the Azure Key Vault.

Azure Function configuration value replacement

In the Azure Key Vault “Access Policies” I configured an “Application Access Policy” so my Azure TTNAzureIoTHubMessageV2Processor function identity could retrieve secrets.

Azure Key Vault Secrets

I kept on making typos in the secret names and types which was frustrating.

Azure Key Vault secret

While debugging in Visual Studio you may need to configure the Azure Identity so the application can access the Azure Key Vault.

The Things Network HTTP Integration Part10

Assembling the components

After a series of articles exploring how portions of solution could be built

I now had working code for receiving The Things Network(TTN) HTTP integration JSON messages with an Azure Function using an HTTPTrigger. (secured with an APIKey) and then putting them into an Azure Storage Queue for processing. This code was intentionally kept as small and as simple as possible so there was less to go wrong. The required configuration is also minimal.

HTTP Endpoint handler application

In the last couple of posts I had been building an Azure Function with a QueueTrigger to process the uplink messages. The function used custom bindings so that the CloudQueueMessage could be accessed, and load the Azure Storage account plus queue name from configuration. I’m still using classes generated by JSON2CSharp (with minimal modifications) for deserialising the payloads with JSON.Net.

The message processor Azure Function uses a ConcurrentCollection to store AzureDeviceClient objects constructed using the information returned by the Azure Device Provisioning Service(DPS). This is so the DPS doesn’t have to be called for the connection details for every message.(When the Azure function is restarted the dictionary of DeviceClient objects has to be repopulated). If there is a backlog of messages the message processor can process more than a dozen messages concurrently so the telemetry events displayed in an application like Azure IoT Central can arrive out of order.

The solution uses DPS Group Enrollment with Symmetric Key Attestation so Azure IoT Hub devices can be “automagically” created when a message from a new device is processed. The processing code is multi-thread and relies on many error conditions being handled by the Azure Function retry mechanism. After a number of failed retries the messages are moved to a poison queue. Azure Storage Explorer is a good tool for viewing payloads and moving poison messages back to the processing queue.

public static class UplinkMessageProcessor
{
   static readonly ConcurrentDictionary<string, DeviceClient> DeviceClients = new ConcurrentDictionary<string, DeviceClient>();

   [FunctionName("UplinkMessageProcessor")]
   public static async Task Run(
      [QueueTrigger("%UplinkQueueName%", Connection = "AzureStorageConnectionString")]
      CloudQueueMessage cloudQueueMessage, // Used to get CloudQueueMessage.Id for logging
      Microsoft.Azure.WebJobs.ExecutionContext context,
      ILogger log)
   {
      PayloadV5 payloadObect;
      DeviceClient deviceClient = null;
      DeviceProvisioningServiceSettings deviceProvisioningServiceConfig;

      string environmentName = Environment.GetEnvironmentVariable("ENVIRONMENT");

      // Load configuration for DPS. Refactor approach and store securely...
      var configuration = new ConfigurationBuilder()
      .SetBasePath(context.FunctionAppDirectory)
      .AddJsonFile($"appsettings.json")
      .AddJsonFile($"appsettings.{environmentName}.json")
      .AddEnvironmentVariables()
      .Build();

      // Load configuration for DPS. Refactor approach and store securely...
      try
      {
         deviceProvisioningServiceConfig = (DeviceProvisioningServiceSettings)configuration.GetSection("DeviceProvisioningService").Get<DeviceProvisioningServiceSettings>(); ;
      }
      catch (Exception ex)
      {
         log.LogError(ex, $"Configuration loading failed");
         throw;
      }

      // Deserialise uplink message from Azure storage queue
      try
      {
         payloadObect = JsonConvert.DeserializeObject<PayloadV5>(cloudQueueMessage.AsString);
      }
      catch (Exception ex)
      {
         log.LogError(ex, $"MessageID:{cloudQueueMessage.Id} uplink message deserialisation failed");
         throw;
      }

      // Extract the device ID as it's used lots of places
      string registrationID = payloadObect.hardware_serial;

      // Construct the prefix used in all the logging
      string messagePrefix = $"MessageID: {cloudQueueMessage.Id} DeviceID:{registrationID} Counter:{payloadObect.counter} Application ID:{payloadObect.app_id}";
      log.LogInformation($"{messagePrefix} Uplink message device processing start");

      // See if the device has already been provisioned
      if (DeviceClients.TryAdd(registrationID, deviceClient))
      {
         log.LogInformation($"{messagePrefix} Device provisioning start");

         string enrollmentGroupSymmetricKey = deviceProvisioningServiceConfig.EnrollmentGroupSymmetricKeyDefault;

         // figure out if custom mapping for TTN applicationID
         if (deviceProvisioningServiceConfig.ApplicationEnrollmentGroupMapping != null)
        {
            deviceProvisioningServiceConfig.ApplicationEnrollmentGroupMapping.GetValueOrDefault(payloadObect.app_id, deviceProvisioningServiceConfig.EnrollmentGroupSymmetricKeyDefault);
         }

         // Do DPS magic first time device seen
         await DeviceRegistration(log, messagePrefix, deviceProvisioningServiceConfig.GlobalDeviceEndpoint, deviceProvisioningServiceConfig.ScopeID, enrollmentGroupSymmetricKey, registrationID);
      }

      // Wait for the Device Provisioning Service to complete on this or other thread
      log.LogInformation($"{messagePrefix} Device provisioning polling start");
      if (!DeviceClients.TryGetValue(registrationID, out deviceClient))
      {
         log.LogError($"{messagePrefix} Device provisioning polling TryGet before while failed");

         throw new ApplicationException($"{messagePrefix} Device provisioning polling TryGet before while failed");
      }

      while (deviceClient == null)
      {
         log.LogInformation($"{messagePrefix} provisioning polling delay");
         await Task.Delay(deviceProvisioningServiceConfig.DeviceProvisioningPollingDelay);

         if (!DeviceClients.TryGetValue(registrationID, out deviceClient))
         {
            log.LogError($"{messagePrefix} Device provisioning polling TryGet while loop failed");

            throw new ApplicationException($"{messagePrefix} Device provisioning polling TryGet while loopfailed");
         }
      }

      // Assemble the JSON payload to send to Azure IoT Hub/Central.
      log.LogInformation($"{messagePrefix} Payload assembly start");
      JObject telemetryEvent = new JObject();
      try
      {
         JObject payloadFields = (JObject)payloadObect.payload_fields;
         telemetryEvent.Add("HardwareSerial", payloadObect.hardware_serial);
         telemetryEvent.Add("Retry", payloadObect.is_retry);
         telemetryEvent.Add("Counter", payloadObect.counter);
         telemetryEvent.Add("DeviceID", payloadObect.dev_id);
         telemetryEvent.Add("ApplicationID", payloadObect.app_id);
         telemetryEvent.Add("Port", payloadObect.port);
         telemetryEvent.Add("PayloadRaw", payloadObect.payload_raw);
         telemetryEvent.Add("ReceivedAt", payloadObect.metadata.time);

         // If the payload has been unpacked in TTN backend add fields to telemetry event payload
         if (payloadFields != null)
         {
            foreach (JProperty child in payloadFields.Children())
            {
               EnumerateChildren(telemetryEvent, child);
            }
         }
      }
      catch (Exception ex)
      {
         if (DeviceClients.TryRemove(registrationID, out deviceClient))
         {
            log.LogWarning($"{messagePrefix} TryRemove payload assembly failed");
         }

         log.LogError(ex, $"{messagePrefix} Payload assembly failed");
         throw;
      }

      // Send the message to Azure IoT Hub/Azure IoT Central
      log.LogInformation($"{messagePrefix} Payload SendEventAsync start");
      try
      {
         using (Message ioTHubmessage = new Message(Encoding.ASCII.GetBytes(JsonConvert.SerializeObject(telemetryEvent))))
         {
            // Ensure the displayed time is the acquired time rather than the uploaded time. esp. importan for messages that end up in poison queue
            ioTHubmessage.Properties.Add("iothub-creation-time-utc", payloadObect.metadata.time.ToString("s", CultureInfo.InvariantCulture));
            await deviceClient.SendEventAsync(ioTHubmessage);
         }
      }
      catch (Exception ex)
      {
         if (DeviceClients.TryRemove(registrationID, out deviceClient))
         {
            log.LogWarning($"{messagePrefix} TryRemove SendEventAsync failed");
         }

         log.LogError(ex, $"{messagePrefix} SendEventAsync failed");
         throw;
      }

   log.LogInformation($"{messagePrefix} Uplink message device processing completed");
   }
}

There is also support for using a specific GroupEnrollment based on the application_id in the uplink message payload.

"DeviceProvisioningService": {
      "GlobalDeviceEndpoint": "global.azure-devices-provisioning.net",
      "ScopeID": "",
      "EnrollmentGroupSymmetricKeyDefault": "TopSecretKey",
      "DeviceProvisioningPollingDelay": 500,
      "ApplicationEnrollmentGroupMapping": {
         "Application1": "TopSecretKey1",
         "Application2": "TopSecretKey2"
      }
   }

In addition to the appsettings.json there is configuration for application insights, uplink message queue name and Azure Storage connection strings. The “Environment” setting is important as it specifies what appsettings.json file should be used if code is being debugged etc..

TTN Integration uplink message processor configuration

The deployed solution application consists of Azure IoTHub and DPS instances. There are two Azure functions, one for putting the messages from the TTN into a queue the other is for processing them. The Azure Functions are hosted in an Azure AppService plan.

Azure solution deployment

An Azure Storage account is used for the queue and Azure Function synchronisation information and Azure Application Insights is used to monitor the solution.

Azure Function Log4Net configuration Revisted

In a previous post I showed how I configured Apache Log4Net and Azure Application Insights to work with an Azure Function, this is the code updated to .Net Core V3.1.

With the different versions of the libraries involved (Early April 2020) this was what I found worked for me so YMMV.

Initially the logging to Application Insights wasn’t working even though it was configured in the ApplicationIngisghts.config file. After some experimentation I found setting the APPINSIGHTS_INSTRUMENTATIONKEY environment variable was the only way I could get it to work.

namespace ApplicationInsightsAzureFunctionLog4NetClient
{
	using System;
	using System.IO;
	using System.Reflection;
	using log4net;
	using log4net.Config;
	using Microsoft.ApplicationInsights;
	using Microsoft.ApplicationInsights.Extensibility;
	using Microsoft.Azure.WebJobs;

	public static class ApplicationInsightsTimer
	{
		[FunctionName("ApplicationInsightsTimerLog4Net")]
		public static void Run([TimerTrigger("0 */1 * * * *")]TimerInfo myTimer, ExecutionContext executionContext)
		{
         ILog log = log4net.LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);

         using (TelemetryConfiguration telemetryConfiguration = TelemetryConfiguration.CreateDefault())
         {
            TelemetryClient telemetryClient = new TelemetryClient(telemetryConfiguration);
 
            var logRepository = LogManager.GetRepository(Assembly.GetEntryAssembly());
            XmlConfigurator.Configure(logRepository, new FileInfo(Path.Combine(executionContext.FunctionAppDirectory, "log4net.config")));

            log.Debug("This is a Log4Net Debug message");
            log.Info("This is a Log4Net Info message");
            log.Warn("This is a Log4Net Warning message");
            log.Error("This is a Log4Net Error message");
            log.Fatal("This is a Log4Net Fatal message");

            telemetryClient.Flush();
         }
      }
   }
}

I did notice that there were a number of exceptions which warrant further investigation.

'func.exe' (CoreCLR: clrhost): Loaded 'C:\Users\BrynLewis\source\repos\AzureApplicationInsightsClients\ApplicationInsightsAzureFunctionLog4NetClient\bin\Debug\netcoreapp3.1\bin\log4net.dll'. 
Exception thrown: 'System.IO.FileNotFoundException' in System.Private.CoreLib.dll
Exception thrown: 'System.IO.FileNotFoundException' in System.Private.CoreLib.dll
Exception thrown: 'System.IO.FileNotFoundException' in System.Private.CoreLib.dll
Exception thrown: 'System.IO.FileNotFoundException' in System.Private.CoreLib.dll
Exception thrown: 'System.IO.FileNotFoundException' in System.Private.CoreLib.dll
Exception thrown: 'System.IO.FileNotFoundException' in System.Private.CoreLib.dll
'func.exe' (CoreCLR: clrhost): Loaded 'C:\Users\BrynLewis\AppData\Local\AzureFunctionsTools\Releases\2.47.1\cli_x64\System.Xml.XmlDocument.dll'. 
'func.exe' (CoreCLR: clrhost): Loaded 'C:\Users\BrynLewis\source\repos\AzureApplicationInsightsClients\ApplicationInsightsAzureFunctionLog4NetClient\bin\Debug\netcoreapp3.1\bin\Microsoft.ApplicationInsights.Log4NetAppender.dll'. 
'func.exe' (CoreCLR: clrhost): Loaded 'C:\Users\BrynLewis\AppData\Local\AzureFunctionsTools\Releases\2.47.1\cli_x64\System.Reflection.TypeExtensions.dll'. 
Application Insights Telemetry: {"name":"Microsoft.ApplicationInsights.64b1950b90bb46aaa36c26f5dce0cad3.Message","time":"2020-04-09T09:22:33.2274370Z","iKey":"1234567890123-1234-12345-123456789012","tags":{"ai.cloud.roleInstance":"DESKTOP-C9IPNQ1","ai.operation.id":"bc6c4d10cebd954c9d815ad06add2582","ai.operation.parentId":"|bc6c4d10cebd954c9d815ad06add2582.d8fa83b88b175348.","ai.operation.name":"ApplicationInsightsTimerLog4Net","ai.location.ip":"0.0.0.0","ai.internal.sdkVersion":"log4net:2.13.1-12554","ai.internal.nodeName":"DESKTOP-C9IPNQ1"},"data":{"baseType":"MessageData","baseData":{"ver":2,"message":"This is a Log4Net Info message","severityLevel":"Information","properties":{"Domain":"NOT AVAILABLE","InvocationId":"91063ef9-70d0-4318-a1e0-e49ade07c51b","ThreadName":"14","ClassName":"?","LogLevel":"Information","ProcessId":"15824","Category":"Function.ApplicationInsightsTimerLog4Net","MethodName":"?","Identity":"NOT AVAILABLE","FileName":"?","LoggerName":"ApplicationInsightsAzureFunctionLog4NetClient.ApplicationInsightsTimer","LineNumber":"?"}}}}

The latest code for my Azure Function Log4net to Applications Insights sample is available on here.

Azure IoT Hub, Event Grid to Application Insights

For a second Proof of Concept (PoC) I wanted to upload sensor data from my MQTT LoRa Telemetry Field Gateway to an Azure IoT Hub, then using Azure EventGrid subscribe to the stream of telemetry data events, logging the payloads in Azure Application Insights (the aim was minimal code so no database etc.).

The first step was to create and deploy a simple Azure Function for unpacking the telemetry event payload.

Azure IoT Hub Azure Function Handler

Then wire the Azure function to the Microsoft.Devices.Device.Telemetry Event Type

Azure IoT Hub Event Metrics

On the Windows 10 IoT Core device in the Event Tracing Window(ETW) logging on the device I could see LoRa messages arriving and being unpacked.

Windows 10 Device ETW showing message payload

Then in Application Insights after some mucking around with code I could see in a series of Trace statements the event payload as it was unpacked.

{"id":"29108ebf-e5d5-7b95-e739-7d9048209d53","topic":"/SUBSCRIPTIONS/12345678-9012-3456-7890-123456789012/RESOURCEGROUPS/AZUREIOTHUBEVENTGRIDAZUREFUNCTION/PROVIDERS/MICROSOFT.DEVICES/IOTHUBS/FIELDGATEWAYHUB",
"subject":"devices/MQTTNetClient",
"eventType":"Microsoft.Devices.DeviceTelemetry",
"eventTime":"2020-02-01T04:30:51.427Z",
"data":
{
 "properties":{},
"systemProperties":{"iothub-connection-device-id":"MQTTNetClient","iothub-connection-auth-method":"{\"scope\":\"device\",\"type\":\"sas\",\"issuer\":\"iothub\",\"acceptingIpFilterRule\":null}",
"iothub-connection-auth-generation-id":"637149890997219611",
"iothub-enqueuedtime":"2020-02-01T04:30:51.427Z",
"iothub-message-source":"Telemetry"
},
"body":"eyJPZmZpY2VUZW1wZXJhdHVyZSI6IjIyLjUiLCJPZmZpY2VIdW1pZGl0eSI6IjkyIn0="
},
"dataVersion":"",
"metadataVersion":"1"
}
Application Insights logging with message unpacking
Application Insights logging message payload

Then in the last log entry the decoded message payload

/*
    Copyright ® 2020 Feb devMobile Software, All Rights Reserved
 
    MIT License

    Permission is hereby granted, free of charge, to any person obtaining a copy
    of this software and associated documentation files (the "Software"), to deal
    in the Software without restriction, including without limitation the rights
    to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
    copies of the Software, and to permit persons to whom the Software is
    furnished to do so, subject to the following conditions:

    The above copyright notice and this permission notice shall be included in all
    copies or substantial portions of the Software.

    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
    AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
    OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
    SOFTWARE

    Default URL for triggering event grid function in the local environment.
    http://localhost:7071/runtime/webhooks/EventGrid?functionName=functionname
 */
namespace EventGridProcessorAzureIotHub
{
   using System;
   using System.IO;
   using System.Reflection;

   using Microsoft.Azure.WebJobs;
   using Microsoft.Azure.EventGrid.Models;
   using Microsoft.Azure.WebJobs.Extensions.EventGrid;

   using log4net;
   using log4net.Config;
   using Newtonsoft.Json;

   public static class Telemetry
    {
        [FunctionName("Telemetry")]
        public static void Run([EventGridTrigger]Microsoft.Azure.EventGrid.Models.EventGridEvent eventGridEvent, ExecutionContext executionContext )//, TelemetryClient telemetryClient)
        {
			ILog log = log4net.LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);

		   var logRepository = LogManager.GetRepository(Assembly.GetEntryAssembly());
			XmlConfigurator.Configure(logRepository, new FileInfo(Path.Combine(executionContext.FunctionAppDirectory, "log4net.config")));

         log.Info($"eventGridEvent.Data-{eventGridEvent}");

         log.Info($"eventGridEvent.Data.ToString()-{eventGridEvent.Data.ToString()}");

        IotHubDeviceTelemetryEventData iOThubDeviceTelemetryEventData = (IotHubDeviceTelemetryEventData)JsonConvert.DeserializeObject(eventGridEvent.Data.ToString(), typeof(IotHubDeviceTelemetryEventData));

         log.Info($"iOThubDeviceTelemetryEventData.Body.ToString()-{iOThubDeviceTelemetryEventData.Body.ToString()}");

         byte[] base64EncodedBytes = System.Convert.FromBase64String(iOThubDeviceTelemetryEventData.Body.ToString());

         log.Info($"System.Text.Encoding.UTF8.GetString(-{System.Text.Encoding.UTF8.GetString(base64EncodedBytes)}");
      }
	}
}

Overall it took roughly half a page of code (mainly generated by a tool) to unpack and log the contents of an Azure IoT Hub EventGrid payload to Application Insights.