Azure IoT Central Connectivity Part4

The Things Network(TTN) Friendly Commands

I have built a several Proof of Concept(PoC) applications (Azure IoT Central Basic Telemetry, Basic Commands, and Request Commands) to explore to how an Azure IoT Central integration with TTN could work. This blog post is about how to configure queued and non queued Cloud to Device(C2D) Commands with request parameters so they should work with my TTN Message Queue Telemetry Transport(MQTT) Data API connector.

I have focused on commands with Analog values but the same approach should be valid for other parameter types like Boolean, Date, DateTime, Double, Duration, Enumeration, Float, Geopoint, Vector, Integer, Long, String, and Time.

Multiple versions of my Azure IoT Central templates

There was a lot of “trial and error” (26 template versions) required to figure out how to configure commands and queued commands so they can and used in TTN downlink payloads.

{
  "end_device_ids": {
    "device_id": "dev1",
    "application_ids": {
      "application_id": "app1"
    },
    "dev_eui": "4200000000000000",
    "join_eui": "4200000000000000",
    "dev_addr": "00E6F42A"
  },
  "correlation_ids": [
    "my-correlation-id",
    "..."
  ],
  "downlink_ack": {
    "session_key_id": "AWnj0318qrtJ7kbudd8Vmw==",
    "f_port": 15,
    "f_cnt": 11,
    "frm_payload": "....",
    "decoded_payload": 
    {
      "Value_0":"1.23"
      ...
    }
    "confirmed": true,
    "priority": "NORMAL",
    "correlation_ids": [
      "my-correlation-id",
      "..."
    ]
  }
}

My Azure IoT Central client application displays the generated message message including the decoded_payload field which is used by the built in Cayenned Low Power Protocol(LPP) decoder/encoder and other custom encoders/decoders.

Azure IoT Central commands for TTN/TTI integration

From the “Device Commands” form I can send commands and a queued commands which have float parameters or object parameters which contain one or more float values in a JSON payload.

For commands which call the methodHander which was been registered by calling SetMethodDefaultHandlerAsync the request payload can be JSON or plain text. If the payload is valid JSON it is “grafted”(couldn’t think of a better word) into the decoded_payload field. If the payload is not valid a JSON object with the method name as the “name” and the text payload as the value is added the decoded_payload.

private static async Task<MethodResponse> MethodCallbackDefaultHandler(MethodRequest methodRequest, object userContext)
{
   AzureIoTMethodHandlerContext receiveMessageHandlerConext = (AzureIoTMethodHandlerContext)userContext;

   Console.WriteLine($"Default handler method {methodRequest.Name} was called.");

   Console.WriteLine($"Payload:{methodRequest.DataAsJson}");
   Console.WriteLine();

   if (string.IsNullOrWhiteSpace(methodRequest.Name))
   {
      Console.WriteLine($"   Method Request Name null or white space");
      return new MethodResponse(400);
   }

   string payloadText = Encoding.UTF8.GetString(methodRequest.Data);
   if (string.IsNullOrWhiteSpace(payloadText))
   {
       Console.WriteLine($"   Payload null or white space");
       return new MethodResponse(400);
   }

   // At this point would check to see if Azure DeviceClient is in cache, this is so nasty
   if ( String.Compare( methodRequest.Name, "Analog_Output_1", true) ==0 )
   {
      Console.WriteLine($"   Device not found");
      return new MethodResponse(UTF8Encoding.UTF8.GetBytes("Device not found"), 404);
   }

   JObject payload;

   if (IsValidJSON(payloadText))
   {
      payload = JObject.Parse(payloadText);
   }
   else
   {
      payload = new JObject
      {
         { methodRequest.Name, payloadText }
      };
   }

   string downlinktopic = $"v3/{receiveMessageHandlerConext.ApplicationId}@{receiveMessageHandlerConext.TenantId}/devices/{receiveMessageHandlerConext.DeviceId}/down/push";

   DownlinkPayload downlinkPayload = new DownlinkPayload()
   {
      Downlinks = new List<Downlink>()
      {
         new Downlink()
         {
            Confirmed = false,
            //PayloadRaw = messageBody,
            PayloadDecoded = payload,
            Priority = DownlinkPriority.Normal,
            Port = 10,
            /*
            CorrelationIds = new List<string>()
            {
               methodRequest.LockToken
            }
            */
         }
      }
   };

   Console.WriteLine($"TTN Topic :{downlinktopic}");
   Console.WriteLine($"TTN downlink JSON :{JsonConvert.SerializeObject(downlinkPayload, Formatting.Indented)}");

   return new MethodResponse(200);
}
Configuration of unqueued Commands with a typed payload
The output of my test harness for a Command for a typed payload
Configuring fields of object payload(JSON)

A JSON request payload also supports downlink messages with more that one value.

The output of my test harness for a Command with an object payload(JSON)

For queued commands which call the ReceiveMessageHandler which has was registered by calling SetReceiveMessageHandler the request payload is JSON or plain text.

private async static Task ReceiveMessageHandler(Message message, object userContext)
{
   AzureIoTMessageHandlerContext receiveMessageHandlerConext = (AzureIoTMessageHandlerContext)userContext;

   Console.WriteLine($"ReceiveMessageHandler handler method was called.");

   Console.WriteLine($" Message ID:{message.MessageId}");
   Console.WriteLine($" Message Schema:{message.MessageSchema}");
   Console.WriteLine($" Correlation ID:{message.CorrelationId}");
   Console.WriteLine($" Lock Token:{message.LockToken}");
   Console.WriteLine($" Component name:{message.ComponentName}");
   Console.WriteLine($" To:{message.To}");
   Console.WriteLine($" Module ID:{message.ConnectionModuleId}");
   Console.WriteLine($" Device ID:{message.ConnectionDeviceId}");
   Console.WriteLine($" User ID:{message.UserId}");
   Console.WriteLine($" CreatedAt:{message.CreationTimeUtc}");
   Console.WriteLine($" EnqueuedAt:{message.EnqueuedTimeUtc}");
   Console.WriteLine($" ExpiresAt:{message.ExpiryTimeUtc}");
   Console.WriteLine($" Delivery count:{message.DeliveryCount}");
   Console.WriteLine($" InputName:{message.InputName}");
   Console.WriteLine($" SequenceNumber:{message.SequenceNumber}");

   foreach (var property in message.Properties)
   {
      Console.WriteLine($"   Key:{property.Key} Value:{property.Value}");
   }

   Console.WriteLine($" Content encoding:{message.ContentEncoding}");
   Console.WriteLine($" Content type:{message.ContentType}");
   string payloadText = Encoding.UTF8.GetString(message.GetBytes());
   Console.WriteLine($" Content:{payloadText}");
   Console.WriteLine();

   if (!message.Properties.ContainsKey("method-name"))
   {
      await receiveMessageHandlerConext.AzureIoTHubClient.RejectAsync(message);
      Console.WriteLine($"   Property method-name not found");
      return;
   }

   string methodName = message.Properties["method-name"];
   if (string.IsNullOrWhiteSpace( methodName))
   {
      await receiveMessageHandlerConext.AzureIoTHubClient.RejectAsync(message);
      Console.WriteLine($"   Property null or white space");
      return;
   }

   if (string.IsNullOrWhiteSpace(payloadText))
   {
      await receiveMessageHandlerConext.AzureIoTHubClient.RejectAsync(message);
      Console.WriteLine($"   Payload null or white space");
      return;
   }

   JObject payload;

   if (IsValidJSON(payloadText))
   {
      payload = JObject.Parse(payloadText);
   }
   else
   {
      payload = new JObject
      {
         { methodName, payloadText }
      };
   }

   string downlinktopic = $"v3/{receiveMessageHandlerConext.ApplicationId}@{receiveMessageHandlerConext.TenantId}/devices/{receiveMessageHandlerConext.DeviceId}/down/push";

   DownlinkPayload downlinkPayload = new DownlinkPayload()
   {
      Downlinks = new List<Downlink>()
      {
         new Downlink()
         {
            Confirmed = false,
            //PayloadRaw = messageBody,
            PayloadDecoded = payload,
            Priority = DownlinkPriority.Normal,
            Port = 10,
            CorrelationIds = new List<string>()
            {
               message.LockToken
            }
         }
      }
   };

   Console.WriteLine($"TTN Topic :{downlinktopic}");
   Console.WriteLine($"TTN downlink JSON :{JsonConvert.SerializeObject(downlinkPayload, Formatting.Indented)}");

   //await receiveMessageHandlerConext.AzureIoTHubClient.AbandonAsync(message); // message retries
   //await receiveMessageHandlerConext.AzureIoTHubClient.CompleteAsync(message);
   await receiveMessageHandlerConext.AzureIoTHubClient.CompleteAsync(message.LockToken);
   //await receiveMessageHandlerConext.AzureIoTHubClient.RejectAsync(message); // message gone no retry
}

