TTI V3 Gateway Azure IoT Central Digital Twin Definition Language(DTDL) support

Over the last couple of days I have added limited Digital Twin Definition Language(DTDLV2) support to my The Things Industries(TTI) V3 connector so that Azure IoT Central devices can be “zero touch” provisioned. For this blog post I used five Seeeduino LoRaWAN devices left over from another abandoned project.

The first step was to configure and Azure IoT Central enrollment group (ensure “Automatically connect devices in this group” is on) and copy the IDScope and Group Enrollment key to the appsettings.json file (see sample file below for more detail)

Azure IoT Central Enrollment Group configuration

Then I created an Azure IoT Central template for the seeeduino LoRAWAN devices which are running software (developed with the Arduino tooling) that read values from a Grove – Temperature&Humidity sensor. The naming of telemetry properties in specified by the Low Power Protocol(LPP) encoder/decoder (I check the decoded payload in TTI EndDevice “Live Data” tab).

Configuring Seeeduino LoRaWAN device template

Then I mapped the Azure IoT Central Device Group to my Azure IoT Central Enrollment Group

Associating Device Group with Group Enrollment configuration

The Device Template @Id can be configured as the “default” template for all the devices in a TTI application in the app.settings.json file.

{
...
   "ProgramSettings": {
      "Applications": {
...
      "seeeduinolorawan": {
        "AzureSettings": {
           "DeviceProvisioningServiceSettings": {
              "IdScope": "...",
              "GroupEnrollmentKey": "..."
            }
         },
         "DTDLModelId": "dtmi:ttnv3connectorclient:SeeeduinoLoRaWAN4cz;1",
         "MQTTAccessKey": "...",
         "DeviceIntegrationDefault": true,
         "DevicePageSize": 10
      }
   }.
...

The Device Template @Id can also be set using a dtdlmodelid attribute in a TTI end device settings so devices can be individually configured.

TTI Application EndDevice dtdlmodelid attribute usage

At startup the TTI Gateway enumerates through the devices in each application configured in the app.settings.json. The Azure Device Provisioning Service(DPS) is used to retrieve each device’s connection string and configure it in Azure IoT Central if required.

Azure IoT Central Device Group with no provisioned Devices
TTI Connector application connecting and provisioning EndDevices
Azure IoT Central devices mapped to an Azure IoT Central Template via the modelID

The ProvisioningRegistrationAdditionalData optional parameter of the DPS RegisterAsync method has a JSON property which is used to the specify the device ModelID.

using (var transport = new ProvisioningTransportHandlerAmqp(TransportFallbackType.TcpOnly))
{
	ProvisioningDeviceClient provClient = ProvisioningDeviceClient.Create( 
		Constants.AzureDpsGlobalDeviceEndpoint,
		deviceProvisiongServiceSettings.IdScope,
		securityProvider,
		transport);

	DeviceRegistrationResult result;

	if (!string.IsNullOrEmpty(modelId))
	{
		ProvisioningRegistrationAdditionalData provisioningRegistrationAdditionalData = new ProvisioningRegistrationAdditionalData()
		{
			JsonData = $"{{\"modelId\": \"{modelId}\"}}"
		};

		result = await provClient.RegisterAsync(provisioningRegistrationAdditionalData, stoppingToken);
	}
	else
    {
		result = await provClient.RegisterAsync(stoppingToken);
	}

	if (result.Status != ProvisioningRegistrationStatusType.Assigned)
	{
		_logger.LogError("Config-DeviceID:{0} Status:{1} RegisterAsync failed ", deviceId, result.Status);

		return false;
	}

	IAuthenticationMethod authentication = new DeviceAuthenticationWithRegistrySymmetricKey(result.DeviceId, (securityProvider as SecurityProviderSymmetricKey).GetPrimaryKey());

	deviceClient = DeviceClient.Create(result.AssignedHub, authentication, transportSettings);
}

My implementation was “inspired” by TemperatureController project in the PnP Device Samples.

Azure IoT Central Dashboard with Seeeduino LoRaWAN devices around my house that were “automagically” provisioned

I need to do some testing to confirm my code works reliably with both DPS and user provided connection strings. The RegisterAsync call is currently taking about four seconds which could be an issue for TTI applications with many devices.

TTI V3 Gateway Azure IoT Hub Support

After a couple of weeks work my The Things Industries(TTI) V3 gateway is in beta testing. For this blog post I have configured five Seeeduino LoRaWAN devices. My sensor nodes connect to an Azure IoT Hub with a Shared Access Signature(SAS) device policy connection string. I’m using Device Twin Explorer to display Telemetry from and send messages to the sensor nodes. I have also configured Azure Stream Analytics and PowerBI to graph telemetry from the sensor nodes.

Device Twin Explorer displaying telemetry from one of the Seeeduino devices

My integration uses only queued messages as often they won’t be delivered to the sensor node immediately, especially if the sensor node only sends an uplink message every 30 minutes/hour/day.

The confirmed flag should be used with care as the Azure IoT Hub messages may expire before a delivery Ack/Nack/Failed is received from the TTI.

PowerBI graph of temperature and humidity in my garage over 24 hours

To send a downlink message, TTI needs a LoRaWAN port number (plus optional queue, confirmed and priority values) which is specified in the Azure IoT Hub message custom properties.

Device explorer displaying a raw payload message which has been confirmed delivered
TTI device live data tab displaying raw payload in downlink message information tab
Azure IoT Connector console application sending raw payload to sensor node with confirmation ack
Arduino monitor displaying received raw payload from TTI

If the Azure IoT Hub message payload is valid JSON it is copied into the payload decoded downlink message property. and if it is not valid JSON it assumed to be a Base64 encoded value and copied into the payload raw downlink message property.

try
{
	// Split over multiple lines in an attempt to improve readability. A valid JSON string should start/end with {/} for an object or [/] for an array
	if (!(payloadText.StartsWith("{") && payloadText.EndsWith("}"))
										&&
		(!(payloadText.StartsWith("[") && payloadText.EndsWith("]"))))
	{
		throw new JsonReaderException();
	}

	downlink.PayloadDecoded = JToken.Parse(payloadText);
}
catch (JsonReaderException)
{
	downlink.PayloadRaw = payloadText;
}

Like the Azure IoT Central JSON validation I had to add a check that the string started with a “{” and finished with a “}” (a JSON object) or started with a “[” and finished with a “]” (a JSON array) as part of the validation process.

Device explorer displaying a JSON payload message which has been confirmed delivered

I normally wouldn’t use exceptions for flow control but I can’t see a better way of doing this.

TTI device live data tab displaying JSON payload in downlink message information tab
Azure IoT Connector console application sending JSON payload to sensor node with confirmation ack
Arduino monitor displaying received JSON payload from TTI

The build in TTI decoder only supports downlink decoded payloads with property names “value_0” through “value_x” custom encoders may support other property names.

TTI V3 Gateway Azure IoT Central Support

After a couple of weeks work my The Things Industries(TTI) V3 gateway is in beta testing. For this blog post the client is a GHI Electronics Fezduino with a RAK811 LPWAN Evaluation Board(EVB). My test device was configured in Azure IoT Central by the Device Provisioning Service(DPS) and I then manually migrated the device to each of the four templates used in this post.

The first step was to display the temperature and barometric pressure values from the Seeedstudio Grove BMP180 attached to my sensor node.

Sensor node displaying temperature and barometric pressure values
Azure IoT Central temperature and barometric pressure telemetry configuration
Azure IoT Central Telemetry Dashboard displaying temperature and barometric pressure values

The next step was to configure a simple Azure IoT Central command to send to the sensor node. This was a queued request with no payload. An example of this sort of command would be a request for a sensor node to reboot or turn on an actuator.

My integration uses only offline queued commands as often messages won’t be delivered to the sensor node immediately, especially if the sensor node only sends a message every half hour/hour/day. The confirmed flag should be used with care as the Azure IoT Hub messages may expire before a delivery Ack/Nack/Failed is received from the TTI and it consumes downlink bandwidth.

if (message.Properties.ContainsKey("method-name"))
{
}

I determine an Azure IoT Hub message is an Azure IoT Central command by the presence of the “method-name” property. If the Azure IoT Central command does not have a request payload the Azure IoT Hub message payload will contain a single “@” character so the Azure IoT Connector sends a TTI downlink message with an empty raw payload via the TTI Data API(MQTT).

if (payloadText.CompareTo("@") != 0)
{
   .
}
else
{
   downlink.PayloadRaw = "";
}
Azure IoT Central command with out a request payload value command configuration

To send a downlink message, TTI needs a LoRaWAN port number (plus optional queue, confirmed and priority values) which can’t be provided via the Azure IoT Central command setup so these values are configured in the app.settings file.

Each TTI application has zero or more Azure IoT Central command configurations which supply the port, confirmed, priority and queue settings.

  "ProgramSettings": {
    "Applications": {
      "application1": {
        "AzureSettings": {
          ...
          }
        },
        "MQTTAccessKey": "...",
        "DeviceIntegrationDefault": false,
        "MethodSettings": {
          "Reboot": {
            "Port": 21,
            "Confirmed": true,
            "Priority": "normal",
            "Queue": "push"
          },
        }
      },
      "seeeduinolorawan": {
        "AzureSettings": {
        }
        "MQTTAccessKey": "...",
        "DeviceIntegrationDefault": true,
        "DevicePageSize": 10
      }
    },
    "TheThingsIndustries": {
...
   }
}
Azure IoT Central simple command dashboard
Azure IoT Central simple command initiation
Azure IoT TTI connector application sending a simple command to my sensor node
Sensor node display simple command information. The note message payload is empty

The next step was to configure a more complex Azure IoT Central command to send to the sensor node. This was a queued request with a single value payload. An example of this sort of command could be setting the speed of a fan or the maximum temperature of a freezer for an out of band (OOB) notification to be sent.

Azure IoT Central single value command configuration
  "ProgramSettings": {
    "Applications": {
      "application1": {
        "AzureSettings": {
          ...
          }
        },
        "MQTTAccessKey": "...",
        "DeviceIntegrationDefault": false,
        "MethodSettings": {
          "Reboot": {
            "Port": 21,
            "Confirmed": true,
            "Priority": "normal",
            "Queue": "push"
          },
          "value_0": {
            "Port": 30,
            "Confirmed": true,
            "Priority": "normal",
            "Queue": "push"
          },
          "value_1": {
            "Port": 30,
            "Confirmed": true,
            "Priority": "normal",
            "Queue": "push"
          },
        }
      },
      "seeeduinolorawan": {
        "AzureSettings": {
        }
        "MQTTAccessKey": "...",
        "DeviceIntegrationDefault": true,
        "DevicePageSize": 10
      }
    },
    "TheThingsIndustries": {
...
   }
}

