AllThingsTalk with MQTTnet

As I’m testing my Message Queue Telemetry Transport(MQTT) LoRa gateway I’m building a proof of concept(PoC) .Net core console application for each IoT platform I would like to support.

This PoC was to confirm that I could connect to the AllThingsTalk MQTT API then format topics and payloads correctly.

MQTTNet Console Client

The AllThingsTalk MQTT broker, username, and device ID are required command line parameters.

namespace devmobile.Mqtt.TestClient.AllThingsTalk
{
	using System;
	using System.Diagnostics;
	using System.Threading;
	using System.Threading.Tasks;

	using MQTTnet;
	using MQTTnet.Client;
	using MQTTnet.Client.Disconnecting;
	using MQTTnet.Client.Options;
	using MQTTnet.Client.Receiving;

	using Newtonsoft.Json;
	using Newtonsoft.Json.Linq;

	class Program
	{
		private static IMqttClient mqttClient = null;
		private static IMqttClientOptions mqttOptions = null;
		private static string server;
		private static string username;
		private static string deviceID;

		static void Main(string[] args)
		{
			MqttFactory factory = new MqttFactory();
			mqttClient = factory.CreateMqttClient();

			if ((args.Length != 3))
			{
				Console.WriteLine("[MQTT Server] [UserName] [ClientID]");
				Console.WriteLine("Press <enter> to exit");
				Console.ReadLine();
				return;
			}

			server = args[0];
			username = args[1];
			deviceID = args[2];

			Console.WriteLine($"MQTT Server:{server} DeviceID:{deviceID}");

			// AllThingsTalk formatted device state update topic
			string topicD2C = $"device/{deviceID}/state";

			mqttOptions = new MqttClientOptionsBuilder()
				.WithTcpServer(server)
				.WithCredentials(username, "HighlySecurePassword")
				.WithClientId(deviceID)
				.WithTls()
				.Build();

			mqttClient.UseDisconnectedHandler(new MqttClientDisconnectedHandlerDelegate(e => MqttClient_Disconnected(e)));
			mqttClient.UseApplicationMessageReceivedHandler(new MqttApplicationMessageReceivedHandlerDelegate(e => MqttClient_ApplicationMessageReceived(e)));
			mqttClient.ConnectAsync(mqttOptions).Wait();

			// AllThingsTalk formatted device command with wildcard topic
			string topicC2D = $"device/{deviceID}/asset/+/command";

			mqttClient.SubscribeAsync(topicC2D, MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce).GetAwaiter().GetResult();

			while (true)
			{
				JObject payloadJObject = new JObject();

				double temperature = 22.0 + (DateTime.UtcNow.Millisecond / 1000.0);
				temperature = Math.Round( temperature, 1 );
				double humidity = 50 + (DateTime.UtcNow.Millisecond / 100.0);
				humidity = Math.Round(humidity, 1);

				JObject temperatureJObject = new JObject
				{
					{ "value", temperature }
				};
				payloadJObject.Add("Temperature", temperatureJObject);

				JObject humidityJObject = new JObject
				{
					{ "value", humidity }
				};
				payloadJObject.Add("Humidity", humidityJObject);

				string payload = JsonConvert.SerializeObject(payloadJObject);
				Console.WriteLine($"Topic:{topicD2C} Payload:{payload}");

				var message = new MqttApplicationMessageBuilder()
					.WithTopic(topicD2C)
					.WithPayload(payload)
					.WithAtMostOnceQoS()
//					.WithAtLeastOnceQoS()
					.Build();

				Console.WriteLine("PublishAsync start");
				mqttClient.PublishAsync(message).Wait();
				Console.WriteLine("PublishAsync finish");

				Thread.Sleep(15100);
			}
		}

		private static void MqttClient_ApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e)
		{
			Console.WriteLine($"ClientId:{e.ClientId} Topic:{e.ApplicationMessage.Topic} Payload:{e.ApplicationMessage.ConvertPayloadToString()}");
		}

		private static async void MqttClient_Disconnected(MqttClientDisconnectedEventArgs e)
		{
			Debug.WriteLine("Disconnected");
			await Task.Delay(TimeSpan.FromSeconds(5));

			try
			{
				await mqttClient.ConnectAsync(mqttOptions);
			}
			catch (Exception ex)
			{
				Debug.WriteLine("Reconnect failed {0}", ex.Message);
			}
		}
	}

The AllThingsTalk device configuration was relatively easy but I need to investigate “Gateway” functionality and configuration further.

Configuring an Asset
Configuration a watchdog to check for sensor data
Sending a command to an actuator
Processing a command on the client

The ability to look at message payloads in the Debug tab would be very helpful when working out why a payload was not being processed as expected.

Asset debug information

Overall the AllThingsTalk configuration went fairly smoothly, though I need to investigate the “Gateway” configuration and functionality further. The way that assets are name by the system could make support in my MQTT Gateway more complex.

Bosch IoT Suite with MQTTnet

As I’m testing my Message Queue Telemetry Transport(MQTT) LoRa gateway I’m building a proof of concept(PoC) .Net core console application for each IoT platform I would like to support.

This PoC was to confirm that I could connect to the Bosch IoT Suite MQTT API then format topics and payloads correctly.

MQTTNet Console Client

The Bosch IoT Hub MQTT broker, username, password, and clientID are the required command line parameters. For this PoC I ran out of time to get cloud to device (C2D) messaging or any presentation functionality working.