When I initiated an Analog queued command the message handler was invoked with the name of the command capability (Analog_Output_2) in a message property called “method-name”. For a typed parameter the message content was a string representation of the value. For an object parameter the payload contains a JSON representation of the request field(s)

The output of my test harness for a Queued Command with a typed payload

A JSON request payload supports downlink message with more that one value.

The output of my test harness for a Queued Command with an object payload(JSON)

The choice of Value_0, Value_1 (I think they are float64 type) etc. for the decoded_payload is specified in the Cayenne LPP downlink decode/encoder source code.

The context information for both comments and queued commands provides additional information required to construct the MQTT topic for publishing the downlink messages.

For queued commands the correlation_id will contain the message.LockToken so that messages can be Abandoned, Completed or Rejected. The MQTT broker publishes a series of topics so the progress of the transmission of downlink message can be monitored.

If the device is not known the Abandon method will be called immediately. For command messages Completed will be called as soon as the message is “sent”

  • v3/{application id}@{tenant id}/devices/{device id}/down/queued
  • v3/{application id}@{tenant id}/devices/{device id}/down/sent
  • v3/{application id}@{tenant id}/devices/{device id}/down/ack
  • v3/{application id}@{tenant id}/devices/{device id}/down/nack
  • v3/{application id}@{tenant id}/devices/{device id}/down/failed

For queued messages the point in the delivery process where the Abandoned, Completed and Rejected methods will be called will be configurable.

Azure IoT Central Connectivity Part3

Request Commands

I have built a couple of proof of Concept(PoC) applications to explore the Basic Telemetry and Basic Command functionality of Azure IoT Central. This blog post is about queued and non queued Cloud to Device(C2D) Commands with request parameters.

I initially created an Azure IoT Central Device Template with command and telemetry device capabilities.

“Collapsed” Command Request template
Command Request Template digital commands

I tried typed request and object based parameters to explorer how an integration with The Things Network(TTN)/The Things Industries(TTI) using the Message Queue Telemetry Transport(MQTT) interface could work.

Object parameter schema designer

With object based parameters the request JSON could contain more than one value though the validation of user provided information didn’t appear to be as robust.

Object parameter schema definition

I “migrated” my third preconfigured device to the CommandRequest template to see how the commands with Request parameters interacted with my PoC application.

After “migrating” my device I went back and created a Template view so I could visualise the simulated telemetry from my PoC application and provide a way to initiate commands (Didn’t really need four command tiles as they all open the Device commands form).

CommandRequest device template default view

From the Device Commands form I could send commands and a queued commands which had analog or digital parameters.

Device Three Command Tab

When I initiated an Analog non-queued command the default method handler was invoked with the name of the command capability (Analog_Output_1) as the method name and the payload contained a JSON representation of the request values(s). With a typed parameter a string representation of the value was in the message payload. With a typed parameter a string representation of the value was in the message payload rather than JSON.

Console application displaying Analog request and Analog Request queued commands

When I initiated an Analog queued command the message handler was invoked with the name of the command capability (Analog_Output_2) in a message property called “method-name” and the payload contained a JSON representation of the request value(s). With a typed parameter a string representation of the value was in the message payload rather than JSON.

When I initiated a Digital non-queued command the default method handler was invoked with the name of the command capability (Digital_Output_1) as the method name and the payload contained a JSON representation of the request values(s). With a typed parameter a string representation of the value was in the message payload rather than JSON.

Console application displaying Digital request and Digital Request queued commands

When I initiated a Digital queued command the message handler was invoked with the name of the command capability(Digital_Output_2) in a message property called “method-name” and the payload contained a JSON representation of the request value(s). With a typed parameter a string representation of the value was in the message payload rather than JSON.

The validation of user input wasn’t as robust as I expected, with problems selecting checkboxes with a mouse when there were several Boolean fields. I often had to click on a nearby input field and use the TAB button to navigate to the desired checkbox. I also had problems with ISO 8601 format date validation as the built in Date Picker returned a month, day, year date which was not editable and wouldn’t pass validation.

The next logical step would be to look at commands with a Response parameter but as the MQTT interface is The Things Network(TTN) and The Things Industries(TTI) is asynchronous and devices reporting every 5 minutes to a couple of times a day there could be a significant delay between sending a message and receiving an optional delivery confirmation or response.

Azure IoT Central Connectivity Part2

Basic Commands

I have been struggling with making The Things Network(TTN) and The Things Industries(TTI) uplink/downlink messages work well Azure IoT Central. To explore different messaging approaches I have built a proof of Concept(PoC) application which simulates TTN/TTI connectivity to an Azure IoT Hub, or Azure IoT Central.

This blog post is about queued and non queued Cloud to Device(C2D) commands without request or response parameters. I have mostly used non queued commands in other projects (my Azure IoT Hub LoRa and RF24L01 gateways) to “Restart” devices etc..

The first step was to create an Azure IoT Central Device Template with command and telemetry device capabilities.

CommandBasic device template device with command & telemetry capabilities

I then “migrated” my second preconfigured device to the CommandBasic template.

Migrating a device to TelemetryBasic template

I then went back and created a Template view to visualise the telemetry from my console application and initiate commands.

CommandBasic device template default view

I modified the PoC application adding handlers for Methods (SetMethodDefaultHandlerAsync) and Messages (SetReceiveMessageHandlerAsync).

private static async Task ApplicationCore(CommandLineOptions options)
{
   DeviceClient azureIoTHubClient;
   Timer MessageSender;

   try
   {
      // Open up the connection
      azureIoTHubClient = DeviceClient.CreateFromConnectionString(options.AzureIoTHubconnectionString, TransportType.Amqp_Tcp_Only);

      await azureIoTHubClient.OpenAsync();
      await azureIoTHubClient.SetReceiveMessageHandlerAsync(ReceiveMessageHandler, azureIoTHubClient);
      await azureIoTHubClient.SetMethodHandlerAsync("Named", MethodCallbackNamedHandler, null);
      await azureIoTHubClient.SetMethodDefaultHandlerAsync(MethodCallbackDefaultHandler, null);

      MessageSender = new Timer(TimerCallbackAsync, azureIoTHubClient, new TimeSpan(0, 0, 10), new TimeSpan(0, 2, 0));

      Console.WriteLine("Press any key to exit");
      while (!Console.KeyAvailable)
      {
         await Task.Delay(100);
      }
   }
   catch (Exception ex)
   {
      Console.WriteLine($"Main {ex.Message}");
      Console.WriteLine("Press <enter> to exit");
      Console.ReadLine();
   }
}

The method handler displays the method name and the message payload.

private static async Task<MethodResponse> MethodCallbackDefaultHandler(MethodRequest methodRequest, object userContext)
{
   Console.WriteLine($"Default handler method {methodRequest.Name} was called.");

   Console.WriteLine($"Payload:{methodRequest.DataAsJson}");
   Console.WriteLine();

   //return new MethodResponse(400);
   //return new MethodResponse(404);
   return new MethodResponse(200);
}

The message handler displays a selection the message properties, any attributes and the message payload.

 private async static Task ReceiveMessageHandler(Message message, object userContext)
{
   DeviceClient azureIoTHubClient = (DeviceClient)userContext;

   Console.WriteLine($"ReceiveMessageHandler handler method was called.");

   Console.WriteLine($" Message ID:{message.MessageId}");
   Console.WriteLine($" Message Schema:{message.MessageSchema}");
   Console.WriteLine($" Correlation ID:{message.CorrelationId}");
   Console.WriteLine($" Component name:{message.ComponentName}");
   Console.WriteLine($" To:{message.To}");
   Console.WriteLine($" Module ID:{message.ConnectionModuleId}");
   Console.WriteLine($" Device ID:{message.ConnectionDeviceId}");
   Console.WriteLine($" CreatedAt:{message.CreationTimeUtc}");
   Console.WriteLine($" EnqueuedAt:{message.EnqueuedTimeUtc}");
   Console.WriteLine($" ExpiresAt:{message.ExpiryTimeUtc}");
   Console.WriteLine($" Delivery count:{message.DeliveryCount}");
   Console.WriteLine($" InputName:{message.InputName}");
   Console.WriteLine($" SequenceNumber:{message.SequenceNumber}");

   foreach (var property in message.Properties)
   {
     Console.WriteLine($"   Key:{property.Key} Value:{property.Value}");
   }

   Console.WriteLine($" Content encoding:{message.ContentEncoding}");
   Console.WriteLine($" Content type:{message.ContentType}");
   Console.WriteLine($" Content:{Encoding.UTF8.GetString(message.GetBytes())}");
   Console.WriteLine();

   //await azureIoTHubClient.AbandonAsync(message); // message retries
   await azureIoTHubClient.CompleteAsync(message);
   //await azureIoTHubClient.RejectAsync(message); // message gone no retry
}

From the Device Commands tab I can could non queued and a queued commands

