The Things Network MQTT & Azure IoT Part2

Uplink with decoded_payload & frm_payload

The next functionality added to my Proof of Concept(PoC) Azure IoT Hub, The Things Network(TTN) V3 Hypertext Transfer Protocol(HTTP) client API Integration, and Message Queue Telemetry Transport (MQTT) Data API Integration is sending of raw and decoded uplink messages to an Azure IoT Hub.

// At this point all the AzureIoT Hub deviceClients setup and ready to go so can enable MQTT receive
mqttClient.UseApplicationMessageReceivedHandler(new MqttApplicationMessageReceivedHandlerDelegate(e => MqttClientApplicationMessageReceived(e)));

// This may shift to individual device subscriptions
string uplinkTopic = $"v3/{options.MqttApplicationID}/devices/+/up";
await mqttClient.SubscribeAsync(uplinkTopic, MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce);

//string queuedTopic = $"v3/{options.MqttApplicationID}/devices/+/queued";
//await mqttClient.SubscribeAsync(queuedTopic, MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce);

The additional commented out subscriptions are for the processing of downlink messages

The MQTTNet received message handler uses the last segment of the topic to route messages to a method for processing

private static async void MqttClientApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e)
{
	if (e.ApplicationMessage.Topic.EndsWith("/up", StringComparison.InvariantCultureIgnoreCase))
	{
		await UplinkMessageReceived(e);
	}

	/*
	if (e.ApplicationMessage.Topic.EndsWith("/queued", StringComparison.InvariantCultureIgnoreCase))
	{
		await DownlinkMessageQueued(e);
	}
	...			
	*/
}

The UplinkMessageReceived method deserialises the message payload, retrieves device context information from the local ObjectCache, adds relevant uplink messages fields (including the raw payload), then if the message has been unpacked by a TTN Decoder, the message fields are added as well.

static async Task UplinkMessageReceived(MqttApplicationMessageReceivedEventArgs e)
{
	try
	{
		PayloadUplinkV3 payload = JsonConvert.DeserializeObject<PayloadUplinkV3>(e.ApplicationMessage.ConvertPayloadToString());
		string applicationId = payload.EndDeviceIds.ApplicationIds.ApplicationId;
		string deviceId = payload.EndDeviceIds.DeviceId;
		int port = payload.UplinkMessage.Port;
...
		DeviceClient deviceClient = (DeviceClient)DeviceClients.Get(deviceId);
		if (deviceClient == null)
		{
			Console.WriteLine($" UplinkMessageReceived unknown DeviceID: {deviceId}");
			return;
		}

		JObject telemetryEvent = new JObject();
		telemetryEvent.Add("DeviceID", deviceId);
		telemetryEvent.Add("ApplicationID", applicationId);
		telemetryEvent.Add("Port", port);
		telemetryEvent.Add("PayloadRaw", payload.UplinkMessage.PayloadRaw);

		// If the payload has been unpacked in TTN backend add fields to telemetry event payload
		if (payload.UplinkMessage.PayloadDecoded != null)
		{
			EnumerateChildren(telemetryEvent, payload.UplinkMessage.PayloadDecoded);
		}

		// Send the message to Azure IoT Hub/Azure IoT Central
		using (Message ioTHubmessage = new Message(Encoding.ASCII.GetBytes(JsonConvert.SerializeObject(telemetryEvent))))
		{
			// Ensure the displayed time is the acquired time rather than the uploaded time. 
			//ioTHubmessage.Properties.Add("iothub-creation-time-utc", payloadObject.Metadata.ReceivedAtUtc.ToString("s", CultureInfo.InvariantCulture));
			ioTHubmessage.Properties.Add("ApplicationId", applicationId);
			ioTHubmessage.Properties.Add("DeviceId", deviceId);
			ioTHubmessage.Properties.Add("port", port.ToString());

			await deviceClient.SendEventAsync(ioTHubmessage);
		}
	}
	catch( Exception ex)
	{
		Debug.WriteLine("UplinkMessageReceived failed: {0}", ex.Message);
	}
}

private static void EnumerateChildren(JObject jobject, JToken token)
{
	if (token is JProperty property)
	{
		if (token.First is JValue)
		{
			// Temporary dirty hack for Azure IoT Central compatibility
			if (token.Parent is JObject possibleGpsProperty)
			{
				if (possibleGpsProperty.Path.StartsWith("GPS_", StringComparison.OrdinalIgnoreCase))
				{
					if (string.Compare(property.Name, "Latitude", true) == 0)
					{
						jobject.Add("lat", property.Value);
					}
					if (string.Compare(property.Name, "Longitude", true) == 0)
					{
						jobject.Add("lon", property.Value);
					}
					if (string.Compare(property.Name, "Altitude", true) == 0)
					{
						jobject.Add("alt", property.Value);
					}
				}
			}
			jobject.Add(property.Name, property.Value);
		}
		else
		{
			JObject parentObject = new JObject();
			foreach (JToken token2 in token.Children())
			{
				EnumerateChildren(parentObject, token2);
				jobject.Add(property.Name, parentObject);
			}
		}
	}
	else
	{
		foreach (JToken token2 in token.Children())
		{
			EnumerateChildren(jobject, token2);
		}
	}
}

There is also some basic reformatting of the messages for Azure IoT Central

TTN Simulate uplink message with GPS location payload.
Nasty console application processing uplink message
Message from LoRaWAN device displayed in Azure IoT Explorer

Currently the code has a lots of diagnostic Console.Writeline statements, doesn’t support Uplink messages, has no Advanced Message Queuing Protocol(AMQP) client connection pooling, can’t run as an Azure Webjob, and a number of other features which I plan on adding in future blog posts.

MQTTnet Azure Function Binding

It Looks promising

