The Things Network V3 MQTT Client Uplink

In preparation for the impending(delayed) deployment of The Things Network(TTN) V3 I wanted to build a new Message Queue Telemetry Transport(MQTT) integration. As per my usual approach I build a .Net Core console application which sends and receives messages

The console application uses MQTTNet to connect to TTN. It subscribes to to the TTN application device uplink topic (did try subscribing to the uplink messages for all the devices in the application, and the downlink message scheduled, sent and acknowledged topics.

I tried a lot of topic formats with and without wildcards to see which worked best

//downlinkTopic = $"v3/{applicationId}/devices/{deviceId}/down/push";
//uplinkTopic = $"v3/+";
//uplinkTopic = $"v3/#";
//uplinkTopic = $"v3/{applicationId}/+"; //exception
//uplinkTopic = $"v3/{applicationId}/*";
//uplinkTopic = $"v3/devices/+";
//uplinkTopic = $"v3/devices/#";
//uplinkTopic = $"v3/devices/+/events/+";
//uplinkTopic = $"v3/{applicationId}/devices/+/events/+";
//uplinkTopic = $"v3/{applicationId}/devices/{deviceId}/events/update";
//uplinkTopic = $"v3/{applicationId}/devices/{deviceId}/events/create";
//uplinkTopic = $"v3/{applicationId}/devices/{deviceId}/events/delete";
//uplinkTopic = $"v3/{applicationId}/devices/+/events/+";
//uplinkTopic = $"v3/{applicationId}/devices/+/events/create";
//uplinkTopic = $"v3/{applicationId}/devices/+/events/update";
//uplinkTopic = $"v3/{applicationId}/devices/+/events/delete";
//uplinkTopic = $"v3/{applicationId}/devices/+/events/+";
//uplinkTopic = $"v3/{applicationId}/devices/{deviceId}/up";

string downlinkTopic = $"v3/{applicationId}/devices/{deviceId}/down/push";
string downlinkQueuedTopic = $"v3/{applicationId}/devices/{deviceId}/down/queued";
string downlinkSentTopic = $"v3/{applicationId}/devices/{deviceId}/down/sent";
string downlinkAckTopic = $"v3/{applicationId}/devices/{ deviceId}/down/ack";
string downlinkNakTopic = $"v3/{applicationId}/devices/{ deviceId}/down/nack";
string downlinkFailedTopic = $"v3/{applicationId}/devices/{deviceId}/down/sent";

I generated new classes from the ones provided in the documentation then added any obvious missing fields and fine tuned the data types by delving into the TTN V3 GO code.

The new messages payloads have significant differences to the V2 ones. I have refactored the generated classes to reduce the duplication of code and fix up datatypes e.g. int32 vs. ulong where JSON2Charp couldn’t infer the size of the number.

namespace devMobile.TheThingsNetwork.Models
{
   public class ApplicationIds
   {
      public string application_id { get; set; }
   }

   public class EndDeviceIds
   {
      public string device_id { get; set; }
      public ApplicationIds application_ids { get; set; }
      public string dev_eui { get; set; }
      public string join_eui { get; set; }
      public string dev_addr { get; set; }
   }
}

I wonder about the naming of the applicationIds class as it appears that it could only ever contain single applicationId.

I installed the tooling for GO support into Visual Studio Code and went looking for the uplink message definition which I think is in messages.pb.go (still learning go and how the TTN GO source is structured).

type ApplicationUplink struct {
	// Join Server issued identifier for the session keys used by this uplink.
	SessionKeyID []byte `protobuf:"bytes,1,opt,name=session_key_id,json=sessionKeyId,proto3" json:"session_key_id,omitempty"`
	FPort        uint32 `protobuf:"varint,2,opt,name=f_port,json=fPort,proto3" json:"f_port,omitempty"`
	FCnt         uint32 `protobuf:"varint,3,opt,name=f_cnt,json=fCnt,proto3" json:"f_cnt,omitempty"`
	// The frame payload of the uplink message.
	// The payload is still encrypted if the skip_payload_crypto field of the EndDevice
	// is true, which is indicated by the presence of the app_s_key field.
	FRMPayload []byte `protobuf:"bytes,4,opt,name=frm_payload,json=frmPayload,proto3" json:"frm_payload,omitempty"`
	// The decoded frame payload of the uplink message.
	// This field is set by the message processor that is configured for the end device (see formatters) or application (see default_formatters).
	DecodedPayload *types.Struct `protobuf:"bytes,5,opt,name=decoded_payload,json=decodedPayload,proto3" json:"decoded_payload,omitempty"`
	// Warnings generated by the message processor while decoding the frm_payload.
	DecodedPayloadWarnings []string `protobuf:"bytes,12,rep,name=decoded_payload_warnings,json=decodedPayloadWarnings,proto3" json:"decoded_payload_warnings,omitempty"`
	// A list of metadata for each antenna of each gateway that received this message.
	RxMetadata []*RxMetadata `protobuf:"bytes,6,rep,name=rx_metadata,json=rxMetadata,proto3" json:"rx_metadata,omitempty"`
	// Settings for the transmission.
	Settings TxSettings `protobuf:"bytes,7,opt,name=settings,proto3" json:"settings"`
	// Server time when the Network Server received the message.
	ReceivedAt time.Time `protobuf:"bytes,8,opt,name=received_at,json=receivedAt,proto3,stdtime" json:"received_at"`
	// The AppSKey of the current session.
	// This field is only present if the skip_payload_crypto field of the EndDevice
	// is true.
	// Can be used to decrypt uplink payloads and encrypt downlink payloads.
	AppSKey *KeyEnvelope `protobuf:"bytes,9,opt,name=app_s_key,json=appSKey,proto3" json:"app_s_key,omitempty"`
	// The last AFCntDown of the current session.
	// This field is only present if the skip_payload_crypto field of the EndDevice
	// is true.
	// Can be used with app_s_key to encrypt downlink payloads.
	LastAFCntDown uint32 `protobuf:"varint,10,opt,name=last_a_f_cnt_down,json=lastAFCntDown,proto3" json:"last_a_f_cnt_down,omitempty"`
	Confirmed     bool   `protobuf:"varint,11,opt,name=confirmed,proto3" json:"confirmed,omitempty"`
	// Consumed airtime for the transmission of the uplink message. Calculated by Network Server using the RawPayload size and the transmission settings.
	ConsumedAirtime *time.Duration `protobuf:"bytes,13,opt,name=consumed_airtime,json=consumedAirtime,proto3,stdduration" json:"consumed_airtime,omitempty"`
	// End device location metadata, set by the Application Server while handling the message.
	Locations            map[string]*Location `protobuf:"bytes,14,rep,name=locations,proto3" json:"locations,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
	XXX_NoUnkeyedLiteral struct{}             `json:"-"`
	XXX_sizecache        int32                `json:"-"`
}

I also need to deploy some more gateways and devices to check that I haven’t missed any fields available in more realistic environments.

TTN V3 MQTT Console client

In the TTN Device data tab I could see messages being sent, to and received from from the simulated device.

TTN V3 MQTT Device Live Data

The next step is to get downlink messages working, then connect up a couple of gateways and trial with some real devices.

TTN V3 Application API Basic Paging Client

The next step was to enumerate The Things Network(TTN) Applications and their attributes. I’m planning on using attributes to manage which applications (and in future EndDevices) are enabled in my Advanced Message Queuing Protocol(AMQP) client.

In the code I have left the different paging implementations which I trialled but abandoned.

using (HttpClient httpClient = new HttpClient())
{
	ApplicationRegistryClient applicationRegistryClient = new ApplicationRegistryClient(baseUrl, httpClient)
	{
		ApiKey = apiKey
	};

	try
	{
		int page = 1;
		string[] fieldMaskPathsApplication = { "attributes" };

		V3Applications applications = await applicationRegistryClient.ListAsync(collaborator, field_mask_paths: fieldMaskPathsApplication, limit:pageSize, page: page);
		while ((applications != null) && (applications.Applications != null))
		{ 
			Console.WriteLine($"Applications:{applications.Applications.Count} Page:{page} Page size:{pageSize}");
			foreach (V3Application application in applications.Applications)
			{
				Console.WriteLine($"Application ID:{application.Ids.Application_id}"); 
				if (application.Attributes != null)
				{
					Console.WriteLine("  Application attributes");

					foreach (KeyValuePair<string, string> attribute in application.Attributes)
					{
						Console.WriteLine($"   Key: {attribute.Key} Value: {attribute.Value}");
					}
				}
				Console.WriteLine();
			}
			page += 1;
			applications = await applicationRegistryClient.ListAsync(collaborator, field_mask_paths: fieldMaskPathsApplication, limit: pageSize, page: page);
		}
	}   
}

For each LoraWAN client I have to have an open connection to the Azure IoT hub to get Cloud to Device (C2D) messages so I’m looking at using connection pooling to reduce the overall number of connections.

I think the Azure ClientDevice library supports up to 995 devices per connection and has quiet a lot of additional functionality.

/// <summary>
/// contains Amqp Connection Pool settings for DeviceClient
/// </summary>
public sealed class AmqpConnectionPoolSettings
{
   private static readonly TimeSpan s_defaultConnectionIdleTimeout = TimeSpan.FromMinutes(2);
    private uint _maxPoolSize;
    internal const uint MaxDevicesPerConnection = 995; // IotHub allows upto 999 tokens per connection. Setting the threshold just below that.

    /// <summary>
    /// The default size of the pool
    /// </summary>
    /// <remarks>
    /// Allows up to 100,000 devices
    /// </remarks>
    private const uint DefaultPoolSize = 100;

    /// <summary>
    /// The maximum value that can be used for the MaxPoolSize property
    /// </summary>
     public const uint AbsoluteMaxPoolSize = ushort.MaxValue;

    /// <summary>
    /// Creates an instance of AmqpConnecitonPoolSettings with default properties
    /// </summary>
    public AmqpConnectionPoolSettings()
    {
       _maxPoolSize = DefaultPoolSize;
       Pooling = false;
    }

Whereas I think AMQPNetLite may support more, but will require me to implement more of the Azure IoT client interface

/// <summary>
/// The default maximum frame size used by the library.
/// </summary>
public const uint DefaultMaxFrameSize = 64 * 1024;
internal const ushort DefaultMaxConcurrentChannels = 8 * 1024;
internal const uint DefaultMaxLinkHandles = 256 * 1024;
internal const uint DefaultHeartBeatInterval = 90000;
internal const uint MinimumHeartBeatIntervalMs = 5 * 1000;

I have got todo some more research to see which library is easier/requires more code/complex/scales better.

The Things Network Cayenne LPP Support

Uplink Encoding

In my applications the myDevices Cayenne Low power payload(LPP) uplink messages from my *duino devices are decoded by the built in The Things Network(TTN) decoder. I can also see the nicely formatted values in the device data view.

Downlink Encoding

I could successfully download raw data to the device but I found that manually unpacking it on the device was painful.

Raw data

I really want to send LPP formatted messages to my devices so I could use a standard LPP library. I initially populated the payload fields in the downlink message JSON. The TTN documentation appeared to indicate this was possible.

Download JSON payload format

Initially I tried a more complex data type because I was looking at downloading a location to the device.

Complex data type

I could see nicely formatted values in the device data view but they didn’t arrive at the device. I then tried simpler data type to see if the complex data type was an issue.

Simple Data Types

At this point I asked a few questions on the TTN forums and started to dig into the TTN source code.

Learning Go on demand

I had a look at the TTB Go code and learnt a lot as I figured out how the “baked in “encoder/decoder worked. I haven’t done any Go coding so it took a while to get comfortable with the syntax. The code my look a bit odd as a Pascal formatter was the closest I could get to Go.

In core/handler/cayennelpp/encoder.go there was

func (e *Encoder) Encode(fields map[string]interface{}, fPort uint8) ([]byte, bool, error) and func (d *Decoder) Decode(payload []byte, fPort uint8) (map[string]interface{}, bool, error)

Which was a positive sign…

Then in core/handler/convert_fields.go there are these two functions

> // ConvertFieldsUp converts the payload to fields using the application's payload formatter
> func (h *handler) ConvertFieldsUp(ctx ttnlog.Interface, _ *pb_broker.DeduplicatedUplinkMessage, appUp *types.UplinkMessage, dev *device.Device) error {
> 	// Find Application

and

> // ConvertFieldsDown converts the fields into a payload
> func (h *handler) ConvertFieldsDown(ctx ttnlog.Interface, appDown *types.DownlinkMessage, ttnDown *pb_broker.DownlinkMessage, _ *device.Device) error {

Then further down in the second function is this call

var encoder PayloadEncoder
	switch app.PayloadFormat {
	case application.PayloadFormatCustom:
		encoder = &CustomDownlinkFunctions{
			Encoder: app.CustomEncoder,
			Logger:  functions.Ignore,
		}
	case application.PayloadFormatCayenneLPP:
		encoder = &cayennelpp.Encoder{}
	default:
		return nil
	}var encoder PayloadEncoder
	switch app.PayloadFormat {
	case application.PayloadFormatCustom:
		encoder = &CustomDownlinkFunctions{
			Encoder: app.CustomEncoder,
			Logger:  functions.Ignore,
		}
	case application.PayloadFormatCayenneLPP:
		encoder = &cayennelpp.Encoder{}
	default:
		return nil
	}

Which I think calls

// Encode encodes the fields to CayenneLPP
func (e *Encoder) Encode(fields map[string]interface{}, fPort uint8) ([]byte, bool, error) {
	encoder := protocol.NewEncoder()
	for name, value := range fields {
		key, channel, err := parseName(name)
		if err != nil {
			continue
		}
		switch key {
		case valueKey:
			if val, ok := value.(float64); ok {
				encoder.AddPort(channel, float32(val))
			}
		}
	}
	return encoder.Bytes(), true, nil
}

Then right down at the very bottom of the call stack in keys.go

func parseName(name string) (string, uint8, error) {
	parts := strings.Split(name, "_")
	if len(parts) < 2 {
		return "", 0, errors.New("Invalid name")
	}
	key := strings.Join(parts[:len(parts)-1], "_")
	if key == "" {
		return "", 0, errors.New("Invalid key")
	}
	channel, err := strconv.Atoi(parts[len(parts)-1])
	if err != nil {
		return "", 0, err
	}
	if channel < 0 || channel > 255 {
		return "", 0, errors.New("Invalid range")
	}
	return key, uint8(channel), nil
}

At this point I started to hit the limits of my Go skills but with some trial and error I figured it out…

Executive Summary

The downlink payload values are sent as 2 byte floats with a sign bit, 100 multiplier. The fields have to be named “value_X” where X is is a byte value.

Dictionary<string, object> payloadFields = new Dictionary<string, object>();
payloadFields.Add(“value_0”, 0.0);
//00-00-00
payloadFields.Add(“value_1”, 1.0);
//01-00-64
payloadFields.Add(“value_2”, 2.0);
//02-00-C8
payloadFields.Add(“value_3”, 3.0);
//03-01-2C
payloadFields.Add(“value_4”, 4.0);
//04-01-90

payloadFields.Add(“value_0”, -0.0);
//00-00-00
payloadFields.Add(“value_1”, -1.0);
//01-FF-9C
payloadFields.Add(“value_2”, -2.0);
//02-FF-38
payloadFields.Add(“value_3”, -3.0);
//03-FE-D4
payloadFields.Add(“value_4”, -4.0);
//04-FE-70

I could see these arrive on my TinyCLR plus RAK811 device and could manually unpack them

The stream of bytes can be decoded on an Arduino using the electronic cats library (needs a small modification) with code this

byte data[] = {0xff,0x38} ; // bytes which represent -2 
float value = lpp.getValue( data, 2, 100, 1);
Serial.print("value:");
Serial.println(value);

It is possible to use the “baked” in Cayenne Encoder/Decoder to send payload fields to a device but I’m not certain is this is quite what myDevices/TTN intended.

The Things Network HTTP Integration Part13

Connection multiplexing

For the Proof of Concept(PoC) I had used a cache to store Azure IoT Hub connections to reduce the number of calls to the Device Provisioning Service(DPS).

Number of connections with no pooling

When stress testing with 1000’s of devices my program hit the host connection limit so I enabled Advanced Message Queuing Protocol(AMQP) connection pooling.

return DeviceClient.Create(result.AssignedHub,
                  authentication,
                  new ITransportSettings[]
                  {
                     new AmqpTransportSettings(TransportType.Amqp_Tcp_Only)
                     {
                        PrefetchCount = 0,
                        AmqpConnectionPoolSettings = new AmqpConnectionPoolSettings()
                        {
                           Pooling = true,
                        }
                     }
                  }
               );

My first attempt failed as I hadn’t configured “TransportType.Amqp_Tcp_Only” which would have allowed the AMQP implementation to fallback to other protocols which don’t support pooling.

Exception caused by not using TransportType.Amqp_Tcp_Only

I then deployed the updated code and ran my 1000 device stress test (note the different x axis scales)

Number of connections with pooling

This confirmed what I found in the Azure.AMQP source code

/// <summary>
/// The default size of the pool
/// </summary>
/// <remarks>
/// Allows up to 100,000 devices
/// </remarks>
/// private const uint DefaultPoolSize = 100;

The Things Network V2 MQTT Client

Another option for I had been looking at for connecting an Azure IoT Hub and The Things Network(TTN) was a Message Queue Telemetry Transport(MQTT) integration.

To trial this approach I build a .Net Core console application which sent message to and received messages from an application running on a GHI Electronics TinyCLRV2 Fezduino with RakWireless Wisduino Evaluation Board(EVB).

The console application uses MQTTNet to connect to TTN. It subscribes to to the TTN application device uplink topic (did try subscribing to the uplink messages for all the devices in the application but this was to noisy), and the downlink message scheduled, sent and acknowledged topics. To send messages to the device I published them on the device downlink topic.

//string uplinktopic = $"{applicationId}/devices/+/up";
string uplinktopic = $"{applicationId}/devices/{deviceId}/up";
await mqttClient.SubscribeAsync(uplinktopic, MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce);

string downlinkAcktopic = $"{applicationId}/devices/{deviceId}/events/down/acks";
await mqttClient.SubscribeAsync(downlinkAcktopic, MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce);

string downlinkScheduledtopic = $"{applicationId}/devices/{deviceId}/events/down/scheduled";
await mqttClient.SubscribeAsync(downlinkScheduledtopic, MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce);

string downlinkSenttopic = $"{applicationId}/devices/{deviceId}/events/down/sent";
await mqttClient.SubscribeAsync(downlinkSenttopic, MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce);

string downlinktopic = $"{applicationId}/devices/{deviceId}/down";

I used the classes from one of my earlier blog posts to deserialise the uplink message payload so I could display a subset of the fields.

MQTTNet based .Net Core console client
Things Network Device Data view

In the TTN Device data tab I could see messages being sent, to and received from from the device.

Visual Studio 2019 Tiny CLR debugger Output

In the Visual Studio 2019 debugger output window I could see messages being sent and received by the Fezduino.

Malformed TTN downlink payload

I had some problems with the downlink messages silently failing as the TTN sample payload JSON was malformed and I had copied it without noticing.

I have a working TTN HTTP Integration (uplink messages only) but have been exploring alternatives using TTN MQTT and Azure IoT Hub AMQP clients.

The next step is to build an Azure IoT Hub client (using native AMQP) then join them together.

The Things Network HTTP Integration Part12

Removing the DIY cache

For the Proof of Concept(PoC) I had written a simple cache using a ConcurrentDictionary to store Azure IoT Hub connections to reduce the number of calls to the Device Provisioning Service(DPS).

Device Provisioning Service calls in stress test

For a PoC the DIY cache was ok but I wanted to replace it with something more robust like the .Net ObjectCache which is in the System.Runtime.Caching namespace.

I started by replacing the ConcurrentDictionary declaration

static readonly ConcurrentDictionary<string, DeviceClient> DeviceClients = new ConcurrentDictionary<string, DeviceClient>();
     

With an ObjectCache declaration.

static readonly ObjectCache DeviceClients = MemoryCache.Default;
  

Then, where there were compiler errors I updated the method call.

// See if the device has already been provisioned or is being provisioned on another thread.
if (DeviceClients.Add(registrationId, deviceContext, cacheItemPolicy))
{
   log.LogInformation("RegID:{registrationId} Device provisioning start", registrationId);
...

One difference I found was that ObjectCache throws an exception if the value is null. I was using a null value to indicate that the Device Provisioning Service(DPS) process had been initiated on another thread and was underway.

I have been planning to add support for downlink messages so I added a new class to store the uplink (Azure IoT Hub DeviceClient) and downlink ( downlink_url in the uplink message) details.

 public class DeviceContext
   {
      public DeviceClient Uplink { get; set; }
      public Uri Downlink { get; set; }
   }

For the first version the only functionality I’m using is sliding expiration which is set to one day

CacheItemPolicy cacheItemPolicy = new CacheItemPolicy()
{
   SlidingExpiration = new TimeSpan(1, 0, 0, 0),
   //RemovedCallback
};

DeviceContext deviceContext = new DeviceContext()
{
   Uplink = null,
   Downlink = new Uri(payload.DownlinkUrl)
};

I didn’t have to make many changes and I’ll double check my implementation in the next round of stress and soak testing.

The Things Network HTTP Integration Part11

Moving Secrets to KeyVault

The application configuration file contained sensitive information like Device Provision Service(DPS) Group Enrollment Symmetric Keys and Azure IoT Hub connection strings which is OK for a proof of concept (PoC) but sub-optimal for production deployments.

"DeviceProvisioningService": {
      "GlobalDeviceEndpoint": "global.azure-devices-provisioning.net",
      "ScopeID": "",
      "EnrollmentGroupSymmetricKeyDefault": "TopSecretKey",
      "DeviceProvisioningPollingDelay": 500,
      "ApplicationEnrollmentGroupMapping": {
         "Application1": "TopSecretKey1",
         "Application2": "TopSecretKey2"
      }
   }

The Azure Key Vault is intended for securing sensitive information like connection strings so I added one to my resource group.

Azure Key Vault overview and basic metrics

I wrote a wrapper which resolves configuration settings based on the The Things Network(TTN) application identifier and port information in the uplink message payload. The resolve methods start by looking for configuration for the applicationId and port (separated by a – ), then the applicationId and then finally falling back to a default value. This functionality is used for AzureIoTHub connection strings, DPS IDScopes, DPS Enrollment Group Symmetric Keys, and is also used to format the cache keys.

public class ApplicationConfiguration
{
const string DpsGlobaDeviceEndpointDefault = "global.azure-devices-provisioning.net";

private IConfiguration Configuration;

public void Initialise( )
{
   // Check that KeyVault URI is configured in environment variables. Not a lot we can do if it isn't....
   if (Configuration == null)
   {
      string keyVaultUri = Environment.GetEnvironmentVariable("KeyVaultURI");
      if (string.IsNullOrWhiteSpace(keyVaultUri))
      {
         throw new ApplicationException("KeyVaultURI environment variable not set");
      }

      // Load configuration from KeyVault 
      Configuration = new ConfigurationBuilder()
         .AddEnvironmentVariables()
         .AddAzureKeyVault(keyVaultUri)
         .Build();
   }
}

public string DpsGlobaDeviceEndpointResolve()
{
   string globaDeviceEndpoint = Configuration.GetSection("DPSGlobaDeviceEndpoint").Value;
   if (string.IsNullOrWhiteSpace(globaDeviceEndpoint))
   {
      globaDeviceEndpoint = DpsGlobaDeviceEndpointDefault;
   }

   return globaDeviceEndpoint;
}

public string ConnectionStringResolve(string applicationId, int port)
{
   // Check to see if there is application + port specific configuration
   string connectionString = Configuration.GetSection($"AzureIotHubConnectionString-{applicationId}-{port}").Value;
   if (!string.IsNullOrWhiteSpace(connectionString))
   {
      return connectionString;
   }

   // Check to see if there is application specific configuration, otherwise run with default
   connectionString = Configuration.GetSection($"AzureIotHubConnectionString-{applicationId}").Value;
   if (!string.IsNullOrWhiteSpace(connectionString))
   {
      return connectionString;
   }

   // get the default as not a specialised configuration
   connectionString = Configuration.GetSection("AzureIotHubConnectionStringDefault").Value;

   return connectionString;
}

public string DpsIdScopeResolve(string applicationId, int port)
{
   // Check to see if there is application + port specific configuration
   string idScope = Configuration.GetSection($"DPSIDScope-{applicationId}-{port}").Value;
   if (!string.IsNullOrWhiteSpace(idScope))
   {
      return idScope;
   }

   // Check to see if there is application specific configuration, otherwise run with default
   idScope = Configuration.GetSection($"DPSIDScope-{applicationId}").Value;
   if (!string.IsNullOrWhiteSpace(idScope))
   {
      return idScope;
   }

   // get the default as not a specialised configuration
   idScope = Configuration.GetSection("DPSIDScopeDefault").Value;

   if (string.IsNullOrWhiteSpace(idScope))
   {
      throw new ApplicationException($"DPSIDScope configuration invalid");
   }

   return idScope;
}

The values of Azure function configuration settings are replaced by a reference to the secret in the Azure Key Vault.

Azure Function configuration value replacement

In the Azure Key Vault “Access Policies” I configured an “Application Access Policy” so my Azure TTNAzureIoTHubMessageV2Processor function identity could retrieve secrets.

Azure Key Vault Secrets

I kept on making typos in the secret names and types which was frustrating.

Azure Key Vault secret

While debugging in Visual Studio you may need to configure the Azure Identity so the application can access the Azure Key Vault.

Azure IoT Hub MQTT/AMQP oddness

This is a long post which covers some oddness I noticed when changing the protocol used by an Azure IoT Hub client from Message Queuing Telemetry Transport(MQTT) to Advanced Message Queuing Protocol (AMQP). I want to build a console application to test the pooling of AMQP connections so I started with an MQTT client written for another post.

class Program
{
   private static string payload;

   static async Task Main(string[] args)
   {
      string filename;
      string azureIoTHubconnectionString;
      DeviceClient azureIoTHubClient;

      if (args.Length != 2)
      {
         Console.WriteLine("[JOSN file] [AzureIoTHubConnectionString]");
         Console.WriteLine("Press <enter> to exit");
         Console.ReadLine();
         return;
      }

      filename = args[0];
      azureIoTHubconnectionString = args[1];

      try
      {
         payload = File.ReadAllText(filename);

         // Open up the connection
         azureIoTHubClient = DeviceClient.CreateFromConnectionString(azureIoTHubconnectionString, TransportType.Mqtt);
         //azureIoTHubClient = DeviceClient.CreateFromConnectionString(azureIoTHubconnectionString, TransportType.Mqtt_Tcp_Only);
         //azureIoTHubClient = DeviceClient.CreateFromConnectionString(azureIoTHubconnectionString, TransportType.Mqtt_WebSocket_Only);

         await azureIoTHubClient.OpenAsync();

         await azureIoTHubClient.SetMethodDefaultHandlerAsync(MethodCallbackDefault, null);

         Timer MessageSender = new Timer(TimerCallback, azureIoTHubClient, new TimeSpan(0, 0, 10), new TimeSpan(0, 0, 10));


         Console.WriteLine("Press <enter> to exit");
         Console.ReadLine();
      }
      catch (Exception ex)
      {
         Console.WriteLine(ex.Message);
         Console.WriteLine("Press <enter> to exit");
         Console.ReadLine();
      }
   }

   public static async void TimerCallback(object state)
   {
      DeviceClient azureIoTHubClient = (DeviceClient)state;

      try
      {
         // I know having the payload as a global is a bit nasty but this is a demo..
         using (Message message = new Message(Encoding.ASCII.GetBytes(JsonConvert.SerializeObject(payload))))
         {
            Console.WriteLine(" {0:HH:mm:ss} AzureIoTHubDeviceClient SendEventAsync start", DateTime.UtcNow);
            await azureIoTHubClient.SendEventAsync(message);
            Console.WriteLine(" {0:HH:mm:ss} AzureIoTHubDeviceClient SendEventAsync finish", DateTime.UtcNow);
         }
      }
      catch (Exception ex)
      {
         Console.WriteLine(ex.Message);
      }
   }

   private static async Task<MethodResponse> MethodCallbackDefault(MethodRequest methodRequest, object userContext)
   {
      Console.WriteLine($"Default handler method {methodRequest.Name} was called.");

      return new MethodResponse(200);
   }
}

I configured an Azure IoT hub then used Azure IoT explorer to create a device and get the connections string for my application. After fixing up the application’s command line parameters I could see the timer code was successfully sending telemetry messages to my Azure IoT Hub. I also explored the different MQTT connections options TransportType.Mqtt, TransportType.Mqtt_Tcp_Only, and TransportType.Mqtt_WebSocket_Only which worked as expected.

MQTT Console application displaying sent telemetry
Azure IoT Hub displaying received telemetry

I could also initiate Direct Method calls to my console application from Azure IoT explorer.

Azure IoT Explorer initiating a Direct Method
MQTT console application displaying direct method call.

I then changed the protocol to AMQP

class Program
{
   private static string payload;

   static async Task Main(string[] args)
   {
      string filename;
      string azureIoTHubconnectionString;
      DeviceClient azureIoTHubClient;
      Timer MessageSender;

      if (args.Length != 2)
      {
         Console.WriteLine("[JOSN file] [AzureIoTHubConnectionString]");
         Console.WriteLine("Press <enter> to exit");
         Console.ReadLine();
         return;
      }

      filename = args[0];
      azureIoTHubconnectionString = args[1];

      try
      {
         payload = File.ReadAllText(filename);

         // Open up the connection
         azureIoTHubClient = DeviceClient.CreateFromConnectionString(azureIoTHubconnectionString, TransportType.Amqp);
         //azureIoTHubClient = DeviceClient.CreateFromConnectionString(azureIoTHubconnectionString, TransportType.Amqp_Tcp_Only);
         //azureIoTHubClient = DeviceClient.CreateFromConnectionString(azureIoTHubconnectionString, TransportType.Amqp_WebSocket_Only);

         await azureIoTHubClient.OpenAsync();

         await azureIoTHubClient.SetMethodDefaultHandlerAsync(MethodCallbackDefault, null);

         //MessageSender = new Timer(TimerCallbackAsync, azureIoTHubClient, new TimeSpan(0, 0, 10), new TimeSpan(0, 0, 10));
         MessageSender = new Timer(TimerCallbackSync, azureIoTHubClient, new TimeSpan(0, 0, 10), new TimeSpan(0, 0, 10));

#if MESSAGE_PUMP
         Console.WriteLine("Press any key to exit");
         while (!Console.KeyAvailable)
         {
            await Task.Delay(100);
         }
#else
         Console.WriteLine("Press <enter> to exit");
         Console.ReadLine();
#endif
      }
      catch (Exception ex)
      {
         Console.WriteLine(ex.Message);
         Console.WriteLine("Press <enter> to exit");
         Console.ReadLine();
      }
   }

   public static async void TimerCallbackAsync(object state)
   {
      DeviceClient azureIoTHubClient = (DeviceClient)state;

      try
      {
         // I know having the payload as a global is a bit nasty but this is a demo..
         using (Message message = new Message(Encoding.ASCII.GetBytes(JsonConvert.SerializeObject(payload))))
         {
            Console.WriteLine(" {0:HH:mm:ss} AzureIoTHubDeviceClient SendEventAsync start", DateTime.UtcNow);
            await azureIoTHubClient.SendEventAsync(message);
            Console.WriteLine(" {0:HH:mm:ss} AzureIoTHubDeviceClient SendEventAsync finish", DateTime.UtcNow);
         }
      }
      catch (Exception ex)
      {
         Console.WriteLine(ex.Message);
      }
   }

   public static void TimerCallbackSync(object state)
   {
      DeviceClient azureIoTHubClient = (DeviceClient)state;

      try
      {
         // I know having the payload as a global is a bit nasty but this is a demo..
         using (Message message = new Message(Encoding.ASCII.GetBytes(JsonConvert.SerializeObject(payload))))
         {
            Console.WriteLine(" {0:HH:mm:ss} AzureIoTHubDeviceClient SendEventAsync start", DateTime.UtcNow);
            azureIoTHubClient.SendEventAsync(message).GetAwaiter();
            Console.WriteLine(" {0:HH:mm:ss} AzureIoTHubDeviceClient SendEventAsync finish", DateTime.UtcNow);
         }
      }
      catch (Exception ex)
      {
         Console.WriteLine(ex.Message);
      }
   }


   private static async Task<MethodResponse> MethodCallbackDefault(MethodRequest methodRequest, object userContext)
   {
      Console.WriteLine($"Default handler method {methodRequest.Name} was called.");

      return new MethodResponse(200);
   }
}

In the first version of my console application I could see the SendEventAsync method was getting called but was not returning

AMQP Console application displaying sent telemetry failure

Even though the SendEventAsync call was not returning the telemetry messages were making it to my Azure IoT Hub.

Azure IoT Hub displaying AMQP telemetry

When I tried to initiate a Direct Method call from Azure IoT Explorer it failed after a while with a timeout.

Azure IoT Explorer initiating a Direct Method

The first successful approach I tried was to change the Console.Readline to a “message pump” (flashbacks to Win32 API programming).

Console.WriteLine("Press any key to exit");
while (!Console.KeyAvailable)
{
   await Task.Delay(100);
}

After some more experimentation I found that changing the timer method from asynchronous to synchronous also worked.

public static void TimerCallbackSync(object state)
{
   DeviceClient azureIoTHubClient = (DeviceClient)state;

   try
   {
      // I know having the payload as a global is a bit nasty but this is a demo..
      using (Message message = new Message(Encoding.ASCII.GetBytes(JsonConvert.SerializeObject(payload))))
      {
         Console.WriteLine(" {0:HH:mm:ss} AzureIoTHubDeviceClient SendEventAsync start", DateTime.UtcNow);
         azureIoTHubClient.SendEventAsync(message).GetAwaiter();
         Console.WriteLine(" {0:HH:mm:ss} AzureIoTHubDeviceClient SendEventAsync finish", DateTime.UtcNow);
      }
   }
   catch (Exception ex)
   {
      Console.WriteLine(ex.Message);
   }
}

I also had to change the method declaration and modify the SendEventAsync call to use a GetAwaiter.

AMQP Console application displaying sent telemetry
Azure IoT Hub displaying received telemetry
Azure IoT Explorer initiating a Direct Method
MQTT console application displaying direct method call.

It took a while to figure out enough about what was going on so I could do a search with the right keywords (DeviceClient AMQP async await SendEventAsync) to confirm my suspicion that MQTT and AMQP clients did behave differently.

For anyone who reads this post, I think this Github issue about task handling and blocking calls is most probably the answer (October 2020).

The Things Network HTTP Azure IoT Integration Soak Testing

I wanted to do some testing to make sure the application would reliably process messages from 1000’s of devices…

The first thing I learnt was “don’t forget to restart your Azure Function after deleting all the devices from the Azure IoT Hub” as the DeviceClients are cached. Also make sure you delete the devices from both your Azure Device Provisioning service(DPS) and Azure IoT Hub instances.

Applications Insights provisioning event tracking

The next “learning” was that if you forget to enable “always on” the caching won’t work and your application will call the DPS way more often than expected.

Azure Application “always on configuration

The next “learning” was if your soak test sends 24000 messages it will start to fail just after you go out to get a coffee because of the 8000 msgs/day limit on the free version of IoT Hub.

Azure IoT Hub Free tier 8000 messages/day limit

After these “learnings” the application appeared to be working and every so often a message would briefly appear in Azure Storage Explorer queue view.

Azure storage explorer view of uplink messages queue

The console test application simulated 1000 devices sending 24 messages every so often and took roughly 8 hours to complete.

Message generator finished

In the Azure IoT Hub telemetry 24000 messages had been received after roughly 8 hours confirming the test rig was working as expected.

The notch was another “learning”, if you go and do some gardening then after roughly 40 minutes of inactivity your desktop PC will go into power save mode and the test client will stop sending messages.

The caching of settings appeared to be work as there were only a couple of requests to my Azure Key Vault where sensitive information like connection strings, symmetric keys etc. are stored.

Memory consumption did look to bad and topped out at roughly 120M.

In the application logging you can see the 1000 calls to DPS at the beginning (the yellow dependency events) then the regular processing of messages.

Application Insights logging

Even with the “learnings” the testing went pretty well overall. I do need to run the test rig for longer and with even more simulated devices.

I think this should do

48K Telemetry messages

If you get lots of errors in the logs “Host thresholds exceeded: [Connections]…. might need to bump your plan to something a bit larger

The Things Network HTTP Azure IoT Hub Integration

This post provides an overview of the required Azure Device Provisioning Service(DPS) and Azure IoT Hub configuration to process The Things Network(TTN) HTTP integration uplink messages. I have assumed that the reader is already familiar with all these products. There is an overview of configuring TTN HTTP integration in my “Simplicating and securing the HTTP handler” post.

The first step is to configure a DPS Enrollment Group

DPS Group Enrollment blade

The scopeID and the primary/secondary key need to be configured in the appsettings.json file of uplink message processing Azure QueueTrigger function.

For more complex deployments the ApplicationEnrollmentGroupMapping configuration enables The Things Network(TTN) devices to be provisioned using different GroupEnrollment keys based on the applicationid in the first Uplink message which initiates provisoning.

"DeviceProvisioningService": {
      "GlobalDeviceEndpoint": "global.azure-devices-provisioning.net",
      "ScopeID": "",
      "EnrollmentGroupSymmetricKeyDefault": "TopSecretKey",
      "DeviceProvisioningPollingDelay": 500,
      "ApplicationEnrollmentGroupMapping": {
         "Application1": "TopSecretKey1",
         "Application2": "TopSecretKey2"
      }
   }

DPS Group Enrolment with no provisioned devices

Then as uplink messages from the TTN integration are processed devices are “automagically” created in the DPS.

Simultaneously devices are created in the Azure IoT Hub

Then shortly after telemetry events are available for applications to process or inspection with tools like Azure IoT Explorer.

In the telemetry event payload sent to the Azure IoT IoT Hub are some extra fields to help with debugging and tracing. The raw payload is also included so messages not decoded by TTN can be processed by the client application(s).

/ Assemble the JSON payload to send to Azure IoT Hub/Central.
log.LogInformation($"{messagePrefix} Payload assembly start");
JObject telemetryEvent = new JObject();
try
{
   JObject payloadFields = (JObject)payloadObect.payload_fields;
   telemetryEvent.Add("HardwareSerial", payloadObect.hardware_serial);
   telemetryEvent.Add("Retry", payloadObect.is_retry);
   telemetryEvent.Add("Counter", payloadObect.counter);
   telemetryEvent.Add("DeviceID", payloadObect.dev_id);
   telemetryEvent.Add("ApplicationID", payloadObect.app_id);
   telemetryEvent.Add("Port", payloadObect.port);
   telemetryEvent.Add("PayloadRaw", payloadObect.payload_raw);
   telemetryEvent.Add("ReceivedAtUTC", payloadObect.metadata.time);
 
   // If the payload has been unpacked in TTN backend add fields to telemetry event payload
   if (payloadFields != null)
   {
      foreach (JProperty child in payloadFields.Children())
      {
         EnumerateChildren(telemetryEvent, child);
      }
   }
}
catch (Exception ex)
{
   log.LogError(ex, $"{messagePrefix} Payload processing or Telemetry event assembly failed");
   throw;
}

Beware, the Azure Storage Account and storage queue names have a limited character set. This caused me problems several times when I used camel cased queue names etc.