The Things Network HTTP Integration Part7

Queuing uplink messages

For my HTTP Integration I need to reliably forward messages to an Azure IoT Hub or Azure IoT Central. This solution needs to be robust and not lose any messages even when portions of the system are unavailable because of failures or sudden spikes in inbound traffic.

I added yet another controller, it receives an uplink messages from The Things Network(TTN) and puts them into an Azure Storage Queue.

[Route("[controller]")]
[ApiController]
public class Queued : ControllerBase
{
   private readonly string storageConnectionString;
   private readonly string queueName;
   private static readonly ILog log = log4net.LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);

   public Queued( IConfiguration configuration)
   {
      this.storageConnectionString = configuration.GetSection("AzureStorageConnectionString").Value;
      this.queueName = configuration.GetSection("UplinkQueueName").Value;
   }

   public string Index()
   {
      return "Queued move along nothing to see";
   }

   [HttpPost]
   public async Task<IActionResult> Post([FromBody] PayloadV5 payload)
   {
      string payloadFieldsUnpacked = string.Empty;

      // Check that the post data is good
      if (!this.ModelState.IsValid)
      {
         log.WarnFormat("QueuedController validation failed {0}", this.ModelState.Messages());

         return this.BadRequest(this.ModelState);
      }

      try
      {
         QueueClient queueClient = new QueueClient(storageConnectionString, queueName);

         await queueClient.CreateIfNotExistsAsync();

         await queueClient.SendMessageAsync(Convert.ToBase64String(JsonSerializer.SerializeToUtf8Bytes(payload)));
      }
      catch( Exception ex)
      {
         log.Error("Unable to open/create queue or send message", ex);

         return this.Problem("Unable to open queue (creating if it doesn't exist) or send message", statusCode:500, title:"Uplink payload not sent" );
      }

      return this.Ok();
   }
}

An Azure Function with a Queue Trigger processes the messages and for this test pauses for 2 seconds (simulating a call to the Device Provisioning Service(DPS) ). It keeps track of the number of concurrent processing threads and when the first message for each device was received since the program was started.

public static class UplinkMessageProcessor
{
   static readonly ILog log = log4net.LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);
   static ConcurrentDictionary<string, PayloadV5> DevicesSeen = new ConcurrentDictionary<string, PayloadV5>();
   static int ConcurrentThreadCount = 0;

   [FunctionName("UplinkMessageProcessor")]
   public static void Run([QueueTrigger("%UplinkQueueName%", Connection = "AzureStorageConnectionString")] string myQueueItem, Microsoft.Azure.WebJobs.ExecutionContext executionContext)
   {
      Interlocked.Increment(ref ConcurrentThreadCount);
      var logRepository = LogManager.GetRepository(Assembly.GetEntryAssembly());
      XmlConfigurator.Configure(logRepository, new FileInfo(Path.Combine(executionContext.FunctionAppDirectory, "log4net.config")));

      log.Info($"Uplink message function triggered: {myQueueItem}");

      PayloadV5 payloadMessage = (PayloadV5)JsonSerializer.Deserialize(myQueueItem, typeof(PayloadV5));
      PayloadV5 payload = (PayloadV5)DevicesSeen.GetOrAdd(payloadMessage.dev_id, payloadMessage);

      log.Info($"Uplink message DevEui:{payload.dev_id} Threads:{ConcurrentThreadCount} First:{payload.metadata.time} Current:{payloadMessage.metadata.time} PayloadRaw:{payload.payload_raw}");

      Thread.Sleep(2000);

      Interlocked.Decrement(ref ConcurrentThreadCount);
   }
}

To explore how this processing worked I sent 1000 uplink messages from my Seeeduino LoRaWAN devices which were buffered in a queue.

Azure storage Explorer 1000 queued messages
Application insights 1000 events

I processed 1000’s of messages with the Azure Function but every so often 10-20% of the logging messages wouldn’t turn up in the logs. I’m using Log4Net and I think it is most probably caused by not flushing the messages before the function finishes

The Things Network HTTP Integration Part5

First TTN payload to the cloud

For my HTTP Integration I need to securely forward messages to an Azure IoT Hub or Azure IoT Central. This “nasty” console application loads a sample The Things Network(TTN) message from a file, connects the specified Azure IOT Hub, reformats the payload and sends it.

I couldn’t use System.Text.Json to construct the message payload as JsonDocument is not modifable(Sept2020). I had to rewrite my code to use Json.Net from Newtonsoft instead.