I’m using MQTTnet to build my The Things Industries client and it looked like the amount of code for my Message Queue Telemetry Transport (MQTT) Data API Integration could be reduced by using the AzureFunction MQTT Binding by Kees Schollaart.

I used The Things Industries simulate uplink functionality for my initial testing

TTI uplink message simulator

The first version of the Azure function code proof of concept(PoC) was very compact

namespace MQTTnetAzureFunction
{
   using System;
   using System.Text;
   using Microsoft.Azure.WebJobs;
   using Microsoft.Extensions.Logging;

   using CaseOnline.Azure.WebJobs.Extensions.Mqtt;
   using CaseOnline.Azure.WebJobs.Extensions.Mqtt.Messaging;
   using CaseOnline.Azure.WebJobs.Extensions.Mqtt.Config;
   using CaseOnline.Azure.WebJobs.Extensions.Mqtt.Bindings;

   using MQTTnet.Client.Options;
   using MQTTnet.Extensions.ManagedClient;

   public static class Subscribe
   {
      [FunctionName("UplinkMessageProcessor")]
      public static void UplinkMessageProcessor(
            [MqttTrigger("v3/application123456789012345/devices/+/up", ConnectionString = "TTNMQTTConnectionString")] IMqttMessage message,
IMqttMessage message,
            ILogger log)
      {
         var body = Encoding.UTF8.GetString(message.GetMessage());

         log.LogInformation($"Advanced: message from topic {message.Topic} \nbody: {body}");
      }
   }
}

I configured the TTNMQTTConnectionString in the application’s local.settings.json file

{
    "IsEncrypted": false,
   "Values": {
      "AzureWebJobsStorage": "DefaultEndpointsProtocol=https;AccountName=...",
      "AzureWebJobsDashboard": "DefaultEndpointsProtocol=https;AccountName=...",
      "FUNCTIONS_WORKER_RUNTIME": "dotnet",
      "TTNMQTTConnectionString": "Server=...;Username=application1@...;Password=...",
   }
}

This was a good start but I need to be able to configure the MQTT topic for deployments.

After looking at the binding source code plus some trial and error based on the AdvancedConfiguration sample I have a nasty PoC

public static class Subscribe
{
   [FunctionName("UplinkMessageProcessor")]
   public static void UplinkMessageProcessor(
         [MqttTrigger(typeof(ExampleMqttConfigProvider), "v3/%TopicName%/devices/+/up")] IMqttMessage message,
         ILogger log)
   {
      var body = Encoding.UTF8.GetString(message.GetMessage());

      log.LogInformation($"Advanced: message from topic {message.Topic} \nbody: {body}");
   }
}

public class MqttConfigExample : CustomMqttConfig
{
  public override IManagedMqttClientOptions Options { get; }

  public override string Name { get; }

  public MqttConfigExample(string name, IManagedMqttClientOptions options)
  {
     Options = options;
     Name = name;
   }
}

public class ExampleMqttConfigProvider : ICreateMqttConfig
{
   public CustomMqttConfig Create(INameResolver nameResolver, ILogger logger)
   {
      var connectionString = new MqttConnectionString(nameResolver.Resolve("TTNMQTTConnectionString"), "CustomConfiguration");

      var options = new ManagedMqttClientOptionsBuilder()
             .WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
             .WithClientOptions(new MqttClientOptionsBuilder()
                  .WithClientId(connectionString.ClientId.ToString())
                  .WithTcpServer(connectionString.Server, connectionString.Port)
                  .WithCredentials(connectionString.Username, connectionString.Password)
                  .Build())
             .Build();

      return new MqttConfigExample("CustomConnection", options);
   }
}

The TTNMQTTConnectionString and TopicName can be configured in the application’s local.settings.json file

{
    "IsEncrypted": false,
   "Values": {
      "AzureWebJobsStorage": "DefaultEndpointsProtocol=https;AccountName=...",
      "AzureWebJobsDashboard": "DefaultEndpointsProtocol=https;AccountName=...",
      "FUNCTIONS_WORKER_RUNTIME": "dotnet",
      "TTNMQTTConnectionString": "Server=...;Username=application1@...;Password=...",
      "TopicName": "application1@..."
   }
}

When run in the Azure Functions Core Tools the simulated message properties and payload are displayed

The message “The ‘UplinkMessageProcessor’ function is in error: Unable to configure binding ‘message’ of type ‘mqttTrigger’. This may indicate invalid function.json properties. Can’t figure out which ctor to call.” needs further investigation.

The Things Network MQTT & Azure IoT Part1

Side by Side

In my last few posts I have built Proof of Concept(PoC) The Things Network(TTN) V3 Hypertext Transfer Protocol(HTTP) API Integration and Message Queue Telemetry Transport (MQTT) Data API Integrations.

While building these PoCs I have learnt a lot about the way that the TTN V3 RESTful and MQTT APIs work and this is the first in a series of posts about linking them together. My plan is to start with yet another .NetCore Console application which hosts both the MQTT and Azure IoT Hub DeviceClient (using the Advanced Message Queueing Protocol(AMQP)) client implementations. I’m using MQTTnet to build my data API client and used NSwag by Richo Suter to generate my RESTful client from the TTN provided swagger file.

In this PoC I’m using the commandlineParser NuGet package to the reduce the amount of code required to process command line parameters and make it more robust. This PoC has a lot of command line parameters which would have been painful to manually parse and validate.

public class CommandLineOptions
{
	[Option('u', "APIbaseURL", Required = false, HelpText = "TTN Restful API URL.")]
	public string ApiBaseUrl { get; set; }

	[Option('K', "APIKey", Required = true, HelpText = "TTN Restful API APIkey")]
	public string ApiKey { get; set; }

	[Option('P', "APIApplicationID", Required = true, HelpText = "TTN Restful API ApplicationID")]
	public string ApiApplicationID { get; set; }