/*
    Copyright ® 2019 December devMobile Software, All Rights Reserved
 
    MIT License

    Permission is hereby granted, free of charge, to any person obtaining a copy
    of this software and associated documentation files (the "Software"), to deal
    in the Software without restriction, including without limitation the rights
    to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
    copies of the Software, and to permit persons to whom the Software is
    furnished to do so, subject to the following conditions:

    The above copyright notice and this permission notice shall be included in all
    copies or substantial portions of the Software.

    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
    AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
    OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
    SOFTWARE

	 A quick and dirty test client to explore how BoschIoT Suite MQTT connectivity works
 */
namespace devMobile.Mqtt.TestClient.BoschIoTSuite
{
   using System;
   using System.Diagnostics;
   using System.Threading;
   using System.Threading.Tasks;

   using MQTTnet;
   using MQTTnet.Client;
   using MQTTnet.Client.Disconnecting;
   using MQTTnet.Client.Options;
   using MQTTnet.Client.Receiving;
   using Newtonsoft.Json;
   using Newtonsoft.Json.Linq;

   class Program
   {
      private static IMqttClient mqttClient = null;
      private static IMqttClientOptions mqttOptions = null;
      private static string server;
      private static string username;
      private static string password;
      private static string clientId;

      static void Main(string[] args)
      {
         MqttFactory factory = new MqttFactory();
         mqttClient = factory.CreateMqttClient();

         if (args.Length != 4) 
         {
            Console.WriteLine("[MQTT Server] [UserName] [Password] [ClientID]");
            Console.WriteLine("Press <enter> to exit");
            Console.ReadLine();
            return;
         }

         server = args[0];
         username = args[1];
         password = args[2];
         clientId = args[3];

         mqttOptions = new MqttClientOptionsBuilder()
            .WithTcpServer(server)
            .WithCredentials(username, password)
            .WithClientId(clientId)
            .WithTls()
            .Build();

         mqttClient.UseDisconnectedHandler(new MqttClientDisconnectedHandlerDelegate(e => MqttClient_Disconnected(e)));
         mqttClient.UseApplicationMessageReceivedHandler(new MqttApplicationMessageReceivedHandlerDelegate(e => MqttClient_ApplicationMessageReceived(e)));
         mqttClient.ConnectAsync(mqttOptions).Wait();

         string topicD2C = "telemetry";

         while (true)
         {
            JObject payloadJObject = new JObject();

            payloadJObject.Add("OfficeTemperature", "22." + DateTime.UtcNow.Millisecond.ToString());
            payloadJObject.Add("OfficeHumidity", (DateTime.UtcNow.Second + 40).ToString());

            string payload = JsonConvert.SerializeObject(payloadJObject);
            Console.WriteLine($"Topic:{topicD2C} Payload:{payload}");

            var message = new MqttApplicationMessageBuilder()
               .WithTopic(topicD2C)
               .WithPayload(payload)
               .WithAtMostOnceQoS() // Anthing but this causes timeout
               .WithRetainFlag()
            .Build();

            Console.WriteLine("PublishAsync start");
            mqttClient.PublishAsync(message).Wait();
            Console.WriteLine("PublishAsync finish");

            Thread.Sleep(30100);
         }
      }

      private static void MqttClient_ApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e)
      {
         Console.WriteLine($"ClientId:{e.ClientId} Topic:{e.ApplicationMessage.Topic} Payload:{e.ApplicationMessage.ConvertPayloadToString()}");
      }

      private static async void MqttClient_Disconnected(MqttClientDisconnectedEventArgs e)
      {
         Debug.WriteLine("Disconnected");
         await Task.Delay(TimeSpan.FromSeconds(5));

         try
         {
            await mqttClient.ConnectAsync(mqttOptions);
         }
         catch (Exception ex)
         {
            Debug.WriteLine("Reconnect failed {0}", ex.Message);
         }
      }
   }
}

The bosch IoT Hub device configuration was via a swagger API but I need to spend some more time figuring out how to configure the data analysis and presentation tools.

I adapted the steps in the IoT Hub Documentation for Sending Device Data using MQTT. The first step was to create a free Hub subscription.

IoT Hub Subscription

Then using the device registry swagger UI page to add a new device.

Device Registry Swagger UI

After a couple of failed attempts I worked out the format of the Authorisation details (I think the username format in the online documentation might be wrong)

Swagger UI Authorisation form
Querying the available devices

Of the 10+ SaaS IoT services I have setup the Bosch IoT Suite was the hardest to get working. I think this was becuase it is meant to be managed via the API from a in-house application. In a future post I’ll get configure the cloud to device messaging, plus analysis and display functionality.

wolkabout with MQTTnet

As I’m testing my Message Queue Telemetry Transport(MQTT) LoRa gateway I’m building a proof of concept(PoC) .Net core console application for each IoT platform I would like to support.

This PoC was to confirm that I could connect to the walkabout MQTT API then format topics and payloads correctly.

MQTTNet Console Client

The walkabout MQTT broker, username, API Key, and device ID are the required command line parameters. For this PoC I couldn’t get cloud to device (C2D) or Transport Layer Security(TLS) working so will have to do some more research.

namespace devmobile.Mqtt.TestClient.WolkAbout
{
   using System;
   using System.Diagnostics;
   using System.Threading;
   using System.Threading.Tasks;

   using MQTTnet;
   using MQTTnet.Client;
   using MQTTnet.Client.Disconnecting;
   using MQTTnet.Client.Options;

   using Newtonsoft.Json;
   using Newtonsoft.Json.Linq;