Device Two Commands tab

When I sent a non-queued command the default method handler was invoked with the name of the command capability (Digital_Output_0) as the method name and an empty payload. In the Azure IoT Central interface I couldn’t see any difference for successful (HTTP 200 OK) or failure (HTTP 400 Bad Request or HTTP 404 Not Found) responses. If the application was not running the command failed immediately.

Console application displaying non-queued call

With Azure IoT Explorer failure responses were visible.

Azure IoT Explorer show message with 404 response

When I sent a queued command the message handler was invoked with the name of the command capability(Digital_Output_1) in a message property called “method-name” and the payload contained only an “@” character.

Console application displaying queued call

If the application was not running the command was queued until the Console application was started. When the console application was running and AbandonAsync was called rather than CompleteAsync the message was retried 10 times. If RejectAsync was called rather than CompleteAsync the message was deleted from the queue and not retried. There didn’t appear to be any difference for the displayed Azure IoT Central or Azure IoT Hub explorer results when AbandonAsync or RejectAsync were called.

I also created a personal dashboard to visualise the telemetry data and initiate commands. The way the two commands were presented on the dashboard was quite limited so I will go back to the documentation and see what I missed

Azure IoT Central Connectivity Part1

Basic Telemetry

I have been struggling with making The Things Network(TTN) and The Things Industries(TTI) uplink/downlink messages Azure IoT Central compatible. To explore the messaging approaches used I have built a proof of Concept(PoC) application which simulates TTN/TTI connectivity to an Azure IoT Hub, or Azure IoT Central.

My “nasty” console application uses the Azure DeviceClient library (Advanced Message Queuing Protocol(AMQP) connectivity) to explore how to interface with Azure IoT Central. This first blog post is about to Device Cloud(D2C) telemetry

The first step was to create an Azure IoT Central Device Template with a selection of telemetry capabilities.

TelemetryBasic device template device capabilities

I then created a Plain old Common Language Runtime(CLR) object(PoCo) with Newtonsoft JSON library attributes to fine tune the serialisation/deserialation.

public class GPSPosition
{    
   [JsonProperty("lat")]
   public float Latitude { get; set; }
   [JsonProperty("lon")]
   public float Longitude { get; set; }
   [JsonProperty("alt")]
   public float Altitude { get; set; }
}

public class DigitialTelemetryPayload
{
   [JsonProperty("Digital_Input_0")]
   public bool DigitalInput { get; set; }

   [JsonProperty("Analog_Input_0")]
   public float AnalogInput { get; set; }

   [JsonProperty("GPS_0")]
   public GPSPosition GPSPosition { get; set; }
 }

I created five devices and generated their connection strings using the DPS individual enrollment functionality of one my other sample applications.

I then “migrated” the first device to my BasicTelemetry template

Migrating a device to TelemetryBasic template

I then went back and created a Template view to visualise the telemetry from my console application.

TelemetryBasic device template default view

Then I configured a preview device so the template view was populated with “realistic” data.

TelemetryBasic device template default view configuring a device as data source

The console application simulates a digital input (random true/false), analog input (random value between 0.0 and 1.0) and a Global Positioning System(GPS) location (Christchurch Anglican Cathedral with a random latitude, longitude and altitude offset) .

Basic Telemetry Console Application

The final step was to create an Azure IoT Central Personal dashboard to visualise the data from my simulated device.

Basic Telemetry Dashboard

Connecting a Device, creating a Device Template, Migrating the Device, and then displaying telemetry on a personal dashboard was a good introduction to interfacing with and configuring Azure IoT Central devices.

In other applications I have mapped “payload_fields” to an Azure IoT Central telemetry payload with minimal code.

{
   "app_id": "rak811wisnodetest",
   "dev_id": "seeeduinolorawan4",
   "hardware_serial": "1234567890123456",
   "port": 10,
   "counter": 1,
   "is_retry": true,
   "payload_raw": "AWcBEAFlAGQBAAEBAgAyAYgAqYgGIxgBJuw=",
   "payload_fields": {
      "analog_in_1": 0.5,
      "digital_in_1": 1,
      "gps_1": {
         "altitude": 755,
         "latitude": 4.34,
         "longitude": 40.22
      },
      "luminosity_1": 100,
      "temperature_1": 27.2
   },
   "metadata": {
      "time": "2020-08-28T10:41:04.496594225Z",
      "frequency": 923.4,
      "modulation": "LORA",
      "data_rate": "SF12BW125",
      "coding_rate": "4/5",
      "gateways": [
         {
            "gtw_id": "eui-b827ebfffe6c279d",
            "timestamp": 3971612260,
            "time": "2020-08-28T10:41:03.313471Z",
            "channel": 1,
            "rssi": -53,
            "snr": 11.2,
            "rf_chain": 0,
            "latitude": -43.49885,
            "longitude": 172.60095,
            "altitude": 25
         }
      ]
   },
   "downlink_url": "https://integrations.thethingsnetwork.org/ttn-eu/api/v2/down/rak811wisnodetest/azure-webapi-endpoint?key=ttn-account-v2.12345678901234567_12345_1234567-dduo"
}

This was a longish post with lots of screen shots so I don’t have to repeat core setup instructions in future posts.

The Things Network MQTT & Azure IoT Part3A

Cloud to Device with frm_payload no confirmation

An Azure IoT Hub supports three kinds for Cloud to Device(C2D) messaging and my gateway will initially support only Direct Methods and Cloud-to-device messages.

The first step was to add the The Things Network(TTN) V3 Tennant ID to the context information as it is required for the downlink Message Queue Telemetry Transport (MQTT) publish topic.

namespace devMobile.TheThingsNetwork.Models
{
   public class AzureIoTHubReceiveMessageHandlerContext
   {
      public string TenantId { get; set; }
      public string DeviceId { get; set; }
      public string ApplicationId { get; set; }
   }
}

The object is passed as the context parameter of the SetReceiveMessageHandlerAsync method.

try
{
	DeviceClient deviceClient = DeviceClient.CreateFromConnectionString(
		options.AzureIoTHubconnectionString,
		endDevice.Ids.Device_id,
		TransportType.Amqp_Tcp_Only);

	await deviceClient.OpenAsync();

	AzureIoTHubReceiveMessageHandlerContext context = new AzureIoTHubReceiveMessageHandlerContext()
	{
		TenantId = options.Tenant,
		DeviceId = endDevice.Ids.Device_id,
		ApplicationId = options.ApiApplicationID,
	};

	await deviceClient.SetReceiveMessageHandlerAsync(AzureIoTHubClientReceiveMessageHandler, context);
	
	DeviceClients.Add(endDevice.Ids.Device_id, deviceClient, cacheItemPolicy);
}
catch( Exception ex)
{
	Console.WriteLine($"Azure IoT Hub OpenAsync failed {ex.Message}");
}

To send a message to a LoRaWAN device in addition to the payload, TTN needs the port number and optionally a confirmation required flag, message priority, queueing type and correlation ids.

With my implementation the confirmation required flag, message priority, and queueing type are Azure IoT Hub message properties and the messageid is used as a correlation id.