	[Option('D', "DeviceListPageSize", Required = true, HelpText = "The size of the pages used to retrieve EndDevice configuration")]
	public int DevicePageSize { get; set; }

	[Option('S', "MQTTServerName", Required = true, HelpText = "TTN MQTT API server name")]
	public string MqttServerName { get; set; }

	[Option('A', "MQTTAccessKey", Required = true, HelpText = "TTN MQTT API access key")]
	public string MqttAccessKey { get; set; }

	[Option('Q', "MQTTApplicationID", Required = true, HelpText = "TTN MQTT API ApplicationID")]
	public string MqttApplicationID { get; set; }

	[Option('C', "MQTTClientName", Required = true, HelpText = "TTN MQTT API Client ID")]
	public string MqttClientID { get; set; }

	[Option('Z', "AzureIoTHubConnectionString", Required = true, HelpText = "Azure IoT Hub Connection string")]
	public string AzureIoTHubconnectionString { get; set; }
}

To keep things simple in this PoC I’m using an Azure IoT Hub specific (rather than a device specific connection string)

Azure IoT Hub Device shared access policy selection

After some trial and error I found the order of execution was important

  • Open MQTTnet connection to TTN host (but don’t configure any subscriptions)
  • Configure connection to TTN RESTful API
  • Retrieve list of V3EndDevices (paginated), then for each V3EndDevice
    • Open connection to Azure IoT Hub using command line connection string + TTN Device ID
    • Call DeviceClient.SetReceiveMessageHandlerAsync to specify ReceiveMessageCallback and additional context information for processing Azure IoT Hub downlink messages.
    • Store DeviceClient instance in ObjectCache using DeviceID as key
  • Configure the MQTTnet recived message handler
  • Subscribe to uplink messages from all the V3EndDevices in the specified application.
private static async Task ApplicationCore(CommandLineOptions options)
{
	MqttFactory factory = new MqttFactory();
	mqttClient = factory.CreateMqttClient();

#if DIAGNOSTICS
	Console.WriteLine($"baseURL: {options.ApiBaseUrl}");
	Console.WriteLine($"APIKey: {options.ApiKey}");
	Console.WriteLine($"ApplicationID: {options.ApiApplicationID}");
	Console.WriteLine($"AazureIoTHubconnectionString: {options.AzureIoTHubconnectionString}");
	Console.WriteLine();
#endif

	try
	{
		// First configure MQTT, open connection and wire up disconnection handler. 
		// Can't wire up MQTT received handler as at this stage AzureIoTHub devices not connected.
		mqttOptions = new MqttClientOptionsBuilder()
			.WithTcpServer(options.MqttServerName)
			.WithCredentials(options.MqttApplicationID, options.MqttAccessKey)
			.WithClientId(options.MqttClientID)
			.WithTls()
			.Build();

		mqttClient.UseDisconnectedHandler(new MqttClientDisconnectedHandlerDelegate(e => MqttClientDisconnected(e)));

		await mqttClient.ConnectAsync(mqttOptions);

		// Prepare the HTTP client to be used in the TTN device enumeration
		using (HttpClient httpClient = new HttpClient())
		{
			EndDeviceRegistryClient endDeviceRegistryClient = new EndDeviceRegistryClient(options.ApiBaseUrl, httpClient)
			{
				ApiKey = options.ApiKey
			};

			// Retrieve list of devices page by page
			V3EndDevices endDevices = await endDeviceRegistryClient.ListAsync(
				options.ApiApplicationID, 
				field_mask_paths: DevicefieldMaskPaths, 
				limit: options.DevicePageSize);
			if ((endDevices != null) && (endDevices.End_devices != null)) // If no devices returns null rather than empty list
			{
				foreach (V3EndDevice endDevice in endDevices.End_devices)
				{
					// Display the device info+attributes then connect device to Azure IoT Hub
#if DEVICE_FIELDS_MINIMUM
					Console.WriteLine($"EndDevice ID: {endDevice.Ids.Device_id}");
#else
					Console.WriteLine($"Device ID: {endDevice.Ids.Device_id} Name: {endDevice.Name} Description: {endDevice.Description}");
					Console.WriteLine($"  CreatedAt: {endDevice.Created_at:dd-MM-yy HH:mm:ss} UpdatedAt: {endDevice.Updated_at:dd-MM-yy HH:mm:ss}");
#endif

#if DEVICE_ATTRIBUTES_DISPLAY
					if (endDevice.Attributes != null)
					{
						Console.WriteLine("  EndDevice attributes");

						foreach (KeyValuePair<string, string> attribute in endDevice.Attributes)
						{
							Console.WriteLine($"    Key: {attribute.Key} Value: {attribute.Value}");
						}
					}
#endif
					try
					{
						DeviceClient deviceClient = DeviceClient.CreateFromConnectionString(
							options.AzureIoTHubconnectionString, 
							endDevice.Ids.Device_id, 
							TransportType.Amqp_Tcp_Only);

						await deviceClient.OpenAsync();

						await deviceClient.SetReceiveMessageHandlerAsync(
							AzureIoTHubClientReceiveMessageHandler,
							new AzureIoTHubReceiveMessageHandlerContext()
							{
								DeviceId = endDevice.Ids.Device_id,
								ApplicationId = endDevice.Ids.Application_ids.Application_id,
							});

						DeviceClients.Add(endDevice.Ids.Device_id, deviceClient, cacheItemPolicy);
					}
					catch( Exception ex)
					{
						Console.WriteLine($"Azure IoT Hub OpenAsync failed {ex.Message}");
					}
				}
			}
		}

		// At this point all the AzureIoT Hub deviceClients setup and ready to go so can enable MQTT receive
		mqttClient.UseApplicationMessageReceivedHandler(new MqttApplicationMessageReceivedHandlerDelegate(e => MqttClientApplicationMessageReceived(e)));

		// This may shift to individual device subscriptions
		string uplinktopic = $"v3/{options.MqttApplicationID}/devices/+/up";

		await mqttClient.SubscribeAsync(uplinktopic, MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce);
	}
	catch(Exception ex)
	{
		Console.WriteLine($"Main {ex.Message}");
		Console.WriteLine("Press any key to exit");
		Console.ReadLine();
		return;
	}

	while (!Console.KeyAvailable)
	{
		Console.Write(".");
		await Task.Delay(1000);
	}

	// Consider ways to mop up connections

	Console.WriteLine("Press any key to exit");
	Console.ReadLine();
}

