Random wanderings through Microsoft Azure esp. PaaS plumbing, the IoT bits, AI on Micro controllers, AI on Edge Devices, .NET nanoFramework, .NET Core on *nix and ML.NET+ONNX
The console application uses MQTTNet to connect to TTN. It subscribes to to the TTN application device uplink topic (did try subscribing to the uplink messages for all the devices in the application but this was to noisy), and the downlink message scheduled, sent and acknowledged topics. To send messages to the device I published them on the device downlink topic.
class Program
{
private static string payload;
static async Task Main(string[] args)
{
string filename;
string azureIoTHubconnectionString;
DeviceClient azureIoTHubClient;
if (args.Length != 2)
{
Console.WriteLine("[JOSN file] [AzureIoTHubConnectionString]");
Console.WriteLine("Press <enter> to exit");
Console.ReadLine();
return;
}
filename = args[0];
azureIoTHubconnectionString = args[1];
try
{
payload = File.ReadAllText(filename);
// Open up the connection
azureIoTHubClient = DeviceClient.CreateFromConnectionString(azureIoTHubconnectionString, TransportType.Mqtt);
//azureIoTHubClient = DeviceClient.CreateFromConnectionString(azureIoTHubconnectionString, TransportType.Mqtt_Tcp_Only);
//azureIoTHubClient = DeviceClient.CreateFromConnectionString(azureIoTHubconnectionString, TransportType.Mqtt_WebSocket_Only);
await azureIoTHubClient.OpenAsync();
await azureIoTHubClient.SetMethodDefaultHandlerAsync(MethodCallbackDefault, null);
Timer MessageSender = new Timer(TimerCallback, azureIoTHubClient, new TimeSpan(0, 0, 10), new TimeSpan(0, 0, 10));
Console.WriteLine("Press <enter> to exit");
Console.ReadLine();
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
Console.WriteLine("Press <enter> to exit");
Console.ReadLine();
}
}
public static async void TimerCallback(object state)
{
DeviceClient azureIoTHubClient = (DeviceClient)state;
try
{
// I know having the payload as a global is a bit nasty but this is a demo..
using (Message message = new Message(Encoding.ASCII.GetBytes(JsonConvert.SerializeObject(payload))))
{
Console.WriteLine(" {0:HH:mm:ss} AzureIoTHubDeviceClient SendEventAsync start", DateTime.UtcNow);
await azureIoTHubClient.SendEventAsync(message);
Console.WriteLine(" {0:HH:mm:ss} AzureIoTHubDeviceClient SendEventAsync finish", DateTime.UtcNow);
}
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
private static async Task<MethodResponse> MethodCallbackDefault(MethodRequest methodRequest, object userContext)
{
Console.WriteLine($"Default handler method {methodRequest.Name} was called.");
return new MethodResponse(200);
}
}
I configured an Azure IoT hub then used Azure IoT explorer to create a device and get the connections string for my application. After fixing up the application’s command line parameters I could see the timer code was successfully sending telemetry messages to my Azure IoT Hub. I also explored the different MQTT connections options TransportType.Mqtt, TransportType.Mqtt_Tcp_Only, and TransportType.Mqtt_WebSocket_Only which worked as expected.
MQTT Console application displaying sent telemetryAzure IoT Hub displaying received telemetry
I could also initiate Direct Method calls to my console application from Azure IoT explorer.
Azure IoT Explorer initiating a Direct Method MQTT console application displaying direct method call.
I then changed the protocol to AMQP
class Program
{
private static string payload;
static async Task Main(string[] args)
{
string filename;
string azureIoTHubconnectionString;
DeviceClient azureIoTHubClient;
Timer MessageSender;
if (args.Length != 2)
{
Console.WriteLine("[JOSN file] [AzureIoTHubConnectionString]");
Console.WriteLine("Press <enter> to exit");
Console.ReadLine();
return;
}
filename = args[0];
azureIoTHubconnectionString = args[1];
try
{
payload = File.ReadAllText(filename);
// Open up the connection
azureIoTHubClient = DeviceClient.CreateFromConnectionString(azureIoTHubconnectionString, TransportType.Amqp);
//azureIoTHubClient = DeviceClient.CreateFromConnectionString(azureIoTHubconnectionString, TransportType.Amqp_Tcp_Only);
//azureIoTHubClient = DeviceClient.CreateFromConnectionString(azureIoTHubconnectionString, TransportType.Amqp_WebSocket_Only);
await azureIoTHubClient.OpenAsync();
await azureIoTHubClient.SetMethodDefaultHandlerAsync(MethodCallbackDefault, null);
//MessageSender = new Timer(TimerCallbackAsync, azureIoTHubClient, new TimeSpan(0, 0, 10), new TimeSpan(0, 0, 10));
MessageSender = new Timer(TimerCallbackSync, azureIoTHubClient, new TimeSpan(0, 0, 10), new TimeSpan(0, 0, 10));
#if MESSAGE_PUMP
Console.WriteLine("Press any key to exit");
while (!Console.KeyAvailable)
{
await Task.Delay(100);
}
#else
Console.WriteLine("Press <enter> to exit");
Console.ReadLine();
#endif
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
Console.WriteLine("Press <enter> to exit");
Console.ReadLine();
}
}
public static async void TimerCallbackAsync(object state)
{
DeviceClient azureIoTHubClient = (DeviceClient)state;
try
{
// I know having the payload as a global is a bit nasty but this is a demo..
using (Message message = new Message(Encoding.ASCII.GetBytes(JsonConvert.SerializeObject(payload))))
{
Console.WriteLine(" {0:HH:mm:ss} AzureIoTHubDeviceClient SendEventAsync start", DateTime.UtcNow);
await azureIoTHubClient.SendEventAsync(message);
Console.WriteLine(" {0:HH:mm:ss} AzureIoTHubDeviceClient SendEventAsync finish", DateTime.UtcNow);
}
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
public static void TimerCallbackSync(object state)
{
DeviceClient azureIoTHubClient = (DeviceClient)state;
try
{
// I know having the payload as a global is a bit nasty but this is a demo..
using (Message message = new Message(Encoding.ASCII.GetBytes(JsonConvert.SerializeObject(payload))))
{
Console.WriteLine(" {0:HH:mm:ss} AzureIoTHubDeviceClient SendEventAsync start", DateTime.UtcNow);
azureIoTHubClient.SendEventAsync(message).GetAwaiter();
Console.WriteLine(" {0:HH:mm:ss} AzureIoTHubDeviceClient SendEventAsync finish", DateTime.UtcNow);
}
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
private static async Task<MethodResponse> MethodCallbackDefault(MethodRequest methodRequest, object userContext)
{
Console.WriteLine($"Default handler method {methodRequest.Name} was called.");
return new MethodResponse(200);
}
}
In the first version of my console application I could see the SendEventAsync method was getting called but was not returning
AMQP Console application displaying sent telemetry failure
Even though the SendEventAsync call was not returning the telemetry messages were making it to my Azure IoT Hub.
Azure IoT Hub displaying AMQP telemetry
When I tried to initiate a Direct Method call from Azure IoT Explorer it failed after a while with a timeout.
Azure IoT Explorer initiating a Direct Method
The first successful approach I tried was to change the Console.Readline to a “message pump” (flashbacks to Win32 API programming).
Console.WriteLine("Press any key to exit");
while (!Console.KeyAvailable)
{
await Task.Delay(100);
}
After some more experimentation I found that changing the timer method from asynchronous to synchronous also worked.
public static void TimerCallbackSync(object state)
{
DeviceClient azureIoTHubClient = (DeviceClient)state;
try
{
// I know having the payload as a global is a bit nasty but this is a demo..
using (Message message = new Message(Encoding.ASCII.GetBytes(JsonConvert.SerializeObject(payload))))
{
Console.WriteLine(" {0:HH:mm:ss} AzureIoTHubDeviceClient SendEventAsync start", DateTime.UtcNow);
azureIoTHubClient.SendEventAsync(message).GetAwaiter();
Console.WriteLine(" {0:HH:mm:ss} AzureIoTHubDeviceClient SendEventAsync finish", DateTime.UtcNow);
}
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
I also had to change the method declaration and modify the SendEventAsync call to use a GetAwaiter.
AMQP Console application displaying sent telemetryAzure IoT Hub displaying received telemetryAzure IoT Explorer initiating a Direct MethodMQTT console application displaying direct method call.
It took a while to figure out enough about what was going on so I could do a search with the right keywords (DeviceClient AMQP async await SendEventAsync) to confirm my suspicion that MQTT and AMQP clients did behave differently.
As I’m testing my Message Queue Telemetry Transport(MQTT) LoRa gateway I’m building a proof of concept(PoC) .Net core console application for each IoT platform I would like to support.
This PoC was to confirm that I could connect to the AllThingsTalkMQTT API then format topics and payloads correctly.
MQTTNet Console Client
The AllThingsTalk MQTT broker, username, and device ID are required command line parameters.
namespace devmobile.Mqtt.TestClient.AllThingsTalk
{
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.Options;
using MQTTnet.Client.Receiving;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
class Program
{
private static IMqttClient mqttClient = null;
private static IMqttClientOptions mqttOptions = null;
private static string server;
private static string username;
private static string deviceID;
static void Main(string[] args)
{
MqttFactory factory = new MqttFactory();
mqttClient = factory.CreateMqttClient();
if ((args.Length != 3))
{
Console.WriteLine("[MQTT Server] [UserName] [ClientID]");
Console.WriteLine("Press <enter> to exit");
Console.ReadLine();
return;
}
server = args[0];
username = args[1];
deviceID = args[2];
Console.WriteLine($"MQTT Server:{server} DeviceID:{deviceID}");
// AllThingsTalk formatted device state update topic
string topicD2C = $"device/{deviceID}/state";
mqttOptions = new MqttClientOptionsBuilder()
.WithTcpServer(server)
.WithCredentials(username, "HighlySecurePassword")
.WithClientId(deviceID)
.WithTls()
.Build();
mqttClient.UseDisconnectedHandler(new MqttClientDisconnectedHandlerDelegate(e => MqttClient_Disconnected(e)));
mqttClient.UseApplicationMessageReceivedHandler(new MqttApplicationMessageReceivedHandlerDelegate(e => MqttClient_ApplicationMessageReceived(e)));
mqttClient.ConnectAsync(mqttOptions).Wait();
// AllThingsTalk formatted device command with wildcard topic
string topicC2D = $"device/{deviceID}/asset/+/command";
mqttClient.SubscribeAsync(topicC2D, MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce).GetAwaiter().GetResult();
while (true)
{
JObject payloadJObject = new JObject();
double temperature = 22.0 + (DateTime.UtcNow.Millisecond / 1000.0);
temperature = Math.Round( temperature, 1 );
double humidity = 50 + (DateTime.UtcNow.Millisecond / 100.0);
humidity = Math.Round(humidity, 1);
JObject temperatureJObject = new JObject
{
{ "value", temperature }
};
payloadJObject.Add("Temperature", temperatureJObject);
JObject humidityJObject = new JObject
{
{ "value", humidity }
};
payloadJObject.Add("Humidity", humidityJObject);
string payload = JsonConvert.SerializeObject(payloadJObject);
Console.WriteLine($"Topic:{topicD2C} Payload:{payload}");
var message = new MqttApplicationMessageBuilder()
.WithTopic(topicD2C)
.WithPayload(payload)
.WithAtMostOnceQoS()
// .WithAtLeastOnceQoS()
.Build();
Console.WriteLine("PublishAsync start");
mqttClient.PublishAsync(message).Wait();
Console.WriteLine("PublishAsync finish");
Thread.Sleep(15100);
}
}
private static void MqttClient_ApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e)
{
Console.WriteLine($"ClientId:{e.ClientId} Topic:{e.ApplicationMessage.Topic} Payload:{e.ApplicationMessage.ConvertPayloadToString()}");
}
private static async void MqttClient_Disconnected(MqttClientDisconnectedEventArgs e)
{
Debug.WriteLine("Disconnected");
await Task.Delay(TimeSpan.FromSeconds(5));
try
{
await mqttClient.ConnectAsync(mqttOptions);
}
catch (Exception ex)
{
Debug.WriteLine("Reconnect failed {0}", ex.Message);
}
}
}
The AllThingsTalk device configuration was relatively easy but I need to investigate “Gateway” functionality and configuration further.
Configuring an Asset
Configuration a watchdog to check for sensor data
Sending a command to an actuatorProcessing a command on the client
The ability to look at message payloads in the Debug tab would be very helpful when working out why a payload was not being processed as expected.
Asset debug information
Overall the AllThingsTalk configuration went fairly smoothly, though I need to investigate the “Gateway” configuration and functionality further. The way that assets are name by the system could make support in my MQTT Gateway more complex.
Application Insights logging with message unpackingApplication Insights logging message payload
Then in the last log entry the decoded message payload
/*
Copyright ® 2020 Feb devMobile Software, All Rights Reserved
MIT License
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE
Default URL for triggering event grid function in the local environment.
http://localhost:7071/runtime/webhooks/EventGrid?functionName=functionname
*/
namespace EventGridProcessorAzureIotHub
{
using System;
using System.IO;
using System.Reflection;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.EventGrid.Models;
using Microsoft.Azure.WebJobs.Extensions.EventGrid;
using log4net;
using log4net.Config;
using Newtonsoft.Json;
public static class Telemetry
{
[FunctionName("Telemetry")]
public static void Run([EventGridTrigger]Microsoft.Azure.EventGrid.Models.EventGridEvent eventGridEvent, ExecutionContext executionContext )//, TelemetryClient telemetryClient)
{
ILog log = log4net.LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);
var logRepository = LogManager.GetRepository(Assembly.GetEntryAssembly());
XmlConfigurator.Configure(logRepository, new FileInfo(Path.Combine(executionContext.FunctionAppDirectory, "log4net.config")));
log.Info($"eventGridEvent.Data-{eventGridEvent}");
log.Info($"eventGridEvent.Data.ToString()-{eventGridEvent.Data.ToString()}");
IotHubDeviceTelemetryEventData iOThubDeviceTelemetryEventData = (IotHubDeviceTelemetryEventData)JsonConvert.DeserializeObject(eventGridEvent.Data.ToString(), typeof(IotHubDeviceTelemetryEventData));
log.Info($"iOThubDeviceTelemetryEventData.Body.ToString()-{iOThubDeviceTelemetryEventData.Body.ToString()}");
byte[] base64EncodedBytes = System.Convert.FromBase64String(iOThubDeviceTelemetryEventData.Body.ToString());
log.Info($"System.Text.Encoding.UTF8.GetString(-{System.Text.Encoding.UTF8.GetString(base64EncodedBytes)}");
}
}
}
Overall it took roughly half a page of code (mainly generated by a tool) to unpack and log the contents of an Azure IoT Hub EventGrid payload to Application Insights.
The size of the packets sent and the total device data appeared to map pretty well but I was also interested in the Transport Layer Security (TLS) and Messaging Queuing Telemetry Transport (MQTT) overheads.
Azure IoT Hub Metrics
To get an idea of the overheads I fired up LiveTcpUdpWatch by Nirsoft and noted down the traffic measure on port 8883.
Conenction LiveTcpUdpWatch main screen
Launching the MQTTNet client sending every 30 seconds resulted in traffic like this
So it looks like my very rough numbers are close to the numbers discussed in the above article. I need to explore the impact of keep-alive messages and other background operations.
I did notice that the .DeviceConnected and .DeviceDisconnected events did take a while to arrive. When I started the field gateway application on the Windows 10 IoT Core device I would get several DeviceTelemetry events before the DeviceConnected event arrived.
I was using Advanced Message Queueing Protocol (AMQP) so I modified the configuration file so I could try all the available options.
C# TransportType enumeration
namespace Microsoft.Azure.Devices.Client
{
//
// Summary:
// Transport types supported by DeviceClient - AMQP/TCP, HTTP 1.1, MQTT/TCP, AMQP/WS,
// MQTT/WS
public enum TransportType
{
//
// Summary:
// Advanced Message Queuing Protocol transport. Try Amqp over TCP first and fallback
// to Amqp over WebSocket if that fails
Amqp = 0,
//
// Summary:
// HyperText Transfer Protocol version 1 transport.
Http1 = 1,
//
// Summary:
// Advanced Message Queuing Protocol transport over WebSocket only.
Amqp_WebSocket_Only = 2,
//
// Summary:
// Advanced Message Queuing Protocol transport over native TCP only
Amqp_Tcp_Only = 3,
//
// Summary:
// Message Queuing Telemetry Transport. Try Mqtt over TCP first and fallback to
// Mqtt over WebSocket if that fails
Mqtt = 4,
//
// Summary:
// Message Queuing Telemetry Transport over Websocket only.
Mqtt_WebSocket_Only = 5,
//
// Summary:
// Message Queuing Telemetry Transport over native TCP only
Mqtt_Tcp_Only = 6
}
}
The first telemetry data arrived 00:57:18, the DeviceConnected arrived 01:01:28 so approximately a 4 minute delay, the DeviceDisconnected arrived within a minute of me shutting the device down.
The first telemetry data arrived 04:16:48, the DeviceConnected arrived 04:20:39 so approximately a 4 minute delay, the DeviceDisconnected arrived within a minute of me shutting the device down.
The first telemetry data arrived 04:05:36, DeviceConnected arrived 04:09:52 so approximately a 4 minute delay, the DeviceDisconnected arrived within a minute of me shutting the device down.
HTTP
I waited for 20 minutes and there wasn’t a DeviceConnected message which I sort of expected as HTTP is a connectionless protocol.
The first telemetry data arrived 01:11:33, the DeviceConnected arrived 01:11:25 so they arrived in order and within 10 seconds, the DeviceDisconnected arrived within a 15 seconds of me shutting the device down.
The first telemetry data arrived 04:42:15, the DeviceConnected arrived 04:42:06 so they arrived in order and within 10 seconds, the DeviceDisconnected arrived within a 20 seconds of me shutting device down.
The first telemetry data arrived 04:36:08, the DeviceConnected arrived 04:36:03 so they arrived in order and within 10 seconds, the DeviceDisconnected arrived within a 30 seconds of me shutting device down.
Summary
My LoRa sensors nodes are sending data roughly every minute which reduces the precision of the times.
It looks like for AMQP based messaging it can take 4-5 minutes for a Devices.DeviceConnected message to arrive, for based MQTT messaging it’s 5-10 seconds.
I have one an Azure IoT HubLoRa Telemetry Field Gateway running in my office and I wanted to process the data collected by the sensors around my property without using a Software as a Service(SaaS) Internet of Things (IoT) package.
Rather than lots of screen grabs of my configuration steps I figured people reading this series of posts would be able to figure the details out themselves.
I downloaded the JSON configuration file template from my Windows 10 device (which is created on first startup after installation) and configured the Azure IoT Hub connection string.
I then uploaded this to my Windows 10 IoT Core device and restarted the Azure IoT Hub Field gateway so it picked up the new settings.
I could then see on the device messages from sensor nodes being unpacked and uploaded to my Azure IoT Hub.
ETW logging on device
In the Azure IoT Hub metrics I graphed the number of devices connected and the number of telemetry messages sent and could see my device connect then start uploading telemetry.
Azure IoT Hub metrics
One of my customers uses Azure Event Grid for application integration and I wanted to explore using it in an IoT solution. The first step was to create an Event Grid Domain.
To confirm my event subscriptions were successful I previously found the “simplest” approach was to use an Azure storage queue endpoint. I had to create an Azure Storage Account with two Azure Storage Queues one for device connectivity (.DeviceConnected & .DeviceDisconnected) events and the other for device telemetry (.DeviceTelemetry) events.
I created a couple of other subscriptions so I could compare the different Event schemas (Event Grid Schema & Cloud Event Schema v1.0). At this stage I didn’t configure any Filters or Additional Features.
Azure IoT Hub Telemetry Event Metrics
I use Cerebrate Cerculean for monitoring and managing a couple of other customer projects so I used it to inspect the messages in the storage queues.
Without writing any code (I will script the configuration) I could upload sensor data to an Azure IoT Hub, subscribe to a selection of events the Azure IoT Hub publishes and then inspect them in an Azure Storage Queue.
I did notice that the .DeviceConnected and .DeviceDisconnected events did take a while to arrive. When I started the field gateway application on the device I would get several DeviceTelemetry events before the DeviceConnected event arrived.
As I’m testing my Message Queue Telemetry Transport(MQTT) LoRa gateway I’m building a proof of concept(PoC) .Net core console application for each IoT platform I would like to support.
This PoC was to confirm that I could connect to the Bosch IoT SuiteMQTT API then format topics and payloads correctly.
MQTTNet Console Client
The Bosch IoT Hub MQTT broker, username, password, and clientID are the required command line parameters. For this PoC I ran out of time to get cloud to device (C2D) messaging or any presentation functionality working.
/*
Copyright ® 2019 December devMobile Software, All Rights Reserved
MIT License
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE
A quick and dirty test client to explore how BoschIoT Suite MQTT connectivity works
*/
namespace devMobile.Mqtt.TestClient.BoschIoTSuite
{
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.Options;
using MQTTnet.Client.Receiving;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
class Program
{
private static IMqttClient mqttClient = null;
private static IMqttClientOptions mqttOptions = null;
private static string server;
private static string username;
private static string password;
private static string clientId;
static void Main(string[] args)
{
MqttFactory factory = new MqttFactory();
mqttClient = factory.CreateMqttClient();
if (args.Length != 4)
{
Console.WriteLine("[MQTT Server] [UserName] [Password] [ClientID]");
Console.WriteLine("Press <enter> to exit");
Console.ReadLine();
return;
}
server = args[0];
username = args[1];
password = args[2];
clientId = args[3];
mqttOptions = new MqttClientOptionsBuilder()
.WithTcpServer(server)
.WithCredentials(username, password)
.WithClientId(clientId)
.WithTls()
.Build();
mqttClient.UseDisconnectedHandler(new MqttClientDisconnectedHandlerDelegate(e => MqttClient_Disconnected(e)));
mqttClient.UseApplicationMessageReceivedHandler(new MqttApplicationMessageReceivedHandlerDelegate(e => MqttClient_ApplicationMessageReceived(e)));
mqttClient.ConnectAsync(mqttOptions).Wait();
string topicD2C = "telemetry";
while (true)
{
JObject payloadJObject = new JObject();
payloadJObject.Add("OfficeTemperature", "22." + DateTime.UtcNow.Millisecond.ToString());
payloadJObject.Add("OfficeHumidity", (DateTime.UtcNow.Second + 40).ToString());
string payload = JsonConvert.SerializeObject(payloadJObject);
Console.WriteLine($"Topic:{topicD2C} Payload:{payload}");
var message = new MqttApplicationMessageBuilder()
.WithTopic(topicD2C)
.WithPayload(payload)
.WithAtMostOnceQoS() // Anthing but this causes timeout
.WithRetainFlag()
.Build();
Console.WriteLine("PublishAsync start");
mqttClient.PublishAsync(message).Wait();
Console.WriteLine("PublishAsync finish");
Thread.Sleep(30100);
}
}
private static void MqttClient_ApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e)
{
Console.WriteLine($"ClientId:{e.ClientId} Topic:{e.ApplicationMessage.Topic} Payload:{e.ApplicationMessage.ConvertPayloadToString()}");
}
private static async void MqttClient_Disconnected(MqttClientDisconnectedEventArgs e)
{
Debug.WriteLine("Disconnected");
await Task.Delay(TimeSpan.FromSeconds(5));
try
{
await mqttClient.ConnectAsync(mqttOptions);
}
catch (Exception ex)
{
Debug.WriteLine("Reconnect failed {0}", ex.Message);
}
}
}
}
The bosch IoT Hub device configuration was via a swagger API but I need to spend some more time figuring out how to configure the data analysis and presentation tools.
I adapted the steps in the IoT Hub Documentation for Sending Device Data using MQTT. The first step was to create a free Hub subscription.
IoT Hub Subscription
Then using the device registry swagger UI page to add a new device.
Device Registry Swagger UI
After a couple of failed attempts I worked out the format of the Authorisation details (I think the username format in the online documentation might be wrong)
Swagger UI Authorisation formQuerying the available devices
Of the 10+ SaaS IoT services I have setup the Bosch IoT Suite was the hardest to get working. I think this was becuase it is meant to be managed via the API from a in-house application. In a future post I’ll get configure the cloud to device messaging, plus analysis and display functionality.
As I’m testing my Message Queue Telemetry Transport(MQTT) LoRa gateway I’m building a proof of concept(PoC) .Net core console application for each IoT platform I would like to support.
This PoC was to confirm that I could connect to the walkaboutMQTT API then format topics and payloads correctly.
MQTTNet Console Client
The walkabout MQTT broker, username, API Key, and device ID are the required command line parameters. For this PoC I couldn’t get cloud to device (C2D) or Transport Layer Security(TLS) working so will have to do some more research.
namespace devmobile.Mqtt.TestClient.WolkAbout
{
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.Options;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
class Program
{
private static IMqttClient mqttClient = null;
private static IMqttClientOptions mqttOptions = null;
private static string server;
private static string username;
private static string apiKey;
private static string clientID;
static void Main(string[] args)
{
MqttFactory factory = new MqttFactory();
mqttClient = factory.CreateMqttClient();
if ((args.Length != 4) )
{
Console.WriteLine("[MQTT Server] [UserName] [APIKey] [ClientID]");
Console.WriteLine("Press <enter> to exit");
Console.ReadLine();
return;
}
server = args[0];
username = args[1];
apiKey = args[2];
clientID = args[3];
Console.WriteLine($"MQTT Server:{server} Username:{username} ClientID:{clientID}");
// wolkabout formatted client state update topic
string topicD2C = $"readings/{username}/";
mqttOptions = new MqttClientOptionsBuilder()
.WithTcpServer(server)
.WithCredentials(username, apiKey)
.WithClientId(clientID)
//.WithTls()
.Build();
mqttClient.UseDisconnectedHandler(new MqttClientDisconnectedHandlerDelegate(e => MqttClient_Disconnected(e)));
mqttClient.ConnectAsync(mqttOptions).Wait();
while (true)
{
JObject payloadJObject = new JObject();
double temperature = 22.0 + (DateTime.UtcNow.Millisecond / 1000.0);
double humidity = 50 + (DateTime.UtcNow.Millisecond / 100.0);
payloadJObject.Add("Temperature", temperature);
payloadJObject.Add("Humidity", humidity);
string payload = JsonConvert.SerializeObject(payloadJObject);
Console.WriteLine($"Topic:{topicD2C} Payload:{payload}");
var message = new MqttApplicationMessageBuilder()
.WithTopic(topicD2C)
.WithPayload(payload)
.WithAtLeastOnceQoS()
.Build();
Console.WriteLine("PublishAsync start");
mqttClient.PublishAsync(message).Wait();
Console.WriteLine("PublishAsync finish");
Thread.Sleep(30100);
}
}
private static async void MqttClient_Disconnected(MqttClientDisconnectedEventArgs e)
{
Debug.WriteLine("Disconnected");
await Task.Delay(TimeSpan.FromSeconds(5));
try
{
await mqttClient.ConnectAsync(mqttOptions);
}
catch (Exception ex)
{
Debug.WriteLine("Reconnect failed {0}", ex.Message);
}
}
}
The walkabout device configuration was relatively easy but I need watch the instructional videos again to better understand the device and data semantics relationship.
Data semantics configurationDevices setupDevice SetupMy first dashboard
As I’m testing my Message Queue Telemetry Transport(MQTT) LoRa gateway I’m building a proof of concept(PoC) .Net core console application for each IoT platform I would like to support.
This PoC was to confirm that I could connect to the SmartWorks (formerly Carriots) MQTT API then format topics and payloads correctly.
MQTTNet Console Client
The SmartWorks MQTT broker, username, and device ID are the required command line parameters. I didn’t notice any configuration options for cloud to device (C2D) messaging which maybe due to my device configuration or the free trial I was using.
namespace devMobile.Mqtt.TestClient.SmartWorks
{
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.Options;
using MQTTnet.Client.Receiving;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
class Program
{
private static IMqttClient mqttClient = null;
private static IMqttClientOptions mqttOptions = null;
private static string server;
private static string username;
private static string clientId;
private static string commandTopic;
private static string groupname;
private static string feedname;
static void Main(string[] args)
{
MqttFactory factory = new MqttFactory();
mqttClient = factory.CreateMqttClient();
if (args.Length != 3)
{
Console.WriteLine("[MQTT Server] [UserName] [ClientID]");
Console.WriteLine("Press <enter> to exit");
Console.ReadLine();
return;
}
server = args[0];
username = args[1];
clientId = args[2];
mqttOptions = new MqttClientOptionsBuilder()
.WithTcpServer(server)
.WithCredentials(username, "")
.WithClientId(clientId)
.WithTls()
.Build();
mqttClient.UseDisconnectedHandler(new MqttClientDisconnectedHandlerDelegate(e => MqttClient_Disconnected(e)));
mqttClient.ConnectAsync(mqttOptions).Wait();
// Adafruit.IO format for topics which are called feeds
string topicD2C = $"{username}/streams";
while (true)
{
JObject payloadJObject = new JObject();
payloadJObject.Add("at", "now");
payloadJObject.Add("device", clientId);
payloadJObject.Add("protocol", "v2");
double temperature = 22.0 + (DateTime.UtcNow.Millisecond / 1000.0);
double humidity = 50 + (DateTime.UtcNow.Millisecond / 100.0);
JObject dataJObject = new JObject();
dataJObject.Add("OfficeTemperature", temperature);
dataJObject.Add("OfficeHumidity", humidity);
payloadJObject.Add("data", dataJObject);
string payload = JsonConvert.SerializeObject(payloadJObject);
Console.WriteLine($"Topic:{topicD2C} Payload:{payload}");
var message = new MqttApplicationMessageBuilder()
.WithTopic(topicD2C)
.WithPayload(payload)
.WithAtLeastOnceQoS()
.Build();
Console.WriteLine("PublishAsync start");
mqttClient.PublishAsync(message).Wait();
Console.WriteLine("PublishAsync finish");
Thread.Sleep(30100);
}
}
private static async void MqttClient_Disconnected(MqttClientDisconnectedEventArgs e)
{
Debug.WriteLine("Disconnected");
await Task.Delay(TimeSpan.FromSeconds(5));
try
{
await mqttClient.ConnectAsync(mqttOptions);
}
catch (Exception ex)
{
Debug.WriteLine("Reconnect failed {0}", ex.Message);
}
}
}
}
The ThingsBoard device configuration was relatively easy with convenient buttons to copy the Device ID (Client ID in test client) and Access Token (UserName in test client). I need to revisit the Device and Group configuration to see if I can make the automatically generated names more user friendly.
Devices configuration
The Device configuration form has a tab which has a link for the “Data Streams” form which was useful for debugging.
Device configuration
I have emailed SmartWorks support about a free trial of their dashboard product as it is not available in the free trial.
Device data stream query form
Overall the initial configuration went smoothly but the lack of any dashboard functionality in the free trial was quite limiting.