private async static Task AzureIoTHubClientReceiveMessageHandler(Message message, object userContext)
{
	bool confirmed;
	byte port;
	DownlinkPriority priority;
	string downlinktopic;

	try
	{
		AzureIoTHubReceiveMessageHandlerContext receiveMessageHandlerConext = (AzureIoTHubReceiveMessageHandlerContext)userContext;

		DeviceClient deviceClient = (DeviceClient)DeviceClients.Get(receiveMessageHandlerConext.DeviceId);
		if (deviceClient == null)
		{
			Console.WriteLine($" UplinkMessageReceived unknown DeviceID: {receiveMessageHandlerConext.DeviceId}");
			await deviceClient.RejectAsync(message);
			return;
		}

		using (message)
		{
			Console.WriteLine();
			Console.WriteLine();
			Console.WriteLine($"{DateTime.UtcNow:HH:mm:ss} Azure IoT Hub downlink message");
			Console.WriteLine($" ApplicationID: {receiveMessageHandlerConext.ApplicationId}");
			Console.WriteLine($" DeviceID: {receiveMessageHandlerConext.DeviceId}");
#if DIAGNOSTICS_AZURE_IOT_HUB
			Console.WriteLine($" Cached: {DeviceClients.Contains(receiveMessageHandlerConext.DeviceId)}");
			Console.WriteLine($" MessageID: {message.MessageId}");
			Console.WriteLine($" DeliveryCount: {message.DeliveryCount}");
			Console.WriteLine($" EnqueuedTimeUtc: {message.EnqueuedTimeUtc}");
			Console.WriteLine($" SequenceNumber: {message.SequenceNumber}");
			Console.WriteLine($" To: {message.To}");
#endif
			string messageBody = Encoding.UTF8.GetString(message.GetBytes());
			Console.WriteLine($" Body: {messageBody}");
#if DOWNLINK_MESSAGE_PROPERTIES_DISPLAY
			foreach (var property in message.Properties)
			{
				Console.WriteLine($"   Key:{property.Key} Value:{property.Value}");
			}
#endif
			if (!message.Properties.ContainsKey("Confirmed"))
			{
				Console.WriteLine(" UplinkMessageReceived missing confirmed property");
				await deviceClient.RejectAsync(message);
				return;
			}

			if (!bool.TryParse(message.Properties["Confirmed"], out confirmed))
			{
				Console.WriteLine(" UplinkMessageReceived confirmed property invalid");
				await deviceClient.RejectAsync(message);
				return;
			}

			if (!message.Properties.ContainsKey("Priority"))
			{
				Console.WriteLine(" UplinkMessageReceived missing priority property");
				await deviceClient.RejectAsync(message);
				return;
			}

			if (!Enum.TryParse(message.Properties["Priority"], true, out priority))
			{
				Console.WriteLine(" UplinkMessageReceived priority property invalid");
				await deviceClient.RejectAsync(message);
				return;
			}

			if (priority == DownlinkPriority.Undefined)
			{
				Console.WriteLine(" UplinkMessageReceived priority property undefined value invalid");
				await deviceClient.RejectAsync(message);
				return;
			}

			if (!message.Properties.ContainsKey("Port"))
			{
				Console.WriteLine(" UplinkMessageReceived missing port number property");
				await deviceClient.RejectAsync(message);
				return;
			}

			if (!byte.TryParse( message.Properties["Port"], out port))
			{
				Console.WriteLine(" UplinkMessageReceived port number property invalid");
				await deviceClient.RejectAsync(message);
				return;
			}

			if ((port < Constants.PortNumberMinimum) || port > (Constants.PortNumberMaximum))
			{
				Console.WriteLine($" UplinkMessageReceived port number property invalid value must be between {Constants.PortNumberMinimum} and {Constants.PortNumberMaximum}");
				await deviceClient.RejectAsync(message);
				return;
			}

			if (!message.Properties.ContainsKey("Queue"))
			{
				Console.WriteLine(" UplinkMessageReceived missing queue property");
				await deviceClient.RejectAsync(message);
				return;
			}

			switch(message.Properties["Queue"].ToLower())
			{
				case "push":
					downlinktopic = $"v3/{receiveMessageHandlerConext.ApplicationId}@{receiveMessageHandlerConext.TenantId}/devices/{receiveMessageHandlerConext.DeviceId}/down/push";
					break;
				case "replace":
					downlinktopic = $"v3/{receiveMessageHandlerConext.ApplicationId}@{receiveMessageHandlerConext.TenantId}/devices/{receiveMessageHandlerConext.DeviceId}/down/replace";
					break;
				default:
					Console.WriteLine(" UplinkMessageReceived missing queue property invalid value");
					await deviceClient.RejectAsync(message);
					return;
               }

			DownlinkPayload Payload = new DownlinkPayload()
			{
				Downlinks = new List<Downlink>()
				{ 
					new Downlink()
					{
						Confirmed = confirmed,
						PayloadRaw = messageBody,
						Priority = priority,
						Port = port,
						CorrelationIds = new List<string>()
						{
							message.MessageId
						}
					}
				}
			};

			var mqttMessage = new MqttApplicationMessageBuilder()
					.WithTopic(downlinktopic)
					.WithPayload(JsonConvert.SerializeObject(Payload))
					.WithAtLeastOnceQoS()
					.Build();

			await mqttClient.PublishAsync(mqttMessage);

			// Need to look at confirmation requirement ack, nack maybe failed & sent
			await deviceClient.CompleteAsync(message);

			Console.WriteLine();
		}
	}
	catch (Exception ex)
	{
		Debug.WriteLine("UplinkMessageReceived failed: {0}", ex.Message);
	}
}

To “smoke test”” my implementation I used Azure IoT Explorer to send a C2D telemetry message

Azure IoT Hub Explorer send message form with payload and message properties

The PoC console application then forwarded the message to TTN using MQTT to be sent(which fails)

PoC application sending message then displaying result

The TTN live data display shows the message couldn’t be delivered because my test LoRaWAN device has not been activiated.

TTN Live Data display with message delivery failure

Now that my PoC application can receive and transmit message to devices I need to reconfigure my RAK Wisgate Developer D+ gateway and Seeeduino LoRaWAN and RAK Wisnode 7200 Track Lite devices on The Things Industries Network so I can test my approach with more realistic setup.

The Things Network MQTT & Azure IoT Part2

Uplink with decoded_payload & frm_payload

The next functionality added to my Proof of Concept(PoC) Azure IoT Hub, The Things Network(TTN) V3 Hypertext Transfer Protocol(HTTP) client API Integration, and Message Queue Telemetry Transport (MQTT) Data API Integration is sending of raw and decoded uplink messages to an Azure IoT Hub.

// At this point all the AzureIoT Hub deviceClients setup and ready to go so can enable MQTT receive
mqttClient.UseApplicationMessageReceivedHandler(new MqttApplicationMessageReceivedHandlerDelegate(e => MqttClientApplicationMessageReceived(e)));

// This may shift to individual device subscriptions
string uplinkTopic = $"v3/{options.MqttApplicationID}/devices/+/up";
await mqttClient.SubscribeAsync(uplinkTopic, MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce);

//string queuedTopic = $"v3/{options.MqttApplicationID}/devices/+/queued";
//await mqttClient.SubscribeAsync(queuedTopic, MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce);

The additional commented out subscriptions are for the processing of downlink messages

The MQTTNet received message handler uses the last segment of the topic to route messages to a method for processing

private static async void MqttClientApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e)
{
	if (e.ApplicationMessage.Topic.EndsWith("/up", StringComparison.InvariantCultureIgnoreCase))
	{
		await UplinkMessageReceived(e);
	}

	/*
	if (e.ApplicationMessage.Topic.EndsWith("/queued", StringComparison.InvariantCultureIgnoreCase))
	{
		await DownlinkMessageQueued(e);
	}
	...			
	*/
}

The UplinkMessageReceived method deserialises the message payload, retrieves device context information from the local ObjectCache, adds relevant uplink messages fields (including the raw payload), then if the message has been unpacked by a TTN Decoder, the message fields are added as well.

static async Task UplinkMessageReceived(MqttApplicationMessageReceivedEventArgs e)
{
	try
	{
		PayloadUplinkV3 payload = JsonConvert.DeserializeObject<PayloadUplinkV3>(e.ApplicationMessage.ConvertPayloadToString());
		string applicationId = payload.EndDeviceIds.ApplicationIds.ApplicationId;
		string deviceId = payload.EndDeviceIds.DeviceId;
		int port = payload.UplinkMessage.Port;
...
		DeviceClient deviceClient = (DeviceClient)DeviceClients.Get(deviceId);
		if (deviceClient == null)
		{
			Console.WriteLine($" UplinkMessageReceived unknown DeviceID: {deviceId}");
			return;
		}

		JObject telemetryEvent = new JObject();
		telemetryEvent.Add("DeviceID", deviceId);
		telemetryEvent.Add("ApplicationID", applicationId);
		telemetryEvent.Add("Port", port);
		telemetryEvent.Add("PayloadRaw", payload.UplinkMessage.PayloadRaw);

		// If the payload has been unpacked in TTN backend add fields to telemetry event payload
		if (payload.UplinkMessage.PayloadDecoded != null)
		{
			EnumerateChildren(telemetryEvent, payload.UplinkMessage.PayloadDecoded);
		}

		// Send the message to Azure IoT Hub/Azure IoT Central
		using (Message ioTHubmessage = new Message(Encoding.ASCII.GetBytes(JsonConvert.SerializeObject(telemetryEvent))))
		{
			// Ensure the displayed time is the acquired time rather than the uploaded time. 
			//ioTHubmessage.Properties.Add("iothub-creation-time-utc", payloadObject.Metadata.ReceivedAtUtc.ToString("s", CultureInfo.InvariantCulture));
			ioTHubmessage.Properties.Add("ApplicationId", applicationId);
			ioTHubmessage.Properties.Add("DeviceId", deviceId);
			ioTHubmessage.Properties.Add("port", port.ToString());

			await deviceClient.SendEventAsync(ioTHubmessage);
		}
	}
	catch( Exception ex)
	{
		Debug.WriteLine("UplinkMessageReceived failed: {0}", ex.Message);
	}
}