   class Program
   {
      private static IMqttClient mqttClient = null;
      private static IMqttClientOptions mqttOptions = null;
      private static string server;
      private static string username;
      private static string apiKey;
      private static string clientID;

      static void Main(string[] args)
      {
         MqttFactory factory = new MqttFactory();
         mqttClient = factory.CreateMqttClient();

         if ((args.Length != 4) )
            {
            Console.WriteLine("[MQTT Server] [UserName] [APIKey] [ClientID]");
            Console.WriteLine("Press <enter> to exit");
            Console.ReadLine();
            return;
         }

         server = args[0];
         username = args[1];
         apiKey = args[2];
         clientID = args[3];

         Console.WriteLine($"MQTT Server:{server} Username:{username} ClientID:{clientID}");

         // wolkabout formatted client state update topic
         string topicD2C = $"readings/{username}/";

         mqttOptions = new MqttClientOptionsBuilder()
            .WithTcpServer(server)
            .WithCredentials(username, apiKey)
            .WithClientId(clientID)
            //.WithTls()
            .Build();

         mqttClient.UseDisconnectedHandler(new MqttClientDisconnectedHandlerDelegate(e => MqttClient_Disconnected(e)));
         mqttClient.ConnectAsync(mqttOptions).Wait();

         while (true)
         {
            JObject payloadJObject = new JObject();

            double temperature = 22.0 + (DateTime.UtcNow.Millisecond / 1000.0);
            double humidity = 50 + (DateTime.UtcNow.Millisecond / 100.0);

            payloadJObject.Add("Temperature", temperature);
            payloadJObject.Add("Humidity", humidity);

            string payload = JsonConvert.SerializeObject(payloadJObject);
            Console.WriteLine($"Topic:{topicD2C} Payload:{payload}");

            var message = new MqttApplicationMessageBuilder()
               .WithTopic(topicD2C)
               .WithPayload(payload)
               .WithAtLeastOnceQoS()
               .Build();

            Console.WriteLine("PublishAsync start");
            mqttClient.PublishAsync(message).Wait();
            Console.WriteLine("PublishAsync finish");

            Thread.Sleep(30100);
         }
      }

      private static async void MqttClient_Disconnected(MqttClientDisconnectedEventArgs e)
      {
         Debug.WriteLine("Disconnected");
         await Task.Delay(TimeSpan.FromSeconds(5));

         try
         {
            await mqttClient.ConnectAsync(mqttOptions);
         }
         catch (Exception ex)
         {
            Debug.WriteLine("Reconnect failed {0}", ex.Message);
         }
      }
   }

The walkabout device configuration was relatively easy but I need watch the instructional videos again to better understand the device and data semantics relationship.

Data semantics configuration
Devices setup
Device Setup
My first dashboard

SmartWorks with MQTTnet

As I’m testing my Message Queue Telemetry Transport(MQTT) LoRa gateway I’m building a proof of concept(PoC) .Net core console application for each IoT platform I would like to support.

This PoC was to confirm that I could connect to the SmartWorks (formerly Carriots) MQTT API then format topics and payloads correctly.

MQTTNet Console Client

The SmartWorks MQTT broker, username, and device ID are the required command line parameters. I didn’t notice any configuration options for cloud to device (C2D) messaging which maybe due to my device configuration or the free trial I was using.

namespace devMobile.Mqtt.TestClient.SmartWorks
{
   using System;
   using System.Diagnostics;
   using System.Threading;
   using System.Threading.Tasks;

   using MQTTnet;
   using MQTTnet.Client;
   using MQTTnet.Client.Disconnecting;
   using MQTTnet.Client.Options;
   using MQTTnet.Client.Receiving;
   using Newtonsoft.Json;
   using Newtonsoft.Json.Linq;

   class Program
   {
      private static IMqttClient mqttClient = null;
      private static IMqttClientOptions mqttOptions = null;
      private static string server;
      private static string username;
      private static string clientId;
      private static string commandTopic;
      private static string groupname;
      private static string feedname;

      static void Main(string[] args)
      {
         MqttFactory factory = new MqttFactory();
         mqttClient = factory.CreateMqttClient();

         if (args.Length != 3) 
         {
            Console.WriteLine("[MQTT Server] [UserName] [ClientID]");
            Console.WriteLine("Press <enter> to exit");
            Console.ReadLine();
            return;
         }

         server = args[0];
         username = args[1];
         clientId = args[2];

         mqttOptions = new MqttClientOptionsBuilder()
            .WithTcpServer(server)
            .WithCredentials(username, "")
            .WithClientId(clientId)
            .WithTls()
            .Build();

         mqttClient.UseDisconnectedHandler(new MqttClientDisconnectedHandlerDelegate(e => MqttClient_Disconnected(e)));
         mqttClient.ConnectAsync(mqttOptions).Wait();

         // Adafruit.IO format for topics which are called feeds
         string topicD2C = $"{username}/streams";

         while (true)
         {
            JObject payloadJObject = new JObject();

            payloadJObject.Add("at", "now");
            payloadJObject.Add("device", clientId);
            payloadJObject.Add("protocol", "v2");

            double temperature = 22.0 + (DateTime.UtcNow.Millisecond / 1000.0);
            double humidity = 50 + (DateTime.UtcNow.Millisecond / 100.0);

            JObject dataJObject = new JObject();
            dataJObject.Add("OfficeTemperature", temperature);
            dataJObject.Add("OfficeHumidity", humidity);

            payloadJObject.Add("data", dataJObject);

            string payload = JsonConvert.SerializeObject(payloadJObject);
            Console.WriteLine($"Topic:{topicD2C} Payload:{payload}");

            var message = new MqttApplicationMessageBuilder()
               .WithTopic(topicD2C)
               .WithPayload(payload)
               .WithAtLeastOnceQoS()
            .Build();

            Console.WriteLine("PublishAsync start");
            mqttClient.PublishAsync(message).Wait();
            Console.WriteLine("PublishAsync finish");

            Thread.Sleep(30100);
         }
      }

