ubidots MQTT LoRa Field Gateway

Back in April I started working on an MQTT LoRa Field gateway which was going to support a selection of different Software as a service(SaaS) Internet of Things(IoT) platforms.

After a long pause in development I have a working ubidots client and have 3 proof of concept (PoC) integrations for Adafruit.IO, AskSensors, and Losant. I am also working on Azure IoT Hub, Azure IoT Central and MyDevice Cayenne clients. The first iteration is focused on Device to Cloud (D2C) messaging in the next iteration I will add Cloud to Device where viable(C2D).

My applications use a lightweight, easy to implemented protocol which is intended for hobbyist and educational use rather than commercial applications (I have been working on a more secure version as yet another side project)

I have a number of sample Arduino with Dragino LoRa Shield for Arduino, MakerFabs Maduino, Dragino LoRa Mini Dev, M2M Low power Node and Netduino with Elecrow LoRa RFM95 Shield etc. clients. These work with both my platform specific (Adafruit.IO, Azure IoT Central) gateways and protocol specific field gateways.

Ubidots dashboard

When the application is first started it creates a minimal configuration file which should be downloaded, the missing information filled out, then uploaded using the File explorer in the Windows device portal.

{
  "MQTTUserName": "Ubidots generated usname here",
  "MQTTPassword": "NotVerySecure",
  "MQTTClientID": "MQTTLoRaGateway",
  "MQTTServer": "industrial.api.ubidots.com",
  "Address": "LoRaIoT1",
  "Frequency": 915000000.0,
  "MessageHandlerAssembly": "Mqtt.IoTCore.FieldGateway.LoRa.Ubidots",
  "PlatformSpecificConfiguration": ""
}

The application logs debugging information to the Windows 10 IoT Core ETW logging Microsoft-Windows-Diagnostics-LoggingChannel

MQTT LoRa Field Gateway with ubidots plugin generated telemetry
ubidots device management
ubidot managment

The message handler uploads all values in an inbound messages in one MQTT message using the ubidots MQTT message format

async void IMessageHandler.Rfm9XOnReceive(object sender, Rfm9XDevice.OnDataReceivedEventArgs e)
{
	LoggingFields processReceiveLoggingFields = new LoggingFields();
	JObject telemetryDataPoint = new JObject();
	char[] sensorReadingSeparators = { ',' };
	char[] sensorIdAndValueSeparators = { ' ' };

	processReceiveLoggingFields.AddString("PacketSNR", e.PacketSnr.ToString("F1"));
	processReceiveLoggingFields.AddInt32("PacketRSSI", e.PacketRssi);
	processReceiveLoggingFields.AddInt32("RSSI", e.Rssi);

	string addressBcdText = BitConverter.ToString(e.Address);
	processReceiveLoggingFields.AddInt32("DeviceAddressLength", e.Address.Length);
	processReceiveLoggingFields.AddString("DeviceAddressBCD", addressBcdText);

	string messageText;
	try
	{
		messageText = UTF8Encoding.UTF8.GetString(e.Data);
		processReceiveLoggingFields.AddString("MessageText", messageText);
	}
	catch (Exception ex)
	{
		processReceiveLoggingFields.AddString("Exception", ex.ToString());
		this.Logging.LogEvent("PayloadProcess failure converting payload to text", processReceiveLoggingFields, LoggingLevel.Warning);
		return;
	}

	// Chop up the CSV text
	string[] sensorReadings = messageText.Split(sensorReadingSeparators, StringSplitOptions.RemoveEmptyEntries);
	if (sensorReadings.Length < 1)
	{
		this.Logging.LogEvent("PayloadProcess payload contains no sensor readings", processReceiveLoggingFields, LoggingLevel.Warning);
		return;
	}

	// Chop up each sensor read into an ID & value
	foreach (string sensorReading in sensorReadings)
	{
		string[] sensorIdAndValue = sensorReading.Split(sensorIdAndValueSeparators, StringSplitOptions.RemoveEmptyEntries);
		// Check that there is an id & value
		if (sensorIdAndValue.Length != 2)
		{
			this.Logging.LogEvent("PayloadProcess payload invalid format", processReceiveLoggingFields, LoggingLevel.Warning);
			return;
		}

		string sensorId = sensorIdAndValue[0];
		string value = sensorIdAndValue[1];

		telemetryDataPoint.Add(addressBcdText + sensorId, Convert.ToDouble(value));
	}
	processReceiveLoggingFields.AddString("MQTTClientId", MqttClient.Options.ClientId);

	string stateTopic = string.Format(stateTopicFormat, MqttClient.Options.ClientId);

	try
	{
		var message = new MqttApplicationMessageBuilder()
			.WithTopic(stateTopic)
			.WithPayload(JsonConvert.SerializeObject(telemetryDataPoint))
			.WithAtLeastOnceQoS()
			.Build();
		Debug.WriteLine(" {0:HH:mm:ss} MQTT Client PublishAsync start", DateTime.UtcNow);
		await MqttClient.PublishAsync(message);
		Debug.WriteLine(" {0:HH:mm:ss} MQTT Client PublishAsync finish", DateTime.UtcNow);

		this.Logging.LogEvent("PublishAsync Ubidots payload", processReceiveLoggingFields, LoggingLevel.Information);
	}
	catch (Exception ex)
	{
		processReceiveLoggingFields.AddString("Exception", ex.ToString());
		this.Logging.LogEvent("PublishAsync Ubidots payload", processReceiveLoggingFields, LoggingLevel.Error);
	}
}