private static void EnumerateChildren(JObject jobject, JToken token)
{
	if (token is JProperty property)
	{
		if (token.First is JValue)
		{
			// Temporary dirty hack for Azure IoT Central compatibility
			if (token.Parent is JObject possibleGpsProperty)
			{
				if (possibleGpsProperty.Path.StartsWith("GPS_", StringComparison.OrdinalIgnoreCase))
				{
					if (string.Compare(property.Name, "Latitude", true) == 0)
					{
						jobject.Add("lat", property.Value);
					}
					if (string.Compare(property.Name, "Longitude", true) == 0)
					{
						jobject.Add("lon", property.Value);
					}
					if (string.Compare(property.Name, "Altitude", true) == 0)
					{
						jobject.Add("alt", property.Value);
					}
				}
			}
			jobject.Add(property.Name, property.Value);
		}
		else
		{
			JObject parentObject = new JObject();
			foreach (JToken token2 in token.Children())
			{
				EnumerateChildren(parentObject, token2);
				jobject.Add(property.Name, parentObject);
			}
		}
	}
	else
	{
		foreach (JToken token2 in token.Children())
		{
			EnumerateChildren(jobject, token2);
		}
	}
}

There is also some basic reformatting of the messages for Azure IoT Central

TTN Simulate uplink message with GPS location payload.
Nasty console application processing uplink message
Message from LoRaWAN device displayed in Azure IoT Explorer

Currently the code has a lots of diagnostic Console.Writeline statements, doesn’t support Uplink messages, has no Advanced Message Queuing Protocol(AMQP) client connection pooling, can’t run as an Azure Webjob, and a number of other features which I plan on adding in future blog posts.

MQTTnet Azure Function Binding

It Looks promising

I’m using MQTTnet to build my The Things Industries client and it looked like the amount of code for my Message Queue Telemetry Transport (MQTT) Data API Integration could be reduced by using the AzureFunction MQTT Binding by Kees Schollaart.

I used The Things Industries simulate uplink functionality for my initial testing

TTI uplink message simulator

The first version of the Azure function code proof of concept(PoC) was very compact

namespace MQTTnetAzureFunction
{
   using System;
   using System.Text;
   using Microsoft.Azure.WebJobs;
   using Microsoft.Extensions.Logging;

   using CaseOnline.Azure.WebJobs.Extensions.Mqtt;
   using CaseOnline.Azure.WebJobs.Extensions.Mqtt.Messaging;
   using CaseOnline.Azure.WebJobs.Extensions.Mqtt.Config;
   using CaseOnline.Azure.WebJobs.Extensions.Mqtt.Bindings;

   using MQTTnet.Client.Options;
   using MQTTnet.Extensions.ManagedClient;

   public static class Subscribe
   {
      [FunctionName("UplinkMessageProcessor")]
      public static void UplinkMessageProcessor(
            [MqttTrigger("v3/application123456789012345/devices/+/up", ConnectionString = "TTNMQTTConnectionString")] IMqttMessage message,
IMqttMessage message,
            ILogger log)
      {
         var body = Encoding.UTF8.GetString(message.GetMessage());

         log.LogInformation($"Advanced: message from topic {message.Topic} \nbody: {body}");
      }
   }
}

I configured the TTNMQTTConnectionString in the application’s local.settings.json file

{
    "IsEncrypted": false,
   "Values": {
      "AzureWebJobsStorage": "DefaultEndpointsProtocol=https;AccountName=...",
      "AzureWebJobsDashboard": "DefaultEndpointsProtocol=https;AccountName=...",
      "FUNCTIONS_WORKER_RUNTIME": "dotnet",
      "TTNMQTTConnectionString": "Server=...;Username=application1@...;Password=...",
   }
}

This was a good start but I need to be able to configure the MQTT topic for deployments.

After looking at the binding source code plus some trial and error based on the AdvancedConfiguration sample I have a nasty PoC

public static class Subscribe
{
   [FunctionName("UplinkMessageProcessor")]
   public static void UplinkMessageProcessor(
         [MqttTrigger(typeof(ExampleMqttConfigProvider), "v3/%TopicName%/devices/+/up")] IMqttMessage message,
         ILogger log)
   {
      var body = Encoding.UTF8.GetString(message.GetMessage());

      log.LogInformation($"Advanced: message from topic {message.Topic} \nbody: {body}");
   }
}

public class MqttConfigExample : CustomMqttConfig
{
  public override IManagedMqttClientOptions Options { get; }

  public override string Name { get; }

  public MqttConfigExample(string name, IManagedMqttClientOptions options)
  {
     Options = options;
     Name = name;
   }
}

public class ExampleMqttConfigProvider : ICreateMqttConfig
{
   public CustomMqttConfig Create(INameResolver nameResolver, ILogger logger)
   {
      var connectionString = new MqttConnectionString(nameResolver.Resolve("TTNMQTTConnectionString"), "CustomConfiguration");

      var options = new ManagedMqttClientOptionsBuilder()
             .WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
             .WithClientOptions(new MqttClientOptionsBuilder()
                  .WithClientId(connectionString.ClientId.ToString())
                  .WithTcpServer(connectionString.Server, connectionString.Port)
                  .WithCredentials(connectionString.Username, connectionString.Password)
                  .Build())
             .Build();

      return new MqttConfigExample("CustomConnection", options);
   }
}

The TTNMQTTConnectionString and TopicName can be configured in the application’s local.settings.json file

{
    "IsEncrypted": false,
   "Values": {
      "AzureWebJobsStorage": "DefaultEndpointsProtocol=https;AccountName=...",
      "AzureWebJobsDashboard": "DefaultEndpointsProtocol=https;AccountName=...",
      "FUNCTIONS_WORKER_RUNTIME": "dotnet",
      "TTNMQTTConnectionString": "Server=...;Username=application1@...;Password=...",
      "TopicName": "application1@..."
   }
}

When run in the Azure Functions Core Tools the simulated message properties and payload are displayed

The message “The ‘UplinkMessageProcessor’ function is in error: Unable to configure binding ‘message’ of type ‘mqttTrigger’. This may indicate invalid function.json properties. Can’t figure out which ctor to call.” needs further investigation.

The Things Network MQTT & Azure IoT Part1

Side by Side

In my last few posts I have built Proof of Concept(PoC) The Things Network(TTN) V3 Hypertext Transfer Protocol(HTTP) API Integration and Message Queue Telemetry Transport (MQTT) Data API Integrations.

While building these PoCs I have learnt a lot about the way that the TTN V3 RESTful and MQTT APIs work and this is the first in a series of posts about linking them together. My plan is to start with yet another .NetCore Console application which hosts both the MQTT and Azure IoT Hub DeviceClient (using the Advanced Message Queueing Protocol(AMQP)) client implementations. I’m using MQTTnet to build my data API client and used NSwag by Richo Suter to generate my RESTful client from the TTN provided swagger file.

In this PoC I’m using the commandlineParser NuGet package to the reduce the amount of code required to process command line parameters and make it more robust. This PoC has a lot of command line parameters which would have been painful to manually parse and validate.

public class CommandLineOptions
{
	[Option('u', "APIbaseURL", Required = false, HelpText = "TTN Restful API URL.")]
	public string ApiBaseUrl { get; set; }

	[Option('K', "APIKey", Required = true, HelpText = "TTN Restful API APIkey")]
	public string ApiKey { get; set; }

	[Option('P', "APIApplicationID", Required = true, HelpText = "TTN Restful API ApplicationID")]
	public string ApiApplicationID { get; set; }

	[Option('D', "DeviceListPageSize", Required = true, HelpText = "The size of the pages used to retrieve EndDevice configuration")]
	public int DevicePageSize { get; set; }

	[Option('S', "MQTTServerName", Required = true, HelpText = "TTN MQTT API server name")]
	public string MqttServerName { get; set; }

	[Option('A', "MQTTAccessKey", Required = true, HelpText = "TTN MQTT API access key")]
	public string MqttAccessKey { get; set; }

	[Option('Q', "MQTTApplicationID", Required = true, HelpText = "TTN MQTT API ApplicationID")]
	public string MqttApplicationID { get; set; }

	[Option('C', "MQTTClientName", Required = true, HelpText = "TTN MQTT API Client ID")]
	public string MqttClientID { get; set; }

	[Option('Z', "AzureIoTHubConnectionString", Required = true, HelpText = "Azure IoT Hub Connection string")]
	public string AzureIoTHubconnectionString { get; set; }
}

To keep things simple in this PoC I’m using an Azure IoT Hub specific (rather than a device specific connection string)

Azure IoT Hub Device shared access policy selection

After some trial and error I found the order of execution was important

  • Open MQTTnet connection to TTN host (but don’t configure any subscriptions)
  • Configure connection to TTN RESTful API
  • Retrieve list of V3EndDevices (paginated), then for each V3EndDevice
    • Open connection to Azure IoT Hub using command line connection string + TTN Device ID
    • Call DeviceClient.SetReceiveMessageHandlerAsync to specify ReceiveMessageCallback and additional context information for processing Azure IoT Hub downlink messages.
    • Store DeviceClient instance in ObjectCache using DeviceID as key
  • Configure the MQTTnet recived message handler
  • Subscribe to uplink messages from all the V3EndDevices in the specified application.