      private static async void MqttClient_Disconnected(MqttClientDisconnectedEventArgs e)
      {
         Debug.WriteLine("Disconnected");
         await Task.Delay(TimeSpan.FromSeconds(5));

         try
         {
            await mqttClient.ConnectAsync(mqttOptions);
         }
         catch (Exception ex)
         {
            Debug.WriteLine("Reconnect failed {0}", ex.Message);
         }
      }
   }
}

The ThingsBoard device configuration was relatively easy with convenient buttons to copy the Device ID (Client ID in test client) and Access Token (UserName in test client). I need to revisit the Device and Group configuration to see if I can make the automatically generated names more user friendly.

Devices configuration

The Device configuration form has a tab which has a link for the “Data Streams” form which was useful for debugging.

Device configuration

I have emailed SmartWorks support about a free trial of their dashboard product as it is not available in the free trial.

Device data stream query form

Overall the initial configuration went smoothly but the lack of any dashboard functionality in the free trial was quite limiting.

ThingsBoard with MQTTnet

As I’m testing my Message Queue Telemetry Transport(MQTT) LoRa gateway I’m building a proof of concept(PoC) .Net core console application for each IoT platform I would like to support.

This PoC was to confirm that I could connect to the ThingsBoard MQTT API then format topics and payloads correctly.

MQTTNet Console Client

The ThingsBoard MQTT broker, username, and client ID are the minimum command line options required with the CommandTopic optional.

   class Program
   {
      private static IMqttClient mqttClient = null;
      private static IMqttClientOptions mqttOptions = null;
      private static string server;
      private static string username;
      private static string clientId;
      private const string telemetryTopic = "v1/devices/me/telemetry";
      private static string commandTopic;

      static void Main(string[] args)
      {
         MqttFactory factory = new MqttFactory();
         mqttClient = factory.CreateMqttClient();

         if ((args.Length != 3) && (args.Length != 4))
         {
            Console.WriteLine("[MQTT Server] [UserName] [ClientID]");
            Console.WriteLine("[MQTT Server] [UserName] [ClientID] [CommandTopic]");
            Console.WriteLine("Press <enter> to exit");
            Console.ReadLine();
            return;
         }

         server =  args[0];
         username = args[1];
         clientId =  args[2];
         
         if (args.Length == 3)
         {
            Console.WriteLine($"MQTT Server:{server} ClientID:{clientId}");
         }

         if (args.Length == 4)
         {
            commandTopic = args[3];
            Console.WriteLine($"MQTT Server:{server} ClientID:{clientId} CommandTopic:{commandTopic}");
         }

         mqttOptions = new MqttClientOptionsBuilder()
            .WithTcpServer(server)
            .WithCredentials(username, "")
            .WithClientId(clientId)
            //.WithTls() blows up if this enabled, need to do more research on certificate config.
            .Build();

         mqttClient.UseDisconnectedHandler(new MqttClientDisconnectedHandlerDelegate(e => MqttClient_Disconnected(e)));
         mqttClient.UseApplicationMessageReceivedHandler(new MqttApplicationMessageReceivedHandlerDelegate(e => MqttClient_ApplicationMessageReceived(e)));
         mqttClient.ConnectAsync(mqttOptions).Wait();

         if (args.Length == 4)
         {
            mqttClient.SubscribeAsync(commandTopic, MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce).GetAwaiter().GetResult();
         }

         while (true)
         {
            JObject payloadJObject = new JObject();

            payloadJObject.Add("OfficeTemperature", "22." + DateTime.UtcNow.Millisecond.ToString());
            payloadJObject.Add("OfficeHumidity", (DateTime.UtcNow.Second + 40).ToString());

            string payload = JsonConvert.SerializeObject(payloadJObject);
            Console.WriteLine($"Topic:{telemetryTopic} Payload:{payload}");

            var message = new MqttApplicationMessageBuilder()
               .WithTopic(telemetryTopic)
               .WithPayload(payload)
               .WithAtLeastOnceQoS()
            .Build();

            Console.WriteLine("PublishAsync start");
            mqttClient.PublishAsync(message).Wait();
            Console.WriteLine("PublishAsync finish");

            Thread.Sleep(30100);
         }
      }

      private static void MqttClient_ApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e)
      {
         Console.WriteLine($"ClientId:{e.ClientId} Topic:{e.ApplicationMessage.Topic} Payload:{e.ApplicationMessage.ConvertPayloadToString()}");
      }

