return DeviceClient.Create(result.AssignedHub,
authentication,
new ITransportSettings[]
{
new AmqpTransportSettings(TransportType.Amqp_Tcp_Only)
{
PrefetchCount = 0,
AmqpConnectionPoolSettings = new AmqpConnectionPoolSettings()
{
Pooling = true,
}
}
}
);
My first attempt failed as I hadn’t configured “TransportType.Amqp_Tcp_Only” which would have allowed the AMQP implementation to fallback to other protocols which don’t support pooling.
Exception caused by not using TransportType.Amqp_Tcp_Only
I then deployed the updated code and ran my 1000 device stress test (note the different x axis scales)
Number of connections with pooling
This confirmed what I found in the Azure.AMQP source code
/// <summary>
/// The default size of the pool
/// </summary>
/// <remarks>
/// Allows up to 100,000 devices
/// </remarks>
/// private const uint DefaultPoolSize = 100;
The console application uses MQTTNet to connect to TTN. It subscribes to to the TTN application device uplink topic (did try subscribing to the uplink messages for all the devices in the application but this was to noisy), and the downlink message scheduled, sent and acknowledged topics. To send messages to the device I published them on the device downlink topic.
For a PoC the DIY cache was ok but I wanted to replace it with something more robust like the .Net ObjectCache which is in the System.Runtime.Caching namespace.
I started by replacing the ConcurrentDictionary declaration
static readonly ConcurrentDictionary<string, DeviceClient> DeviceClients = new ConcurrentDictionary<string, DeviceClient>();
Then, where there were compiler errors I updated the method call.
// See if the device has already been provisioned or is being provisioned on another thread.
if (DeviceClients.Add(registrationId, deviceContext, cacheItemPolicy))
{
log.LogInformation("RegID:{registrationId} Device provisioning start", registrationId);
...
One difference I found was that ObjectCache throws an exception if the value is null. I was using a null value to indicate that the Device Provisioning Service(DPS) process had been initiated on another thread and was underway.
The Azure Key Vault is intended for securing sensitive information like connection strings so I added one to my resource group.
Azure Key Vault overview and basic metrics
I wrote a wrapper which resolves configuration settings based on the The Things Network(TTN) application identifier and port information in the uplink message payload. The resolve methods start by looking for configuration for the applicationId and port (separated by a – ), then the applicationId and then finally falling back to a default value. This functionality is used for AzureIoTHub connection strings, DPS IDScopes, DPS Enrollment Group Symmetric Keys, and is also used to format the cache keys.
public class ApplicationConfiguration
{
const string DpsGlobaDeviceEndpointDefault = "global.azure-devices-provisioning.net";
private IConfiguration Configuration;
public void Initialise( )
{
// Check that KeyVault URI is configured in environment variables. Not a lot we can do if it isn't....
if (Configuration == null)
{
string keyVaultUri = Environment.GetEnvironmentVariable("KeyVaultURI");
if (string.IsNullOrWhiteSpace(keyVaultUri))
{
throw new ApplicationException("KeyVaultURI environment variable not set");
}
// Load configuration from KeyVault
Configuration = new ConfigurationBuilder()
.AddEnvironmentVariables()
.AddAzureKeyVault(keyVaultUri)
.Build();
}
}
public string DpsGlobaDeviceEndpointResolve()
{
string globaDeviceEndpoint = Configuration.GetSection("DPSGlobaDeviceEndpoint").Value;
if (string.IsNullOrWhiteSpace(globaDeviceEndpoint))
{
globaDeviceEndpoint = DpsGlobaDeviceEndpointDefault;
}
return globaDeviceEndpoint;
}
public string ConnectionStringResolve(string applicationId, int port)
{
// Check to see if there is application + port specific configuration
string connectionString = Configuration.GetSection($"AzureIotHubConnectionString-{applicationId}-{port}").Value;
if (!string.IsNullOrWhiteSpace(connectionString))
{
return connectionString;
}
// Check to see if there is application specific configuration, otherwise run with default
connectionString = Configuration.GetSection($"AzureIotHubConnectionString-{applicationId}").Value;
if (!string.IsNullOrWhiteSpace(connectionString))
{
return connectionString;
}
// get the default as not a specialised configuration
connectionString = Configuration.GetSection("AzureIotHubConnectionStringDefault").Value;
return connectionString;
}
public string DpsIdScopeResolve(string applicationId, int port)
{
// Check to see if there is application + port specific configuration
string idScope = Configuration.GetSection($"DPSIDScope-{applicationId}-{port}").Value;
if (!string.IsNullOrWhiteSpace(idScope))
{
return idScope;
}
// Check to see if there is application specific configuration, otherwise run with default
idScope = Configuration.GetSection($"DPSIDScope-{applicationId}").Value;
if (!string.IsNullOrWhiteSpace(idScope))
{
return idScope;
}
// get the default as not a specialised configuration
idScope = Configuration.GetSection("DPSIDScopeDefault").Value;
if (string.IsNullOrWhiteSpace(idScope))
{
throw new ApplicationException($"DPSIDScope configuration invalid");
}
return idScope;
}
In the Azure Key Vault “Access Policies” I configured an “Application Access Policy” so my Azure TTNAzureIoTHubMessageV2Processor function identity could retrieve secrets.
Azure Key Vault Secrets
I kept on making typos in the secret names and types which was frustrating.
Azure Key Vault secret
While debugging in Visual Studio you may need to configure the Azure Identity so the application can access the Azure Key Vault.
class Program
{
private static string payload;
static async Task Main(string[] args)
{
string filename;
string azureIoTHubconnectionString;
DeviceClient azureIoTHubClient;
if (args.Length != 2)
{
Console.WriteLine("[JOSN file] [AzureIoTHubConnectionString]");
Console.WriteLine("Press <enter> to exit");
Console.ReadLine();
return;
}
filename = args[0];
azureIoTHubconnectionString = args[1];
try
{
payload = File.ReadAllText(filename);
// Open up the connection
azureIoTHubClient = DeviceClient.CreateFromConnectionString(azureIoTHubconnectionString, TransportType.Mqtt);
//azureIoTHubClient = DeviceClient.CreateFromConnectionString(azureIoTHubconnectionString, TransportType.Mqtt_Tcp_Only);
//azureIoTHubClient = DeviceClient.CreateFromConnectionString(azureIoTHubconnectionString, TransportType.Mqtt_WebSocket_Only);
await azureIoTHubClient.OpenAsync();
await azureIoTHubClient.SetMethodDefaultHandlerAsync(MethodCallbackDefault, null);
Timer MessageSender = new Timer(TimerCallback, azureIoTHubClient, new TimeSpan(0, 0, 10), new TimeSpan(0, 0, 10));
Console.WriteLine("Press <enter> to exit");
Console.ReadLine();
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
Console.WriteLine("Press <enter> to exit");
Console.ReadLine();
}
}
public static async void TimerCallback(object state)
{
DeviceClient azureIoTHubClient = (DeviceClient)state;
try
{
// I know having the payload as a global is a bit nasty but this is a demo..
using (Message message = new Message(Encoding.ASCII.GetBytes(JsonConvert.SerializeObject(payload))))
{
Console.WriteLine(" {0:HH:mm:ss} AzureIoTHubDeviceClient SendEventAsync start", DateTime.UtcNow);
await azureIoTHubClient.SendEventAsync(message);
Console.WriteLine(" {0:HH:mm:ss} AzureIoTHubDeviceClient SendEventAsync finish", DateTime.UtcNow);
}
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
private static async Task<MethodResponse> MethodCallbackDefault(MethodRequest methodRequest, object userContext)
{
Console.WriteLine($"Default handler method {methodRequest.Name} was called.");
return new MethodResponse(200);
}
}
I configured an Azure IoT hub then used Azure IoT explorer to create a device and get the connections string for my application. After fixing up the application’s command line parameters I could see the timer code was successfully sending telemetry messages to my Azure IoT Hub. I also explored the different MQTT connections options TransportType.Mqtt, TransportType.Mqtt_Tcp_Only, and TransportType.Mqtt_WebSocket_Only which worked as expected.
MQTT Console application displaying sent telemetryAzure IoT Hub displaying received telemetry
I could also initiate Direct Method calls to my console application from Azure IoT explorer.
Azure IoT Explorer initiating a Direct Method MQTT console application displaying direct method call.
I then changed the protocol to AMQP
class Program
{
private static string payload;
static async Task Main(string[] args)
{
string filename;
string azureIoTHubconnectionString;
DeviceClient azureIoTHubClient;
Timer MessageSender;
if (args.Length != 2)
{
Console.WriteLine("[JOSN file] [AzureIoTHubConnectionString]");
Console.WriteLine("Press <enter> to exit");
Console.ReadLine();
return;
}
filename = args[0];
azureIoTHubconnectionString = args[1];
try
{
payload = File.ReadAllText(filename);
// Open up the connection
azureIoTHubClient = DeviceClient.CreateFromConnectionString(azureIoTHubconnectionString, TransportType.Amqp);
//azureIoTHubClient = DeviceClient.CreateFromConnectionString(azureIoTHubconnectionString, TransportType.Amqp_Tcp_Only);
//azureIoTHubClient = DeviceClient.CreateFromConnectionString(azureIoTHubconnectionString, TransportType.Amqp_WebSocket_Only);
await azureIoTHubClient.OpenAsync();
await azureIoTHubClient.SetMethodDefaultHandlerAsync(MethodCallbackDefault, null);
//MessageSender = new Timer(TimerCallbackAsync, azureIoTHubClient, new TimeSpan(0, 0, 10), new TimeSpan(0, 0, 10));
MessageSender = new Timer(TimerCallbackSync, azureIoTHubClient, new TimeSpan(0, 0, 10), new TimeSpan(0, 0, 10));
#if MESSAGE_PUMP
Console.WriteLine("Press any key to exit");
while (!Console.KeyAvailable)
{
await Task.Delay(100);
}
#else
Console.WriteLine("Press <enter> to exit");
Console.ReadLine();
#endif
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
Console.WriteLine("Press <enter> to exit");
Console.ReadLine();
}
}
public static async void TimerCallbackAsync(object state)
{
DeviceClient azureIoTHubClient = (DeviceClient)state;
try
{
// I know having the payload as a global is a bit nasty but this is a demo..
using (Message message = new Message(Encoding.ASCII.GetBytes(JsonConvert.SerializeObject(payload))))
{
Console.WriteLine(" {0:HH:mm:ss} AzureIoTHubDeviceClient SendEventAsync start", DateTime.UtcNow);
await azureIoTHubClient.SendEventAsync(message);
Console.WriteLine(" {0:HH:mm:ss} AzureIoTHubDeviceClient SendEventAsync finish", DateTime.UtcNow);
}
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
public static void TimerCallbackSync(object state)
{
DeviceClient azureIoTHubClient = (DeviceClient)state;
try
{
// I know having the payload as a global is a bit nasty but this is a demo..
using (Message message = new Message(Encoding.ASCII.GetBytes(JsonConvert.SerializeObject(payload))))
{
Console.WriteLine(" {0:HH:mm:ss} AzureIoTHubDeviceClient SendEventAsync start", DateTime.UtcNow);
azureIoTHubClient.SendEventAsync(message).GetAwaiter();
Console.WriteLine(" {0:HH:mm:ss} AzureIoTHubDeviceClient SendEventAsync finish", DateTime.UtcNow);
}
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
private static async Task<MethodResponse> MethodCallbackDefault(MethodRequest methodRequest, object userContext)
{
Console.WriteLine($"Default handler method {methodRequest.Name} was called.");
return new MethodResponse(200);
}
}
In the first version of my console application I could see the SendEventAsync method was getting called but was not returning
AMQP Console application displaying sent telemetry failure
Even though the SendEventAsync call was not returning the telemetry messages were making it to my Azure IoT Hub.
Azure IoT Hub displaying AMQP telemetry
When I tried to initiate a Direct Method call from Azure IoT Explorer it failed after a while with a timeout.
Azure IoT Explorer initiating a Direct Method
The first successful approach I tried was to change the Console.Readline to a “message pump” (flashbacks to Win32 API programming).
Console.WriteLine("Press any key to exit");
while (!Console.KeyAvailable)
{
await Task.Delay(100);
}
After some more experimentation I found that changing the timer method from asynchronous to synchronous also worked.
public static void TimerCallbackSync(object state)
{
DeviceClient azureIoTHubClient = (DeviceClient)state;
try
{
// I know having the payload as a global is a bit nasty but this is a demo..
using (Message message = new Message(Encoding.ASCII.GetBytes(JsonConvert.SerializeObject(payload))))
{
Console.WriteLine(" {0:HH:mm:ss} AzureIoTHubDeviceClient SendEventAsync start", DateTime.UtcNow);
azureIoTHubClient.SendEventAsync(message).GetAwaiter();
Console.WriteLine(" {0:HH:mm:ss} AzureIoTHubDeviceClient SendEventAsync finish", DateTime.UtcNow);
}
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
I also had to change the method declaration and modify the SendEventAsync call to use a GetAwaiter.
AMQP Console application displaying sent telemetryAzure IoT Hub displaying received telemetryAzure IoT Explorer initiating a Direct MethodMQTT console application displaying direct method call.
It took a while to figure out enough about what was going on so I could do a search with the right keywords (DeviceClient AMQP async await SendEventAsync) to confirm my suspicion that MQTT and AMQP clients did behave differently.
My implementation was “inspired” by the myDevices C/C++ sample code. The first step was to allocate a buffer to store the byte encoded values. I pre allocated the buffer to try and reduce the impacts of garbage collection. The code uses a manually incremented index into the buffer for performance reasons, plus the inconsistent support of System.Collections.Generic and Language Integrated Query(LINQ) on my three embedded platforms. The maximum length message that can be sent is limited by coding rate, duty cycle and bandwidth of the LoRa channel.
public Encoder(byte bufferSize)
{
if ((bufferSize < BufferSizeMinimum) || ( bufferSize > BufferSizeMaximum))
{
throw new ArgumentException($"BufferSize must be between {BufferSizeMinimum} and {BufferSizeMaximum}", "bufferSize");
}
buffer = new byte[bufferSize];
}
For a simple data types like a digital input a single byte (True or False ) is used. The channel parameter is included so that multiple values of the same data type can be included in a message.
public void DigitalInputAdd(byte channel, bool value)
{
if ((index + DigitalInputSize) > buffer.Length)
{
throw new ApplicationException("DigitalInputAdd insufficent buffer capacity");
}
buffer[index++] = channel;
buffer[index++] = (byte)DataType.DigitalInput;
// I know this is fugly but it works on all platforms
if (value)
{
buffer[index++] = 1;
}
else
{
buffer[index++] = 0;
}
}
For more complex data types like a Global Positioning System(GPS) location (Latitude, Longitude and Altitude) the values are converted to 32bit signed integers and only 3 of the 4 bytes are used.