private static async Task ApplicationCore(CommandLineOptions options)
{
	MqttFactory factory = new MqttFactory();
	mqttClient = factory.CreateMqttClient();

#if DIAGNOSTICS
	Console.WriteLine($"baseURL: {options.ApiBaseUrl}");
	Console.WriteLine($"APIKey: {options.ApiKey}");
	Console.WriteLine($"ApplicationID: {options.ApiApplicationID}");
	Console.WriteLine($"AazureIoTHubconnectionString: {options.AzureIoTHubconnectionString}");
	Console.WriteLine();
#endif

	try
	{
		// First configure MQTT, open connection and wire up disconnection handler. 
		// Can't wire up MQTT received handler as at this stage AzureIoTHub devices not connected.
		mqttOptions = new MqttClientOptionsBuilder()
			.WithTcpServer(options.MqttServerName)
			.WithCredentials(options.MqttApplicationID, options.MqttAccessKey)
			.WithClientId(options.MqttClientID)
			.WithTls()
			.Build();

		mqttClient.UseDisconnectedHandler(new MqttClientDisconnectedHandlerDelegate(e => MqttClientDisconnected(e)));

		await mqttClient.ConnectAsync(mqttOptions);

		// Prepare the HTTP client to be used in the TTN device enumeration
		using (HttpClient httpClient = new HttpClient())
		{
			EndDeviceRegistryClient endDeviceRegistryClient = new EndDeviceRegistryClient(options.ApiBaseUrl, httpClient)
			{
				ApiKey = options.ApiKey
			};

			// Retrieve list of devices page by page
			V3EndDevices endDevices = await endDeviceRegistryClient.ListAsync(
				options.ApiApplicationID, 
				field_mask_paths: DevicefieldMaskPaths, 
				limit: options.DevicePageSize);
			if ((endDevices != null) && (endDevices.End_devices != null)) // If no devices returns null rather than empty list
			{
				foreach (V3EndDevice endDevice in endDevices.End_devices)
				{
					// Display the device info+attributes then connect device to Azure IoT Hub
#if DEVICE_FIELDS_MINIMUM
					Console.WriteLine($"EndDevice ID: {endDevice.Ids.Device_id}");
#else
					Console.WriteLine($"Device ID: {endDevice.Ids.Device_id} Name: {endDevice.Name} Description: {endDevice.Description}");
					Console.WriteLine($"  CreatedAt: {endDevice.Created_at:dd-MM-yy HH:mm:ss} UpdatedAt: {endDevice.Updated_at:dd-MM-yy HH:mm:ss}");
#endif

#if DEVICE_ATTRIBUTES_DISPLAY
					if (endDevice.Attributes != null)
					{
						Console.WriteLine("  EndDevice attributes");

						foreach (KeyValuePair<string, string> attribute in endDevice.Attributes)
						{
							Console.WriteLine($"    Key: {attribute.Key} Value: {attribute.Value}");
						}
					}
#endif
					try
					{
						DeviceClient deviceClient = DeviceClient.CreateFromConnectionString(
							options.AzureIoTHubconnectionString, 
							endDevice.Ids.Device_id, 
							TransportType.Amqp_Tcp_Only);

						await deviceClient.OpenAsync();

						await deviceClient.SetReceiveMessageHandlerAsync(
							AzureIoTHubClientReceiveMessageHandler,
							new AzureIoTHubReceiveMessageHandlerContext()
							{
								DeviceId = endDevice.Ids.Device_id,
								ApplicationId = endDevice.Ids.Application_ids.Application_id,
							});

						DeviceClients.Add(endDevice.Ids.Device_id, deviceClient, cacheItemPolicy);
					}
					catch( Exception ex)
					{
						Console.WriteLine($"Azure IoT Hub OpenAsync failed {ex.Message}");
					}
				}
			}
		}

		// At this point all the AzureIoT Hub deviceClients setup and ready to go so can enable MQTT receive
		mqttClient.UseApplicationMessageReceivedHandler(new MqttApplicationMessageReceivedHandlerDelegate(e => MqttClientApplicationMessageReceived(e)));

		// This may shift to individual device subscriptions
		string uplinktopic = $"v3/{options.MqttApplicationID}/devices/+/up";

		await mqttClient.SubscribeAsync(uplinktopic, MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce);
	}
	catch(Exception ex)
	{
		Console.WriteLine($"Main {ex.Message}");
		Console.WriteLine("Press any key to exit");
		Console.ReadLine();
		return;
	}

	while (!Console.KeyAvailable)
	{
		Console.Write(".");
		await Task.Delay(1000);
	}

	// Consider ways to mop up connections

	Console.WriteLine("Press any key to exit");
	Console.ReadLine();
}

When I was initially looking at Azure Deviceclient I would of had to have created a thread (which would have been blocked most of the time) for each device. This implementation issued was removed by the introduction of the DeviceClient SetReceiveMessageHandlerAsync method in release 1.33.0.

Currently the application just displays the Cloud to Device(C2D) message payload plus diagnostic information, and the CompleteAsync method is called so the message is dequeued.

private async static Task AzureIoTHubClientReceiveMessageHandler(Message message, object userContext)
{
	AzureIoTHubReceiveMessageHandlerContext receiveMessageHandlerConext = (AzureIoTHubReceiveMessageHandlerContext)userContext;

	DeviceClient deviceClient = (DeviceClient)DeviceClients.Get(receiveMessageHandlerConext.DeviceId);

	using (message)
	{
		Console.WriteLine();
		Console.WriteLine();
		Console.WriteLine($"{DateTime.UtcNow:HH:mm:ss} Azure IoT Hub downlink message");
		Console.WriteLine($" ApplicationID: {receiveMessageHandlerConext.ApplicationId}");
		Console.WriteLine($" DeviceID: {receiveMessageHandlerConext.DeviceId}");
#if DIAGNOSTICS_AZURE_IOT_HUB
		Console.WriteLine($" Cached: {DeviceClients.Contains(receiveMessageHandlerConext.DeviceId)}");
		Console.WriteLine($" MessageID: {message.MessageId}");
		Console.WriteLine($" DeliveryCount: {message.DeliveryCount}");
		Console.WriteLine($" EnqueuedTimeUtc: {message.EnqueuedTimeUtc}");
		Console.WriteLine($" SequenceNumber: {message.SequenceNumber}");
		Console.WriteLine($" To: {message.To}");
#endif
		string messageBody = Encoding.UTF8.GetString(message.GetBytes());
		Console.WriteLine($" Body: {messageBody}");
#if DOWNLINK_MESSAGE_PROPERTIES_DISPLAY
		foreach (var property in message.Properties)
		{
			Console.WriteLine($"   Key:{property.Key} Value:{property.Value}");
		}
#endif

		await deviceClient.CompleteAsync(message);

		Console.WriteLine();
	}
}

Currently the application just displays the Cloud to Device(D2C) message payload plus diagnostic information, displaying the payload fields if the message format has been configured and successfully processed.

private static void MqttClientApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e)
{
	if (e.ApplicationMessage.Topic.EndsWith("/up"))
	{
		PayloadUplinkV3 payload = JsonConvert.DeserializeObject<PayloadUplinkV3>(e.ApplicationMessage.ConvertPayloadToString());

		Console.WriteLine();
		Console.WriteLine();
		Console.WriteLine($"{DateTime.UtcNow:HH:mm:ss} TTN Uplink message");
#if DIAGNOSTICS_MQTT
		Console.WriteLine($" ClientId:{e.ClientId} Topic:{e.ApplicationMessage.Topic}");
		Console.WriteLine($" Cached: {DeviceClients.Contains(payload.EndDeviceIds.DeviceId)}");
#endif
		Console.WriteLine($" ApplicationID: {payload.EndDeviceIds.ApplicationIds.ApplicationId}");
		Console.WriteLine($" DeviceID: {payload.EndDeviceIds.DeviceId}");
		Console.WriteLine($" Port: {payload.UplinkMessage.Port} ");
		Console.WriteLine($" Payload raw: {payload.UplinkMessage.PayloadRaw}");

		if (payload.UplinkMessage.PayloadDecoded != null)
		{
			Console.WriteLine($" Payload decoded: {payload.UplinkMessage.PayloadRaw}");
			EnumerateChildren(1, payload.UplinkMessage.PayloadDecoded);
		}

		Console.WriteLine();
	}
	else
	{
		Console.WriteLine($"{DateTime.UtcNow:HH:mm:ss} ClientId: {e.ClientId} Topic: {e.ApplicationMessage.Topic}");
	}
}
dotNet Core Console application displaying simulated uplink and downlink messages.
Simulating C2D messages with AzureIoTExplorer
Simulating D2C messages with TTN Device console

In the MQTT received message handler.

Console.WriteLine($" Cached: {DeviceClients.Contains(receiveMessageHandlerConext.DeviceId)}");

and Azure DeviceClient received message handler.

