The Things Network MQTT & Azure IoT Part1

Side by Side

In my last few posts I have built Proof of Concept(PoC) The Things Network(TTN) V3 Hypertext Transfer Protocol(HTTP) API Integration and Message Queue Telemetry Transport (MQTT) Data API Integrations.

While building these PoCs I have learnt a lot about the way that the TTN V3 RESTful and MQTT APIs work and this is the first in a series of posts about linking them together. My plan is to start with yet another .NetCore Console application which hosts both the MQTT and Azure IoT Hub DeviceClient (using the Advanced Message Queueing Protocol(AMQP)) client implementations. I’m using MQTTnet to build my data API client and used NSwag by Richo Suter to generate my RESTful client from the TTN provided swagger file.

In this PoC I’m using the commandlineParser NuGet package to the reduce the amount of code required to process command line parameters and make it more robust. This PoC has a lot of command line parameters which would have been painful to manually parse and validate.

public class CommandLineOptions
{
	[Option('u', "APIbaseURL", Required = false, HelpText = "TTN Restful API URL.")]
	public string ApiBaseUrl { get; set; }

	[Option('K', "APIKey", Required = true, HelpText = "TTN Restful API APIkey")]
	public string ApiKey { get; set; }

	[Option('P', "APIApplicationID", Required = true, HelpText = "TTN Restful API ApplicationID")]
	public string ApiApplicationID { get; set; }

	[Option('D', "DeviceListPageSize", Required = true, HelpText = "The size of the pages used to retrieve EndDevice configuration")]
	public int DevicePageSize { get; set; }

	[Option('S', "MQTTServerName", Required = true, HelpText = "TTN MQTT API server name")]
	public string MqttServerName { get; set; }

	[Option('A', "MQTTAccessKey", Required = true, HelpText = "TTN MQTT API access key")]
	public string MqttAccessKey { get; set; }

	[Option('Q', "MQTTApplicationID", Required = true, HelpText = "TTN MQTT API ApplicationID")]
	public string MqttApplicationID { get; set; }

	[Option('C', "MQTTClientName", Required = true, HelpText = "TTN MQTT API Client ID")]
	public string MqttClientID { get; set; }

	[Option('Z', "AzureIoTHubConnectionString", Required = true, HelpText = "Azure IoT Hub Connection string")]
	public string AzureIoTHubconnectionString { get; set; }
}

To keep things simple in this PoC I’m using an Azure IoT Hub specific (rather than a device specific connection string)

Azure IoT Hub Device shared access policy selection

After some trial and error I found the order of execution was important

  • Open MQTTnet connection to TTN host (but don’t configure any subscriptions)
  • Configure connection to TTN RESTful API
  • Retrieve list of V3EndDevices (paginated), then for each V3EndDevice
    • Open connection to Azure IoT Hub using command line connection string + TTN Device ID
    • Call DeviceClient.SetReceiveMessageHandlerAsync to specify ReceiveMessageCallback and additional context information for processing Azure IoT Hub downlink messages.
    • Store DeviceClient instance in ObjectCache using DeviceID as key
  • Configure the MQTTnet recived message handler
  • Subscribe to uplink messages from all the V3EndDevices in the specified application.
private static async Task ApplicationCore(CommandLineOptions options)
{
	MqttFactory factory = new MqttFactory();
	mqttClient = factory.CreateMqttClient();

#if DIAGNOSTICS
	Console.WriteLine($"baseURL: {options.ApiBaseUrl}");
	Console.WriteLine($"APIKey: {options.ApiKey}");
	Console.WriteLine($"ApplicationID: {options.ApiApplicationID}");
	Console.WriteLine($"AazureIoTHubconnectionString: {options.AzureIoTHubconnectionString}");
	Console.WriteLine();
#endif

	try
	{
		// First configure MQTT, open connection and wire up disconnection handler. 
		// Can't wire up MQTT received handler as at this stage AzureIoTHub devices not connected.
		mqttOptions = new MqttClientOptionsBuilder()
			.WithTcpServer(options.MqttServerName)
			.WithCredentials(options.MqttApplicationID, options.MqttAccessKey)
			.WithClientId(options.MqttClientID)
			.WithTls()
			.Build();

		mqttClient.UseDisconnectedHandler(new MqttClientDisconnectedHandlerDelegate(e => MqttClientDisconnected(e)));

		await mqttClient.ConnectAsync(mqttOptions);

		// Prepare the HTTP client to be used in the TTN device enumeration
		using (HttpClient httpClient = new HttpClient())
		{
			EndDeviceRegistryClient endDeviceRegistryClient = new EndDeviceRegistryClient(options.ApiBaseUrl, httpClient)
			{
				ApiKey = options.ApiKey
			};

			// Retrieve list of devices page by page
			V3EndDevices endDevices = await endDeviceRegistryClient.ListAsync(
				options.ApiApplicationID, 
				field_mask_paths: DevicefieldMaskPaths, 
				limit: options.DevicePageSize);
			if ((endDevices != null) && (endDevices.End_devices != null)) // If no devices returns null rather than empty list
			{
				foreach (V3EndDevice endDevice in endDevices.End_devices)
				{
					// Display the device info+attributes then connect device to Azure IoT Hub
#if DEVICE_FIELDS_MINIMUM
					Console.WriteLine($"EndDevice ID: {endDevice.Ids.Device_id}");
#else
					Console.WriteLine($"Device ID: {endDevice.Ids.Device_id} Name: {endDevice.Name} Description: {endDevice.Description}");
					Console.WriteLine($"  CreatedAt: {endDevice.Created_at:dd-MM-yy HH:mm:ss} UpdatedAt: {endDevice.Updated_at:dd-MM-yy HH:mm:ss}");
#endif

#if DEVICE_ATTRIBUTES_DISPLAY
					if (endDevice.Attributes != null)
					{
						Console.WriteLine("  EndDevice attributes");

						foreach (KeyValuePair<string, string> attribute in endDevice.Attributes)
						{
							Console.WriteLine($"    Key: {attribute.Key} Value: {attribute.Value}");
						}
					}
#endif
					try
					{
						DeviceClient deviceClient = DeviceClient.CreateFromConnectionString(
							options.AzureIoTHubconnectionString, 
							endDevice.Ids.Device_id, 
							TransportType.Amqp_Tcp_Only);

						await deviceClient.OpenAsync();

						await deviceClient.SetReceiveMessageHandlerAsync(
							AzureIoTHubClientReceiveMessageHandler,
							new AzureIoTHubReceiveMessageHandlerContext()
							{
								DeviceId = endDevice.Ids.Device_id,
								ApplicationId = endDevice.Ids.Application_ids.Application_id,
							});

						DeviceClients.Add(endDevice.Ids.Device_id, deviceClient, cacheItemPolicy);
					}
					catch( Exception ex)
					{
						Console.WriteLine($"Azure IoT Hub OpenAsync failed {ex.Message}");
					}
				}
			}
		}

		// At this point all the AzureIoT Hub deviceClients setup and ready to go so can enable MQTT receive
		mqttClient.UseApplicationMessageReceivedHandler(new MqttApplicationMessageReceivedHandlerDelegate(e => MqttClientApplicationMessageReceived(e)));

		// This may shift to individual device subscriptions
		string uplinktopic = $"v3/{options.MqttApplicationID}/devices/+/up";

		await mqttClient.SubscribeAsync(uplinktopic, MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce);
	}
	catch(Exception ex)
	{
		Console.WriteLine($"Main {ex.Message}");
		Console.WriteLine("Press any key to exit");
		Console.ReadLine();
		return;
	}

	while (!Console.KeyAvailable)
	{
		Console.Write(".");
		await Task.Delay(1000);
	}

	// Consider ways to mop up connections

	Console.WriteLine("Press any key to exit");
	Console.ReadLine();
}

When I was initially looking at Azure Deviceclient I would of had to have created a thread (which would have been blocked most of the time) for each device. This implementation issued was removed by the introduction of the DeviceClient SetReceiveMessageHandlerAsync method in release 1.33.0.

Currently the application just displays the Cloud to Device(C2D) message payload plus diagnostic information, and the CompleteAsync method is called so the message is dequeued.