When I was initially looking at Azure Deviceclient I would of had to have created a thread (which would have been blocked most of the time) for each device. This implementation issued was removed by the introduction of the DeviceClient SetReceiveMessageHandlerAsync method in release 1.33.0.

Currently the application just displays the Cloud to Device(C2D) message payload plus diagnostic information, and the CompleteAsync method is called so the message is dequeued.

private async static Task AzureIoTHubClientReceiveMessageHandler(Message message, object userContext)
{
	AzureIoTHubReceiveMessageHandlerContext receiveMessageHandlerConext = (AzureIoTHubReceiveMessageHandlerContext)userContext;

	DeviceClient deviceClient = (DeviceClient)DeviceClients.Get(receiveMessageHandlerConext.DeviceId);

	using (message)
	{
		Console.WriteLine();
		Console.WriteLine();
		Console.WriteLine($"{DateTime.UtcNow:HH:mm:ss} Azure IoT Hub downlink message");
		Console.WriteLine($" ApplicationID: {receiveMessageHandlerConext.ApplicationId}");
		Console.WriteLine($" DeviceID: {receiveMessageHandlerConext.DeviceId}");
#if DIAGNOSTICS_AZURE_IOT_HUB
		Console.WriteLine($" Cached: {DeviceClients.Contains(receiveMessageHandlerConext.DeviceId)}");
		Console.WriteLine($" MessageID: {message.MessageId}");
		Console.WriteLine($" DeliveryCount: {message.DeliveryCount}");
		Console.WriteLine($" EnqueuedTimeUtc: {message.EnqueuedTimeUtc}");
		Console.WriteLine($" SequenceNumber: {message.SequenceNumber}");
		Console.WriteLine($" To: {message.To}");
#endif
		string messageBody = Encoding.UTF8.GetString(message.GetBytes());
		Console.WriteLine($" Body: {messageBody}");
#if DOWNLINK_MESSAGE_PROPERTIES_DISPLAY
		foreach (var property in message.Properties)
		{
			Console.WriteLine($"   Key:{property.Key} Value:{property.Value}");
		}
#endif

		await deviceClient.CompleteAsync(message);

		Console.WriteLine();
	}
}

Currently the application just displays the Cloud to Device(D2C) message payload plus diagnostic information, displaying the payload fields if the message format has been configured and successfully processed.

private static void MqttClientApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e)
{
	if (e.ApplicationMessage.Topic.EndsWith("/up"))
	{
		PayloadUplinkV3 payload = JsonConvert.DeserializeObject<PayloadUplinkV3>(e.ApplicationMessage.ConvertPayloadToString());

		Console.WriteLine();
		Console.WriteLine();
		Console.WriteLine($"{DateTime.UtcNow:HH:mm:ss} TTN Uplink message");
#if DIAGNOSTICS_MQTT
		Console.WriteLine($" ClientId:{e.ClientId} Topic:{e.ApplicationMessage.Topic}");
		Console.WriteLine($" Cached: {DeviceClients.Contains(payload.EndDeviceIds.DeviceId)}");
#endif
		Console.WriteLine($" ApplicationID: {payload.EndDeviceIds.ApplicationIds.ApplicationId}");
		Console.WriteLine($" DeviceID: {payload.EndDeviceIds.DeviceId}");
		Console.WriteLine($" Port: {payload.UplinkMessage.Port} ");
		Console.WriteLine($" Payload raw: {payload.UplinkMessage.PayloadRaw}");

		if (payload.UplinkMessage.PayloadDecoded != null)
		{
			Console.WriteLine($" Payload decoded: {payload.UplinkMessage.PayloadRaw}");
			EnumerateChildren(1, payload.UplinkMessage.PayloadDecoded);
		}

		Console.WriteLine();
	}
	else
	{
		Console.WriteLine($"{DateTime.UtcNow:HH:mm:ss} ClientId: {e.ClientId} Topic: {e.ApplicationMessage.Topic}");
	}
}
dotNet Core Console application displaying simulated uplink and downlink messages.
Simulating C2D messages with AzureIoTExplorer
Simulating D2C messages with TTN Device console

In the MQTT received message handler.

Console.WriteLine($" Cached: {DeviceClients.Contains(receiveMessageHandlerConext.DeviceId)}");

and Azure DeviceClient received message handler.

Console.WriteLine($" Cached: {DeviceClients.Contains(receiveMessageHandlerConext.DeviceId)}");

check that the specified TTN device ID is in the DeviceClients ObjectCache

Cayenne Low Power Payload (LPP) Encoder

I originally started building my own Cayenne Low Power Protocol(LPP) encoder because I could only find one other Github repository with a C# implementation. There hadn’t been any updates for a while and I wasn’t confident that I could make the code work on my nanoFramework and TinyCLR devices.

I started with the sample Mbed C code and did a largely mechanical conversion to C#. I then revisited some of the mathematics where floating point values were converted to an integer.

The original C++ code (understandably) had some language specific approaches which didn’t map well into C#