The “automagic” provisioning of feeds does make setting up small scale systems easier, though I’m not certain how well it would scale.

Some of the fields weren’t obviously editable e.g.”√ĄPI Label” in device configuration which I only discovered by clicking on them..

The limitations of the free account meant I couldn’t evaluate ubidots in much depth but what was available appeared to be robust and reliable (Nov 2019).

Adafruit.IO MQTT LoRa Field Gateway

Back in April I started working on an MQTT LoRa Field gateway which was going to support a selection of different Software as a service(SaaS) Internet of Things(IoT) platforms.

After a long pause in development I have a working AdaFruit.IO client and have 3 proof of concept (PoC) integrations for AskSensors, Losant and Ubidots. I am also working on Azure IoT Hub, Azure IoT Central and MyDevice Cayenne clients. The first iteration is focused on Device to Cloud (D2C) messaging in the next iteration I will add Cloud to Device where viable(C2D).

My applications use a lightweight, easy to implemented protocol which is intended for hobbyist and educational use rather than commercial applications (I have been working on a more secure version as yet another side project)

I have a number of sample Arduino with Dragino LoRa Shield for Arduino, MakerFabs Maduino, Dragino LoRa Mini Dev, M2M Low power Node and Netduino with Elecrow LoRa RFM95 Shield etc. clients. These work with both my platform specific (Adafruit.IO, Azure IoT Central) gateways and protocol specific field gateways.

Maduino client dashboard

When the application is first started it creates a minimal configuration file which should be downloaded, the missing information filled out, then uploaded using the File explorer in the Windows device portal.

{
  "MQTTUserName": "AdaFruitIO user",
  "MQTTPassword": "AIO Key",
  "MQTTClientID": "MQTTLoRaGateway",
  "MQTTServer": "io.adafruit.com",
  "Address": "LoRaIoT1",
  "Frequency": 915000000.0,
  "MessageHandlerAssembly": "Mqtt.IoTCore.FieldGateway.LoRa.Adafruit",
  "PlatformSpecificConfiguration": "mqttloragateway"
}

The application logs debugging information to the Windows 10 IoT Core ETW logging Microsoft-Windows-Diagnostics-LoggingChannel

MQTT LoRa Gateway with Adafruit.IO plug-in

The SaaS platform specific interface has gained an additional parameter for platform specific configuration.

namespace devMobile.Mqtt.IoTCore.FieldGateway
{
	using System;
	using Windows.Foundation.Diagnostics;

	using devMobile.IoT.Rfm9x;
	using MQTTnet;
	using MQTTnet.Client;

	public interface IMessageHandler
	{
		void Initialise(LoggingChannel logging, IMqttClient mqttClient, Rfm9XDevice rfm9XDevice,string platformSpecificConfiguration);

		void Rfm9XOnReceive(object sender, Rfm9XDevice.OnDataReceivedEventArgs e);

		void MqttApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e);