The value_0 settings are for the minimum temperature the value_1 settings are for the maximum temperature value.

Azure IoT Central single value command initiation
Azure IoT TTI connector application sending a single value command to my sensor node
Sensor node displaying single value command information. There are two downlink messages and each payload contains a single value

The single value command payload contains the textual representation of the value e.g. “true”/”false” or “1.23” which are also valid JSON. This initially caused issues as I was trying to splice a single value into the decoded payload.

I had to add a check that the string started with a “{” and finished with a “}” (a JSON object) or started with a “[” and finished with a “]” (a JSON array) as part of the validation process.

For a single value command the payload decoded has a single property with the method-name value as the name and the payload as the value. For a command with a JSON payload the message payload is copied into the PayloadDecoded.

I normally wouldn’t use exceptions for flow control but I can’t see a better way of doing this.

	try
	{
		// Split over multiple lines to improve readability
		if (!(payloadText.StartsWith("{") && payloadText.EndsWith("}"))
									&&
			(!(payloadText.StartsWith("[") && payloadText.EndsWith("]"))))
		{
			throw new JsonReaderException();
		}

		downlink.PayloadDecoded = JToken.Parse(payloadText);
	}
	catch (JsonReaderException)
	{
		try
		{
			JToken value = JToken.Parse(payloadText);

			downlink.PayloadDecoded = new JObject(new JProperty(methodName, value));
		}
		catch (JsonReaderException)
		{
			downlink.PayloadDecoded = new JObject(new JProperty(methodName, payloadText));
		}
	}