uint8_t CayenneLPP::addTemperature(uint8_t channel, float celsius) {
    if ((cursor + LPP_TEMPERATURE_SIZE) > maxsize) {
        return 0;
    }
    int16_t val = celsius * 10;
    buffer[cursor++] = channel; 
    buffer[cursor++] = LPP_TEMPERATURE; 
    buffer[cursor++] = val >> 8; 
    buffer[cursor++] = val; 

    return cursor;
}

I then translated this code to C#

public void TemperatureAdd(byte channel, float celsius)
{
   if ((index + TemperatureSize) > buffer.Length)
   {
      throw new ApplicationException("TemperatureAdd insufficent buffer capacity");
   }

   short val = (short)(celsius * 10);

   buffer[index++] = channel;
   buffer[index++] = (byte)DataType.Temperature;
   buffer[index++] = (byte)(val >> 8);
   buffer[index++] = (byte)val;
}

One of my sensors was sending values with more decimal places than LPP supported and I noticed the value was not getting rounded e.g. 2.99 ->2.9 not 3.0 etc. So I revised my implementation to use Math.Round (which is supported by the nanoFramework and TinyCLR).

public void DigitalInputAdd(byte channel, bool value)
{
   #region Guard conditions
   if ((channel < Constants.ChannelMinimum) || (channel > Constants.ChannelMaximum))
   {
      throw new ArgumentException($"channel must be between {Constants.ChannelMinimum} and {Constants.ChannelMaximum}", "channel");
   }

   if ((index + Constants.DigitalInputSize) > buffer.Length)
   {
      throw new ApplicationException($"Datatype DigitalInput insufficent buffer capacity, {buffer.Length - index} bytes available");
   }
   #endregion

   buffer[index++] = channel;
   buffer[index++] = (byte)Enumerations.DataType.DigitalInput;

   // I know this is fugly but it works on all platforms
   if (value)
   {
      buffer[index++] = 1;
   }
   else
   {
     buffer[index++] = 0;
   }
 }

I then extracted out the channel and buffer size validation but I’m not certain this makes the code anymore readable/understandable

public void DigitalInputAdd(byte channel, bool value)
{
   IsChannelNumberValid(channel);
   IsBufferSizeSufficient(Enumerations.DataType.DigitalInput);

   buffer[index++] = channel;
   buffer[index++] = (byte)Enumerations.DataType.DigitalInput;

   // I know this is fugly but it works on all platforms
   if (value)
   {
      buffer[index++] = 1;
   }
   else
   {
      buffer[index++] = 0;
   }
}

The code runs on netCore, nanoFramework, and TinyCLRV2 just needs a few more unit tests and it will be ready for production. I started with an LPP encoder which I needed for one of my applications. I’m also working an approach for a decoder which will run on all my target platforms with minimal modification or compile time directives.

floor, ceil, trunc and casting

I left a Wisnode Track Lite RAK7200 outside on the deck for a day and the way the positions “snapped” to a grid caught my attention. Based on the size of my property the grid looked to be roughly 10 x 10 meters

The sample Cayenne Low Power Payload Mbed C code uses a cast which is I think is the same as a floor.

uint8_t CayenneLPP::addGPS(uint8_t channel, float latitude, float longitude, float meters) {
    if ((cursor + LPP_GPS_SIZE) > maxsize) {
        return 0;
    }
    int32_t lat = latitude * 10000;
    int32_t lon = longitude * 10000;
    int32_t alt = meters * 100;
    
    buffer[cursor++] = channel; 
    buffer[cursor++] = LPP_GPS; 
 
    buffer[cursor++] = lat >> 16; 
    buffer[cursor++] = lat >> 8; 
    buffer[cursor++] = lat; 
    buffer[cursor++] = lon >> 16; 
    buffer[cursor++] = lon >> 8; 
    buffer[cursor++] = lon; 
    buffer[cursor++] = alt >> 16; 
    buffer[cursor++] = alt >> 8;
    buffer[cursor++] = alt;
 
    return cursor;
}

“These functions round x downwards to the nearest integer, returning that value as a double. Thus, floor (1.5) is 1.0 and floor (-1.5) is -2.0.”

In the C code the latitude and longitude values are truncated to four decimal places and the altitude to two decimal places. In my C# code I used Math.Round and I wondered what impact that could have…

public void GpsLocationAdd(byte channel, float latitude, float longitude, float altitude)
{
   IsChannelNumberValid(channel);
   IsBfferSizeSufficient(Enumerations.DataType.Gps);

   if ((latitude < Constants.LatitudeMinimum ) || (latitude > Constants.LatitudeMaximum))
   {
      throw new ArgumentException($"Latitude must be between {Constants.LatitudeMinimum} and {Constants.LatitudeMaximum}", "latitude");
   }

   if ((latitude < Constants.LongitudeMinimum) || (latitude > Constants.LongitudeMaximum))
   {
      throw new ArgumentException($"Longitude must be between {Constants.LongitudeMinimum} and {Constants.LongitudeMaximum}", "latitude");
   }

   if ((altitude < Constants.AltitudeMinimum) || (altitude > Constants.AltitudeMaximum))
   {
      throw new ArgumentException($"Altitude must be between {Constants.AltitudeMinimum} and {Constants.AltitudeMaximum}", "altitude");
   }

   int lat = (int)Math.Round(latitude * 10000.0f);
   int lon = (int)Math.Round(longitude * 10000.0f);
   int alt = (int)Math.Round(altitude * 100.0f);

   buffer[index++] = channel;
   buffer[index++] = (byte)Enumerations.DataType.Gps;

   buffer[index++] = (byte)(lat >> 16);
   buffer[index++] = (byte)(lat >> 8);
   buffer[index++] = (byte)lat;
   buffer[index++] = (byte)(lon >> 16);
   buffer[index++] = (byte)(lon >> 8);
   buffer[index++] = (byte)lon;
   buffer[index++] = (byte)(alt >> 16);
   buffer[index++] = (byte)(alt >> 8);
   buffer[index++] = (byte)alt;
}