      private static async void MqttClient_Disconnected(MqttClientDisconnectedEventArgs e)
      {
         Debug.WriteLine("Disconnected");
         await Task.Delay(TimeSpan.FromSeconds(5));

         try
         {
            await mqttClient.ConnectAsync(mqttOptions);
         }
         catch (Exception ex)
         {
            Debug.WriteLine("Reconnect failed {0}", ex.Message);
         }
      }

The ThingsBoard device configuration was relatively easy with convenient buttons to copy the Device ID (Client ID in test client) and Access Token (UserName in test client). After looking at the source code for some of the other samples I figured out the ThingsBoard MQTT setup does not use the password field.

Device setup

The Device configuration form has a tab which displays the last telemetry which was useful for debugging. (I need to investigate the MQTT API support for claiming devices)

Device Telemetry display

To get telemetry data displayed on my dashboard I “added” it to my dashboard from the Entity configuration form. (I need to spend some more time watching the video tutorials to understand this process).

Device Entity View Setup

The dashboard designer had a number of “widgets” and what appeared to be the ability to add custom ones. Most forms also had “customer” option which appeared to be for multi-tenant support.

Selecting a dashboard graph widget
Configuring the y Axis Range for graph widget
Dashboard display home office Humidity & Temperature Information

Overall the initial configuration went smoothly after I figured out that the password was not required, and that Transport Layer Security(TLS) required some additional configuration.

Azure IoT Hub with MQTTnet

As I’m testing my Message Queue Telemetry Transport(MQTT) LoRa gateway I’m building a proof of concept(PoC) .Net core console application for each IoT platform I would like to support.

This PoC was to confirm that my device could connect to the Microsoft Azure IoT Hub MQTT API then format topics and payloads correctly.

Azure IoT Hub MQTT Console Client

I had tried with a couple of different MQTT libraries from micro controllers and embedded devices without success. With the benefit of hindsight (plus this article) I think I had the SAS key format wrong.

The Azure IoT Hub MQTT broker requires only a server name (fully resolved CName), device ID and SAS Key.

   class Program
   {
      private static IMqttClient mqttClient = null;
      private static IMqttClientOptions mqttOptions = null;
      private static string server;
      private static string username;
      private static string password;
      private static string clientId;
      private static string topicD2C;
      private static string topicC2D;

      static void Main(string[] args)
      {
         MqttFactory factory = new MqttFactory();
         mqttClient = factory.CreateMqttClient();

         if (args.Length != 3)
         {
            Console.WriteLine("[AzureIoTHubHostName] [deviceID] [SASKey]");
            Console.WriteLine("Press <enter> to exit");
            Console.ReadLine();
            return;
         }

         server = args[0];
         clientId = args[1];
         sasKey= args[2];

         username = $"{server}/{clientId}/api-version=2018-06-30";
         topicD2C = $"devices/{clientId}/messages/events/";
         topicC2D = $"devices/{clientId}/messages/devicebound/#";

         Console.WriteLine($"MQTT Server:{server} Username:{username} ClientID:{clientId}");

         mqttOptions = new MqttClientOptionsBuilder()
            .WithTcpServer(server, 8883)
            .WithCredentials(username, sasKey)
            .WithClientId(clientId)
            .WithTls()
            .Build();

         mqttClient.UseDisconnectedHandler(new MqttClientDisconnectedHandlerDelegate(e => MqttClient_Disconnected(e)));
         mqttClient.UseApplicationMessageReceivedHandler(new MqttApplicationMessageReceivedHandlerDelegate(e => MqttClient_ApplicationMessageReceived(e)));
         mqttClient.ConnectAsync(mqttOptions).Wait();

         mqttClient.SubscribeAsync(topicC2D, MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce).GetAwaiter().GetResult();

         while (true)
         {
            JObject payloadJObject = new JObject();

            payloadJObject.Add("OfficeTemperature", "22." + DateTime.UtcNow.Millisecond.ToString());
            payloadJObject.Add("OfficeHumidity", (DateTime.UtcNow.Second + 40).ToString());

            string payload = JsonConvert.SerializeObject(payloadJObject);
            Console.WriteLine($"Topic:{topicD2C} Payload:{payload}");

            var message = new MqttApplicationMessageBuilder()
               .WithTopic(topicD2C)
               .WithPayload(payload)
               .WithAtLeastOnceQoS()
            .Build();

            Console.WriteLine("PublishAsync start");
            mqttClient.PublishAsync(message).Wait();
            Console.WriteLine("PublishAsync finish");

            Thread.Sleep(30100);
         }
      }

      private static void MqttClient_ApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e)
      {
         Console.WriteLine($"ClientId:{e.ClientId} Topic:{e.ApplicationMessage.Topic} Payload:{e.ApplicationMessage.ConvertPayloadToString()}");
      }

      private static async void MqttClient_Disconnected(MqttClientDisconnectedEventArgs e)
      {
         Debug.WriteLine("Disconnected");
         await Task.Delay(TimeSpan.FromSeconds(5));

         try
         {
            await mqttClient.ConnectAsync(mqttOptions);
         }
         catch (Exception ex)
         {
            Debug.WriteLine("Reconnect failed {0}", ex.Message);
         }
      }
   }

Overall the initial configuration went smoothly after I figured out the required Quality of Service (QoS) settings, and the SAS Key format.

Using the approach described in the Microsoft documentation I manually generated the SAS Key.(In my Netduino samples I have code for generating a SAS Key in my HTTPS Azure IoT Hub Client)

Azure Device Explorer Device Management
Azure Device Explorer SAS Key Generator

Once I had the configuration correct I could see telemetry from the device and send it messages.

Azure Device Explorer Data View

In a future post I will upload data to the Azure IoT Central for display. Then explore using a “module” attached to a device which maybe useful for my field gateway.

Thingspeak with MQTTnet

As I’m testing my Message Queue Telemetry Transport(MQTT) LoRa gateway I’m building a proof of concept(PoC) .Net core console application for each IoT platform I would like to support.

This PoC was to confirm that I could connect to the Thingspeak MQTT API then format topics and payloads correctly.