The final step was to configure an another Azure IoT Central command with a JSON payload to send to the sensor node. A “real-world” example of this sort of command would be setting the minimum and maximum temperatures of a freezer in a single downlink message.

Azure IoT Central JSON payload command setup
Azure IoT Central JSON payload command payload configuration
  "ProgramSettings": {
    "Applications": {
      "application1": {
        "AzureSettings": {
          ...
          }
        },
        "MQTTAccessKey": "...",
        "DeviceIntegrationDefault": false,
        "MethodSettings": {
          "Reboot": {
            "Port": 21,
            "Confirmed": true,
            "Priority": "normal",
            "Queue": "push"
          },
          "value_0": {
            "Port": 30,
            "Confirmed": true,
            "Priority": "normal",
            "Queue": "push"
          },
          "value_1": {
            "Port": 30,
            "Confirmed": true,
            "Priority": "normal",
            "Queue": "push"
          },
          "TemperatureOOBAlertMinimumAndMaximum": {
            "Port": 30,
            "Confirmed": true,
            "Priority": "normal",
            "Queue": "push"
          }
        }
      },
      "seeeduinolorawan": {
        "AzureSettings": {
        }
        "MQTTAccessKey": "...",
        "DeviceIntegrationDefault": true,
        "DevicePageSize": 10
      }
    },
    "TheThingsIndustries": {
...
   }
}
Azure IoT Central JSON payload command initiation

Azure IoT TTI connector application sending a JSON payload command to my sensor node
Sensor node displaying JSON command information. There is a single payload which contains a two values

The build in TTI decoder only supports downlink decoded payloads with property names “value_0” through “value_x” which results in some odd command names and JSON payload property names. (Custom encoders may support other property names). Case sensitivity of some configuration values also tripped me up.

TTN V3 Gateway Downlink Broken

While adding Azure Device Provisioning Service (DPS) support to my The Things Industries(TTI)/The Things Network(TTN) Azure IoT Hub/Azure IoT Central Connector I broke Cloud to Device(C2D)/Downlink messaging. I had copied the Advanced Message Queuing Protocol(AMQP) connection pooling configuration code from my The Things Network Integration assuming it worked.

return DeviceClient.CreateFromConnectionString(connectionString, deviceId,
	new ITransportSettings[]
	{
		new AmqpTransportSettings(TransportType.Amqp_Tcp_Only)
		{
			PrefetchCount = 0,
			AmqpConnectionPoolSettings = new AmqpConnectionPoolSettings()
			{
				Pooling = true,
			}
		}
	});

I hadn’t noticed this issue in my Azure IoT The Things Network Integration because I hadn’t built support for C2D messaging. After some trial and error I figured out the issue was the PrefetchCount initialisation.

return DeviceClient.CreateFromConnectionString(connectionString, deviceId,
	new ITransportSettings[]
	{
		new AmqpTransportSettings(TransportType.Amqp_Tcp_Only)
		{
			AmqpConnectionPoolSettings = new AmqpConnectionPoolSettings()
			{
				Pooling = true,
			}
		}
	});

From the Azure Service Bus (I couldn’t find any specifically Azure IoT Hub ) documentation

Even though the Service Bus APIs do not directly expose such an option today, a lower-level AMQP protocol client can use the link-credit model to turn the “pull-style” interaction of issuing one unit of credit for each receive request into a “push-style” model by issuing a large number of link credits and then receive messages as they become available without any further interaction. Push is supported through the MessagingFactory.PrefetchCount or MessageReceiver.PrefetchCount property settings. When they are non-zero, the AMQP client uses it as the link credit.