Using the WGS84 World Geodetic System Distance Calculator to calculate the distance where the Greenwich Meridian and the Equator cross off the coast of Ghana the theoretical maximum error is 15.69m.

I live in Christchurch New Zealand and the theoretical maximum distance is 13.6 m. So, in summary the LPP latitude and longitude values are most probably fine for tracking applications.

MATH131 Numerical methods was useful

Back in 1986 in my second first year at the University of Canterbury I did “MATH131 Numerical Methods” which was a year of looking at why mathematics in FORTRAN, C, and Pascal sometimes didn’t return the result you were expecting…

While testing my GHI Electronics TinyCLR2 RAK Wireless RAK811 LoRaWAN client I noticed the temperature numbers didn’t quite match…

Visual Studio 2019 Debug output window
The Things Network Device Application Data tab

I have implemented my own Cayenne Low Power Payload encoder in C# based on the sample Mbed C code

uint8_t CayenneLPP::addTemperature(uint8_t channel, float celsius) {
    if ((cursor + LPP_TEMPERATURE_SIZE) > maxsize) {
        return 0;
    }
    int16_t val = celsius * 10;
    buffer[cursor++] = channel; 
    buffer[cursor++] = LPP_TEMPERATURE; 
    buffer[cursor++] = val >> 8; 
    buffer[cursor++] = val; 

    return cursor;
}

My translation of that code to C#

public void TemperatureAdd(byte channel, float celsius)
{
   if ((index + TemperatureSize) > buffer.Length)
   {
      throw new ApplicationException("TemperatureAdd insufficent buffer capacity");
   }

   short val = (short)(celsius * 10);

   buffer[index++] = channel;
   buffer[index++] = (byte)DataType.Temperature;
   buffer[index++] = (byte)(val >> 8);
   buffer[index++] = (byte)val;
}

After looking at the code I think the issues was most probably due to the representation of the constant 10(int32), 10.0(double), and 10.0f(single) . To confirm my theory I modified the client to send the temperature with the calculation done with three different constants.

Visual Studio 2019 Debug output window
The Things Network(TTN) Message Queue Telemetry Transport(MQTT) client

After some trial and error I settled on this C# code for my decoder

public void TemperatureAdd(byte channel, float celsius)
{
   if ((index + TemperatureSize) > buffer.Length)
   {
      throw new ApplicationException("TemperatureAdd insufficent buffer capacity");
   }

   short val = (short)(celsius * 10.0f);

   buffer[index++] = channel;
   buffer[index++] = (byte)DataType.Temperature;
   buffer[index++] = (byte)(val >> 8);
   buffer[index++] = (byte)val;
}

I don’t think this is specifically an issue with the TinyCLR V2 just with number type used for the constant.

The Things Network V2 MQTT SQL Connector

This code was written to solve a problem I had debugging and testing an application which processed data from sensors attached to The Things Network(TTN) and I figured others might find it useful.

As part of my series of TTN projects I wanted to verify that the data from a number of LoRaWAN sensors connected to TTN was reasonable and complete. I’m familiar with Microsoft SQL Server so I built a .Net Core console application which uses the TTN Message Queue Telemetry Transport(MQTT) Data API (so it can run alongside my existing TTN integration) to receive messages from the all devices in a TTN application and store them in a database for post processing.

The console application uses MQTTNet to connect to TTN MQTT Data API. It subscribes to an application device uplink topic, then uses a combination of Stackoverflow Dapper with Microsoft SQL Server tables and stored procedures to store the device data points. I re-generated the classes I had used in my other projects, added any obvious missing fields and fine tuned the data types by delving into the TTN V2 GO code.

The core of the application is in the MQTTNet application message received handler.

private static void MqttClient_ApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e)
{
   PayloadUplinkV2 payload;

   log.InfoFormat($"Receive Start Topic:{e.ApplicationMessage.Topic}");

   string connectionString = configuration.GetSection("TTNDatabase").Value;

   try
   {
      payload = JsonConvert.DeserializeObject<PayloadUplinkV2>(e.ApplicationMessage.ConvertPayloadToString());
   }
   catch (Exception ex)
   {
      log.Error("DeserializeObject failed", ex);
      return;
   }

   try
   {
      if (payload.PayloadFields != null)
      {
         var parameters = new DynamicParameters();

         EnumerateChildren(parameters, payload.PayloadFields);

         log.Debug($"Parameters:{parameters.ParameterNames.Aggregate((i, j) => i + ',' + j)}");

         foreach (string storedProcedure in storedProcedureMappings.Keys)
         {
            if (Enumerable.SequenceEqual(parameters.ParameterNames, storedProcedureMappings[storedProcedure].Split(',', StringSplitOptions.RemoveEmptyEntries), StringComparer.InvariantCultureIgnoreCase))
            {
               log.Info($"Payload fields processing with:{storedProcedure}");

               using (SqlConnection db = new SqlConnection(connectionString))
               {
                  parameters.Add("@ReceivedAtUtc", payload.Metadata.ReceivedAtUtc);
                  parameters.Add("@DeviceID", payload.DeviceId);
                  parameters.Add("@DeviceEui", payload.DeviceEui);
                  parameters.Add("@ApplicationID", payload.ApplicationId);
                  parameters.Add("@IsConfirmed", payload.IsConfirmed);
                  parameters.Add("@IsRetry", payload.IsRetry);
                  parameters.Add("@Port", payload.Port);

                  db.Execute(sql: storedProcedure, param: parameters, commandType: CommandType.StoredProcedure);
               }
            }
         }
      }
      else
      {
         foreach (string storedProcedure in storedProcedureMappings.Keys)
         {
            if (string.Compare(storedProcedureMappings[storedProcedure], "payload_raw", true) == 0)
            {
               log.Info($"Payload raw processing with:{storedProcedure}");

               using (SqlConnection db = new SqlConnection(connectionString))
               {
                  var parameters = new DynamicParameters();

                  parameters.Add("@ReceivedAtUtc", payload.Metadata.ReceivedAtUtc);
                  parameters.Add("@DeviceID", payload.DeviceId);
                  parameters.Add("@DeviceEui", payload.DeviceEui);
                  parameters.Add("@ApplicationID", payload.ApplicationId);
                  parameters.Add("@IsConfirmed", payload.IsConfirmed);
                  parameters.Add("@IsRetry", payload.IsRetry);
                  parameters.Add("@Port", payload.Port);
                  parameters.Add("@Payload", payload.PayloadRaw);

                  db.Execute(sql: storedProcedure, param: parameters, commandType: CommandType.StoredProcedure);
               }
            }
         }
      }
   }
   catch (Exception ex)
   {
      log.Error("Message processing failed", ex);
   }
}