MQTT Console Client

The Thingspeak MQTT broker, username, password, and client ID, Channel, writeAPIKey, ClientID, channelData, are the minimum command line options required. The channelSubscribe, readAPIkey and field are optional.

   class Program
   {
      private static IMqttClient mqttClient = null;
      private static IMqttClientOptions mqttOptions = null;
      private static string server;
      private static string username;
      private static string password;
      private static string writeApiKey;
      private static string clientId;
      private static string channelData;
      private static string channelSubscribe;
      private static string readApiKey;
      private static string field;

      static void Main(string[] args)
      {
         MqttFactory factory = new MqttFactory();
         mqttClient = factory.CreateMqttClient();

         if ((args.Length != 6) && (args.Length != 8) && (args.Length != 9))
         {
            Console.WriteLine("[MQTT Server] [UserName] [Password] [WriteAPIKey] [ClientID] [Channel]");
            Console.WriteLine("[MQTT Server] [UserName] [Password] [WriteAPIKey] [ClientID] [Channel] [channelSubscribe] [ReadApiKey]");
            Console.WriteLine("[MQTT Server] [UserName] [Password] [WriteAPIKey] [ClientID] [Channel] [channelSubscribe] [ReadApiKey] [Field]");
            Console.WriteLine("Press <enter> to exit");
            Console.ReadLine();
            return;
         }

         server = args[0];
         username = args[1];
         password = args[2];
         writeApiKey = args[3];
         clientId = args[4];
         channelData = args[5];

         if (args.Length == 6)
         {
            Console.WriteLine($"MQTT Server:{server} Username:{username} ClientID:{clientId} ChannelData:{channelData}");
         }

         if (args.Length == 8)
         {
            channelSubscribe = args[6];
            readApiKey = args[7];
            Console.WriteLine($"MQTT Server:{server} Username:{username} ClientID:{clientId} ChannelData:{channelData} ChannelSubscribe:{channelSubscribe}");
         }

         if (args.Length == 9)
         {
            channelSubscribe = args[6];
            readApiKey = args[7];
            field = args[8];
            Console.WriteLine($"MQTT Server:{server} Username:{username} ClientID:{clientId} ChannelData:{channelData} ChannelSubscribe:{channelSubscribe} Field:{field}");
         }

         mqttOptions = new MqttClientOptionsBuilder()
            .WithTcpServer(server)
            .WithCredentials(username, password)
            .WithClientId(clientId)
            .WithTls()
            .Build();

         mqttClient.ConnectAsync(mqttOptions).Wait();

         if (args.Length == 8)
         {
            string topic = $"channels/{channelSubscribe}/subscribe/fields/+/{readApiKey}";
            
            Console.WriteLine($"Subscribe Topic:{topic}");

            mqttClient.SubscribeAsync(topic, MQTTnet.Protocol.MqttQualityOfServiceLevel.AtMostOnce).Wait(); 
            mqttClient.UseApplicationMessageReceivedHandler(new MqttApplicationMessageReceivedHandlerDelegate(e => MqttClient_ApplicationMessageReceived(e)));
         }
         if (args.Length == 9)
         {
            string topic = $"channels/{channelSubscribe}/subscribe/fields/{field}/{readApiKey}";

            Console.WriteLine($"Subscribe Topic:{topic}");

            mqttClient.SubscribeAsync(topic, MQTTnet.Protocol.MqttQualityOfServiceLevel.AtMostOnce).Wait();
            mqttClient.UseApplicationMessageReceivedHandler(new MqttApplicationMessageReceivedHandlerDelegate(e => MqttClient_ApplicationMessageReceived(e)));
         }

         mqttClient.UseDisconnectedHandler(new MqttClientDisconnectedHandlerDelegate(e => MqttClient_Disconnected(e)));

         string topicTemperatureData = $"channels/{channelData}/publish/{writeApiKey}";

         Console.WriteLine();

         while (true)
         {
            string value = "field1=22." + DateTime.UtcNow.Millisecond.ToString() + "&field2=60." + DateTime.UtcNow.Millisecond.ToString();
            Console.WriteLine($"Publish Topic {topicTemperatureData}  Value {value}");

            var message = new MqttApplicationMessageBuilder()
               .WithTopic(topicTemperatureData)
               .WithPayload(value)
               .WithQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.AtMostOnce)
//               .WithRetainFlag()
            .Build();

            Console.WriteLine("PublishAsync start");
            mqttClient.PublishAsync(message).Wait();
            Console.WriteLine("PublishAsync finish");
            Console.WriteLine();

            Thread.Sleep(30100);
         }
      }

      private static void MqttClient_ApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e)
      {
         Console.WriteLine($"ApplicationMessageReceived ClientId:{e.ClientId} Topic:{e.ApplicationMessage.Topic} Qos:{e.ApplicationMessage.QualityOfServiceLevel} Payload:{e.ApplicationMessage.ConvertPayloadToString()}");
         Console.WriteLine();
      }

      private static async void MqttClient_Disconnected(MqttClientDisconnectedEventArgs e)
      {
         Debug.WriteLine("Disconnected");
         await Task.Delay(TimeSpan.FromSeconds(5));

         try
         {
            await mqttClient.ConnectAsync(mqttOptions);
         }
         catch (Exception ex)
         {
            Debug.WriteLine("Reconnect failed {0}", ex.Message);
         }
      }
   }
Channel configuration

The ThingSpeak channel setup has many attributes (link to an external site, link to a GitHub repository, fields for elevation, latitude, longitude etc. ) but only 8 data fields which seems a bit limiting (some of my sensor nodes report more than 8 values).

