Thingspeak with MQTTnet

As I’m testing my Message Queue Telemetry Transport(MQTT) LoRa gateway I’m building a proof of concept(PoC) .Net core console application for each IoT platform I would like to support.

This PoC was to confirm that I could connect to the Thingspeak MQTT API then format topics and payloads correctly.

MQTT Console Client

The Thingspeak MQTT broker, username, password, and client ID, Channel, writeAPIKey, ClientID, channelData, are the minimum command line options required. The channelSubscribe, readAPIkey and field are optional.

   class Program
   {
      private static IMqttClient mqttClient = null;
      private static IMqttClientOptions mqttOptions = null;
      private static string server;
      private static string username;
      private static string password;
      private static string writeApiKey;
      private static string clientId;
      private static string channelData;
      private static string channelSubscribe;
      private static string readApiKey;
      private static string field;

      static void Main(string[] args)
      {
         MqttFactory factory = new MqttFactory();
         mqttClient = factory.CreateMqttClient();

         if ((args.Length != 6) && (args.Length != 8) && (args.Length != 9))
         {
            Console.WriteLine("[MQTT Server] [UserName] [Password] [WriteAPIKey] [ClientID] [Channel]");
            Console.WriteLine("[MQTT Server] [UserName] [Password] [WriteAPIKey] [ClientID] [Channel] [channelSubscribe] [ReadApiKey]");
            Console.WriteLine("[MQTT Server] [UserName] [Password] [WriteAPIKey] [ClientID] [Channel] [channelSubscribe] [ReadApiKey] [Field]");
            Console.WriteLine("Press <enter> to exit");
            Console.ReadLine();
            return;
         }

         server = args[0];
         username = args[1];
         password = args[2];
         writeApiKey = args[3];
         clientId = args[4];
         channelData = args[5];

         if (args.Length == 6)
         {
            Console.WriteLine($"MQTT Server:{server} Username:{username} ClientID:{clientId} ChannelData:{channelData}");
         }

         if (args.Length == 8)
         {
            channelSubscribe = args[6];
            readApiKey = args[7];
            Console.WriteLine($"MQTT Server:{server} Username:{username} ClientID:{clientId} ChannelData:{channelData} ChannelSubscribe:{channelSubscribe}");
         }

         if (args.Length == 9)
         {
            channelSubscribe = args[6];
            readApiKey = args[7];
            field = args[8];
            Console.WriteLine($"MQTT Server:{server} Username:{username} ClientID:{clientId} ChannelData:{channelData} ChannelSubscribe:{channelSubscribe} Field:{field}");
         }

         mqttOptions = new MqttClientOptionsBuilder()
            .WithTcpServer(server)
            .WithCredentials(username, password)
            .WithClientId(clientId)
            .WithTls()
            .Build();

         mqttClient.ConnectAsync(mqttOptions).Wait();

         if (args.Length == 8)
         {
            string topic = $"channels/{channelSubscribe}/subscribe/fields/+/{readApiKey}";
            
            Console.WriteLine($"Subscribe Topic:{topic}");

            mqttClient.SubscribeAsync(topic, MQTTnet.Protocol.MqttQualityOfServiceLevel.AtMostOnce).Wait(); 
            mqttClient.UseApplicationMessageReceivedHandler(new MqttApplicationMessageReceivedHandlerDelegate(e => MqttClient_ApplicationMessageReceived(e)));
         }
         if (args.Length == 9)
         {
            string topic = $"channels/{channelSubscribe}/subscribe/fields/{field}/{readApiKey}";

            Console.WriteLine($"Subscribe Topic:{topic}");

            mqttClient.SubscribeAsync(topic, MQTTnet.Protocol.MqttQualityOfServiceLevel.AtMostOnce).Wait();
            mqttClient.UseApplicationMessageReceivedHandler(new MqttApplicationMessageReceivedHandlerDelegate(e => MqttClient_ApplicationMessageReceived(e)));
         }

         mqttClient.UseDisconnectedHandler(new MqttClientDisconnectedHandlerDelegate(e => MqttClient_Disconnected(e)));

         string topicTemperatureData = $"channels/{channelData}/publish/{writeApiKey}";

         Console.WriteLine();

         while (true)
         {
            string value = "field1=22." + DateTime.UtcNow.Millisecond.ToString() + "&field2=60." + DateTime.UtcNow.Millisecond.ToString();
            Console.WriteLine($"Publish Topic {topicTemperatureData}  Value {value}");

            var message = new MqttApplicationMessageBuilder()
               .WithTopic(topicTemperatureData)
               .WithPayload(value)
               .WithQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.AtMostOnce)
//               .WithRetainFlag()
            .Build();

            Console.WriteLine("PublishAsync start");
            mqttClient.PublishAsync(message).Wait();
            Console.WriteLine("PublishAsync finish");
            Console.WriteLine();

            Thread.Sleep(30100);
         }
      }

      private static void MqttClient_ApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e)
      {
         Console.WriteLine($"ApplicationMessageReceived ClientId:{e.ClientId} Topic:{e.ApplicationMessage.Topic} Qos:{e.ApplicationMessage.QualityOfServiceLevel} Payload:{e.ApplicationMessage.ConvertPayloadToString()}");
         Console.WriteLine();
      }

      private static async void MqttClient_Disconnected(MqttClientDisconnectedEventArgs e)
      {
         Debug.WriteLine("Disconnected");
         await Task.Delay(TimeSpan.FromSeconds(5));

         try
         {
            await mqttClient.ConnectAsync(mqttOptions);
         }
         catch (Exception ex)
         {
            Debug.WriteLine("Reconnect failed {0}", ex.Message);
         }
      }
   }
Channel configuration

The ThingSpeak channel setup has many attributes (link to an external site, link to a GitHub repository, fields for elevation, latitude, longitude etc. ) but only 8 data fields which seems a bit limiting (some of my sensor nodes report more than 8 values).

Dashboard

The Thingspeak dashboard configuration was fairly simple, though the maximum of eight sequentially numbered of inputs (Fields) might require some mapping code.

Overall the initial configuration went smoothly after I figured out the required Quality of Service (QoS) settings, retain flag usage and the different APIKeys on the publish vs. subscribe topics.

I have not explored the advanced analysis enabled by the tight integration with MATLAB which could be quite an advantage for applications requiring that sort of functionality.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.