Console.WriteLine($" Cached: {DeviceClients.Contains(receiveMessageHandlerConext.DeviceId)}");

check that the specified TTN device ID is in the DeviceClients ObjectCache

Cayenne Low Power Payload (LPP) Encoder

I originally started building my own Cayenne Low Power Protocol(LPP) encoder because I could only find one other Github repository with a C# implementation. There hadn’t been any updates for a while and I wasn’t confident that I could make the code work on my nanoFramework and TinyCLR devices.

I started with the sample Mbed C code and did a largely mechanical conversion to C#. I then revisited some of the mathematics where floating point values were converted to an integer.

The original C++ code (understandably) had some language specific approaches which didn’t map well into C#

uint8_t CayenneLPP::addTemperature(uint8_t channel, float celsius) {
    if ((cursor + LPP_TEMPERATURE_SIZE) > maxsize) {
        return 0;
    }
    int16_t val = celsius * 10;
    buffer[cursor++] = channel; 
    buffer[cursor++] = LPP_TEMPERATURE; 
    buffer[cursor++] = val >> 8; 
    buffer[cursor++] = val; 

    return cursor;
}

I then translated this code to C#

public void TemperatureAdd(byte channel, float celsius)
{
   if ((index + TemperatureSize) > buffer.Length)
   {
      throw new ApplicationException("TemperatureAdd insufficent buffer capacity");
   }

   short val = (short)(celsius * 10);

   buffer[index++] = channel;
   buffer[index++] = (byte)DataType.Temperature;
   buffer[index++] = (byte)(val >> 8);
   buffer[index++] = (byte)val;
}

One of my sensors was sending values with more decimal places than LPP supported and I noticed the value was not getting rounded e.g. 2.99 ->2.9 not 3.0 etc. So I revised my implementation to use Math.Round (which is supported by the nanoFramework and TinyCLR).

public void DigitalInputAdd(byte channel, bool value)
{
   #region Guard conditions
   if ((channel < Constants.ChannelMinimum) || (channel > Constants.ChannelMaximum))
   {
      throw new ArgumentException($"channel must be between {Constants.ChannelMinimum} and {Constants.ChannelMaximum}", "channel");
   }

   if ((index + Constants.DigitalInputSize) > buffer.Length)
   {
      throw new ApplicationException($"Datatype DigitalInput insufficent buffer capacity, {buffer.Length - index} bytes available");
   }
   #endregion

   buffer[index++] = channel;
   buffer[index++] = (byte)Enumerations.DataType.DigitalInput;

   // I know this is fugly but it works on all platforms
   if (value)
   {
      buffer[index++] = 1;
   }
   else
   {
     buffer[index++] = 0;
   }
 }

I then extracted out the channel and buffer size validation but I’m not certain this makes the code anymore readable/understandable

public void DigitalInputAdd(byte channel, bool value)
{
   IsChannelNumberValid(channel);
   IsBufferSizeSufficient(Enumerations.DataType.DigitalInput);

   buffer[index++] = channel;
   buffer[index++] = (byte)Enumerations.DataType.DigitalInput;

   // I know this is fugly but it works on all platforms
   if (value)
   {
      buffer[index++] = 1;
   }
   else
   {
      buffer[index++] = 0;
   }
}

The code runs on netCore, nanoFramework, and TinyCLRV2 just needs a few more unit tests and it will be ready for production. I started with an LPP encoder which I needed for one of my applications. I’m also working an approach for a decoder which will run on all my target platforms with minimal modification or compile time directives.

The Things Network V2 MQTT SQL Connector

This code was written to solve a problem I had debugging and testing an application which processed data from sensors attached to The Things Network(TTN) and I figured others might find it useful.

As part of my series of TTN projects I wanted to verify that the data from a number of LoRaWAN sensors connected to TTN was reasonable and complete. I’m familiar with Microsoft SQL Server so I built a .Net Core console application which uses the TTN Message Queue Telemetry Transport(MQTT) Data API (so it can run alongside my existing TTN integration) to receive messages from the all devices in a TTN application and store them in a database for post processing.

The console application uses MQTTNet to connect to TTN MQTT Data API. It subscribes to an application device uplink topic, then uses a combination of Stackoverflow Dapper with Microsoft SQL Server tables and stored procedures to store the device data points. I re-generated the classes I had used in my other projects, added any obvious missing fields and fine tuned the data types by delving into the TTN V2 GO code.

The core of the application is in the MQTTNet application message received handler.

private static void MqttClient_ApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e)
{
   PayloadUplinkV2 payload;

   log.InfoFormat($"Receive Start Topic:{e.ApplicationMessage.Topic}");

   string connectionString = configuration.GetSection("TTNDatabase").Value;

   try
   {
      payload = JsonConvert.DeserializeObject<PayloadUplinkV2>(e.ApplicationMessage.ConvertPayloadToString());
   }
   catch (Exception ex)
   {
      log.Error("DeserializeObject failed", ex);
      return;
   }

   try
   {
      if (payload.PayloadFields != null)
      {
         var parameters = new DynamicParameters();

         EnumerateChildren(parameters, payload.PayloadFields);

         log.Debug($"Parameters:{parameters.ParameterNames.Aggregate((i, j) => i + ',' + j)}");

         foreach (string storedProcedure in storedProcedureMappings.Keys)
         {
            if (Enumerable.SequenceEqual(parameters.ParameterNames, storedProcedureMappings[storedProcedure].Split(',', StringSplitOptions.RemoveEmptyEntries), StringComparer.InvariantCultureIgnoreCase))
            {
               log.Info($"Payload fields processing with:{storedProcedure}");

               using (SqlConnection db = new SqlConnection(connectionString))
               {
                  parameters.Add("@ReceivedAtUtc", payload.Metadata.ReceivedAtUtc);
                  parameters.Add("@DeviceID", payload.DeviceId);
                  parameters.Add("@DeviceEui", payload.DeviceEui);
                  parameters.Add("@ApplicationID", payload.ApplicationId);
                  parameters.Add("@IsConfirmed", payload.IsConfirmed);
                  parameters.Add("@IsRetry", payload.IsRetry);
                  parameters.Add("@Port", payload.Port);

                  db.Execute(sql: storedProcedure, param: parameters, commandType: CommandType.StoredProcedure);
               }
            }
         }
      }
      else
      {
         foreach (string storedProcedure in storedProcedureMappings.Keys)
         {
            if (string.Compare(storedProcedureMappings[storedProcedure], "payload_raw", true) == 0)
            {
               log.Info($"Payload raw processing with:{storedProcedure}");

               using (SqlConnection db = new SqlConnection(connectionString))
               {
                  var parameters = new DynamicParameters();

                  parameters.Add("@ReceivedAtUtc", payload.Metadata.ReceivedAtUtc);
                  parameters.Add("@DeviceID", payload.DeviceId);
                  parameters.Add("@DeviceEui", payload.DeviceEui);
                  parameters.Add("@ApplicationID", payload.ApplicationId);
                  parameters.Add("@IsConfirmed", payload.IsConfirmed);
                  parameters.Add("@IsRetry", payload.IsRetry);
                  parameters.Add("@Port", payload.Port);
                  parameters.Add("@Payload", payload.PayloadRaw);

                  db.Execute(sql: storedProcedure, param: parameters, commandType: CommandType.StoredProcedure);
               }
            }
         }
      }
   }
   catch (Exception ex)
   {
      log.Error("Message processing failed", ex);
   }
}

For messages with payload fields the code attempts to match the list of field names (there maybe more than one match) with the parameter list for stored procedures in the AppSettings.json file. The Enumerable.SequenceEqual uses a case insensitive comparison but order is important. I did consider sorting the two lists of parameters but wasn’t certain the added complexity was worth it.

{
   "TTNDatabase": "Server=DESKTOP-1234567;Initial Catalog=Rak7200TrackerTest;Persist Security Info=False;User ID=TopSecret;Password=TopSecret;Connection Timeout=30",
   "MqttServer": "eu.thethings.network",
   "MqttPassword": "ttn-account-TopSecret",
   "ApplicationId": "rak811wisnodetest",
   "MqttClientId": "TTNSQLClient",
   "StoredProcedureMappings": {
      "EnvironmentalSensorProcess": "relative_humidity_0,temperature_0",
      "PayloadRawProcess": "payload_raw",
      "WeatherSensorProcess": "barometric_pressure_0,temperature_0",
      "PositionReportProcess": "accelerometer_3x,accelerometer_3y,accelerometer_3z,analog_in_10,analog_in_11,analog_in_8,analog_in_9,gps_1altitude,gps_1latitude,gps_1longitude,gyrometer_5x,gyrometer_5y,gyrometer_5z"
   }
}

To reduce the scope for mistakes (especially with longer parameter lists) I usually copy them from the Log4Net RollingFileAppender file or ManagedColoredConsoleAppender console output.

