Azure IoT Hub with MQTTnet

As I’m testing my Message Queue Telemetry Transport(MQTT) LoRa gateway I’m building a proof of concept(PoC) .Net core console application for each IoT platform I would like to support.

This PoC was to confirm that my device could connect to the Microsoft Azure IoT Hub MQTT API then format topics and payloads correctly.

Azure IoT Hub MQTT Console Client

I had tried with a couple of different MQTT libraries from micro controllers and embedded devices without success. With the benefit of hindsight (plus this article) I think I had the SAS key format wrong.

The Azure IoT Hub MQTT broker requires only a server name (fully resolved CName), device ID and SAS Key.

   class Program
   {
      private static IMqttClient mqttClient = null;
      private static IMqttClientOptions mqttOptions = null;
      private static string server;
      private static string username;
      private static string password;
      private static string clientId;
      private static string topicD2C;
      private static string topicC2D;

      static void Main(string[] args)
      {
         MqttFactory factory = new MqttFactory();
         mqttClient = factory.CreateMqttClient();

         if (args.Length != 3)
         {
            Console.WriteLine("[AzureIoTHubHostName] [deviceID] [SASKey]");
            Console.WriteLine("Press <enter> to exit");
            Console.ReadLine();
            return;
         }

         server = args[0];
         clientId = args[1];
         sasKey= args[2];

         username = $"{server}/{clientId}/api-version=2018-06-30";
         topicD2C = $"devices/{clientId}/messages/events/";
         topicC2D = $"devices/{clientId}/messages/devicebound/#";

         Console.WriteLine($"MQTT Server:{server} Username:{username} ClientID:{clientId}");

         mqttOptions = new MqttClientOptionsBuilder()
            .WithTcpServer(server, 8883)
            .WithCredentials(username, sasKey)
            .WithClientId(clientId)
            .WithTls()
            .Build();

         mqttClient.UseDisconnectedHandler(new MqttClientDisconnectedHandlerDelegate(e => MqttClient_Disconnected(e)));
         mqttClient.UseApplicationMessageReceivedHandler(new MqttApplicationMessageReceivedHandlerDelegate(e => MqttClient_ApplicationMessageReceived(e)));
         mqttClient.ConnectAsync(mqttOptions).Wait();

         mqttClient.SubscribeAsync(topicC2D, MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce).GetAwaiter().GetResult();

         while (true)
         {
            JObject payloadJObject = new JObject();

            payloadJObject.Add("OfficeTemperature", "22." + DateTime.UtcNow.Millisecond.ToString());
            payloadJObject.Add("OfficeHumidity", (DateTime.UtcNow.Second + 40).ToString());

            string payload = JsonConvert.SerializeObject(payloadJObject);
            Console.WriteLine($"Topic:{topicD2C} Payload:{payload}");

            var message = new MqttApplicationMessageBuilder()
               .WithTopic(topicD2C)
               .WithPayload(payload)
               .WithAtLeastOnceQoS()
            .Build();

            Console.WriteLine("PublishAsync start");
            mqttClient.PublishAsync(message).Wait();
            Console.WriteLine("PublishAsync finish");

            Thread.Sleep(30100);
         }
      }

      private static void MqttClient_ApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e)
      {
         Console.WriteLine($"ClientId:{e.ClientId} Topic:{e.ApplicationMessage.Topic} Payload:{e.ApplicationMessage.ConvertPayloadToString()}");
      }

      private static async void MqttClient_Disconnected(MqttClientDisconnectedEventArgs e)
      {
         Debug.WriteLine("Disconnected");
         await Task.Delay(TimeSpan.FromSeconds(5));

         try
         {
            await mqttClient.ConnectAsync(mqttOptions);
         }
         catch (Exception ex)
         {
            Debug.WriteLine("Reconnect failed {0}", ex.Message);
         }
      }
   }

Overall the initial configuration went smoothly after I figured out the required Quality of Service (QoS) settings, and the SAS Key format.

Using the approach described in the Microsoft documentation I manually generated the SAS Key.(In my Netduino samples I have code for generating a SAS Key in my HTTPS Azure IoT Hub Client)

Azure Device Explorer Device Management
Azure Device Explorer SAS Key Generator

Once I had the configuration correct I could see telemetry from the device and send it messages.

Azure Device Explorer Data View

In a future post I will upload data to the Azure IoT Central for display. Then explore using a “module” attached to a device which maybe useful for my field gateway.

Thingspeak with MQTTnet

As I’m testing my Message Queue Telemetry Transport(MQTT) LoRa gateway I’m building a proof of concept(PoC) .Net core console application for each IoT platform I would like to support.

This PoC was to confirm that I could connect to the Thingspeak MQTT API then format topics and payloads correctly.

MQTT Console Client