private async static Task AzureIoTHubClientReceiveMessageHandler(Message message, object userContext)
{
	AzureIoTHubReceiveMessageHandlerContext receiveMessageHandlerConext = (AzureIoTHubReceiveMessageHandlerContext)userContext;

	DeviceClient deviceClient = (DeviceClient)DeviceClients.Get(receiveMessageHandlerConext.DeviceId);

	using (message)
	{
		Console.WriteLine();
		Console.WriteLine();
		Console.WriteLine($"{DateTime.UtcNow:HH:mm:ss} Azure IoT Hub downlink message");
		Console.WriteLine($" ApplicationID: {receiveMessageHandlerConext.ApplicationId}");
		Console.WriteLine($" DeviceID: {receiveMessageHandlerConext.DeviceId}");
#if DIAGNOSTICS_AZURE_IOT_HUB
		Console.WriteLine($" Cached: {DeviceClients.Contains(receiveMessageHandlerConext.DeviceId)}");
		Console.WriteLine($" MessageID: {message.MessageId}");
		Console.WriteLine($" DeliveryCount: {message.DeliveryCount}");
		Console.WriteLine($" EnqueuedTimeUtc: {message.EnqueuedTimeUtc}");
		Console.WriteLine($" SequenceNumber: {message.SequenceNumber}");
		Console.WriteLine($" To: {message.To}");
#endif
		string messageBody = Encoding.UTF8.GetString(message.GetBytes());
		Console.WriteLine($" Body: {messageBody}");
#if DOWNLINK_MESSAGE_PROPERTIES_DISPLAY
		foreach (var property in message.Properties)
		{
			Console.WriteLine($"   Key:{property.Key} Value:{property.Value}");
		}
#endif

		await deviceClient.CompleteAsync(message);

		Console.WriteLine();
	}
}

Currently the application just displays the Cloud to Device(D2C) message payload plus diagnostic information, displaying the payload fields if the message format has been configured and successfully processed.

private static void MqttClientApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e)
{
	if (e.ApplicationMessage.Topic.EndsWith("/up"))
	{
		PayloadUplinkV3 payload = JsonConvert.DeserializeObject<PayloadUplinkV3>(e.ApplicationMessage.ConvertPayloadToString());

		Console.WriteLine();
		Console.WriteLine();
		Console.WriteLine($"{DateTime.UtcNow:HH:mm:ss} TTN Uplink message");
#if DIAGNOSTICS_MQTT
		Console.WriteLine($" ClientId:{e.ClientId} Topic:{e.ApplicationMessage.Topic}");
		Console.WriteLine($" Cached: {DeviceClients.Contains(payload.EndDeviceIds.DeviceId)}");
#endif
		Console.WriteLine($" ApplicationID: {payload.EndDeviceIds.ApplicationIds.ApplicationId}");
		Console.WriteLine($" DeviceID: {payload.EndDeviceIds.DeviceId}");
		Console.WriteLine($" Port: {payload.UplinkMessage.Port} ");
		Console.WriteLine($" Payload raw: {payload.UplinkMessage.PayloadRaw}");

		if (payload.UplinkMessage.PayloadDecoded != null)
		{
			Console.WriteLine($" Payload decoded: {payload.UplinkMessage.PayloadRaw}");
			EnumerateChildren(1, payload.UplinkMessage.PayloadDecoded);
		}

		Console.WriteLine();
	}
	else
	{
		Console.WriteLine($"{DateTime.UtcNow:HH:mm:ss} ClientId: {e.ClientId} Topic: {e.ApplicationMessage.Topic}");
	}
}
dotNet Core Console application displaying simulated uplink and downlink messages.
Simulating C2D messages with AzureIoTExplorer
Simulating D2C messages with TTN Device console

In the MQTT received message handler.

Console.WriteLine($" Cached: {DeviceClients.Contains(receiveMessageHandlerConext.DeviceId)}");

and Azure DeviceClient received message handler.

Console.WriteLine($" Cached: {DeviceClients.Contains(receiveMessageHandlerConext.DeviceId)}");

check that the specified TTN device ID is in the DeviceClients ObjectCache

The Things Network V2 MQTT SQL Connector

This code was written to solve a problem I had debugging and testing an application which processed data from sensors attached to The Things Network(TTN) and I figured others might find it useful.

As part of my series of TTN projects I wanted to verify that the data from a number of LoRaWAN sensors connected to TTN was reasonable and complete. I’m familiar with Microsoft SQL Server so I built a .Net Core console application which uses the TTN Message Queue Telemetry Transport(MQTT) Data API (so it can run alongside my existing TTN integration) to receive messages from the all devices in a TTN application and store them in a database for post processing.

The console application uses MQTTNet to connect to TTN MQTT Data API. It subscribes to an application device uplink topic, then uses a combination of Stackoverflow Dapper with Microsoft SQL Server tables and stored procedures to store the device data points. I re-generated the classes I had used in my other projects, added any obvious missing fields and fine tuned the data types by delving into the TTN V2 GO code.

The core of the application is in the MQTTNet application message received handler.

private static void MqttClient_ApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e)
{
   PayloadUplinkV2 payload;

   log.InfoFormat($"Receive Start Topic:{e.ApplicationMessage.Topic}");

   string connectionString = configuration.GetSection("TTNDatabase").Value;

   try
   {
      payload = JsonConvert.DeserializeObject<PayloadUplinkV2>(e.ApplicationMessage.ConvertPayloadToString());
   }
   catch (Exception ex)
   {
      log.Error("DeserializeObject failed", ex);
      return;
   }

   try
   {
      if (payload.PayloadFields != null)
      {
         var parameters = new DynamicParameters();

         EnumerateChildren(parameters, payload.PayloadFields);

         log.Debug($"Parameters:{parameters.ParameterNames.Aggregate((i, j) => i + ',' + j)}");

         foreach (string storedProcedure in storedProcedureMappings.Keys)
         {
            if (Enumerable.SequenceEqual(parameters.ParameterNames, storedProcedureMappings[storedProcedure].Split(',', StringSplitOptions.RemoveEmptyEntries), StringComparer.InvariantCultureIgnoreCase))
            {
               log.Info($"Payload fields processing with:{storedProcedure}");

               using (SqlConnection db = new SqlConnection(connectionString))
               {
                  parameters.Add("@ReceivedAtUtc", payload.Metadata.ReceivedAtUtc);
                  parameters.Add("@DeviceID", payload.DeviceId);
                  parameters.Add("@DeviceEui", payload.DeviceEui);
                  parameters.Add("@ApplicationID", payload.ApplicationId);
                  parameters.Add("@IsConfirmed", payload.IsConfirmed);
                  parameters.Add("@IsRetry", payload.IsRetry);
                  parameters.Add("@Port", payload.Port);

                  db.Execute(sql: storedProcedure, param: parameters, commandType: CommandType.StoredProcedure);
               }
            }
         }
      }
      else
      {
         foreach (string storedProcedure in storedProcedureMappings.Keys)
         {
            if (string.Compare(storedProcedureMappings[storedProcedure], "payload_raw", true) == 0)
            {
               log.Info($"Payload raw processing with:{storedProcedure}");

               using (SqlConnection db = new SqlConnection(connectionString))
               {
                  var parameters = new DynamicParameters();

                  parameters.Add("@ReceivedAtUtc", payload.Metadata.ReceivedAtUtc);
                  parameters.Add("@DeviceID", payload.DeviceId);
                  parameters.Add("@DeviceEui", payload.DeviceEui);
                  parameters.Add("@ApplicationID", payload.ApplicationId);
                  parameters.Add("@IsConfirmed", payload.IsConfirmed);
                  parameters.Add("@IsRetry", payload.IsRetry);
                  parameters.Add("@Port", payload.Port);
                  parameters.Add("@Payload", payload.PayloadRaw);

                  db.Execute(sql: storedProcedure, param: parameters, commandType: CommandType.StoredProcedure);
               }
            }
         }
      }
   }
   catch (Exception ex)
   {
      log.Error("Message processing failed", ex);
   }
}

For messages with payload fields the code attempts to match the list of field names (there maybe more than one match) with the parameter list for stored procedures in the AppSettings.json file. The Enumerable.SequenceEqual uses a case insensitive comparison but order is important. I did consider sorting the two lists of parameters but wasn’t certain the added complexity was worth it.