n this context, it’s important to understand that the clock for the expiration of the lock on the message inside the entity starts when the message is taken from the entity, not when the message is put on the wire. Whenever the client indicates readiness to receive messages by issuing link credit, it is therefore expected to be actively pulling messages across the network and be ready to handle them. Otherwise the message lock may have expired before the message is even delivered. The use of link-credit flow control should directly reflect the immediate readiness to deal with available messages dispatched to the receiver.

In the Azure IoT Hub SDK the prefetch count is set to 50 (around line 57) and throws an exception if less that zero (around line 90) and there is some information about tuning the prefetch value for Azure Service Bus.

The best explanation I count find was Github issue which was a query “What exactly does the PrefetchCount property control?”

“You are correct, the pre-fetch count is used to set the link credit over AMQP. What this signifies is the max. no. of messages that can be “in-flight” from the service to the client, at any given time. (This value defaults to 50 for the IoT Hub .NET client).
The client specifies its link-credit, that the service must respect. In simplest terms, any time the service sends a message to the client, it decrements the link credit, and will continue sending messages until linkCredit > 0. Once the client acknowledges the message, it will increment the link credit.”

In summary if Prefetch count is set to zero on startup in my application no messages will be sent to the client….

TTN V3 Gateway Azure Configuration Simplication

To reduce complexity the initial version of the V3 TTI gateway didn’t support the Azure Device Provisioning Service(DPS). In preparation for this I had included DeviceProvisioningServiceSettings object in both the Application and AzureSettingsDefault sections.

After trialing a couple of different approaches I have removed the AzureSettingsDefault. If an application has a connectionstring configured that is used, if there is not one then the DPS configuration is used, if there are neither currently the application logs an error. In the future I will look at adding a configuration option to make the application optionally shutdown

{
  ...
  "ProgramSettings": {
    "Applications": {
      "application1": {
        "AzureSettings": {
          "IoTHubConnectionString": "HostName=TT...n1.azure-devices.net;SharedAccessKeyName=device;SharedAccessKey=Am...M=",
          "DeviceProvisioningServiceSettings": {
            "IdScope": "0n...3B",
            "GroupEnrollmentKey": "Kl...Y="
          }
        },
        "MQTTAccessKey": "NNSXS.HC...YQ",
        "DeviceIntegrationDefault": false,
        "DevicePageSize": 10
      },
      "seeeduinolorawan": {
        "AzureSettings": {
          "IoTHubConnectionString": "HostName=TT...n2.azure-devices.net;SharedAccessKeyName=device;SharedAccessKey=D2q...L8=",
          "DeviceProvisioningServiceSettings": {
            "IdScope": "0n...3B",
            "GroupEnrollmentKey": "Kl...Y="
          }
        },
        "MQTTAccessKey": "NNSXS.V44...42A",
        "DeviceIntegrationDefault": true,
        "DevicePageSize": 10
      }
    },

    "TheThingsIndustries": {
      "MqttServerName": "eu1.cloud.thethings.industries",
      "MqttClientId": "MQTTClient",
      "MqttAutoReconnectDelay": "00:00:05",
      "Tenant": "br...st",
      "ApiBaseUrl": "https://br..st.eu1.cloud.thethings.industries/api/v3",
      "ApiKey": "NNSXS.NR...SA",
      "Collaborator": "de...le",
      "DevicePageSize": 10,
      "DeviceIntegrationDefault": true
    }
  }
}

The implementation of failing back from application to default settings wasn’t easy to implement, explain or document.

TTN V3 Gateway Configuration, Deployment and Operation

After configuring, deploying and then operating my The Things Network(TTN) V2 gateway I have made some changes to my The Things Industries(TTI) V3 gateway.

TTI V3 Gateway running as a console application on my desktop

Azure IoT integration can be configured at the Device (TTN Device “azureintegration” attribute).

TTN Device AzureIntegration Attribute

Then falls back to the Application default (TTN application “azureintegrationdevicedefault” attribute).

TTN Application AzureIntegrationDeviceDefault attribute.

Then falls back to the “DeviceIntegrationDefault” setting for the Application then finally “DeviceIntegrationDefault” setting for the webjob the in the app.settings.json file