The Thingspeak MQTT broker, username, password, and client ID, Channel, writeAPIKey, ClientID, channelData, are the minimum command line options required. The channelSubscribe, readAPIkey and field are optional.

   class Program
   {
      private static IMqttClient mqttClient = null;
      private static IMqttClientOptions mqttOptions = null;
      private static string server;
      private static string username;
      private static string password;
      private static string writeApiKey;
      private static string clientId;
      private static string channelData;
      private static string channelSubscribe;
      private static string readApiKey;
      private static string field;

      static void Main(string[] args)
      {
         MqttFactory factory = new MqttFactory();
         mqttClient = factory.CreateMqttClient();

         if ((args.Length != 6) && (args.Length != 8) && (args.Length != 9))
         {
            Console.WriteLine("[MQTT Server] [UserName] [Password] [WriteAPIKey] [ClientID] [Channel]");
            Console.WriteLine("[MQTT Server] [UserName] [Password] [WriteAPIKey] [ClientID] [Channel] [channelSubscribe] [ReadApiKey]");
            Console.WriteLine("[MQTT Server] [UserName] [Password] [WriteAPIKey] [ClientID] [Channel] [channelSubscribe] [ReadApiKey] [Field]");
            Console.WriteLine("Press <enter> to exit");
            Console.ReadLine();
            return;
         }

         server = args[0];
         username = args[1];
         password = args[2];
         writeApiKey = args[3];
         clientId = args[4];
         channelData = args[5];

         if (args.Length == 6)
         {
            Console.WriteLine($"MQTT Server:{server} Username:{username} ClientID:{clientId} ChannelData:{channelData}");
         }

         if (args.Length == 8)
         {
            channelSubscribe = args[6];
            readApiKey = args[7];
            Console.WriteLine($"MQTT Server:{server} Username:{username} ClientID:{clientId} ChannelData:{channelData} ChannelSubscribe:{channelSubscribe}");
         }

         if (args.Length == 9)
         {
            channelSubscribe = args[6];
            readApiKey = args[7];
            field = args[8];
            Console.WriteLine($"MQTT Server:{server} Username:{username} ClientID:{clientId} ChannelData:{channelData} ChannelSubscribe:{channelSubscribe} Field:{field}");
         }

         mqttOptions = new MqttClientOptionsBuilder()
            .WithTcpServer(server)
            .WithCredentials(username, password)
            .WithClientId(clientId)
            .WithTls()
            .Build();

         mqttClient.ConnectAsync(mqttOptions).Wait();

         if (args.Length == 8)
         {
            string topic = $"channels/{channelSubscribe}/subscribe/fields/+/{readApiKey}";
            
            Console.WriteLine($"Subscribe Topic:{topic}");

            mqttClient.SubscribeAsync(topic, MQTTnet.Protocol.MqttQualityOfServiceLevel.AtMostOnce).Wait(); 
            mqttClient.UseApplicationMessageReceivedHandler(new MqttApplicationMessageReceivedHandlerDelegate(e => MqttClient_ApplicationMessageReceived(e)));
         }
         if (args.Length == 9)
         {
            string topic = $"channels/{channelSubscribe}/subscribe/fields/{field}/{readApiKey}";

            Console.WriteLine($"Subscribe Topic:{topic}");

            mqttClient.SubscribeAsync(topic, MQTTnet.Protocol.MqttQualityOfServiceLevel.AtMostOnce).Wait();
            mqttClient.UseApplicationMessageReceivedHandler(new MqttApplicationMessageReceivedHandlerDelegate(e => MqttClient_ApplicationMessageReceived(e)));
         }

         mqttClient.UseDisconnectedHandler(new MqttClientDisconnectedHandlerDelegate(e => MqttClient_Disconnected(e)));

         string topicTemperatureData = $"channels/{channelData}/publish/{writeApiKey}";

         Console.WriteLine();

         while (true)
         {
            string value = "field1=22." + DateTime.UtcNow.Millisecond.ToString() + "&field2=60." + DateTime.UtcNow.Millisecond.ToString();
            Console.WriteLine($"Publish Topic {topicTemperatureData}  Value {value}");

            var message = new MqttApplicationMessageBuilder()
               .WithTopic(topicTemperatureData)
               .WithPayload(value)
               .WithQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.AtMostOnce)
//               .WithRetainFlag()
            .Build();

            Console.WriteLine("PublishAsync start");
            mqttClient.PublishAsync(message).Wait();
            Console.WriteLine("PublishAsync finish");
            Console.WriteLine();

            Thread.Sleep(30100);
         }
      }

      private static void MqttClient_ApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e)
      {
         Console.WriteLine($"ApplicationMessageReceived ClientId:{e.ClientId} Topic:{e.ApplicationMessage.Topic} Qos:{e.ApplicationMessage.QualityOfServiceLevel} Payload:{e.ApplicationMessage.ConvertPayloadToString()}");
         Console.WriteLine();
      }

      private static async void MqttClient_Disconnected(MqttClientDisconnectedEventArgs e)
      {
         Debug.WriteLine("Disconnected");
         await Task.Delay(TimeSpan.FromSeconds(5));

         try
         {
            await mqttClient.ConnectAsync(mqttOptions);
         }
         catch (Exception ex)
         {
            Debug.WriteLine("Reconnect failed {0}", ex.Message);
         }
      }
   }
Channel configuration

The ThingSpeak channel setup has many attributes (link to an external site, link to a GitHub repository, fields for elevation, latitude, longitude etc. ) but only 8 data fields which seems a bit limiting (some of my sensor nodes report more than 8 values).

Dashboard

The Thingspeak dashboard configuration was fairly simple, though the maximum of eight sequentially numbered of inputs (Fields) might require some mapping code.

Overall the initial configuration went smoothly after I figured out the required Quality of Service (QoS) settings, retain flag usage and the different APIKeys on the publish vs. subscribe topics.

I have not explored the advanced analysis enabled by the tight integration with MATLAB which could be quite an advantage for applications requiring that sort of functionality.