Environmental sensor output with flat data format

I created a database table to store the temperature and humidity values.

CREATE TABLE [dbo].[EnvironmentalSensorReport](
	[WeatherSensorReportUID] [UNIQUEIDENTIFIER] NOT NULL,
	[ReceivedAtUtC] [DATETIME] NOT NULL,
	[DeviceID] [NVARCHAR](32) NOT NULL,
	[DeviceEui] [NVARCHAR](32) NOT NULL,
	[ApplicationID] [NVARCHAR](32) NOT NULL,
	[IsConfirmed] [BIT] NOT NULL,
	[IsRetry] [BIT] NOT NULL,
	[Port] [SMALLINT] NOT NULL,
	[Temperature] [FLOAT] NOT NULL,
	[Humidity] [FLOAT] NOT NULL,
CONSTRAINT [PK_EnvironmentalSensorReport] PRIMARY KEY CLUSTERED 
(
	[WeatherSensorReportUID] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]
GO

ALTER TABLE [dbo].[EnvironmentalSensorReport] ADD  CONSTRAINT [DF_EnvironmentalSensorReport_EnvironmentalSensorReporttUID]  DEFAULT (NEWID()) FOR [WeatherSensorReportUID]
GO

The stored procedure must have the parameters @ReceivedAtUtc, @DeviceID, @DeviceEui, @ApplicationID, @IsRetry, @IsConfirmed and @Port. In this example the payload specific fields generated by the Cayenne Low Power Protocol(LPP) decoder are @Temperature_0 and @relative_humidity_0

CREATE PROCEDURE [dbo].[EnvironmentalSensorProcess]
   @ReceivedAtUtc AS DATETIME,
   @DeviceID AS NVARCHAR(32),
   @DeviceEui AS NVARCHAR(32),
   @ApplicationID AS NVARCHAR(32),
   @IsRetry AS BIT,
   @IsConfirmed AS BIT,
   @Port AS SMALLINT,
   @Temperature_0 AS FLOAT,
   @relative_humidity_0 AS FLOAT
AS
BEGIN
   SET NOCOUNT ON;
 
   INSERT INTO [dbo].[EnvironmentalSensorReport]
           ([PositionReportUID]
	   .[ReceivedAtUtc]
           ,[DeviceID]
           ,[DeviceEui]
           ,[ApplicationID]
           ,[IsConfirmed]
           ,[IsRetry]
           ,[Port]
	   ,Temperature
	   ,Humidity)
   VALUES
   (
      @ReceivedAtUtc,
      @DeviceID,
      @DeviceEui,
      @ApplicationID,
      @IsConfirmed,
      @IsRetry,
      @port,
      @Temperature_0,
      @relative_humidity_0)
END
Environmental sensor data displayed in SQL Server Management Studio(SSMS)

To store more complex nest payload fields (e.g. latitude, longitude and altitude values), I flattened the the hierarchy.

private static void EnumerateChildren(DynamicParameters parameters, JToken token, string prefix ="")
{
   if (token is JProperty)
      if (token.First is JValue)
      {
         JProperty property = (JProperty)token;
         parameters.Add($"@{prefix}{property.Name}", property.Value.ToString());
      }
      else
      {
         JProperty property = (JProperty)token;
         prefix += property.Name;
      }

   foreach (JToken token2 in token.Children())
   {
      EnumerateChildren(parameters,token2, prefix);
   }
}
Unpacked LPP payload from GPS tracker displayed in TTN application data view
Flattened location, acceleration and rotation information
CREATE TABLE [dbo].[PositionReport](
      [PositionReportUID] [UNIQUEIDENTIFIER] NOT NULL,
      [ReceivedAtUtC] [DATETIME] NOT NULL,
      [DeviceID] [NVARCHAR](32) NOT NULL,
      [DeviceEui] [NVARCHAR](32) NOT NULL,
      [ApplicationID] [NVARCHAR](32) NOT NULL,
      [IsConfirmed] [BIT] NOT NULL,
      [IsRetry] [BIT] NOT NULL,
      [Port] [SMALLINT] NOT NULL,
      [Latitude] [FLOAT] NOT NULL,
      [Longitude] [FLOAT] NOT NULL,
      [Altitude] [FLOAT] NOT NULL,
 CONSTRAINT [PK_PositionReport] PRIMARY KEY CLUSTERED 
(
	[PositionReportUID] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]
GO

I created a database table to store values of only the fields I cared about.

CREATE PROCEDURE [dbo].[PositionReportProcess]
      @ReceivedAtUtc AS DATETIME,
      @DeviceID AS NVARCHAR(32),
      @DeviceEui AS NVARCHAR(32),
      @ApplicationID AS NVARCHAR(32),
      @IsRetry AS Bit,
      @IsConfirmed AS BIT,
      @Port AS SMALLINT,
      @accelerometer_3x AS FLOAT,
      @accelerometer_3y AS FLOAT,
      @accelerometer_3z AS FLOAT,
      @analog_in_8 AS FLOAT,
      @analog_in_9 AS FLOAT,
      @analog_in_10 AS FLOAT,
      @analog_in_11 AS FLOAT,
      @gps_1Latitude AS FLOAT,
      @gps_1Longitude AS FLOAT,
      @gps_1Altitude AS FLOAT,
      @gyrometer_5x  AS FLOAT, 
      @gyrometer_5y  AS FLOAT, 
      @gyrometer_5z  AS FLOAT 
AS
BEGIN
   SET NOCOUNT ON;

   INSERT INTO [dbo].[PositionReport]
      ([PositionReportUID]
      .[ReceivedAtUtc]
      ,[DeviceID]
      ,[DeviceEui]
      ,[ApplicationID]
      ,[IsConfirmed]
      ,[IsRetry]
      ,[Port]
      ,Latitude
      ,Longitude
      ,Altitude)
   VALUES
   (
      @ReceivedAtUtc,
      @DeviceID,
      @DeviceEui,
      @ApplicationID,
      @IsConfirmed,
      @IsRetry,
      @port,
      @gps_1Latitude,
      @gps_1Longitude,
      @gps_1Altitude)
END

The stored procedure for storing the GPS tracker payload has to have parameters matching each payload field but some of the fields are not used.

Location data displayed in SQL Server Management Studio(SSMS)

For uplink messages with no payload fields the message processor looks for a stored procedure with a single parameter called “payload_raw”.(there maybe more than one match)

CREATE TABLE [dbo].[PayloadReport](
      [PayloadReportUID] [UNIQUEIDENTIFIER] NOT NULL,
      [ReceivedAtUtC] [DATETIME] NOT NULL,
      [DeviceID] [NVARCHAR](32) NOT NULL,
      [DeviceEui] [NVARCHAR](32) NOT NULL,
      [ApplicationID] [NVARCHAR](32) NOT NULL,
      [IsConfirmed] [BIT] NOT NULL,
      [IsRetry] [BIT] NOT NULL,
      [Port] [SMALLINT] NOT NULL,
      [Payload] [NVARCHAR](128) NOT NULL,
CONSTRAINT [PK_PayloadReport] PRIMARY KEY CLUSTERED 
(
      [PayloadReportUID] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]
GO

ALTER TABLE [dbo].[PayloadReport] ADD  CONSTRAINT [DF_PayloadReport_PositionReportUID]  DEFAULT (NEWID()) FOR [PayloadReportUID]
GO
ALTER PROCEDURE [dbo].[PayloadRawProcess]
      @ReceivedAtUtc AS DATETIME,
      @DeviceID AS NVARCHAR(32),
      @DeviceEui AS NVARCHAR(32),
      @ApplicationID AS NVARCHAR(32),
      @IsRetry AS Bit,
      @IsConfirmed AS BIT,
      @Port AS SMALLINT,
      @Payload AS NVARCHAR(128)
AS
BEGIN
      SET NOCOUNT ON;

      INSERT INTO [dbo].[PayloadReport]
         ([PositionReportUID]
         .[ReceivedAtUtc]
         ,[DeviceID]
         ,[DeviceEui]
         ,[ApplicationID]
         ,[IsConfirmed]
         ,[IsRetry]
         ,[Port]
         ,[Payload])
     VALUES(@ReceivedAtUtc,
         @DeviceID,
         @DeviceEui,
         @ApplicationID,
         @IsConfirmed,
         @IsRetry,
         @port,
         @Payload)
END
Raw payload data displayed in SQL Server Management Studio(SSMS)

Initially the application just used Console.Writeline for logging, then I added Log4Net because it would be useful to persist information about failures and so I could copy n paste parameter lists to the appSettings.json file.

To make the application more robust adding a retries with the Enterprise Library Transient Fault Handling and Configuration blocks or Polly on the Dapper Execute would be a good idea. It also would take much work to get the application to run in Microsoft Azure as a “headless” webapp.

Dapper supports a number of database platforms so in theory this application (with a little bit of effort) should be platform portable.