{
  ...
  "ProgramSettings": {
    "Applications": {
      "application1": {
        "AzureSettings": {
          "IoTHubConnectionString": "HostName=TT...n1.azure-devices.net;SharedAccessKeyName=device;SharedAccessKey=Am...M=",
          "DeviceProvisioningServiceSettings": {
            "IdScope": "0n...3B",
            "GroupEnrollmentKey": "Kl...Y="
          }
        },
        "MQTTAccessKey": "NNSXS.HC...YQ",
        "DeviceIntegrationDefault": false,
        "DevicePageSize": 10
      },
      "seeeduinolorawan": {
        "AzureSettings": {
          "IoTHubConnectionString": "HostName=TT...n2.azure-devices.net;SharedAccessKeyName=device;SharedAccessKey=D2q...L8=",
          "DeviceProvisioningServiceSettings": {
            "IdScope": "0n...3B",
            "GroupEnrollmentKey": "Kl...Y="
          }
        },
        "MQTTAccessKey": "NNSXS.V44...42A",
        "DeviceIntegrationDefault": true,
        "DevicePageSize": 10
      }
    },

    "AzureSettingsDefault": {
      "IoTHubConnectionString": "HostName=TT...ors.azure-devices.net;SharedAccessKeyName=device;SharedAccessKey=yd...k=",
      "DeviceProvisioningServiceSettings": {
        "IdScope": "0n...3B",
        "GroupEnrollmentKey": "Kl...Y="
      }
    },

    "TheThingsIndustries": {
      "MqttServerName": "eu1.cloud.thethings.industries",
      "MqttClientId": "MQTTClient",
      "MqttAutoReconnectDelay": "00:00:05",
      "Tenant": "br...st",
      "ApiBaseUrl": "https://br..st.eu1.cloud.thethings.industries/api/v3",
      "ApiKey": "NNSXS.NR...SA",
      "Collaborator": "de...le",
      "DevicePageSize": 10,
      "DeviceIntegrationDefault": true
    }
  }
}

This approach is now used for most of the application settings to recue the amount of configuration required for a small scale deployment.

To reduce complexity the initial version of the V3 TTI gateway doesn’t support Azure IoT Central and the Device Provisioning Service(DPS).

The Things Network MQTT & Azure IoT Part3B

MQTT Topics and case sensitivity

To send LoRaWAN messages using The Things Network Message Queue Telemetry Transport(MQTT) Data API with JSON payloads there are two topics

  • v3/{application id}@{tenant id}/devices/{device id}/down/push.
  • v3/{application id}@{tenant id}/devices/{device id}/down/replace.

My Azure IoT Hub messages have properties for the LoRaWAN port (required), confirmed (which defaults to false), priority (which defaults to Normal) and queue(which defaults to Replace). The priority and queue enumerations are defined in TTNcommon.cs.

I used the enumeration for message priority in the JSON payload and MQTT downlink message topic.

{
  "downlinks": [{
    "f_port": 15,
    "frm_payload": "vu8=",
    "priority": "NORMAL"
  }]
}
string downlinktopic = $"v3/{ApplicationId}@{TenantId}/devices/{DeviceId}/down/{Enum.GetName(typeof(DownlinkQueue), queue)}".ToLower();

Initially when I published a message it wasn’t sent and there was no error. It was a while before I noticed that the queue setting was being being converted to the text “Push” or “Replace” based on the enumeration value name (The priority value was in the JSON which is case insensitive). I did wonder if the tenantId and ApplicationId were also case sensitive so I ensured consistent capitalisation with ToLower();

The Things Network MQTT & Azure IoT Part3A

Cloud to Device with frm_payload no confirmation

An Azure IoT Hub supports three kinds for Cloud to Device(C2D) messaging and my gateway will initially support only Direct Methods and Cloud-to-device messages.

The first step was to add the The Things Network(TTN) V3 Tennant ID to the context information as it is required for the downlink Message Queue Telemetry Transport (MQTT) publish topic.

namespace devMobile.TheThingsNetwork.Models
{
   public class AzureIoTHubReceiveMessageHandlerContext
   {
      public string TenantId { get; set; }
      public string DeviceId { get; set; }
      public string ApplicationId { get; set; }
   }
}

The object is passed as the context parameter of the SetReceiveMessageHandlerAsync method.

try
{
	DeviceClient deviceClient = DeviceClient.CreateFromConnectionString(
		options.AzureIoTHubconnectionString,
		endDevice.Ids.Device_id,
		TransportType.Amqp_Tcp_Only);

	await deviceClient.OpenAsync();

	AzureIoTHubReceiveMessageHandlerContext context = new AzureIoTHubReceiveMessageHandlerContext()
	{
		TenantId = options.Tenant,
		DeviceId = endDevice.Ids.Device_id,
		ApplicationId = options.ApiApplicationID,
	};

	await deviceClient.SetReceiveMessageHandlerAsync(AzureIoTHubClientReceiveMessageHandler, context);
	
	DeviceClients.Add(endDevice.Ids.Device_id, deviceClient, cacheItemPolicy);
}
catch( Exception ex)
{
	Console.WriteLine($"Azure IoT Hub OpenAsync failed {ex.Message}");
}

To send a message to a LoRaWAN device in addition to the payload, TTN needs the port number and optionally a confirmation required flag, message priority, queueing type and correlation ids.

With my implementation the confirmation required flag, message priority, and queueing type are Azure IoT Hub message properties and the messageid is used as a correlation id.