For messages with payload fields the code attempts to match the list of field names (there maybe more than one match) with the parameter list for stored procedures in the AppSettings.json file. The Enumerable.SequenceEqual uses a case insensitive comparison but order is important. I did consider sorting the two lists of parameters but wasn’t certain the added complexity was worth it.

{
   "TTNDatabase": "Server=DESKTOP-1234567;Initial Catalog=Rak7200TrackerTest;Persist Security Info=False;User ID=TopSecret;Password=TopSecret;Connection Timeout=30",
   "MqttServer": "eu.thethings.network",
   "MqttPassword": "ttn-account-TopSecret",
   "ApplicationId": "rak811wisnodetest",
   "MqttClientId": "TTNSQLClient",
   "StoredProcedureMappings": {
      "EnvironmentalSensorProcess": "relative_humidity_0,temperature_0",
      "PayloadRawProcess": "payload_raw",
      "WeatherSensorProcess": "barometric_pressure_0,temperature_0",
      "PositionReportProcess": "accelerometer_3x,accelerometer_3y,accelerometer_3z,analog_in_10,analog_in_11,analog_in_8,analog_in_9,gps_1altitude,gps_1latitude,gps_1longitude,gyrometer_5x,gyrometer_5y,gyrometer_5z"
   }
}

To reduce the scope for mistakes (especially with longer parameter lists) I usually copy them from the Log4Net RollingFileAppender file or ManagedColoredConsoleAppender console output.

Environmental sensor output with flat data format

I created a database table to store the temperature and humidity values.