		void Rfm9xOnTransmit(object sender, Rfm9XDevice.OnDataTransmitedEventArgs e);
	}
}

This is used for the AdaFruit.IO GroupName so Adafruit.IO feed values are not all in a single group.

public class MessageHandler : IMessageHandler
	{
		private LoggingChannel Logging { get; set; }
		private IMqttClient MqttClient { get; set; }
		private Rfm9XDevice Rfm9XDevice { get; set; }
      private string PlatformSpecificConfiguration { get; set; }


      void IMessageHandler.Initialise(LoggingChannel logging, IMqttClient mqttClient, Rfm9XDevice rfm9XDevice, string platformSpecificConfiguration)
		{
			LoggingFields processInitialiseLoggingFields = new LoggingFields();

			this.Logging = logging;
			this.MqttClient = mqttClient;
			this.Rfm9XDevice = rfm9XDevice;
			this.PlatformSpecificConfiguration = platformSpecificConfiguration;
		}

		async void IMessageHandler.Rfm9XOnReceive(object sender, Rfm9XDevice.OnDataReceivedEventArgs e)
		{
			LoggingFields processReceiveLoggingFields = new LoggingFields();

			processReceiveLoggingFields.AddString("PacketSNR", e.PacketSnr.ToString("F1"));
			processReceiveLoggingFields.AddInt32("PacketRSSI", e.PacketRssi);
			processReceiveLoggingFields.AddInt32("RSSI", e.Rssi);

			string addressBcdText = BitConverter.ToString(e.Address);
			processReceiveLoggingFields.AddInt32("DeviceAddressLength", e.Address.Length);
			processReceiveLoggingFields.AddString("DeviceAddressBCD", addressBcdText);

			string payloadBcdText = BitConverter.ToString(e.Data);
			processReceiveLoggingFields.AddInt32("PayloadLength", e.Data.Length);
			processReceiveLoggingFields.AddString("DeviceAddressBCD", payloadBcdText);

			this.Logging.LogEvent("Rfm9XOnReceive", processReceiveLoggingFields, LoggingLevel.Information);
		}

		void IMessageHandler.MqttApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)
		{
			LoggingFields processReceiveLoggingFields = new LoggingFields();

			processReceiveLoggingFields.AddString("ClientId", e.ClientId);
#if DEBUG
			processReceiveLoggingFields.AddString("Payload", e.ApplicationMessage.ConvertPayloadToString());
#endif
			processReceiveLoggingFields.AddString("QualityOfServiceLevel", e.ApplicationMessage.QualityOfServiceLevel.ToString());
			processReceiveLoggingFields.AddBoolean("Retain", e.ApplicationMessage.Retain);
			processReceiveLoggingFields.AddString("Topic", e.ApplicationMessage.Topic);

			this.Logging.LogEvent("MqttApplicationMessageReceived topic not processed", processReceiveLoggingFields, LoggingLevel.Error);
		}

		void IMessageHandler.Rfm9xOnTransmit(object sender, Rfm9XDevice.OnDataTransmitedEventArgs e)
		{
			this.Logging.LogMessage("Rfm9xOnTransmit", LoggingLevel.Information);
		}
	}
Adafruit.IO Group for a single field gateway
Group Setup