private async static Task AzureIoTHubClientReceiveMessageHandler(Message message, object userContext)
{
	bool confirmed;
	byte port;
	DownlinkPriority priority;
	string downlinktopic;

	try
	{
		AzureIoTHubReceiveMessageHandlerContext receiveMessageHandlerConext = (AzureIoTHubReceiveMessageHandlerContext)userContext;

		DeviceClient deviceClient = (DeviceClient)DeviceClients.Get(receiveMessageHandlerConext.DeviceId);
		if (deviceClient == null)
		{
			Console.WriteLine($" UplinkMessageReceived unknown DeviceID: {receiveMessageHandlerConext.DeviceId}");
			await deviceClient.RejectAsync(message);
			return;
		}

		using (message)
		{
			Console.WriteLine();
			Console.WriteLine();
			Console.WriteLine($"{DateTime.UtcNow:HH:mm:ss} Azure IoT Hub downlink message");
			Console.WriteLine($" ApplicationID: {receiveMessageHandlerConext.ApplicationId}");
			Console.WriteLine($" DeviceID: {receiveMessageHandlerConext.DeviceId}");
#if DIAGNOSTICS_AZURE_IOT_HUB
			Console.WriteLine($" Cached: {DeviceClients.Contains(receiveMessageHandlerConext.DeviceId)}");
			Console.WriteLine($" MessageID: {message.MessageId}");
			Console.WriteLine($" DeliveryCount: {message.DeliveryCount}");
			Console.WriteLine($" EnqueuedTimeUtc: {message.EnqueuedTimeUtc}");
			Console.WriteLine($" SequenceNumber: {message.SequenceNumber}");
			Console.WriteLine($" To: {message.To}");
#endif
			string messageBody = Encoding.UTF8.GetString(message.GetBytes());
			Console.WriteLine($" Body: {messageBody}");
#if DOWNLINK_MESSAGE_PROPERTIES_DISPLAY
			foreach (var property in message.Properties)
			{
				Console.WriteLine($"   Key:{property.Key} Value:{property.Value}");
			}
#endif
			if (!message.Properties.ContainsKey("Confirmed"))
			{
				Console.WriteLine(" UplinkMessageReceived missing confirmed property");
				await deviceClient.RejectAsync(message);
				return;
			}

			if (!bool.TryParse(message.Properties["Confirmed"], out confirmed))
			{
				Console.WriteLine(" UplinkMessageReceived confirmed property invalid");
				await deviceClient.RejectAsync(message);
				return;
			}

			if (!message.Properties.ContainsKey("Priority"))
			{
				Console.WriteLine(" UplinkMessageReceived missing priority property");
				await deviceClient.RejectAsync(message);
				return;
			}

			if (!Enum.TryParse(message.Properties["Priority"], true, out priority))
			{
				Console.WriteLine(" UplinkMessageReceived priority property invalid");
				await deviceClient.RejectAsync(message);
				return;
			}

			if (priority == DownlinkPriority.Undefined)
			{
				Console.WriteLine(" UplinkMessageReceived priority property undefined value invalid");
				await deviceClient.RejectAsync(message);
				return;
			}

			if (!message.Properties.ContainsKey("Port"))
			{
				Console.WriteLine(" UplinkMessageReceived missing port number property");
				await deviceClient.RejectAsync(message);
				return;
			}

			if (!byte.TryParse( message.Properties["Port"], out port))
			{
				Console.WriteLine(" UplinkMessageReceived port number property invalid");
				await deviceClient.RejectAsync(message);
				return;
			}

			if ((port < Constants.PortNumberMinimum) || port > (Constants.PortNumberMaximum))
			{
				Console.WriteLine($" UplinkMessageReceived port number property invalid value must be between {Constants.PortNumberMinimum} and {Constants.PortNumberMaximum}");
				await deviceClient.RejectAsync(message);
				return;
			}

			if (!message.Properties.ContainsKey("Queue"))
			{
				Console.WriteLine(" UplinkMessageReceived missing queue property");
				await deviceClient.RejectAsync(message);
				return;
			}

			switch(message.Properties["Queue"].ToLower())
			{
				case "push":
					downlinktopic = $"v3/{receiveMessageHandlerConext.ApplicationId}@{receiveMessageHandlerConext.TenantId}/devices/{receiveMessageHandlerConext.DeviceId}/down/push";
					break;
				case "replace":
					downlinktopic = $"v3/{receiveMessageHandlerConext.ApplicationId}@{receiveMessageHandlerConext.TenantId}/devices/{receiveMessageHandlerConext.DeviceId}/down/replace";
					break;
				default:
					Console.WriteLine(" UplinkMessageReceived missing queue property invalid value");
					await deviceClient.RejectAsync(message);
					return;
               }

			DownlinkPayload Payload = new DownlinkPayload()
			{
				Downlinks = new List<Downlink>()
				{ 
					new Downlink()
					{
						Confirmed = confirmed,
						PayloadRaw = messageBody,
						Priority = priority,
						Port = port,
						CorrelationIds = new List<string>()
						{
							message.MessageId
						}
					}
				}
			};

			var mqttMessage = new MqttApplicationMessageBuilder()
					.WithTopic(downlinktopic)
					.WithPayload(JsonConvert.SerializeObject(Payload))
					.WithAtLeastOnceQoS()
					.Build();

			await mqttClient.PublishAsync(mqttMessage);

			// Need to look at confirmation requirement ack, nack maybe failed & sent
			await deviceClient.CompleteAsync(message);

			Console.WriteLine();
		}
	}
	catch (Exception ex)
	{
		Debug.WriteLine("UplinkMessageReceived failed: {0}", ex.Message);
	}
}

To “smoke test”” my implementation I used Azure IoT Explorer to send a C2D telemetry message

Azure IoT Hub Explorer send message form with payload and message properties

The PoC console application then forwarded the message to TTN using MQTT to be sent(which fails)

PoC application sending message then displaying result

The TTN live data display shows the message couldn’t be delivered because my test LoRaWAN device has not been activiated.