CREATE TABLE [dbo].[EnvironmentalSensorReport](
	[WeatherSensorReportUID] [UNIQUEIDENTIFIER] NOT NULL,
	[ReceivedAtUtC] [DATETIME] NOT NULL,
	[DeviceID] [NVARCHAR](32) NOT NULL,
	[DeviceEui] [NVARCHAR](32) NOT NULL,
	[ApplicationID] [NVARCHAR](32) NOT NULL,
	[IsConfirmed] [BIT] NOT NULL,
	[IsRetry] [BIT] NOT NULL,
	[Port] [SMALLINT] NOT NULL,
	[Temperature] [FLOAT] NOT NULL,
	[Humidity] [FLOAT] NOT NULL,
CONSTRAINT [PK_EnvironmentalSensorReport] PRIMARY KEY CLUSTERED 
(
	[WeatherSensorReportUID] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]
GO

ALTER TABLE [dbo].[EnvironmentalSensorReport] ADD  CONSTRAINT [DF_EnvironmentalSensorReport_EnvironmentalSensorReporttUID]  DEFAULT (NEWID()) FOR [WeatherSensorReportUID]
GO

The stored procedure must have the parameters @ReceivedAtUtc, @DeviceID, @DeviceEui, @ApplicationID, @IsRetry, @IsConfirmed and @Port. In this example the payload specific fields generated by the Cayenne Low Power Protocol(LPP) decoder are @Temperature_0 and @relative_humidity_0

CREATE PROCEDURE [dbo].[EnvironmentalSensorProcess]
   @ReceivedAtUtc AS DATETIME,
   @DeviceID AS NVARCHAR(32),
   @DeviceEui AS NVARCHAR(32),
   @ApplicationID AS NVARCHAR(32),
   @IsRetry AS BIT,
   @IsConfirmed AS BIT,
   @Port AS SMALLINT,
   @Temperature_0 AS FLOAT,
   @relative_humidity_0 AS FLOAT
AS
BEGIN
   SET NOCOUNT ON;
 
   INSERT INTO [dbo].[EnvironmentalSensorReport]
           ([PositionReportUID]
	   .[ReceivedAtUtc]
           ,[DeviceID]
           ,[DeviceEui]
           ,[ApplicationID]
           ,[IsConfirmed]
           ,[IsRetry]
           ,[Port]
	   ,Temperature
	   ,Humidity)
   VALUES
   (
      @ReceivedAtUtc,
      @DeviceID,
      @DeviceEui,
      @ApplicationID,
      @IsConfirmed,
      @IsRetry,
      @port,
      @Temperature_0,
      @relative_humidity_0)
END
Environmental sensor data displayed in SQL Server Management Studio(SSMS)

To store more complex nest payload fields (e.g. latitude, longitude and altitude values), I flattened the the hierarchy.

private static void EnumerateChildren(DynamicParameters parameters, JToken token, string prefix ="")
{
   if (token is JProperty)
      if (token.First is JValue)
      {
         JProperty property = (JProperty)token;
         parameters.Add($"@{prefix}{property.Name}", property.Value.ToString());
      }
      else
      {
         JProperty property = (JProperty)token;
         prefix += property.Name;
      }

   foreach (JToken token2 in token.Children())
   {
      EnumerateChildren(parameters,token2, prefix);
   }
}
Unpacked LPP payload from GPS tracker displayed in TTN application data view
Flattened location, acceleration and rotation information
CREATE TABLE [dbo].[PositionReport](
      [PositionReportUID] [UNIQUEIDENTIFIER] NOT NULL,
      [ReceivedAtUtC] [DATETIME] NOT NULL,
      [DeviceID] [NVARCHAR](32) NOT NULL,
      [DeviceEui] [NVARCHAR](32) NOT NULL,
      [ApplicationID] [NVARCHAR](32) NOT NULL,
      [IsConfirmed] [BIT] NOT NULL,
      [IsRetry] [BIT] NOT NULL,
      [Port] [SMALLINT] NOT NULL,
      [Latitude] [FLOAT] NOT NULL,
      [Longitude] [FLOAT] NOT NULL,
      [Altitude] [FLOAT] NOT NULL,
 CONSTRAINT [PK_PositionReport] PRIMARY KEY CLUSTERED 
(
	[PositionReportUID] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]
GO

I created a database table to store values of only the fields I cared about.

CREATE PROCEDURE [dbo].[PositionReportProcess]
      @ReceivedAtUtc AS DATETIME,
      @DeviceID AS NVARCHAR(32),
      @DeviceEui AS NVARCHAR(32),
      @ApplicationID AS NVARCHAR(32),
      @IsRetry AS Bit,
      @IsConfirmed AS BIT,
      @Port AS SMALLINT,
      @accelerometer_3x AS FLOAT,
      @accelerometer_3y AS FLOAT,
      @accelerometer_3z AS FLOAT,
      @analog_in_8 AS FLOAT,
      @analog_in_9 AS FLOAT,
      @analog_in_10 AS FLOAT,
      @analog_in_11 AS FLOAT,
      @gps_1Latitude AS FLOAT,
      @gps_1Longitude AS FLOAT,
      @gps_1Altitude AS FLOAT,
      @gyrometer_5x  AS FLOAT, 
      @gyrometer_5y  AS FLOAT, 
      @gyrometer_5z  AS FLOAT 
AS
BEGIN
   SET NOCOUNT ON;

   INSERT INTO [dbo].[PositionReport]
      ([PositionReportUID]
      .[ReceivedAtUtc]
      ,[DeviceID]
      ,[DeviceEui]
      ,[ApplicationID]
      ,[IsConfirmed]
      ,[IsRetry]
      ,[Port]
      ,Latitude
      ,Longitude
      ,Altitude)
   VALUES
   (
      @ReceivedAtUtc,
      @DeviceID,
      @DeviceEui,
      @ApplicationID,
      @IsConfirmed,
      @IsRetry,
      @port,
      @gps_1Latitude,
      @gps_1Longitude,
      @gps_1Altitude)
END

The stored procedure for storing the GPS tracker payload has to have parameters matching each payload field but some of the fields are not used.

Location data displayed in SQL Server Management Studio(SSMS)

For uplink messages with no payload fields the message processor looks for a stored procedure with a single parameter called “payload_raw”.(there maybe more than one match)

CREATE TABLE [dbo].[PayloadReport](
      [PayloadReportUID] [UNIQUEIDENTIFIER] NOT NULL,
      [ReceivedAtUtC] [DATETIME] NOT NULL,
      [DeviceID] [NVARCHAR](32) NOT NULL,
      [DeviceEui] [NVARCHAR](32) NOT NULL,
      [ApplicationID] [NVARCHAR](32) NOT NULL,
      [IsConfirmed] [BIT] NOT NULL,
      [IsRetry] [BIT] NOT NULL,
      [Port] [SMALLINT] NOT NULL,
      [Payload] [NVARCHAR](128) NOT NULL,
CONSTRAINT [PK_PayloadReport] PRIMARY KEY CLUSTERED 
(
      [PayloadReportUID] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]
GO

ALTER TABLE [dbo].[PayloadReport] ADD  CONSTRAINT [DF_PayloadReport_PositionReportUID]  DEFAULT (NEWID()) FOR [PayloadReportUID]
GO
ALTER PROCEDURE [dbo].[PayloadRawProcess]
      @ReceivedAtUtc AS DATETIME,
      @DeviceID AS NVARCHAR(32),
      @DeviceEui AS NVARCHAR(32),
      @ApplicationID AS NVARCHAR(32),
      @IsRetry AS Bit,
      @IsConfirmed AS BIT,
      @Port AS SMALLINT,
      @Payload AS NVARCHAR(128)
AS
BEGIN
      SET NOCOUNT ON;

      INSERT INTO [dbo].[PayloadReport]
         ([PositionReportUID]
         .[ReceivedAtUtc]
         ,[DeviceID]
         ,[DeviceEui]
         ,[ApplicationID]
         ,[IsConfirmed]
         ,[IsRetry]
         ,[Port]
         ,[Payload])
     VALUES(@ReceivedAtUtc,
         @DeviceID,
         @DeviceEui,
         @ApplicationID,
         @IsConfirmed,
         @IsRetry,
         @port,
         @Payload)
END
Raw payload data displayed in SQL Server Management Studio(SSMS)

Initially the application just used Console.Writeline for logging, then I added Log4Net because it would be useful to persist information about failures and so I could copy n paste parameter lists to the appSettings.json file.

To make the application more robust adding a retries with the Enterprise Library Transient Fault Handling and Configuration blocks or Polly on the Dapper Execute would be a good idea. It also would take much work to get the application to run in Microsoft Azure as a “headless” webapp.

Dapper supports a number of database platforms so in theory this application (with a little bit of effort) should be platform portable.