Queuing uplink messages
For my HTTP Integration I need to reliably forward messages to an Azure IoT Hub or Azure IoT Central. This solution needs to be robust and not lose any messages even when portions of the system are unavailable because of failures or sudden spikes in inbound traffic.
I added yet another controller, it receives an uplink messages from The Things Network(TTN) and puts them into an Azure Storage Queue.
[Route("[controller]")]
[ApiController]
public class Queued : ControllerBase
{
private readonly string storageConnectionString;
private readonly string queueName;
private static readonly ILog log = log4net.LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);
public Queued( IConfiguration configuration)
{
this.storageConnectionString = configuration.GetSection("AzureStorageConnectionString").Value;
this.queueName = configuration.GetSection("UplinkQueueName").Value;
}
public string Index()
{
return "Queued move along nothing to see";
}
[HttpPost]
public async Task<IActionResult> Post([FromBody] PayloadV5 payload)
{
string payloadFieldsUnpacked = string.Empty;
// Check that the post data is good
if (!this.ModelState.IsValid)
{
log.WarnFormat("QueuedController validation failed {0}", this.ModelState.Messages());
return this.BadRequest(this.ModelState);
}
try
{
QueueClient queueClient = new QueueClient(storageConnectionString, queueName);
await queueClient.CreateIfNotExistsAsync();
await queueClient.SendMessageAsync(Convert.ToBase64String(JsonSerializer.SerializeToUtf8Bytes(payload)));
}
catch( Exception ex)
{
log.Error("Unable to open/create queue or send message", ex);
return this.Problem("Unable to open queue (creating if it doesn't exist) or send message", statusCode:500, title:"Uplink payload not sent" );
}
return this.Ok();
}
}
An Azure Function with a Queue Trigger processes the messages and for this test pauses for 2 seconds (simulating a call to the Device Provisioning Service(DPS) ). It keeps track of the number of concurrent processing threads and when the first message for each device was received since the program was started.
public static class UplinkMessageProcessor
{
static readonly ILog log = log4net.LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);
static ConcurrentDictionary<string, PayloadV5> DevicesSeen = new ConcurrentDictionary<string, PayloadV5>();
static int ConcurrentThreadCount = 0;
[FunctionName("UplinkMessageProcessor")]
public static void Run([QueueTrigger("%UplinkQueueName%", Connection = "AzureStorageConnectionString")] string myQueueItem, Microsoft.Azure.WebJobs.ExecutionContext executionContext)
{
Interlocked.Increment(ref ConcurrentThreadCount);
var logRepository = LogManager.GetRepository(Assembly.GetEntryAssembly());
XmlConfigurator.Configure(logRepository, new FileInfo(Path.Combine(executionContext.FunctionAppDirectory, "log4net.config")));
log.Info($"Uplink message function triggered: {myQueueItem}");
PayloadV5 payloadMessage = (PayloadV5)JsonSerializer.Deserialize(myQueueItem, typeof(PayloadV5));
PayloadV5 payload = (PayloadV5)DevicesSeen.GetOrAdd(payloadMessage.dev_id, payloadMessage);
log.Info($"Uplink message DevEui:{payload.dev_id} Threads:{ConcurrentThreadCount} First:{payload.metadata.time} Current:{payloadMessage.metadata.time} PayloadRaw:{payload.payload_raw}");
Thread.Sleep(2000);
Interlocked.Decrement(ref ConcurrentThreadCount);
}
}
To explore how this processing worked I sent 1000 uplink messages from my Seeeduino LoRaWAN devices which were buffered in a queue.


I processed 1000’s of messages with the Azure Function but every so often 10-20% of the logging messages wouldn’t turn up in the logs. I’m using Log4Net and I think it is most probably caused by not flushing the messages before the function finishes