static async Task Main(string[] args)
{
   string filename ;
   string azureIoTHubconnectionString;
   DeviceClient azureIoTHubClient;
   Payload payload;
   JObject telemetryDataPoint = new JObject();

...
   filename = args[0];
   azureIoTHubconnectionString = args[1];

   try
   {
      payload = JsonConvert.DeserializeObject<Payload>(File.ReadAllText(filename));

      JObject payloadFields = (JObject)payload.payload_fields;

      using (azureIoTHubClient = DeviceClient.CreateFromConnectionString(azureIoTHubconnectionString, TransportType.Amqp))
      {
         await azureIoTHubClient.OpenAsync();

         foreach (JProperty child in payloadFields.Children())
         {
            EnumerateChildren(telemetryDataPoint, child);
         }
               
         using (Message message = new Message(Encoding.ASCII.GetBytes(JsonConvert.SerializeObject(telemetryDataPoint))))
         {
            Console.WriteLine(" {0:HH:mm:ss} AzureIoTHubDeviceClient SendEventAsync start", DateTime.UtcNow);
            await azureIoTHubClient.SendEventAsync(message);
            Console.WriteLine(" {0:HH:mm:ss} AzureIoTHubDeviceClient SendEventAsync finish", DateTime.UtcNow);
         }

         await azureIoTHubClient.CloseAsync();
      }
   }
   catch (Exception ex)
   {            
      Console.WriteLine(ex.Message);
   }

   Console.WriteLine("Press <enter> to exit");
   Console.ReadLine();
   return;
}

static void EnumerateChildren(JObject jobject, JToken token)
{
   if (token is JProperty property)
   {
      if (token.First is JValue)
      {
         jobject.Add(property.Name, property.Value);
      }
      else
      {
         JObject parentObject = new JObject();
         foreach (JToken token2 in token.Children())
         {
            EnumerateChildren(parentObject, token2);
            jobject.Add(property.Name, parentObject);
          }
     }
   }
   else
   {
      foreach (JToken token2 in token.Children())
      {
         EnumerateChildren(jobject, token2);
      }
   }
}

To connect to an Azure IoT Hub I copied the connection string from the portal.

Azure IoT Hub connection string components

Retrieving a connection string for a device connected to Azure IoT Central (without using the Device Provisioning Service(DPS)) is a bit more involved. There is a deprecated command line application dps-keygen which calls the DPS with a device ID , device SAS key and scope ID and returns a connection string.

Azure IoT Central Device Connection Information
Azure DPS-Keygen command-line

Using Azure IoT Explorer I could see reformatted JSON messages from my client application.

Azure IoT Explorer displaying message payload

These two approached are fine for testing but wouldn’t scale well and would be painful to use it there were 1000s, 100s or even 10s of devices.

Microsoft IoT Central dynamic payload desktop client

Unlike most of the Azure IoT Hub client examples the names and number of sensor values will only be known when messages received over the nRF24L01 wireless link are processed so the JSON message payload has to be constructed on the fly.

Using the Newtonsoft.Json NuGet package and Linq + JObject made this much easier than expected so I have added some code improve robustness.

/*

Copyright ® 2018 Jan devMobile Software, All Rights Reserved

THIS CODE AND INFORMATION IS PROVIDED "AS IS" WITHOUT WARRANTY OF ANY
KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND/OR FITNESS FOR A PARTICULAR
PURPOSE.

You can do what you want with this code, acknowledgment would be nice.

http://www.devmobile.co.nz

*/
using System;
using System.Text;
using System.Threading;
using Microsoft.Azure.Devices.Client;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;

namespace devMobile.IoT.MicrosoftIoTCentral.Desktop.DynamicPayload
{
   class Program
   {
      const string DeviceConnectionString = "YourDeviceConnectionStringFromIoTCentralGoesHere";
      const string TelemetryDataPointPropertyNameFormat = @"{0}-{1}";
      const double temperatureBase = 20.0;
      const double temperatureRange = 10.0;
      const double humidityBase = 70.0;
      const double humidityRange = 20.0;
      const double batteryVoltageBase = 3.00;
      const double batteryVoltageRange = -1.00;
      static readonly TimeSpan feedUpdateDelay = new TimeSpan(0, 0, 15);

      static void Main(string[] args)
      {
         DeviceClient Client = null;

         try
         {
            Console.WriteLine("Connecting to IoI hub");
            Client = DeviceClient.CreateFromConnectionString(DeviceConnectionString, TransportType.Amqp);
            Console.WriteLine(" Connected");
         }
         catch (Exception ex)
         {
            Console.WriteLine("Error connecting or sending data to IoT Central: {0}", ex.Message);
            return;
         }

         while (true)
         {
            // Then send simulated temperature, humidity & battery voltage data
            Random random = new Random();
            double temperature = temperatureBase + random.NextDouble() * temperatureRange;
            double humidity = humidityBase + random.NextDouble() * humidityRange;
            double batteryVoltage = batteryVoltageBase + random.NextDouble() * batteryVoltageRange;

            Console.WriteLine("Temperature {0}°C  Humidity {1}% Battery Voltage {2}V", temperature.ToString("F1"), humidity.ToString("F0"), batteryVoltage.ToString("F2"));

            // Populate the data point -
            JObject telemetryDataPoint = new JObject(); // This could be simplified but for field gateway will use this style

            string sensorDeviceSerialNumber = "0123456789ABCDEF"; // intentionally created and initialised at this level as sensor device will send over NRF24 link

            telemetryDataPoint.Add(string.Format(TelemetryDataPointPropertyNameFormat, sensorDeviceSerialNumber, "T"), temperature.ToString("F1"));
            telemetryDataPoint.Add(string.Format(TelemetryDataPointPropertyNameFormat, sensorDeviceSerialNumber, "H"), humidity.ToString("F0"));
            telemetryDataPoint.Add(string.Format(TelemetryDataPointPropertyNameFormat, sensorDeviceSerialNumber, "V"), batteryVoltage.ToString("F2"));

            string messageString = JsonConvert.SerializeObject(telemetryDataPoint);

            Console.WriteLine("{0:hh:mm:ss} > Sending telemetry: {1}", DateTime.Now, messageString);

            try
            {
               using (Message message = new Message(Encoding.ASCII.GetBytes(messageString)))
               {
                  Client.SendEventAsync(message).Wait();
                  Console.WriteLine(" Sent");
               }
            }
            catch (Exception ex)
            {
               Console.WriteLine("Error sending data to IoT Central: {0}", ex.Message);
            }

            Thread.Sleep(feedUpdateDelay);
         }
      }
   }
}