Dashboard

The Thingspeak dashboard configuration was fairly simple, though the maximum of eight sequentially numbered of inputs (Fields) might require some mapping code.

Overall the initial configuration went smoothly after I figured out the required Quality of Service (QoS) settings, retain flag usage and the different APIKeys on the publish vs. subscribe topics.

I have not explored the advanced analysis enabled by the tight integration with MATLAB which could be quite an advantage for applications requiring that sort of functionality.

Ask Sensors with MQTTnet

After a 6 month pause I’m back working on my Message Queue Telemetry Transport(MQTT) LoRa gateway.

As I’m testing my Message Queue Telemetry Transport(MQTT) LoRa gateway I’m building a proof of concept(PoC) .Net core console application for each IoT platform I would like to support.

This PoC was to confirm that I could connect to the Ask Sensors MQTT API then format topics and payloads correctly.

Console test application

The MQTT broker, username, password, and client ID are command line options.

class Program
{
	private static IMqttClient mqttClient = null;
	private static IMqttClientOptions mqttOptions = null;
	private static string server;
	private static string username;
	private static string apiKey;
	private static string clientID;

	static void Main(string[] args)
	{
		MqttFactory factory = new MqttFactory();
		mqttClient = factory.CreateMqttClient();
		bool heatPumpOn = false;

		if (args.Length != 4)
		{
			Console.WriteLine("[MQTT Server] [UserName] [APIKey] [ClientID]");
			Console.WriteLine("Press <enter> to exit");
			Console.ReadLine();
			return;
		}

		server = args[0];
		username = args[1];
		apiKey = args[2];
		clientID = args[3];

		Console.WriteLine($"MQTT Server:{server} Username:{username} ClientID:{clientID}");

		mqttOptions = new MqttClientOptionsBuilder()
			.WithTcpServer(server)
			.WithCredentials(username, "")
			.WithClientId(clientID)
			//.WithTls() // This is a bit of a worry
			.Build();

		mqttClient.ApplicationMessageReceived += MqttClient_ApplicationMessageReceived;
		mqttClient.Disconnected += MqttClient_Disconnected;
		mqttClient.ConnectAsync(mqttOptions).Wait();

		// AskSensors formatted client state update topic
		string stateTopic = $"{username}/{apiKey}";

		while (true)
		{
			string payloadText;
			double temperature = 22.0 + (DateTime.UtcNow.Millisecond / 1000.0);
			double humidity = 50 + (DateTime.UtcNow.Millisecond / 100.0);
			double speed = 10 + (DateTime.UtcNow.Millisecond / 100.0);
			Console.WriteLine($"Topic:{stateTopic} Temperature:{temperature:0.00} Humidity:{humidity:0} HeatPumpOn:{heatPumpOn}");

			// First JSON attempt didn't work
			payloadText = @"{""Humidity"":55}";

            // Second attempt worked
            payloadText = $"module1=22";

            // Third attempt with "real" values injected
            payloadText = $"module1={temperature}&m2={humidity}";

            var message = new MqttApplicationMessageBuilder()
					.WithTopic(stateTopic)
					.WithPayload(payloadText)
					.WithQualityOfServiceLevel(global::MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce)
				   .WithExactlyOnceQoS()
				   //.WithAtLeastOnceQoS()
				   //.WithRetainFlag()
				   .Build();

				Console.WriteLine("PublishAsync start");
				mqttClient.PublishAsync(message).Wait();
				Console.WriteLine("PublishAsync finish");

				Thread.Sleep(30100);
			}
		}

	private static async void MqttClient_Disconnected(object sender, MqttClientDisconnectedEventArgs e)
	{
		Debug.WriteLine("Disconnected");
		await Task.Delay(TimeSpan.FromSeconds(5));

		try
		{
			await mqttClient.ConnectAsync(mqttOptions);
		}
		catch (Exception ex)
		{
			Debug.WriteLine("Reconnect failed {0}", ex.Message);
		}
	}

The Ask Sensors screen designer has 8 different types of Graph (line, bar, scatter, gauge, table, binary. digital, map)

Ask sensors dashboard configuration was fairly simple, though sequential numbering of inputs (modules) might require some mapping code.

Overall the initial configuration went smoothly after I figured out the payload format (not JSON), though the functionality (of a free subscription) did appear to be quite limited.

Since I first started building my MQTT gateway there have been several breaking updates to the MQTTNet API which so I will have to refresh all the applications in my solution.

Ubidots with MQTTnet

As I’m testing my Message Queue Telemetry Transport(MQTT) LoRa gateway I’m building a proof of concept(PoC) .Net core console application for each IoT platform I would like to support.

This PoC was to confirm that I could connect to the ubidots MQTT API then format the topics and payloads correctly. The ubidots screen designer has “variables” (both actual sensors & synthetic calculated ones) which present as topics so I built a client which could subscribe to these.

.Net Core V2 MQTTnet client

The MQTT broker, username, password, and client ID are command line options.

class Program
{
	private static IMqttClient mqttClient = null;
	private static IMqttClientOptions mqttOptions = null;
	private static string server;
	private static string username;
	private static string deviceLabel;

