My Azure IoT Hub messages have properties for the LoRaWAN port (required), confirmed (which defaults to false), priority (which defaults to Normal) and queue(which defaults to Replace). The priority and queue enumerations are defined in TTNcommon.cs.
I used the enumeration for message priority in the JSON payload and MQTT downlink message topic.
Initially when I published a message it wasn’t sent and there was no error. It was a while before I noticed that the queue setting was being being converted to the text “Push” or “Replace” based on the enumeration value name (The priority value was in the JSON which is case insensitive). I did wonder if the tenantId and ApplicationId were also case sensitive so I ensured consistent capitalisation with ToLower();
Back in 1986 in my second first year at the University of Canterbury I did “MATH131 Numerical Methods” which was a year of looking at why mathematics in FORTRAN, C, and Pascal sometimes didn’t return the result you were expecting…
public void TemperatureAdd(byte channel, float celsius)
{
if ((index + TemperatureSize) > buffer.Length)
{
throw new ApplicationException("TemperatureAdd insufficent buffer capacity");
}
short val = (short)(celsius * 10);
buffer[index++] = channel;
buffer[index++] = (byte)DataType.Temperature;
buffer[index++] = (byte)(val >> 8);
buffer[index++] = (byte)val;
}
After looking at the code I think the issues was most probably due to the representation of the constant 10(int32), 10.0(double), and 10.0f(single) . To confirm my theory I modified the client to send the temperature with the calculation done with three different constants.
Visual Studio 2019 Debug output windowThe Things Network(TTN) Message Queue Telemetry Transport(MQTT) client
After some trial and error I settled on this C# code for my decoder
public void TemperatureAdd(byte channel, float celsius)
{
if ((index + TemperatureSize) > buffer.Length)
{
throw new ApplicationException("TemperatureAdd insufficent buffer capacity");
}
short val = (short)(celsius * 10.0f);
buffer[index++] = channel;
buffer[index++] = (byte)DataType.Temperature;
buffer[index++] = (byte)(val >> 8);
buffer[index++] = (byte)val;
}
I don’t think this is specifically an issue with the TinyCLR V2 just with number type used for the constant.
The next step was to enumerate all the EndDevices of a The Things Network(TTN) Application and display their attributes. I have to establish an Azure DeviceClient connection to an Azure IoT Hub for each TTN EndDevice to get downlink messages. To do this I will have to enumerate the TTN Applications in the instance then enumerate the LoRaWAN EndDevices.
using (HttpClient httpClient = new HttpClient())
{
EndDeviceRegistryClient endDeviceRegistryClient = new EndDeviceRegistryClient(baseUrl, httpClient)
{
ApiKey = apiKey
};
try
{
#if FIELDS_MINIMUM
string[] fieldMaskPathsDevice = { "attributes" }; // think this is the bare minimum required for integration
#else
string[] fieldMaskPathsDevice = { "name", "description", "attributes" };
#endif
V3EndDevices endDevices = await endDeviceRegistryClient.ListAsync(applicationID, field_mask_paths:fieldMaskPathsDevice);
if ((endDevices != null) && (endDevices.End_devices != null)) // If there are no devices returns null rather than empty list
{
foreach (V3EndDevice endDevice in endDevices.End_devices)
{
#if FIELDS_MINIMUM
Console.WriteLine($"EndDevice ID:{endDevice.Ids.Device_id}");
#else
Console.WriteLine($"Device ID:{endDevice.Ids.Device_id} Name:{endDevice.Name} Description:{endDevice.Description}");
Console.WriteLine($" CreatedAt: {endDevice.Created_at:dd-MM-yy HH:mm:ss} UpdatedAt: {endDevice.Updated_at:dd-MM-yy HH:mm:ss}");
#endif
if (endDevice.Attributes != null)
{
Console.WriteLine(" EndDevice attributes");
foreach (KeyValuePair<string, string> attribute in endDevice.Attributes)
{
Console.WriteLine($" Key: {attribute.Key} Value: {attribute.Value}");
}
}
Console.WriteLine();
}
}
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
Console.WriteLine("Press <enter> to exit");
Console.ReadLine();
}
Like the applicationRegistryClient.ListAsync call the endDeviceRegistryClient.ListAsync also returns null rather than an empty list.
The next step was to enumerate The Things Network(TTN) Applications so I could connect only to the required Azure IoT hub(s). There would also be a single configuration setting for the client (establish a connection for every TTN application, or don’t establish a connection for any) and this could be overridden with a TTN application attribute
long pageSize = long.Parse(args[3]);
Console.WriteLine($"Page size: {pageSize}");
Console.WriteLine();
using (HttpClient httpClient = new HttpClient())
{
ApplicationRegistryClient applicationRegistryClient = new ApplicationRegistryClient(baseUrl, httpClient)
{
ApiKey = apiKey
};
try
{
int page = 1;
string[] fieldMaskPathsApplication = { "attributes" }; // think this is the bare minimum required for integration
V3Applications applications = await applicationRegistryClient.ListAsync(collaborator, field_mask_paths: fieldMaskPathsApplication, limit: pageSize, page: page);
while ((applications != null) && (applications.Applications != null))
{
Console.WriteLine($"Applications:{applications.Applications.Count} Page:{page} Page size:{pageSize}");
foreach (V3Application application in applications.Applications)
{
bool applicationIntegration = ApplicationAzureintegrationDefault;
Console.WriteLine($"Application ID:{application.Ids.Application_id}");
if (application.Attributes != null)
{
string ApplicationAzureIntegrationValue = string.Empty;
if (application.Attributes.TryGetValue(ApplicationAzureIntegrationField, out ApplicationAzureIntegrationValue))
{
bool.TryParse(ApplicationAzureIntegrationValue, out applicationIntegration);
}
if (applicationIntegration)
{
Console.WriteLine(" Application attributes");
foreach (KeyValuePair<string, string> attribute in application.Attributes)
{
Console.WriteLine($" Key: {attribute.Key} Value: {attribute.Value}");
}
}
}
Console.WriteLine();
}
page += 1;
applications = await applicationRegistryClient.ListAsync(collaborator, field_mask_paths: fieldMaskPathsApplication, limit: pageSize, page: page);
};
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
Console.WriteLine("Press <enter> to exit");
Console.ReadLine();
}
I Used the field_mask_paths parameter (don’t need created_at, updated_at, name etc.) to minimise the data returned to my client.
I was hoping that there would be a away to further “shape” the returned data, but in the NSwag generated code the construction of the URL with field_mask_paths, order, limit, and page parameters meant this appears not to be possible.
For each LoraWAN client I have to have an open connection to the Azure IoT hub to get Cloud to Device (C2D) messages so I’m looking at using connection pooling to reduce the overall number of connections.
I think the Azure ClientDevice library supports up to 995 devices per connection and has quiet a lot of additional functionality.
/// <summary>
/// contains Amqp Connection Pool settings for DeviceClient
/// </summary>
public sealed class AmqpConnectionPoolSettings
{
private static readonly TimeSpan s_defaultConnectionIdleTimeout = TimeSpan.FromMinutes(2);
private uint _maxPoolSize;
internal const uint MaxDevicesPerConnection = 995; // IotHub allows upto 999 tokens per connection. Setting the threshold just below that.
/// <summary>
/// The default size of the pool
/// </summary>
/// <remarks>
/// Allows up to 100,000 devices
/// </remarks>
private const uint DefaultPoolSize = 100;
/// <summary>
/// The maximum value that can be used for the MaxPoolSize property
/// </summary>
public const uint AbsoluteMaxPoolSize = ushort.MaxValue;
/// <summary>
/// Creates an instance of AmqpConnecitonPoolSettings with default properties
/// </summary>
public AmqpConnectionPoolSettings()
{
_maxPoolSize = DefaultPoolSize;
Pooling = false;
}
Whereas I think AMQPNetLite may support more, but will require me to implement more of the Azure IoT client interface
/// <summary>
/// The default maximum frame size used by the library.
/// </summary>
public const uint DefaultMaxFrameSize = 64 * 1024;
internal const ushort DefaultMaxConcurrentChannels = 8 * 1024;
internal const uint DefaultMaxLinkHandles = 256 * 1024;
internal const uint DefaultHeartBeatInterval = 90000;
internal const uint MinimumHeartBeatIntervalMs = 5 * 1000;
I have got todo some more research to see which library is easier/requires more code/complex/scales better.
After reviewing the initial implementation I found I had to have one connection per The Things Network(TTN) device. Todo this I first have to enumerate the LoRaWAN Devices for each Application in my instance. First I had to add the TTN APIKey to the application and device registry requests.
namespace devMobile.TheThingsNetwork.API
{
public partial class EndDeviceRegistryClient
{
public string ApiKey { set; get; }
partial void PrepareRequest(System.Net.Http.HttpClient client, System.Net.Http.HttpRequestMessage request, string url)
{
if (!client.DefaultRequestHeaders.Contains("Authorization"))
{
client.DefaultRequestHeaders.Add("Authorization", $"Bearer {ApiKey}");
}
}
}
public partial class ApplicationRegistryClient
{
public string ApiKey { set; get; }
partial void PrepareRequest(System.Net.Http.HttpClient client, System.Net.Http.HttpRequestMessage request, string url)
{
if (!client.DefaultRequestHeaders.Contains("Authorization"))
{
client.DefaultRequestHeaders.Add("Authorization", $"Bearer {ApiKey}");
}
}
}
}
The first step was to enumerate Applications and their attributes
#if FIELDS_MINIMUM
string[] fieldMaskPathsApplication = { "attributes" }; // think this is the bare minimum required for integration
#else
string[] fieldMaskPathsApplication = { "name", "description", "attributes" };
#endif
V3Applications applications = await applicationRegistryClient.ListAsync(collaborator, field_mask_paths: fieldMaskPathsApplication);
if ((applications != null) && (applications.Applications != null)) // If there are no applications returns null rather than empty list
{
foreach (V3Application application in applications.Applications)
{
#if FIELDS_MINIMUM
Console.WriteLine($"Application ID:{application.Ids.Application_id}");
#else
Console.WriteLine($"Application ID:{application.Ids.Application_id} Name:{application.Name} Description:{application.Description}");
Console.WriteLine($" CreatedAt: {application.Created_at:dd-MM-yy HH:mm:ss} UpdatedAt: {application.Updated_at:dd-MM-yy HH:mm:ss}");
#endif
if (application.Attributes != null)
{
Console.WriteLine(" Application attributes");
foreach (KeyValuePair<string, string> attribute in application.Attributes)
{
Console.WriteLine($" Key: {attribute.Key} Value: {attribute.Value}");
}
}
Console.WriteLine();
}
}
}
The applicationRegistryClient.ListAsync call returns null rather than an empty list which tripped me up. I only found this when I deleted all the applications in my instance and started from scratch.
For my HTTP Integration I need to reliably forward messages to an Azure IoT Hub or Azure IoT Central. This solution needs to be robust and not lose any messages even when portions of the system are unavailable because of failures or sudden spikes in inbound traffic.
[Route("[controller]")]
[ApiController]
public class Queued : ControllerBase
{
private readonly string storageConnectionString;
private readonly string queueName;
private static readonly ILog log = log4net.LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);
public Queued( IConfiguration configuration)
{
this.storageConnectionString = configuration.GetSection("AzureStorageConnectionString").Value;
this.queueName = configuration.GetSection("UplinkQueueName").Value;
}
public string Index()
{
return "Queued move along nothing to see";
}
[HttpPost]
public async Task<IActionResult> Post([FromBody] PayloadV5 payload)
{
string payloadFieldsUnpacked = string.Empty;
// Check that the post data is good
if (!this.ModelState.IsValid)
{
log.WarnFormat("QueuedController validation failed {0}", this.ModelState.Messages());
return this.BadRequest(this.ModelState);
}
try
{
QueueClient queueClient = new QueueClient(storageConnectionString, queueName);
await queueClient.CreateIfNotExistsAsync();
await queueClient.SendMessageAsync(Convert.ToBase64String(JsonSerializer.SerializeToUtf8Bytes(payload)));
}
catch( Exception ex)
{
log.Error("Unable to open/create queue or send message", ex);
return this.Problem("Unable to open queue (creating if it doesn't exist) or send message", statusCode:500, title:"Uplink payload not sent" );
}
return this.Ok();
}
}
An Azure Function with a Queue Trigger processes the messages and for this test pauses for 2 seconds (simulating a call to the Device Provisioning Service(DPS) ). It keeps track of the number of concurrent processing threads and when the first message for each device was received since the program was started.
public static class UplinkMessageProcessor
{
static readonly ILog log = log4net.LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);
static ConcurrentDictionary<string, PayloadV5> DevicesSeen = new ConcurrentDictionary<string, PayloadV5>();
static int ConcurrentThreadCount = 0;
[FunctionName("UplinkMessageProcessor")]
public static void Run([QueueTrigger("%UplinkQueueName%", Connection = "AzureStorageConnectionString")] string myQueueItem, Microsoft.Azure.WebJobs.ExecutionContext executionContext)
{
Interlocked.Increment(ref ConcurrentThreadCount);
var logRepository = LogManager.GetRepository(Assembly.GetEntryAssembly());
XmlConfigurator.Configure(logRepository, new FileInfo(Path.Combine(executionContext.FunctionAppDirectory, "log4net.config")));
log.Info($"Uplink message function triggered: {myQueueItem}");
PayloadV5 payloadMessage = (PayloadV5)JsonSerializer.Deserialize(myQueueItem, typeof(PayloadV5));
PayloadV5 payload = (PayloadV5)DevicesSeen.GetOrAdd(payloadMessage.dev_id, payloadMessage);
log.Info($"Uplink message DevEui:{payload.dev_id} Threads:{ConcurrentThreadCount} First:{payload.metadata.time} Current:{payloadMessage.metadata.time} PayloadRaw:{payload.payload_raw}");
Thread.Sleep(2000);
Interlocked.Decrement(ref ConcurrentThreadCount);
}
}
To explore how this processing worked I sent 1000 uplink messages from my Seeeduino LoRaWAN devices which were buffered in a queue.
Azure storage Explorer 1000 queued messagesApplication insights 1000 events
I processed 1000’s of messages with the Azure Function but every so often 10-20% of the logging messages wouldn’t turn up in the logs. I’m using Log4Net and I think it is most probably caused by not flushing the messages before the function finishes
For development and testing being able to provision an individual device is really useful, though for Azure IoT Central it is not easy (especially with the deprecation of DPS-KeyGen). With an Azure IoT Hub device connection strings are available in the portal which is convenient but not terribly scalable.
Initially the enrollment group had no registration records so I ran my command-line application to generate group enrollment keys for one of my devices.
Device registration before running my command line application
Then I ran the command-line application with my scopeID, registrationID (LoRaWAN deviceEUI) and the device group enrollment key I had generated in the previous step.
Registering a device and sending a message to the my Azure IoT Hub
After running the command line application the device was visible in the enrollment group registration records.
Device registration after running my command line application
Provisioning a device with an individual enrollment has a different workflow. I had to run my command-line application with the RegistrationID, ScopeID, and one of the symmetric keys from the DPS individual enrollment device configuration.
DPS Individual enrollment configuration
A major downside to an individual enrollment is either the primary or the secondary symmetric key for the device has to be deployed on the device which could be problematic if the device has no secure storage.
With a group enrollment only the registration ID and the derived symmetric key have to be deployed on the device which is more secure.
Registering a device and sending a message to the my Azure IoT Hub
In Azure IoT Explorer I could see messages from both my group and individually enrolled devices arriving at my Azure IoT hub
After some initial issues I found DPS was quite reliable and surprisingly easy to configure. I did find the DPS ProvisioningDeviceClient.RegisterAsync method sometimes took several seconds to execute which may have some ramifications when my application is doing this on demand.
To connect to an Azure IoT Hub I copied the connection string from the portal.
Azure IoT Hub connection string components
Retrieving a connection string for a device connected to Azure IoT Central (without using the Device Provisioning Service(DPS)) is a bit more involved. There is a deprecated command line application dps-keygen which calls the DPS with a device ID , device SAS key and scope ID and returns a connection string.
Azure IoT Central Device Connection InformationAzure DPS-Keygen command-line
Using Azure IoT Explorer I could see reformatted JSON messages from my client application.
Azure IoT Explorer displaying message payload
These two approaches are fine for testing but wouldn’t scale well and would be painful to use it there were 1000s, 100s or even 10s of devices.
Unpacking the payload_fields property was causing me some issues. I tried many different approaches but they all failed.
public class PayloadV4
{
public string app_id { get; set; }
public string dev_id { get; set; }
public string hardware_serial { get; set; }
public int port { get; set; }
public int counter { get; set; }
public bool is_retry { get; set; }
public string payload_raw { get; set; }
//public JsonObject payload_fields { get; set; }
//public JObject payload_fields { get; set; }
//public JToken payload_fields { get; set; }
//public JContainer payload_fields { get; set; }
//public dynamic payload_fields { get; set; }
public Object payload_fields { get; set; }
public MetadataV4 metadata { get; set; }
public string downlink_url { get; set; }
}
I tried using the excellent JsonSubTypes library to build a polymorphic convertor, which failed.
...
public class PolymorphicJsonConverter : JsonConverter
{
public override object ReadJson(JsonReader reader, Type objectType, object existingValue, JsonSerializer serializer)
{
JObject item = JObject.Load(reader);
var type = item["type"].Value<string>();
if (type == "PayloadFieldDigitalInput")
{
return item.ToObject<PayloadFieldDigitalInput>();
}
else if (type == "PayloadFieldDigitalInput")
{
return item.ToObject<PayloadFieldDigitalOutput>();
}
else if (type == "PayloadFieldAnalogInput")
{
return item.ToObject<PayloadFieldDigitalOutput>();
}
else if (type == "PayloadFieldAnalogInput")
{
return item.ToObject<PayloadFieldDigitalOutput>();
}
else
{
return null;
}
}
...
}
It was about this point I figured that I was down a very deep rabbit hole and I should just embrace my “stupid”.
I realised I shouldn’t unpack the payload as the number of generated classes required and the complexity of other approaches was going to rapidly get out of hand. Using an Object and recursively traversing its contents with System.Text.Json looked like a viable approach.
public class GatewayV4
{
public string gtw_id { get; set; }
public ulong timestamp { get; set; }
public DateTime time { get; set; }
public int channel { get; set; }
public int rssi { get; set; }
public double snr { get; set; }
public int rf_chain { get; set; }
public double latitude { get; set; }
public double longitude { get; set; }
public int altitude { get; set; }
}
public class MetadataV4
{
public string time { get; set; }
public double frequency { get; set; }
public string modulation { get; set; }
public string data_rate { get; set; }
public string coding_rate { get; set; }
public List<GatewayV4> gateways { get; set; }
}
public class PayloadV4
{
public string app_id { get; set; }
public string dev_id { get; set; }
public string hardware_serial { get; set; }
public int port { get; set; }
public int counter { get; set; }
public bool is_retry { get; set; }
public string payload_raw { get; set; }
// finally settled on an Object
public Object payload_fields { get; set; }
public MetadataV4 metadata { get; set; }
public string downlink_url { get; set; }
}
So, I added yet another new to controller to my application to deserialise the body of the POST from the TTN Application Integration.
[Route("[controller]")]
[ApiController]
public class ClassSerialisationV4Fields : ControllerBase
{
private static readonly ILog log = log4net.LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);
public string Index()
{
return "move along nothing to see";
}
[HttpPost]
public IActionResult Post([FromBody] PayloadV4 payload)
{
string payloadFieldsUnpacked = string.Empty;
// Check that the post data is good
if (!this.ModelState.IsValid)
{
log.WarnFormat("ClassSerialisationV4Fields validation failed {0}", this.ModelState.Messages());
return this.BadRequest(this.ModelState);
}
JsonElement jsonElement = (JsonElement)payload.payload_fields;
foreach (var property in jsonElement.EnumerateObject())
{
// Special handling for nested properties
if (property.Name.StartsWith("gps_") || property.Name.StartsWith("accelerometer_") || property.Name.StartsWith("gyrometer_"))
{
payloadFieldsUnpacked += $"Property Name:{property.Name}\r\n";
JsonElement gpsElement = (JsonElement)property.Value;
foreach (var gpsProperty in gpsElement.EnumerateObject())
{
payloadFieldsUnpacked += $" Property Name:{gpsProperty.Name} Property Value:{gpsProperty.Value}\r\n";
}
}
else
{
payloadFieldsUnpacked += $"Property Name:{property.Name} Property Value:{property.Value}\r\n";
}
}
log.Info(payloadFieldsUnpacked);
return this.Ok();
}
}
In the body of the events in Azure Application Insights I could see messages and the format looked fine for simple payloads