TTN Live Data display with message delivery failure

Now that my PoC application can receive and transmit message to devices I need to reconfigure my RAK Wisgate Developer D+ gateway and Seeeduino LoRaWAN and RAK Wisnode 7200 Track Lite devices on The Things Industries Network so I can test my approach with more realistic setup.

The Things Network MQTT & Azure IoT Part2

Uplink with decoded_payload & frm_payload

The next functionality added to my Proof of Concept(PoC) Azure IoT Hub, The Things Network(TTN) V3 Hypertext Transfer Protocol(HTTP) client API Integration, and Message Queue Telemetry Transport (MQTT) Data API Integration is sending of raw and decoded uplink messages to an Azure IoT Hub.

// At this point all the AzureIoT Hub deviceClients setup and ready to go so can enable MQTT receive
mqttClient.UseApplicationMessageReceivedHandler(new MqttApplicationMessageReceivedHandlerDelegate(e => MqttClientApplicationMessageReceived(e)));

// This may shift to individual device subscriptions
string uplinkTopic = $"v3/{options.MqttApplicationID}/devices/+/up";
await mqttClient.SubscribeAsync(uplinkTopic, MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce);

//string queuedTopic = $"v3/{options.MqttApplicationID}/devices/+/queued";
//await mqttClient.SubscribeAsync(queuedTopic, MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce);

The additional commented out subscriptions are for the processing of downlink messages

The MQTTNet received message handler uses the last segment of the topic to route messages to a method for processing

private static async void MqttClientApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e)
{
	if (e.ApplicationMessage.Topic.EndsWith("/up", StringComparison.InvariantCultureIgnoreCase))
	{
		await UplinkMessageReceived(e);
	}

	/*
	if (e.ApplicationMessage.Topic.EndsWith("/queued", StringComparison.InvariantCultureIgnoreCase))
	{
		await DownlinkMessageQueued(e);
	}
	...			
	*/
}

The UplinkMessageReceived method deserialises the message payload, retrieves device context information from the local ObjectCache, adds relevant uplink messages fields (including the raw payload), then if the message has been unpacked by a TTN Decoder, the message fields are added as well.

static async Task UplinkMessageReceived(MqttApplicationMessageReceivedEventArgs e)
{
	try
	{
		PayloadUplinkV3 payload = JsonConvert.DeserializeObject<PayloadUplinkV3>(e.ApplicationMessage.ConvertPayloadToString());
		string applicationId = payload.EndDeviceIds.ApplicationIds.ApplicationId;
		string deviceId = payload.EndDeviceIds.DeviceId;
		int port = payload.UplinkMessage.Port;
...
		DeviceClient deviceClient = (DeviceClient)DeviceClients.Get(deviceId);
		if (deviceClient == null)
		{
			Console.WriteLine($" UplinkMessageReceived unknown DeviceID: {deviceId}");
			return;
		}

		JObject telemetryEvent = new JObject();
		telemetryEvent.Add("DeviceID", deviceId);
		telemetryEvent.Add("ApplicationID", applicationId);
		telemetryEvent.Add("Port", port);
		telemetryEvent.Add("PayloadRaw", payload.UplinkMessage.PayloadRaw);

		// If the payload has been unpacked in TTN backend add fields to telemetry event payload
		if (payload.UplinkMessage.PayloadDecoded != null)
		{
			EnumerateChildren(telemetryEvent, payload.UplinkMessage.PayloadDecoded);
		}

		// Send the message to Azure IoT Hub/Azure IoT Central
		using (Message ioTHubmessage = new Message(Encoding.ASCII.GetBytes(JsonConvert.SerializeObject(telemetryEvent))))
		{
			// Ensure the displayed time is the acquired time rather than the uploaded time. 
			//ioTHubmessage.Properties.Add("iothub-creation-time-utc", payloadObject.Metadata.ReceivedAtUtc.ToString("s", CultureInfo.InvariantCulture));
			ioTHubmessage.Properties.Add("ApplicationId", applicationId);
			ioTHubmessage.Properties.Add("DeviceId", deviceId);
			ioTHubmessage.Properties.Add("port", port.ToString());

			await deviceClient.SendEventAsync(ioTHubmessage);
		}
	}
	catch( Exception ex)
	{
		Debug.WriteLine("UplinkMessageReceived failed: {0}", ex.Message);
	}
}

private static void EnumerateChildren(JObject jobject, JToken token)
{
	if (token is JProperty property)
	{
		if (token.First is JValue)
		{
			// Temporary dirty hack for Azure IoT Central compatibility
			if (token.Parent is JObject possibleGpsProperty)
			{
				if (possibleGpsProperty.Path.StartsWith("GPS_", StringComparison.OrdinalIgnoreCase))
				{
					if (string.Compare(property.Name, "Latitude", true) == 0)
					{
						jobject.Add("lat", property.Value);
					}
					if (string.Compare(property.Name, "Longitude", true) == 0)
					{
						jobject.Add("lon", property.Value);
					}
					if (string.Compare(property.Name, "Altitude", true) == 0)
					{
						jobject.Add("alt", property.Value);
					}
				}
			}
			jobject.Add(property.Name, property.Value);
		}
		else
		{
			JObject parentObject = new JObject();
			foreach (JToken token2 in token.Children())
			{
				EnumerateChildren(parentObject, token2);
				jobject.Add(property.Name, parentObject);
			}
		}
	}
	else
	{
		foreach (JToken token2 in token.Children())
		{
			EnumerateChildren(jobject, token2);
		}
	}
}