	static void Main(string[] args)
	{
		MqttFactory factory = new MqttFactory();
		mqttClient = factory.CreateMqttClient();
		bool heatPumpOn = false;

		if (args.Length != 3)
		{
			Console.WriteLine("[MQTT Server] [UserName] [Password] [ClientID]");
			Console.WriteLine("Press <enter> to exit");
			Console.ReadLine();
			return;
		}

		server = args[0];
		username = args[1];
		deviceLabel = args[2];

		Console.WriteLine($"MQTT Server:{server} Username:{username} DeviceLabel:{deviceLabel}");

		mqttOptions = new MqttClientOptionsBuilder()
			.WithTcpServer(server)
			.WithCredentials(username, "NotVerySecret")
			.WithClientId(deviceLabel)
			.WithTls()
			.Build();

		mqttClient.ApplicationMessageReceived += MqttClient_ApplicationMessageReceived;
		mqttClient.Disconnected += MqttClient_Disconnected;
		mqttClient.ConnectAsync(mqttOptions).Wait();

		// Setup a subscription for commands sent to client
		string commandTopic = $"/v1.6/devices/{deviceLabel}/officetemperaturedesired/lv";
		mqttClient.SubscribeAsync(commandTopic).GetAwaiter().GetResult();

		//// Ubidots formatted client state update topic
		string stateTopic = $"/v1.6/devices/{deviceLabel}";

		while (true)
		{
			string payloadText;
			double temperature = 22.0 + (DateTime.UtcNow.Millisecond / 1000.0);
			double humidity = 50 + (DateTime.UtcNow.Millisecond / 100.0);
			double speed = 10 + (DateTime.UtcNow.Millisecond / 100.0);
			Console.WriteLine($"Topic:{stateTopic} Temperature:{temperature:0.00} Humidity:{humidity:0} HeatPumpOn:{heatPumpOn}");

			// First attempt which worked
			//payloadText = @"{""OfficeTemperature"":22.5}";

			// Second attempt to work out data format with "real" values injected
			//payloadText = @"{ ""officetemperature"":"+ temperature.ToString("F2") + @",""officehumidity"":" + humidity.ToString("F0") + @"}";

			// Third attempt with Jobject which sort of worked but number serialisation was sub optimal
			JObject payloadJObject = new JObject(); 
			payloadJObject.Add("OfficeTemperature", temperature.ToString("F2"));
			payloadJObject.Add("OfficeHumidity", humidity.ToString("F0"));

			if (heatPumpOn)
			{
				payloadJObject.Add("HeatPumpOn", 1);
			}
			else
			{
				payloadJObject.Add("HeatPumpOn", 0);
			}
			heatPumpOn = !heatPumpOn;
			payloadText = JsonConvert.SerializeObject(payloadJObject);

			/*
			// Forth attempt with JOBject, timestamps and gps 
			JObject payloadJObject = new JObject();
			JObject context = new JObject();
			context.Add("lat", "-43.5309325");
			context.Add("lng", "172.637119");// Christchurch Cathederal
			//context.Add("timestamp", ((DateTimeOffset)(DateTime.UtcNow)).ToUnixTimeSeconds()); // This field is optional and can be commented out
			JObject position = new JObject();
			position.Add("context", context);
			position.Add("value", "0");
			payloadJObject.Add("postion", position);
			payloadText = JsonConvert.SerializeObject(payloadJObject);
			*/

			var message = new MqttApplicationMessageBuilder()
				.WithTopic(stateTopic)
				.WithPayload(payloadText)
				.WithQualityOfServiceLevel(global::MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce)
			//.WithExactlyOnceQoS()// With ubidots this caused the publish to hang
			.WithAtLeastOnceQoS()
			.WithRetainFlag() 
			.Build();

			Console.WriteLine("PublishAsync start");
			mqttClient.PublishAsync(message).Wait();
			Console.WriteLine("PublishAsync finish");

			Thread.Sleep(30100);
		}
	}

	private static void MqttClient_ApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)
	{
		Console.WriteLine($"ClientId:{e.ClientId} Topic:{e.ApplicationMessage.Topic} Payload:{e.ApplicationMessage.ConvertPayloadToString()}");
	}

	private static async void MqttClient_Disconnected(object sender, MqttClientDisconnectedEventArgs e)
	{
		Debug.WriteLine("Disconnected");
		await Task.Delay(TimeSpan.FromSeconds(5));

		try
		{
			await mqttClient.ConnectAsync(mqttOptions);
		}
		catch (Exception ex)
		{
			Debug.WriteLine("Reconnect failed {0}", ex.Message);
		}
	}
}

For this PoC I used the MQTTnet package which is available via NuGet. It appeared to be reasonably well supported and has had recent updates.

Variable configuration with device location map

Overall the initial configuration went smoothly, I found the dragging of blocks onto the dashboard and configuring them worked as expected.

The configuration of a “synthetic” variable (converting a temperature to Fahrenheit for readers from the Unites States of America, Myanmar & Liberia ) took a couple of goes to get right.

I may have missed something (April 2019) but the lack of boolean datatype variables was a bit odd.

Synthetic (calculated) variable configuration

I put a slider control on my test dashboard, associated it with a variable and my client reliably received messages when the slider was moved.

Dashboard with slider for desired temperature

Overall the Ubidots experience was pretty good and I’m going to spend some more time working with the device, data, users configurations to see how well it works for a “real-world” project.

I found (April 2019) that to get MQTTS going I had to install a Ubidots provided certificate

MQTT with TLS guidance and certificate download link

When my .Net Core application didn’t work I tried one my MQTT debugging tools and they didn’t work either with the Ubitdots MQTT brokers. The Ubidots forum people were quite helpful, but making it not necessary to install a certificate or making it really obvious in the documentation that this was required would be a good thing.