The message handler uploads all values in an inbound messages in one MQTT message using the AdaFruit.IO Group Feed format.

      async void IMessageHandler.Rfm9XOnReceive(object sender, Rfm9XDevice.OnDataReceivedEventArgs e)
      {
         LoggingFields processReceiveLoggingFields = new LoggingFields();
         char[] sensorReadingSeparators = { ',' };
         char[] sensorIdAndValueSeparators = { ' ' };

         processReceiveLoggingFields.AddString("PacketSNR", e.PacketSnr.ToString("F1"));
         processReceiveLoggingFields.AddInt32("PacketRSSI", e.PacketRssi);
         processReceiveLoggingFields.AddInt32("RSSI", e.Rssi);

         string addressBcdText = BitConverter.ToString(e.Address);
         processReceiveLoggingFields.AddInt32("DeviceAddressLength", e.Address.Length);
         processReceiveLoggingFields.AddString("DeviceAddressBCD", addressBcdText);

         string messageText;
         try
         {
            messageText = UTF8Encoding.UTF8.GetString(e.Data);
            processReceiveLoggingFields.AddString("MessageText", messageText);
         }
         catch (Exception ex)
         {
            processReceiveLoggingFields.AddString("Exception", ex.ToString());
            this.Logging.LogEvent("PayloadProcess failure converting payload to text", processReceiveLoggingFields, LoggingLevel.Warning);
            return;
         }

         // Chop up the CSV text
         string[] sensorReadings = messageText.Split(sensorReadingSeparators, StringSplitOptions.RemoveEmptyEntries);
         if (sensorReadings.Length < 1)
         {
            this.Logging.LogEvent("PayloadProcess payload contains no sensor readings", processReceiveLoggingFields, LoggingLevel.Warning);
            return;
         }

         JObject payloadJObject = new JObject();

         JObject feeds = new JObject();

         // Chop up each sensor read into an ID & value
         foreach (string sensorReading in sensorReadings)
         {
            string[] sensorIdAndValue = sensorReading.Split(sensorIdAndValueSeparators, StringSplitOptions.RemoveEmptyEntries);

            // Check that there is an id & value
            if (sensorIdAndValue.Length != 2)
            {
               this.Logging.LogEvent("PayloadProcess payload invalid format", processReceiveLoggingFields, LoggingLevel.Warning);
               return;
            }

            string sensorId = string.Concat(addressBcdText, sensorIdAndValue[0]);
            string value = sensorIdAndValue[1];

            feeds.Add(sensorId.ToLower(), value);
         }
         payloadJObject.Add("feeds", feeds);

         string topic = $"{MqttClient.Options.Credentials.Username}/groups/{PlatformSpecificConfiguration}";

         try
         {
            var message = new MqttApplicationMessageBuilder()
               .WithTopic(topic)
               .WithPayload(JsonConvert.SerializeObject(payloadJObject))
               .WithAtLeastOnceQoS()
               .Build();
            Debug.WriteLine(" {0:HH:mm:ss} MQTT Client PublishAsync start", DateTime.UtcNow);
            await MqttClient.PublishAsync(message);
            Debug.WriteLine(" {0:HH:mm:ss} MQTT Client PublishAsync finish", DateTime.UtcNow);

            this.Logging.LogEvent("PublishAsync Adafruit payload", processReceiveLoggingFields, LoggingLevel.Information);
         }
         catch (Exception ex)
         {
            processReceiveLoggingFields.AddString("Exception", ex.ToString());
            this.Logging.LogEvent("PublishAsync Adafruit payload", processReceiveLoggingFields, LoggingLevel.Error);
         }
      }

The casing of User names (Must match exactly) and Group/Feed names (must be lower case) tripped me up yet again. The “automagic” provisioning of feeds does make setting up small scale systems easier, though I’m not certain how well it would scale.

myDevices Cayenne with MQTTnet

As I’m testing my Message Queue Telemetry Transport(MQTT) LoRa gateway I’m building a proof of concept(PoC) .Net core console application for each IoT platform I would like to support.

This PoC was to confirm that I could connect to the myDevices Cayenne MQTT API and format the topics and payloads correctly. The myDevices team have built many platform specific libraries that wrap the MQTT platform APIs to make integration for first timers easier (which is great). Though, as an experienced Bring Your Own Device(BYOD) client developer, I did find myself looking at the C/C++ code to figure out how to implement parts of my .Net test client.

The myDevices screen designer had “widgets” which generated commands for devices so I extended the test client implementation to see this worked.

The MQTT broker, username, password, client ID, channel number and optional subscription channel number are command line options.

class Program
{
	private static IMqttClient mqttClient = null;
	private static IMqttClientOptions mqttOptions = null;
	private static string server;
	private static string username;
	private static string password;
	private static string clientId;
	private static string channelData;
	private static string channelSubscribe;