The application produces very similar output to the basic desktop client

IoTCentralDashboardDynamicPayloadClient

Microsoft IoT Central basic desktop client

One of the replacement Internet of Things services which looked worth evaluating was Microsoft’s IoT Central. My first project was to build the simplest possible desktop client (.Net Core) which simulates a limited number of sensors (sensor names, value formats etc. configured in code) and only sends data to the cloud (no device management, control or provisioning capabilities).

The only required dependencies are the Newtonsoft.Json &  Microsoft.Azure.DevicesClient NuGet packages

/*

Copyright ® 2018 Jan devMobile Software, All Rights Reserved

THIS CODE AND INFORMATION IS PROVIDED "AS IS" WITHOUT WARRANTY OF ANY
KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND/OR FITNESS FOR A PARTICULAR
PURPOSE
You can do what you want with this code, acknowledgement would be nice.
http://www.devmobile.co.nz

*/
using System;
using System.Text;
using System.Threading;
using Microsoft.Azure.Devices.Client;
using Newtonsoft.Json;

namespace devMobile.IoT.MicrosoftIoTCentral.Desktop.Basic
{
 class Program
 {
 private const string DeviceConnectionString = "YourDeviceConnectionStringFromIoTCentralGoesHere";
 const double temperatureBase = 20.0;
 const double temperatureRange = 10.0;
 const double humidityBase = 70.0;
 const double humidityRange = 20.0;
 const double batteryVoltageBase = 3.00;
 const double batteryVoltageRange = -1.00;
 static readonly TimeSpan feedUpdateDelay = new TimeSpan(0, 0, 15);

private class TelemetryDataPoint
 {
 [JsonProperty(PropertyName = "H")]
 public double Humidity { get; set; }
 [JsonProperty(PropertyName = "T")]
 public double Temperature { get; set; }
 [JsonProperty(PropertyName = "B")]
 public double BatteryVoltage { get; set; }
 }

static void Main(string[] args)
 {
 DeviceClient Client ;
 Random random = new Random();

try
 {
 Console.WriteLine("Connecting to IoI hub");
 Client = DeviceClient.CreateFromConnectionString(DeviceConnectionString, TransportType.Mqtt);

while (true)
 {
 double temperature = temperatureBase + random.NextDouble() * temperatureRange;
 double humidity = humidityBase + random.NextDouble() * humidityRange;
 double batteryVoltage = batteryVoltageBase + random.NextDouble() * batteryVoltageRange;

Console.WriteLine("Temperature {0}°C Humidity {1}% Battery Voltage {2}V", temperature.ToString("F1"), humidity.ToString("F0"), batteryVoltage.ToString("F2"));

// Populate the data point - this has a static structure and name which could be a problem for field gateway
 TelemetryDataPoint telemetryDataPoint = new TelemetryDataPoint()
 {
 BatteryVoltage = Math.Round(batteryVoltage, 2),
 Humidity = Math.Round(humidity, 0),
 Temperature = Math.Round( temperature, 1 )
 };

string messageString = JsonConvert.SerializeObject(telemetryDataPoint);
 Message message = new Message(Encoding.ASCII.GetBytes(messageString));

Console.WriteLine("{0:hh:mm:ss} > Sending telemetry: {1}", DateTime.Now, messageString);
 Client.SendEventAsync(message).Wait();
 Console.WriteLine(" Done");

Thread.Sleep(feedUpdateDelay);
 }
 }
 catch (Exception ex)
 {
 Console.WriteLine("Error connecting or sending data to IoT Central: {0}", ex.Message);
 Console.WriteLine("Press <ENTER> to exit");
 Console.ReadLine();
 }
 }
 }
}

I manually provisioned the device by copying the device connection string in the IoT Central dashboard

IoTCentralDashboardBasicClient

DesktopClientSimple

Simple dotNet Core 2 IoTCentral Client

A functional client in less than 100 lines of code with support for individual device configuration. For my FieldGateway I’m going to need more flexibility in the construction of telemetry payloads, device provisioning and configuration support.