There is also some basic reformatting of the messages for Azure IoT Central

TTN Simulate uplink message with GPS location payload.
Nasty console application processing uplink message
Message from LoRaWAN device displayed in Azure IoT Explorer

Currently the code has a lots of diagnostic Console.Writeline statements, doesn’t support Uplink messages, has no Advanced Message Queuing Protocol(AMQP) client connection pooling, can’t run as an Azure Webjob, and a number of other features which I plan on adding in future blog posts.

MQTTnet Azure Function Binding

It Looks promising

I’m using MQTTnet to build my The Things Industries client and it looked like the amount of code for my Message Queue Telemetry Transport (MQTT) Data API Integration could be reduced by using the AzureFunction MQTT Binding by Kees Schollaart.

I used The Things Industries simulate uplink functionality for my initial testing

TTI uplink message simulator

The first version of the Azure function code proof of concept(PoC) was very compact

namespace MQTTnetAzureFunction
{
   using System;
   using System.Text;
   using Microsoft.Azure.WebJobs;
   using Microsoft.Extensions.Logging;

   using CaseOnline.Azure.WebJobs.Extensions.Mqtt;
   using CaseOnline.Azure.WebJobs.Extensions.Mqtt.Messaging;
   using CaseOnline.Azure.WebJobs.Extensions.Mqtt.Config;
   using CaseOnline.Azure.WebJobs.Extensions.Mqtt.Bindings;

   using MQTTnet.Client.Options;
   using MQTTnet.Extensions.ManagedClient;

   public static class Subscribe
   {
      [FunctionName("UplinkMessageProcessor")]
      public static void UplinkMessageProcessor(
            [MqttTrigger("v3/application123456789012345/devices/+/up", ConnectionString = "TTNMQTTConnectionString")] IMqttMessage message,
IMqttMessage message,
            ILogger log)
      {
         var body = Encoding.UTF8.GetString(message.GetMessage());

         log.LogInformation($"Advanced: message from topic {message.Topic} \nbody: {body}");
      }
   }
}

I configured the TTNMQTTConnectionString in the application’s local.settings.json file

{
    "IsEncrypted": false,
   "Values": {
      "AzureWebJobsStorage": "DefaultEndpointsProtocol=https;AccountName=...",
      "AzureWebJobsDashboard": "DefaultEndpointsProtocol=https;AccountName=...",
      "FUNCTIONS_WORKER_RUNTIME": "dotnet",
      "TTNMQTTConnectionString": "Server=...;Username=application1@...;Password=...",
   }
}

This was a good start but I need to be able to configure the MQTT topic for deployments.

After looking at the binding source code plus some trial and error based on the AdvancedConfiguration sample I have a nasty PoC

public static class Subscribe
{
   [FunctionName("UplinkMessageProcessor")]
   public static void UplinkMessageProcessor(
         [MqttTrigger(typeof(ExampleMqttConfigProvider), "v3/%TopicName%/devices/+/up")] IMqttMessage message,
         ILogger log)
   {
      var body = Encoding.UTF8.GetString(message.GetMessage());

      log.LogInformation($"Advanced: message from topic {message.Topic} \nbody: {body}");
   }
}

public class MqttConfigExample : CustomMqttConfig
{
  public override IManagedMqttClientOptions Options { get; }

  public override string Name { get; }

  public MqttConfigExample(string name, IManagedMqttClientOptions options)
  {
     Options = options;
     Name = name;
   }
}

public class ExampleMqttConfigProvider : ICreateMqttConfig
{
   public CustomMqttConfig Create(INameResolver nameResolver, ILogger logger)
   {
      var connectionString = new MqttConnectionString(nameResolver.Resolve("TTNMQTTConnectionString"), "CustomConfiguration");

      var options = new ManagedMqttClientOptionsBuilder()
             .WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
             .WithClientOptions(new MqttClientOptionsBuilder()
                  .WithClientId(connectionString.ClientId.ToString())
                  .WithTcpServer(connectionString.Server, connectionString.Port)
                  .WithCredentials(connectionString.Username, connectionString.Password)
                  .Build())
             .Build();

      return new MqttConfigExample("CustomConnection", options);
   }
}

The TTNMQTTConnectionString and TopicName can be configured in the application’s local.settings.json file

{
    "IsEncrypted": false,
   "Values": {
      "AzureWebJobsStorage": "DefaultEndpointsProtocol=https;AccountName=...",
      "AzureWebJobsDashboard": "DefaultEndpointsProtocol=https;AccountName=...",
      "FUNCTIONS_WORKER_RUNTIME": "dotnet",
      "TTNMQTTConnectionString": "Server=...;Username=application1@...;Password=...",
      "TopicName": "application1@..."
   }
}

When run in the Azure Functions Core Tools the simulated message properties and payload are displayed

The message “The ‘UplinkMessageProcessor’ function is in error: Unable to configure binding ‘message’ of type ‘mqttTrigger’. This may indicate invalid function.json properties. Can’t figure out which ctor to call.” needs further investigation.