	static void Main(string[] args)
	{
		MqttFactory factory = new MqttFactory();
		mqttClient = factory.CreateMqttClient();

		if ((args.Length != 5) && (args.Length != 6))
		{
			Console.WriteLine("[MQTT Server] [UserName] [Password] [ClientID] [Channel]");
			Console.WriteLine("[MQTT Server] [UserName] [Password] [ClientID] [ChannelData] [ChannelSubscribe]");
			Console.WriteLine("Press <enter> to exit");
			Console.ReadLine();
			return;
		}

		server = args[0];
		username = args[1];
		password = args[2];
		clientId = args[3];
		channelData = args[4];

		if (args.Length == 5)
		{
			Console.WriteLine($"MQTT Server:{server} Username:{username} ClientID:{clientId} ChannelData:{channelData}");
		}

		if (args.Length == 6)
		{
			channelSubscribe = args[5];
			Console.WriteLine($"MQTT Server:{server} Username:{username} ClientID:{clientId} ChannelData:{channelData} ChannelSubscribe:{channelSubscribe}");
		}

		mqttOptions = new MqttClientOptionsBuilder()
			.WithTcpServer(server)
			.WithCredentials(username, password)
			.WithClientId(clientId)
			.WithTls()
			.Build();

		mqttClient.ConnectAsync(mqttOptions).Wait();

		if (args.Length == 6)
		{
			string topic = $"v1/{username}/things/{clientId}/cmd/{channelSubscribe}";

			Console.WriteLine($"Subscribe Topic:{topic}");
			mqttClient.SubscribeAsync(topic).Wait();
			// mqttClient.SubscribeAsync(topic, global::MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce).Wait(); 
			// Thought this might help with subscription but it didn't, looks like ACK might be broken in MQTTnet
			mqttClient.ApplicationMessageReceived += MqttClient_ApplicationMessageReceived;
		}
		mqttClient.Disconnected += MqttClient_Disconnected;

		string topicTemperatureData = $"v1/{username}/things/{clientId}/data/{channelData}";

		Console.WriteLine();

		while (true)
		{
			string value = "22." + DateTime.UtcNow.Millisecond.ToString();
			Console.WriteLine($"Publish Topic {topicTemperatureData}  Value {value}");

			var message = new MqttApplicationMessageBuilder()
				.WithTopic(topicTemperatureData)
				.WithPayload(value)
				.WithQualityOfServiceLevel(global::MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce)
				//.WithQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.ExactlyOnce) // Causes publish to hang
				.WithRetainFlag()
				.Build();

			Console.WriteLine("PublishAsync start");

			mqttClient.PublishAsync(message).Wait();
			Console.WriteLine("PublishAsync finish");
			Console.WriteLine();

			Thread.Sleep(30100);
		}
	}

	private static void MqttClient_ApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)
	{
		Console.WriteLine($"ApplicationMessageReceived ClientId:{e.ClientId} Topic:{e.ApplicationMessage.Topic} Qos:{e.ApplicationMessage.QualityOfServiceLevel} Payload:{e.ApplicationMessage.ConvertPayloadToString()}");
		Console.WriteLine();
	}

	private static async void MqttClient_Disconnected(object sender, MqttClientDisconnectedEventArgs e)
	{
		Debug.WriteLine("Disconnected");
		await Task.Delay(TimeSpan.FromSeconds(5));

		try
		{
			await mqttClient.ConnectAsync(mqttOptions);
		}
		catch (Exception ex)
		{
			Debug.WriteLine("Reconnect failed {0}", ex.Message);
		}
	}
}

For this PoC I used the MQTTnet package which is available via NuGet. It appeared to be reasonably well supported and has had recent updates. There did appear to be some issues with myDevices Cayenne default quality of service (QoS) and the default QoS used by MQTTnet connections and also the acknowledgement of the receipt of published messages.

myDevices Cayenne .Net Core 2 client
Cayenne UI with graph, button and value widgets

Overall the initial configuration went ok, I found the dragging of widgets onto the overview screen had some issues (maybe the caching of control settings (I found my self refreshing the whole page every so often) and I couldn’t save a custom widget icon at all.

I put a button widget on the overview screen and associated it with a channel publication. The client received a message when the button was pressed

myDevices .Net Core 2 client displaying a received command message

But the button widget was disabled until the overview screen was manually refreshed.

Cayenne UI after button press

I need to revisit how I confirm that the actuator has been set to the desired value and the command completed.

Overall the myDevices Cayenne experience (April 2019) was a bit flaky with basic functionality like the saving of custom widget icons broken, updates of the real-time data viewer didn’t occur or were delayed, and there were other configuration screen update issues.