{
   "TTNDatabase": "Server=DESKTOP-1234567;Initial Catalog=Rak7200TrackerTest;Persist Security Info=False;User ID=TopSecret;Password=TopSecret;Connection Timeout=30",
   "MqttServer": "eu.thethings.network",
   "MqttPassword": "ttn-account-TopSecret",
   "ApplicationId": "rak811wisnodetest",
   "MqttClientId": "TTNSQLClient",
   "StoredProcedureMappings": {
      "EnvironmentalSensorProcess": "relative_humidity_0,temperature_0",
      "PayloadRawProcess": "payload_raw",
      "WeatherSensorProcess": "barometric_pressure_0,temperature_0",
      "PositionReportProcess": "accelerometer_3x,accelerometer_3y,accelerometer_3z,analog_in_10,analog_in_11,analog_in_8,analog_in_9,gps_1altitude,gps_1latitude,gps_1longitude,gyrometer_5x,gyrometer_5y,gyrometer_5z"
   }
}

To reduce the scope for mistakes (especially with longer parameter lists) I usually copy them from the Log4Net RollingFileAppender file or ManagedColoredConsoleAppender console output.

Environmental sensor output with flat data format

I created a database table to store the temperature and humidity values.

CREATE TABLE [dbo].[EnvironmentalSensorReport](
	[WeatherSensorReportUID] [UNIQUEIDENTIFIER] NOT NULL,
	[ReceivedAtUtC] [DATETIME] NOT NULL,
	[DeviceID] [NVARCHAR](32) NOT NULL,
	[DeviceEui] [NVARCHAR](32) NOT NULL,
	[ApplicationID] [NVARCHAR](32) NOT NULL,
	[IsConfirmed] [BIT] NOT NULL,
	[IsRetry] [BIT] NOT NULL,
	[Port] [SMALLINT] NOT NULL,
	[Temperature] [FLOAT] NOT NULL,
	[Humidity] [FLOAT] NOT NULL,
CONSTRAINT [PK_EnvironmentalSensorReport] PRIMARY KEY CLUSTERED 
(
	[WeatherSensorReportUID] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]
GO

ALTER TABLE [dbo].[EnvironmentalSensorReport] ADD  CONSTRAINT [DF_EnvironmentalSensorReport_EnvironmentalSensorReporttUID]  DEFAULT (NEWID()) FOR [WeatherSensorReportUID]
GO

The stored procedure must have the parameters @ReceivedAtUtc, @DeviceID, @DeviceEui, @ApplicationID, @IsRetry, @IsConfirmed and @Port. In this example the payload specific fields generated by the Low Power Protocol(LPP) decoder are @Temperature_0 and @relative_humidity_0

CREATE PROCEDURE [dbo].[EnvironmentalSensorProcess]
   @ReceivedAtUtc AS DATETIME,
   @DeviceID AS NVARCHAR(32),
   @DeviceEui AS NVARCHAR(32),
   @ApplicationID AS NVARCHAR(32),
   @IsRetry AS BIT,
   @IsConfirmed AS BIT,
   @Port AS SMALLINT,
   @Temperature_0 AS FLOAT,
   @relative_humidity_0 AS FLOAT
AS
BEGIN
   SET NOCOUNT ON;
 
   INSERT INTO [dbo].[EnvironmentalSensorReport]
           ([PositionReportUID]
	   .[ReceivedAtUtc]
           ,[DeviceID]
           ,[DeviceEui]
           ,[ApplicationID]
           ,[IsConfirmed]
           ,[IsRetry]
           ,[Port]
	   ,Temperature
	   ,Humidity)
   VALUES
   (
      @ReceivedAtUtc,
      @DeviceID,
      @DeviceEui,
      @ApplicationID,
      @IsConfirmed,
      @IsRetry,
      @port,
      @Temperature_0,
      @relative_humidity_0)
END
Environmental sensor data displayed in SQL Server Management Studio(SSMS)

To store more complex nest payload fields (e.g. latitude, longitude and altitude values), I flattened the the hierarchy.

private static void EnumerateChildren(DynamicParameters parameters, JToken token, string prefix ="")
{
   if (token is JProperty)
      if (token.First is JValue)
      {
         JProperty property = (JProperty)token;
         parameters.Add($"@{prefix}{property.Name}", property.Value.ToString());
      }
      else
      {
         JProperty property = (JProperty)token;
         prefix += property.Name;
      }

   foreach (JToken token2 in token.Children())
   {
      EnumerateChildren(parameters,token2, prefix);
   }
}
Unpacked LPP payload from GPS tracker displayed in TTN application data view
Flattened location, acceleration and rotation information
CREATE TABLE [dbo].[PositionReport](
      [PositionReportUID] [UNIQUEIDENTIFIER] NOT NULL,
      [ReceivedAtUtC] [DATETIME] NOT NULL,
      [DeviceID] [NVARCHAR](32) NOT NULL,
      [DeviceEui] [NVARCHAR](32) NOT NULL,
      [ApplicationID] [NVARCHAR](32) NOT NULL,
      [IsConfirmed] [BIT] NOT NULL,
      [IsRetry] [BIT] NOT NULL,
      [Port] [SMALLINT] NOT NULL,
      [Latitude] [FLOAT] NOT NULL,
      [Longitude] [FLOAT] NOT NULL,
      [Altitude] [FLOAT] NOT NULL,
 CONSTRAINT [PK_PositionReport] PRIMARY KEY CLUSTERED 
(
	[PositionReportUID] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]
GO

I created a database table to store values of only the fields I cared about.

CREATE PROCEDURE [dbo].[PositionReportProcess]
      @ReceivedAtUtc AS DATETIME,
      @DeviceID AS NVARCHAR(32),
      @DeviceEui AS NVARCHAR(32),
      @ApplicationID AS NVARCHAR(32),
      @IsRetry AS Bit,
      @IsConfirmed AS BIT,
      @Port AS SMALLINT,
      @accelerometer_3x AS FLOAT,
      @accelerometer_3y AS FLOAT,
      @accelerometer_3z AS FLOAT,
      @analog_in_8 AS FLOAT,
      @analog_in_9 AS FLOAT,
      @analog_in_10 AS FLOAT,
      @analog_in_11 AS FLOAT,
      @gps_1Latitude AS FLOAT,
      @gps_1Longitude AS FLOAT,
      @gps_1Altitude AS FLOAT,
      @gyrometer_5x  AS FLOAT, 
      @gyrometer_5y  AS FLOAT, 
      @gyrometer_5z  AS FLOAT 
AS
BEGIN
   SET NOCOUNT ON;

   INSERT INTO [dbo].[PositionReport]
      ([PositionReportUID]
      .[ReceivedAtUtc]
      ,[DeviceID]
      ,[DeviceEui]
      ,[ApplicationID]
      ,[IsConfirmed]
      ,[IsRetry]
      ,[Port]
      ,Latitude
      ,Longitude
      ,Altitude)
   VALUES
   (
      @ReceivedAtUtc,
      @DeviceID,
      @DeviceEui,
      @ApplicationID,
      @IsConfirmed,
      @IsRetry,
      @port,
      @gps_1Latitude,
      @gps_1Longitude,
      @gps_1Altitude)
END

The stored procedure for storing the GPS tracker payload has to have parameters matching each payload field but some of the fields are not used.

Location data displayed in SQL Server Management Studio(SSMS)

For uplink messages with no payload fields the message processor looks for a stored procedure with a single parameter called “payload_raw”.(there maybe more than one match)

CREATE TABLE [dbo].[PayloadReport](
      [PayloadReportUID] [UNIQUEIDENTIFIER] NOT NULL,
      [ReceivedAtUtC] [DATETIME] NOT NULL,
      [DeviceID] [NVARCHAR](32) NOT NULL,
      [DeviceEui] [NVARCHAR](32) NOT NULL,
      [ApplicationID] [NVARCHAR](32) NOT NULL,
      [IsConfirmed] [BIT] NOT NULL,
      [IsRetry] [BIT] NOT NULL,
      [Port] [SMALLINT] NOT NULL,
      [Payload] [NVARCHAR](128) NOT NULL,
CONSTRAINT [PK_PayloadReport] PRIMARY KEY CLUSTERED 
(
      [PayloadReportUID] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]
GO

ALTER TABLE [dbo].[PayloadReport] ADD  CONSTRAINT [DF_PayloadReport_PositionReportUID]  DEFAULT (NEWID()) FOR [PayloadReportUID]
GO
ALTER PROCEDURE [dbo].[PayloadRawProcess]
      @ReceivedAtUtc AS DATETIME,
      @DeviceID AS NVARCHAR(32),
      @DeviceEui AS NVARCHAR(32),
      @ApplicationID AS NVARCHAR(32),
      @IsRetry AS Bit,
      @IsConfirmed AS BIT,
      @Port AS SMALLINT,
      @Payload AS NVARCHAR(128)
AS
BEGIN
      SET NOCOUNT ON;

      INSERT INTO [dbo].[PayloadReport]
         ([PositionReportUID]
         .[ReceivedAtUtc]
         ,[DeviceID]
         ,[DeviceEui]
         ,[ApplicationID]
         ,[IsConfirmed]
         ,[IsRetry]
         ,[Port]
         ,[Payload])
     VALUES(@ReceivedAtUtc,
         @DeviceID,
         @DeviceEui,
         @ApplicationID,
         @IsConfirmed,
         @IsRetry,
         @port,
         @Payload)
END
Raw payload data displayed in SQL Server Management Studio(SSMS)

Initially the application just used Console.Writeline for logging, then I added Log4Net because it would be useful to persist information about failures and so I could copy n paste parameter lists to the appSettings.json file.

To make the application more robust adding a retries with the Enterprise Library Transient Fault Handling and Configuration blocks or Polly on the Dapper Execute would be a good idea. It also would take much work to get the application to run in Microsoft Azure as a “headless” webapp.

Dapper supports a number of database platforms so in theory this application (with a little bit of effort) should be platform portable.

The Things Network V3 MQTT Client Uplink

In preparation for the impending(delayed) deployment of The Things Network(TTN) V3 I wanted to build a new Message Queue Telemetry Transport(MQTT) integration. As per my usual approach I build a .Net Core console application which sends and receives messages

The console application uses MQTTNet to connect to TTN. It subscribes to to the TTN application device uplink topic (did try subscribing to the uplink messages for all the devices in the application, and the downlink message scheduled, sent and acknowledged topics.

I tried a lot of topic formats with and without wildcards to see which worked best

//downlinkTopic = $"v3/{applicationId}/devices/{deviceId}/down/push";
//uplinkTopic = $"v3/+";
//uplinkTopic = $"v3/#";
//uplinkTopic = $"v3/{applicationId}/+"; //exception
//uplinkTopic = $"v3/{applicationId}/*";
//uplinkTopic = $"v3/devices/+";
//uplinkTopic = $"v3/devices/#";
//uplinkTopic = $"v3/devices/+/events/+";
//uplinkTopic = $"v3/{applicationId}/devices/+/events/+";
//uplinkTopic = $"v3/{applicationId}/devices/{deviceId}/events/update";
//uplinkTopic = $"v3/{applicationId}/devices/{deviceId}/events/create";
//uplinkTopic = $"v3/{applicationId}/devices/{deviceId}/events/delete";
//uplinkTopic = $"v3/{applicationId}/devices/+/events/+";
//uplinkTopic = $"v3/{applicationId}/devices/+/events/create";
//uplinkTopic = $"v3/{applicationId}/devices/+/events/update";
//uplinkTopic = $"v3/{applicationId}/devices/+/events/delete";
//uplinkTopic = $"v3/{applicationId}/devices/+/events/+";
//uplinkTopic = $"v3/{applicationId}/devices/{deviceId}/up";

string downlinkTopic = $"v3/{applicationId}/devices/{deviceId}/down/push";
string downlinkQueuedTopic = $"v3/{applicationId}/devices/{deviceId}/down/queued";
string downlinkSentTopic = $"v3/{applicationId}/devices/{deviceId}/down/sent";
string downlinkAckTopic = $"v3/{applicationId}/devices/{ deviceId}/down/ack";
string downlinkNakTopic = $"v3/{applicationId}/devices/{ deviceId}/down/nack";
string downlinkFailedTopic = $"v3/{applicationId}/devices/{deviceId}/down/sent";

I generated new classes from the ones provided in the documentation then added any obvious missing fields and fine tuned the data types by delving into the TTN V3 GO code.

The new messages payloads have significant differences to the V2 ones. I have refactored the generated classes to reduce the duplication of code and fix up datatypes e.g. int32 vs. ulong where JSON2Charp couldn’t infer the size of the number.

namespace devMobile.TheThingsNetwork.Models
{
   public class ApplicationIds
   {
      public string application_id { get; set; }
   }

   public class EndDeviceIds
   {
      public string device_id { get; set; }
      public ApplicationIds application_ids { get; set; }
      public string dev_eui { get; set; }
      public string join_eui { get; set; }
      public string dev_addr { get; set; }
   }
}

I wonder about the naming of the applicationIds class as it appears that it could only ever contain single applicationId.

I installed the tooling for GO support into Visual Studio Code and went looking for the uplink message definition which I think is in messages.pb.go (still learning go and how the TTN GO source is structured).

type ApplicationUplink struct {
	// Join Server issued identifier for the session keys used by this uplink.
	SessionKeyID []byte `protobuf:"bytes,1,opt,name=session_key_id,json=sessionKeyId,proto3" json:"session_key_id,omitempty"`
	FPort        uint32 `protobuf:"varint,2,opt,name=f_port,json=fPort,proto3" json:"f_port,omitempty"`
	FCnt         uint32 `protobuf:"varint,3,opt,name=f_cnt,json=fCnt,proto3" json:"f_cnt,omitempty"`
	// The frame payload of the uplink message.
	// The payload is still encrypted if the skip_payload_crypto field of the EndDevice
	// is true, which is indicated by the presence of the app_s_key field.
	FRMPayload []byte `protobuf:"bytes,4,opt,name=frm_payload,json=frmPayload,proto3" json:"frm_payload,omitempty"`
	// The decoded frame payload of the uplink message.
	// This field is set by the message processor that is configured for the end device (see formatters) or application (see default_formatters).
	DecodedPayload *types.Struct `protobuf:"bytes,5,opt,name=decoded_payload,json=decodedPayload,proto3" json:"decoded_payload,omitempty"`
	// Warnings generated by the message processor while decoding the frm_payload.
	DecodedPayloadWarnings []string `protobuf:"bytes,12,rep,name=decoded_payload_warnings,json=decodedPayloadWarnings,proto3" json:"decoded_payload_warnings,omitempty"`
	// A list of metadata for each antenna of each gateway that received this message.
	RxMetadata []*RxMetadata `protobuf:"bytes,6,rep,name=rx_metadata,json=rxMetadata,proto3" json:"rx_metadata,omitempty"`
	// Settings for the transmission.
	Settings TxSettings `protobuf:"bytes,7,opt,name=settings,proto3" json:"settings"`
	// Server time when the Network Server received the message.
	ReceivedAt time.Time `protobuf:"bytes,8,opt,name=received_at,json=receivedAt,proto3,stdtime" json:"received_at"`
	// The AppSKey of the current session.
	// This field is only present if the skip_payload_crypto field of the EndDevice
	// is true.
	// Can be used to decrypt uplink payloads and encrypt downlink payloads.
	AppSKey *KeyEnvelope `protobuf:"bytes,9,opt,name=app_s_key,json=appSKey,proto3" json:"app_s_key,omitempty"`
	// The last AFCntDown of the current session.
	// This field is only present if the skip_payload_crypto field of the EndDevice
	// is true.
	// Can be used with app_s_key to encrypt downlink payloads.
	LastAFCntDown uint32 `protobuf:"varint,10,opt,name=last_a_f_cnt_down,json=lastAFCntDown,proto3" json:"last_a_f_cnt_down,omitempty"`
	Confirmed     bool   `protobuf:"varint,11,opt,name=confirmed,proto3" json:"confirmed,omitempty"`
	// Consumed airtime for the transmission of the uplink message. Calculated by Network Server using the RawPayload size and the transmission settings.
	ConsumedAirtime *time.Duration `protobuf:"bytes,13,opt,name=consumed_airtime,json=consumedAirtime,proto3,stdduration" json:"consumed_airtime,omitempty"`
	// End device location metadata, set by the Application Server while handling the message.
	Locations            map[string]*Location `protobuf:"bytes,14,rep,name=locations,proto3" json:"locations,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
	XXX_NoUnkeyedLiteral struct{}             `json:"-"`
	XXX_sizecache        int32                `json:"-"`
}

I also need to deploy some more gateways and devices to check that I haven’t missed any fields available in more realistic environments.

TTN V3 MQTT Console client

In the TTN Device data tab I could see messages being sent, to and received from from the simulated device.

TTN V3 MQTT Device Live Data

The next step is to get downlink messages working, then connect up a couple of gateways and trial with some real devices.

The Things Network V2 MQTT Client

Another option for I had been looking at for connecting an Azure IoT Hub and The Things Network(TTN) was a Message Queue Telemetry Transport(MQTT) integration.

To trial this approach I build a .Net Core console application which sent message to and received messages from an application running on a GHI Electronics TinyCLRV2 Fezduino with RakWireless Wisduino Evaluation Board(EVB).

The console application uses MQTTNet to connect to TTN. It subscribes to to the TTN application device uplink topic (did try subscribing to the uplink messages for all the devices in the application but this was to noisy), and the downlink message scheduled, sent and acknowledged topics. To send messages to the device I published them on the device downlink topic.

//string uplinktopic = $"{applicationId}/devices/+/up";
string uplinktopic = $"{applicationId}/devices/{deviceId}/up";
await mqttClient.SubscribeAsync(uplinktopic, MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce);

string downlinkAcktopic = $"{applicationId}/devices/{deviceId}/events/down/acks";
await mqttClient.SubscribeAsync(downlinkAcktopic, MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce);

string downlinkScheduledtopic = $"{applicationId}/devices/{deviceId}/events/down/scheduled";
await mqttClient.SubscribeAsync(downlinkScheduledtopic, MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce);

string downlinkSenttopic = $"{applicationId}/devices/{deviceId}/events/down/sent";
await mqttClient.SubscribeAsync(downlinkSenttopic, MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce);

string downlinktopic = $"{applicationId}/devices/{deviceId}/down";

I used the classes from one of my earlier blog posts to deserialise the uplink message payload so I could display a subset of the fields.

MQTTNet based .Net Core console client
Things Network Device Data view

In the TTN Device data tab I could see messages being sent, to and received from from the device.

Visual Studio 2019 Tiny CLR debugger Output

In the Visual Studio 2019 debugger output window I could see messages being sent and received by the Fezduino.

Malformed TTN downlink payload

I had some problems with the downlink messages silently failing as the TTN sample payload JSON was malformed and I had copied it without noticing.

I have a working TTN HTTP Integration (uplink messages only) but have been exploring alternatives using TTN MQTT and Azure IoT Hub AMQP clients.

The next step is to build an Azure IoT Hub client (using native AMQP) then join them together.

Azure IoT Hub MQTT/AMQP oddness

This is a long post which covers some oddness I noticed when changing the protocol used by an Azure IoT Hub client from Message Queuing Telemetry Transport(MQTT) to Advanced Message Queuing Protocol (AMQP). I want to build a console application to test the pooling of AMQP connections so I started with an MQTT client written for another post.

class Program
{
   private static string payload;

   static async Task Main(string[] args)
   {
      string filename;
      string azureIoTHubconnectionString;
      DeviceClient azureIoTHubClient;

      if (args.Length != 2)
      {
         Console.WriteLine("[JOSN file] [AzureIoTHubConnectionString]");
         Console.WriteLine("Press <enter> to exit");
         Console.ReadLine();
         return;
      }

      filename = args[0];
      azureIoTHubconnectionString = args[1];

      try
      {
         payload = File.ReadAllText(filename);

         // Open up the connection
         azureIoTHubClient = DeviceClient.CreateFromConnectionString(azureIoTHubconnectionString, TransportType.Mqtt);
         //azureIoTHubClient = DeviceClient.CreateFromConnectionString(azureIoTHubconnectionString, TransportType.Mqtt_Tcp_Only);
         //azureIoTHubClient = DeviceClient.CreateFromConnectionString(azureIoTHubconnectionString, TransportType.Mqtt_WebSocket_Only);

         await azureIoTHubClient.OpenAsync();

         await azureIoTHubClient.SetMethodDefaultHandlerAsync(MethodCallbackDefault, null);

         Timer MessageSender = new Timer(TimerCallback, azureIoTHubClient, new TimeSpan(0, 0, 10), new TimeSpan(0, 0, 10));


         Console.WriteLine("Press <enter> to exit");
         Console.ReadLine();
      }
      catch (Exception ex)
      {
         Console.WriteLine(ex.Message);
         Console.WriteLine("Press <enter> to exit");
         Console.ReadLine();
      }
   }

   public static async void TimerCallback(object state)
   {
      DeviceClient azureIoTHubClient = (DeviceClient)state;

      try
      {
         // I know having the payload as a global is a bit nasty but this is a demo..
         using (Message message = new Message(Encoding.ASCII.GetBytes(JsonConvert.SerializeObject(payload))))
         {
            Console.WriteLine(" {0:HH:mm:ss} AzureIoTHubDeviceClient SendEventAsync start", DateTime.UtcNow);
            await azureIoTHubClient.SendEventAsync(message);
            Console.WriteLine(" {0:HH:mm:ss} AzureIoTHubDeviceClient SendEventAsync finish", DateTime.UtcNow);
         }
      }
      catch (Exception ex)
      {
         Console.WriteLine(ex.Message);
      }
   }

   private static async Task<MethodResponse> MethodCallbackDefault(MethodRequest methodRequest, object userContext)
   {
      Console.WriteLine($"Default handler method {methodRequest.Name} was called.");

      return new MethodResponse(200);
   }
}

I configured an Azure IoT hub then used Azure IoT explorer to create a device and get the connections string for my application. After fixing up the application’s command line parameters I could see the timer code was successfully sending telemetry messages to my Azure IoT Hub. I also explored the different MQTT connections options TransportType.Mqtt, TransportType.Mqtt_Tcp_Only, and TransportType.Mqtt_WebSocket_Only which worked as expected.

MQTT Console application displaying sent telemetry
Azure IoT Hub displaying received telemetry

I could also initiate Direct Method calls to my console application from Azure IoT explorer.

Azure IoT Explorer initiating a Direct Method
MQTT console application displaying direct method call.

I then changed the protocol to AMQP

class Program
{
   private static string payload;

   static async Task Main(string[] args)
   {
      string filename;
      string azureIoTHubconnectionString;
      DeviceClient azureIoTHubClient;
      Timer MessageSender;

      if (args.Length != 2)
      {
         Console.WriteLine("[JOSN file] [AzureIoTHubConnectionString]");
         Console.WriteLine("Press <enter> to exit");
         Console.ReadLine();
         return;
      }

      filename = args[0];
      azureIoTHubconnectionString = args[1];

      try
      {
         payload = File.ReadAllText(filename);

         // Open up the connection
         azureIoTHubClient = DeviceClient.CreateFromConnectionString(azureIoTHubconnectionString, TransportType.Amqp);
         //azureIoTHubClient = DeviceClient.CreateFromConnectionString(azureIoTHubconnectionString, TransportType.Amqp_Tcp_Only);
         //azureIoTHubClient = DeviceClient.CreateFromConnectionString(azureIoTHubconnectionString, TransportType.Amqp_WebSocket_Only);

         await azureIoTHubClient.OpenAsync();

         await azureIoTHubClient.SetMethodDefaultHandlerAsync(MethodCallbackDefault, null);

         //MessageSender = new Timer(TimerCallbackAsync, azureIoTHubClient, new TimeSpan(0, 0, 10), new TimeSpan(0, 0, 10));
         MessageSender = new Timer(TimerCallbackSync, azureIoTHubClient, new TimeSpan(0, 0, 10), new TimeSpan(0, 0, 10));

#if MESSAGE_PUMP
         Console.WriteLine("Press any key to exit");
         while (!Console.KeyAvailable)
         {
            await Task.Delay(100);
         }
#else
         Console.WriteLine("Press <enter> to exit");
         Console.ReadLine();
#endif
      }
      catch (Exception ex)
      {
         Console.WriteLine(ex.Message);
         Console.WriteLine("Press <enter> to exit");
         Console.ReadLine();
      }
   }

   public static async void TimerCallbackAsync(object state)
   {
      DeviceClient azureIoTHubClient = (DeviceClient)state;

      try
      {
         // I know having the payload as a global is a bit nasty but this is a demo..
         using (Message message = new Message(Encoding.ASCII.GetBytes(JsonConvert.SerializeObject(payload))))
         {
            Console.WriteLine(" {0:HH:mm:ss} AzureIoTHubDeviceClient SendEventAsync start", DateTime.UtcNow);
            await azureIoTHubClient.SendEventAsync(message);
            Console.WriteLine(" {0:HH:mm:ss} AzureIoTHubDeviceClient SendEventAsync finish", DateTime.UtcNow);
         }
      }
      catch (Exception ex)
      {
         Console.WriteLine(ex.Message);
      }
   }

   public static void TimerCallbackSync(object state)
   {
      DeviceClient azureIoTHubClient = (DeviceClient)state;

      try
      {
         // I know having the payload as a global is a bit nasty but this is a demo..
         using (Message message = new Message(Encoding.ASCII.GetBytes(JsonConvert.SerializeObject(payload))))
         {
            Console.WriteLine(" {0:HH:mm:ss} AzureIoTHubDeviceClient SendEventAsync start", DateTime.UtcNow);
            azureIoTHubClient.SendEventAsync(message).GetAwaiter();
            Console.WriteLine(" {0:HH:mm:ss} AzureIoTHubDeviceClient SendEventAsync finish", DateTime.UtcNow);
         }
      }
      catch (Exception ex)
      {
         Console.WriteLine(ex.Message);
      }
   }


   private static async Task<MethodResponse> MethodCallbackDefault(MethodRequest methodRequest, object userContext)
   {
      Console.WriteLine($"Default handler method {methodRequest.Name} was called.");

      return new MethodResponse(200);
   }
}

In the first version of my console application I could see the SendEventAsync method was getting called but was not returning

AMQP Console application displaying sent telemetry failure

Even though the SendEventAsync call was not returning the telemetry messages were making it to my Azure IoT Hub.

Azure IoT Hub displaying AMQP telemetry

When I tried to initiate a Direct Method call from Azure IoT Explorer it failed after a while with a timeout.

Azure IoT Explorer initiating a Direct Method

The first successful approach I tried was to change the Console.Readline to a “message pump” (flashbacks to Win32 API programming).

Console.WriteLine("Press any key to exit");
while (!Console.KeyAvailable)
{
   await Task.Delay(100);
}

After some more experimentation I found that changing the timer method from asynchronous to synchronous also worked.

public static void TimerCallbackSync(object state)
{
   DeviceClient azureIoTHubClient = (DeviceClient)state;

   try
   {
      // I know having the payload as a global is a bit nasty but this is a demo..
      using (Message message = new Message(Encoding.ASCII.GetBytes(JsonConvert.SerializeObject(payload))))
      {
         Console.WriteLine(" {0:HH:mm:ss} AzureIoTHubDeviceClient SendEventAsync start", DateTime.UtcNow);
         azureIoTHubClient.SendEventAsync(message).GetAwaiter();
         Console.WriteLine(" {0:HH:mm:ss} AzureIoTHubDeviceClient SendEventAsync finish", DateTime.UtcNow);
      }
   }
   catch (Exception ex)
   {
      Console.WriteLine(ex.Message);
   }
}

I also had to change the method declaration and modify the SendEventAsync call to use a GetAwaiter.

AMQP Console application displaying sent telemetry
Azure IoT Hub displaying received telemetry
Azure IoT Explorer initiating a Direct Method
MQTT console application displaying direct method call.

It took a while to figure out enough about what was going on so I could do a search with the right keywords (DeviceClient AMQP async await SendEventAsync) to confirm my suspicion that MQTT and AMQP clients did behave differently.

For anyone who reads this post, I think this Github issue about task handling and blocking calls is most probably the answer (October 2020).

The Things Network Client Part2

MQTT connectivity

In a previous post I couldn’t add a TTN V3EndDevice to an application (I’m going try again soon) using the REST API so I figured would try out the MQTT API. My aim was to get notifications of when a Device was created/updated/deleted in an Application.

After some tinkering with the format of MQTT usernames and passwords I can connect to my V3 instance and successfully subscribe to topics. But, currently(Aug 2020) I’m not receiving any messages when I create, update or delete a Device. I have tried different Quality of Service QoS settings etc. and I wonder if my topic names aren’t quite right.

.Net Core MQTT Client

I wanted notifications so I could “automagically” provision a device in an Azure IoT Hub (maybe with a tag indicating it’s an “orphan” so it is discoverable) or in Azure IoT Central when a Device was created in TTN.

This looked like a good approach as my Azure IoT Hub applications have other devices which are not connected via LoRaWAN, and there are many specialised LoRaWAN settings which would need to be validated, stored etc. by my software. (maybe TTN device templates would make this easier). The TTN software is pretty good at managing devices so why would I “re-invent the wheel”.

I built a “nasty” console application using MQTTNet so that I could figure out how to connect to my V3 setup and subscribe to topics.

namespace devMobile.TheThingsNetwork.MqttClient
{
   using System;
   using System.Diagnostics;
   using System.Threading;
   using System.Threading.Tasks;

   using MQTTnet;
   using MQTTnet.Client;
   using MQTTnet.Client.Disconnecting;
   using MQTTnet.Client.Options;
   using MQTTnet.Client.Receiving;
   using MQTTnet.Client.Subscribing;

   class Program
   {
      private static IMqttClient mqttClient = null;
      private static IMqttClientOptions mqttOptions = null;
      private static string server;
      private static string username;
      private static string password;
      private static string clientId;

      static async Task Main(string[] args)
      {
         MqttFactory factory = new MqttFactory();
         mqttClient = factory.CreateMqttClient();

         if (args.Length != 4)
         {
            Console.WriteLine("[MQTT Server] [UserName] [Password] [ClientID]");
            Console.WriteLine("Press <enter> to exit");
            Console.ReadLine();
            return;
         }

         server = args[0];
         username = args[1];
         password = args[2];
         clientId = args[3];

         mqttOptions = new MqttClientOptionsBuilder()
            .WithTcpServer(server)
            .WithCredentials(username, password)
            .WithClientId(clientId)
            .WithTls()
            .Build();

         mqttClient.UseDisconnectedHandler(new MqttClientDisconnectedHandlerDelegate(e => MqttClient_Disconnected(e)));
         mqttClient.UseApplicationMessageReceivedHandler(new MqttApplicationMessageReceivedHandlerDelegate(e => MqttClient_ApplicationMessageReceived(e)));
         await mqttClient.ConnectAsync(mqttOptions);

         // Different topics I have tried
         string topic;
         topic = $"v3/{username}/devices/{clientId}/events/update";
         //topic = $"v3/{username}/devices/{clientId}/events/create";
         //topic = $"v3/{username}/devices/{clientId}/events/delete";
         //topic = $"v3/{username}/devices/+/events/+";
         //topic = $"v3/{username}/devices/+/events/create";
         //topic = $"v3/{username}/devices/+/events/update";
         //topic = $"v3/{username}/devices/+/events/delete";
         //topic = $"v3/{username}/devices/+/events/+";

         MqttClientSubscribeResult result;

         // Different QoS I have tried
         //result = await mqttClient.SubscribeAsync(topic, MQTTnet.Protocol.MqttQualityOfServiceLevel.AtMostOnce);
         result = await mqttClient.SubscribeAsync(topic, MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce);
         //result = await mqttClient.SubscribeAsync(topic, MQTTnet.Protocol.MqttQualityOfServiceLevel.ExactlyOnce);

         Console.WriteLine("SubscribeAsync Result");
         foreach ( var resultItem in result.Items)
         {
            Console.WriteLine($"ResultCode:{resultItem.ResultCode} TopicFilter:{resultItem.TopicFilter}");
         }                     

         Console.WriteLine("Press any key to temrminate wait");
         while (!Console.KeyAvailable)
         {
            Console.Write(".");

            Thread.Sleep(30100);
         }

         Console.WriteLine("Press <enter> to exit");
         Console.ReadLine();
         return;
      }

      private static void MqttClient_ApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e)
      {
         Console.WriteLine($"ClientId:{e.ClientId} Topic:{e.ApplicationMessage.Topic} Payload:{e.ApplicationMessage.ConvertPayloadToString()}");
      }

      private static async void MqttClient_Disconnected(MqttClientDisconnectedEventArgs e)
      {
         Debug.WriteLine("Disconnected");
         await Task.Delay(TimeSpan.FromSeconds(5));

         try
         {
            await mqttClient.ConnectAsync(mqttOptions);
         }
         catch (Exception ex)
         {
            Debug.WriteLine("Reconnect failed {0}", ex.Message);
         }
      }
   }
}

I’m going to post some questions on the TTN forums and Slack community to see if what I’m trying to do is supported/possible.

I got some helpful responses on the TTN forums and it looks like what I want todo is not supported by the V3 stack (Aug2020) and I will have to use gRPC.

AllThingsTalk with MQTTnet

As I’m testing my Message Queue Telemetry Transport(MQTT) LoRa gateway I’m building a proof of concept(PoC) .Net core console application for each IoT platform I would like to support.

This PoC was to confirm that I could connect to the AllThingsTalk MQTT API then format topics and payloads correctly.

MQTTNet Console Client

The AllThingsTalk MQTT broker, username, and device ID are required command line parameters.

namespace devmobile.Mqtt.TestClient.AllThingsTalk
{
	using System;
	using System.Diagnostics;
	using System.Threading;
	using System.Threading.Tasks;

	using MQTTnet;
	using MQTTnet.Client;
	using MQTTnet.Client.Disconnecting;
	using MQTTnet.Client.Options;
	using MQTTnet.Client.Receiving;

	using Newtonsoft.Json;
	using Newtonsoft.Json.Linq;

	class Program
	{
		private static IMqttClient mqttClient = null;
		private static IMqttClientOptions mqttOptions = null;
		private static string server;
		private static string username;
		private static string deviceID;

		static void Main(string[] args)
		{
			MqttFactory factory = new MqttFactory();
			mqttClient = factory.CreateMqttClient();

			if ((args.Length != 3))
			{
				Console.WriteLine("[MQTT Server] [UserName] [ClientID]");
				Console.WriteLine("Press <enter> to exit");
				Console.ReadLine();
				return;
			}

			server = args[0];
			username = args[1];
			deviceID = args[2];

			Console.WriteLine($"MQTT Server:{server} DeviceID:{deviceID}");

			// AllThingsTalk formatted device state update topic
			string topicD2C = $"device/{deviceID}/state";

			mqttOptions = new MqttClientOptionsBuilder()
				.WithTcpServer(server)
				.WithCredentials(username, "HighlySecurePassword")
				.WithClientId(deviceID)
				.WithTls()
				.Build();

			mqttClient.UseDisconnectedHandler(new MqttClientDisconnectedHandlerDelegate(e => MqttClient_Disconnected(e)));
			mqttClient.UseApplicationMessageReceivedHandler(new MqttApplicationMessageReceivedHandlerDelegate(e => MqttClient_ApplicationMessageReceived(e)));
			mqttClient.ConnectAsync(mqttOptions).Wait();

			// AllThingsTalk formatted device command with wildcard topic
			string topicC2D = $"device/{deviceID}/asset/+/command";

			mqttClient.SubscribeAsync(topicC2D, MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce).GetAwaiter().GetResult();

			while (true)
			{
				JObject payloadJObject = new JObject();

				double temperature = 22.0 + (DateTime.UtcNow.Millisecond / 1000.0);
				temperature = Math.Round( temperature, 1 );
				double humidity = 50 + (DateTime.UtcNow.Millisecond / 100.0);
				humidity = Math.Round(humidity, 1);

				JObject temperatureJObject = new JObject
				{
					{ "value", temperature }
				};
				payloadJObject.Add("Temperature", temperatureJObject);

				JObject humidityJObject = new JObject
				{
					{ "value", humidity }
				};
				payloadJObject.Add("Humidity", humidityJObject);

				string payload = JsonConvert.SerializeObject(payloadJObject);
				Console.WriteLine($"Topic:{topicD2C} Payload:{payload}");

				var message = new MqttApplicationMessageBuilder()
					.WithTopic(topicD2C)
					.WithPayload(payload)
					.WithAtMostOnceQoS()
//					.WithAtLeastOnceQoS()
					.Build();

				Console.WriteLine("PublishAsync start");
				mqttClient.PublishAsync(message).Wait();
				Console.WriteLine("PublishAsync finish");

				Thread.Sleep(15100);
			}
		}

		private static void MqttClient_ApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e)
		{
			Console.WriteLine($"ClientId:{e.ClientId} Topic:{e.ApplicationMessage.Topic} Payload:{e.ApplicationMessage.ConvertPayloadToString()}");
		}

		private static async void MqttClient_Disconnected(MqttClientDisconnectedEventArgs e)
		{
			Debug.WriteLine("Disconnected");
			await Task.Delay(TimeSpan.FromSeconds(5));

			try
			{
				await mqttClient.ConnectAsync(mqttOptions);
			}
			catch (Exception ex)
			{
				Debug.WriteLine("Reconnect failed {0}", ex.Message);
			}
		}
	}

The AllThingsTalk device configuration was relatively easy but I need to investigate “Gateway” functionality and configuration further.

Configuring an Asset
Configuration a watchdog to check for sensor data
Sending a command to an actuator
Processing a command on the client

The ability to look at message payloads in the Debug tab would be very helpful when working out why a payload was not being processed as expected.

Asset debug information

Overall the AllThingsTalk configuration went fairly smoothly, though I need to investigate the “Gateway” configuration and functionality further. The way that assets are name by the system could make support in my MQTT Gateway more complex.

Azure IoT Hub, Event Grid to Application Insights

For a second Proof of Concept (PoC) I wanted to upload sensor data from my MQTT LoRa Telemetry Field Gateway to an Azure IoT Hub, then using Azure EventGrid subscribe to the stream of telemetry data events, logging the payloads in Azure Application Insights (the aim was minimal code so no database etc.).

The first step was to create and deploy a simple Azure Function for unpacking the telemetry event payload.

Azure IoT Hub Azure Function Handler

Then wire the Azure function to the Microsoft.Devices.Device.Telemetry Event Type

Azure IoT Hub Event Metrics

On the Windows 10 IoT Core device in the Event Tracing Window(ETW) logging on the device I could see LoRa messages arriving and being unpacked.

Windows 10 Device ETW showing message payload

Then in Application Insights after some mucking around with code I could see in a series of Trace statements the event payload as it was unpacked.

{"id":"29108ebf-e5d5-7b95-e739-7d9048209d53","topic":"/SUBSCRIPTIONS/12345678-9012-3456-7890-123456789012/RESOURCEGROUPS/AZUREIOTHUBEVENTGRIDAZUREFUNCTION/PROVIDERS/MICROSOFT.DEVICES/IOTHUBS/FIELDGATEWAYHUB",
"subject":"devices/MQTTNetClient",
"eventType":"Microsoft.Devices.DeviceTelemetry",
"eventTime":"2020-02-01T04:30:51.427Z",
"data":
{
 "properties":{},
"systemProperties":{"iothub-connection-device-id":"MQTTNetClient","iothub-connection-auth-method":"{\"scope\":\"device\",\"type\":\"sas\",\"issuer\":\"iothub\",\"acceptingIpFilterRule\":null}",
"iothub-connection-auth-generation-id":"637149890997219611",
"iothub-enqueuedtime":"2020-02-01T04:30:51.427Z",
"iothub-message-source":"Telemetry"
},
"body":"eyJPZmZpY2VUZW1wZXJhdHVyZSI6IjIyLjUiLCJPZmZpY2VIdW1pZGl0eSI6IjkyIn0="
},
"dataVersion":"",
"metadataVersion":"1"
}
Application Insights logging with message unpacking
Application Insights logging message payload

Then in the last log entry the decoded message payload

/*
    Copyright ® 2020 Feb devMobile Software, All Rights Reserved
 
    MIT License

    Permission is hereby granted, free of charge, to any person obtaining a copy
    of this software and associated documentation files (the "Software"), to deal
    in the Software without restriction, including without limitation the rights
    to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
    copies of the Software, and to permit persons to whom the Software is
    furnished to do so, subject to the following conditions:

    The above copyright notice and this permission notice shall be included in all
    copies or substantial portions of the Software.

    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
    AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
    OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
    SOFTWARE

    Default URL for triggering event grid function in the local environment.
    http://localhost:7071/runtime/webhooks/EventGrid?functionName=functionname
 */
namespace EventGridProcessorAzureIotHub
{
   using System;
   using System.IO;
   using System.Reflection;

   using Microsoft.Azure.WebJobs;
   using Microsoft.Azure.EventGrid.Models;
   using Microsoft.Azure.WebJobs.Extensions.EventGrid;

   using log4net;
   using log4net.Config;
   using Newtonsoft.Json;

   public static class Telemetry
    {
        [FunctionName("Telemetry")]
        public static void Run([EventGridTrigger]Microsoft.Azure.EventGrid.Models.EventGridEvent eventGridEvent, ExecutionContext executionContext )//, TelemetryClient telemetryClient)
        {
			ILog log = log4net.LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);

		   var logRepository = LogManager.GetRepository(Assembly.GetEntryAssembly());
			XmlConfigurator.Configure(logRepository, new FileInfo(Path.Combine(executionContext.FunctionAppDirectory, "log4net.config")));

         log.Info($"eventGridEvent.Data-{eventGridEvent}");

         log.Info($"eventGridEvent.Data.ToString()-{eventGridEvent.Data.ToString()}");

        IotHubDeviceTelemetryEventData iOThubDeviceTelemetryEventData = (IotHubDeviceTelemetryEventData)JsonConvert.DeserializeObject(eventGridEvent.Data.ToString(), typeof(IotHubDeviceTelemetryEventData));

         log.Info($"iOThubDeviceTelemetryEventData.Body.ToString()-{iOThubDeviceTelemetryEventData.Body.ToString()}");

         byte[] base64EncodedBytes = System.Convert.FromBase64String(iOThubDeviceTelemetryEventData.Body.ToString());

         log.Info($"System.Text.Encoding.UTF8.GetString(-{System.Text.Encoding.UTF8.GetString(base64EncodedBytes)}");
      }
	}
}

Overall it took roughly half a page of code (mainly generated by a tool) to unpack and log the contents of an Azure IoT Hub EventGrid payload to Application Insights.

Azure IoT Hub MQTT+TLS Overheads

An Azure IoT Hub has a series of metrics and one I had been using was “Total Device Data Usage”. To better understand what it was displaying I modified my Azure IoT Hub MQTT Test Application to display the size of the JOSN payload.

MQTTNet based client displaying payload length

The size of the packets sent and the total device data appeared to map pretty well but I was also interested in the Transport Layer Security (TLS) and Messaging Queuing Telemetry Transport (MQTT) overheads.

Azure IoT Hub Metrics

To get an idea of the overheads I fired up LiveTcpUdpWatch by Nirsoft and noted down the traffic measure on port 8883.

Conenction LiveTcpUdpWatch main screen

Launching the MQTTNet client sending every 30 seconds resulted in traffic like this

4179b - Establishing connection
4284b - 105b
4317b - 33b
4386b - 69b
4455b - 69b
4524b - 69b
4593b - 69b
4662b - 69b
4731b - 69b
4800b - 69b
4869b - 69b
4938b - 69b
5007b - 69b
5076b - 69b
5145b - 69b
5214b - 69b
5288b - 69b

So it looks like my very rough numbers are close to the numbers discussed in the above article. I need to explore the impact of keep-alive messages and other background operations.

Bosch IoT Suite with MQTTnet

As I’m testing my Message Queue Telemetry Transport(MQTT) LoRa gateway I’m building a proof of concept(PoC) .Net core console application for each IoT platform I would like to support.

This PoC was to confirm that I could connect to the Bosch IoT Suite MQTT API then format topics and payloads correctly.

MQTTNet Console Client

The Bosch IoT Hub MQTT broker, username, password, and clientID are the required command line parameters. For this PoC I ran out of time to get cloud to device (C2D) messaging or any presentation functionality working.

/*
    Copyright ® 2019 December devMobile Software, All Rights Reserved
 
    MIT License

    Permission is hereby granted, free of charge, to any person obtaining a copy
    of this software and associated documentation files (the "Software"), to deal
    in the Software without restriction, including without limitation the rights
    to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
    copies of the Software, and to permit persons to whom the Software is
    furnished to do so, subject to the following conditions:

    The above copyright notice and this permission notice shall be included in all
    copies or substantial portions of the Software.

    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
    AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
    OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
    SOFTWARE

	 A quick and dirty test client to explore how BoschIoT Suite MQTT connectivity works
 */
namespace devMobile.Mqtt.TestClient.BoschIoTSuite
{
   using System;
   using System.Diagnostics;
   using System.Threading;
   using System.Threading.Tasks;

   using MQTTnet;
   using MQTTnet.Client;
   using MQTTnet.Client.Disconnecting;
   using MQTTnet.Client.Options;
   using MQTTnet.Client.Receiving;
   using Newtonsoft.Json;
   using Newtonsoft.Json.Linq;

   class Program
   {
      private static IMqttClient mqttClient = null;
      private static IMqttClientOptions mqttOptions = null;
      private static string server;
      private static string username;
      private static string password;
      private static string clientId;

      static void Main(string[] args)
      {
         MqttFactory factory = new MqttFactory();
         mqttClient = factory.CreateMqttClient();

         if (args.Length != 4) 
         {
            Console.WriteLine("[MQTT Server] [UserName] [Password] [ClientID]");
            Console.WriteLine("Press <enter> to exit");
            Console.ReadLine();
            return;
         }

         server = args[0];
         username = args[1];
         password = args[2];
         clientId = args[3];

         mqttOptions = new MqttClientOptionsBuilder()
            .WithTcpServer(server)
            .WithCredentials(username, password)
            .WithClientId(clientId)
            .WithTls()
            .Build();

         mqttClient.UseDisconnectedHandler(new MqttClientDisconnectedHandlerDelegate(e => MqttClient_Disconnected(e)));
         mqttClient.UseApplicationMessageReceivedHandler(new MqttApplicationMessageReceivedHandlerDelegate(e => MqttClient_ApplicationMessageReceived(e)));
         mqttClient.ConnectAsync(mqttOptions).Wait();

         string topicD2C = "telemetry";

         while (true)
         {
            JObject payloadJObject = new JObject();

            payloadJObject.Add("OfficeTemperature", "22." + DateTime.UtcNow.Millisecond.ToString());
            payloadJObject.Add("OfficeHumidity", (DateTime.UtcNow.Second + 40).ToString());

            string payload = JsonConvert.SerializeObject(payloadJObject);
            Console.WriteLine($"Topic:{topicD2C} Payload:{payload}");

            var message = new MqttApplicationMessageBuilder()
               .WithTopic(topicD2C)
               .WithPayload(payload)
               .WithAtMostOnceQoS() // Anthing but this causes timeout
               .WithRetainFlag()
            .Build();

            Console.WriteLine("PublishAsync start");
            mqttClient.PublishAsync(message).Wait();
            Console.WriteLine("PublishAsync finish");

            Thread.Sleep(30100);
         }
      }

      private static void MqttClient_ApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e)
      {
         Console.WriteLine($"ClientId:{e.ClientId} Topic:{e.ApplicationMessage.Topic} Payload:{e.ApplicationMessage.ConvertPayloadToString()}");
      }

      private static async void MqttClient_Disconnected(MqttClientDisconnectedEventArgs e)
      {
         Debug.WriteLine("Disconnected");
         await Task.Delay(TimeSpan.FromSeconds(5));

         try
         {
            await mqttClient.ConnectAsync(mqttOptions);
         }
         catch (Exception ex)
         {
            Debug.WriteLine("Reconnect failed {0}", ex.Message);
         }
      }
   }
}

The bosch IoT Hub device configuration was via a swagger API but I need to spend some more time figuring out how to configure the data analysis and presentation tools.

I adapted the steps in the IoT Hub Documentation for Sending Device Data using MQTT. The first step was to create a free Hub subscription.

IoT Hub Subscription

Then using the device registry swagger UI page to add a new device.

Device Registry Swagger UI

After a couple of failed attempts I worked out the format of the Authorisation details (I think the username format in the online documentation might be wrong)

Swagger UI Authorisation form
Querying the available devices

Of the 10+ SaaS IoT services I have setup the Bosch IoT Suite was the hardest to get working. I think this was becuase it is meant to be managed via the API from a in-house application. In a future post I’ll get configure the cloud to device messaging, plus analysis and display functionality.