Random wanderings through Microsoft Azure esp. PaaS plumbing, the IoT bits, AI on Micro controllers, AI on Edge Devices, .NET nanoFramework, .NET Core on *nix and ML.NET+ONNX
Swarm Space Azure IoT Connector Identity Translation Gateway Architecture
The new approach uses most of the existing building blocks but adds an Azure HTTP Trigger which receives the Swarm Space Bumble bee hive Webhook Delivery Method calls and writes them to an Azure Storage Queue.
Swarm Space Bumble bee hive Web Hook Delivery method
The uplink and downlink formatters are now called asynchronously so they have limited impact on the overall performance of the application.
https://json2csharp.com/
// Root myDeserializedClass = JsonConvert.DeserializeObject<Root>(myJsonResponse);
public class Root
{
public int packetId { get; set; }
public int deviceType { get; set; }
public int deviceId { get; set; }
public int userApplicationId { get; set; }
public int organizationId { get; set; }
public string data { get; set; }
public int len { get; set; }
public int status { get; set; }
public DateTime hiveRxTime { get; set; }
}
*/
public class UplinkPayload
{
[JsonProperty("packetId")]
public int PacketId { get; set; }
[JsonProperty("deviceType")]
public int DeviceType { get; set; }
[JsonProperty("deviceId")]
public int DeviceId { get; set; }
[JsonProperty("userApplicationId")]
public int UserApplicationId { get; set; }
[JsonProperty("organizationId")]
public int OrganizationId { get; set; }
[JsonProperty("data")]
[JsonRequired]
public string Data { get; set; }
[JsonProperty("len")]
public int Len { get; set; }
[JsonProperty("status")]
public int Status { get; set; }
[JsonProperty("hiveRxTime")]
public DateTime HiveRxTime { get; set; }
}
This class is used to “automagically” deserialise Delivery Webhook payloads. There is also some additional payload validation which discards test messages (not certain this is a good idea) etc.
//---------------------------------------------------------------------------------
// Copyright (c) December 2022, devMobile Software
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//---------------------------------------------------------------------------------
namespace devMobile.IoT.SwarmSpace.AzureIoT.Connector.Controllers
{
using System.Globalization;
using System.Text;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.Devices.Client;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
[ApiController]
[Route("api/[controller]")]
public class UplinkController : ControllerBase
{
private readonly ILogger<UplinkController> _logger;
private readonly IAzureIoTDeviceClientCache _azureIoTDeviceClientCache;
public UplinkController(ILogger<UplinkController> logger, IAzureIoTDeviceClientCache azureIoTDeviceClientCache)
{
_logger = logger;
_azureIoTDeviceClientCache = azureIoTDeviceClientCache;
}
[HttpPost]
public async Task<IActionResult> Uplink([FromBody] Models.UplinkPayload payload)
{
DeviceClient deviceClient;
_logger.LogDebug("Payload {0}", JsonConvert.SerializeObject(payload, Formatting.Indented));
if (payload.PacketId == 0)
{
_logger.LogWarning("Uplink-payload simulated DeviceId:{DeviceId}", payload.DeviceId);
return this.Ok();
}
if ((payload.UserApplicationId < Constants.UserApplicationIdMinimum) || (payload.UserApplicationId > Constants.UserApplicationIdMaximum))
{
_logger.LogWarning("Uplink-payload invalid User Application Id:{UserApplicationId}", payload.UserApplicationId);
return this.BadRequest($"Invalid User Application Id {payload.UserApplicationId}");
}
if ((payload.Len < Constants.PayloadLengthMinimum) || string.IsNullOrEmpty(payload.Data))
{
_logger.LogWarning("Uplink-payload.Data is empty PacketId:{PacketId}", payload.PacketId);
return this.Ok("payload.Data is empty");
}
Models.AzureIoTDeviceClientContext context = new Models.AzureIoTDeviceClientContext()
{
OrganisationId = payload.OrganizationId,
UserApplicationId = payload.UserApplicationId,
DeviceType = payload.DeviceType,
DeviceId = payload.DeviceId,
};
deviceClient = await _azureIoTDeviceClientCache.GetOrAddAsync(payload.DeviceId.ToString(), context);
JObject telemetryEvent = new JObject
{
{ "packetId", payload.PacketId},
{ "deviceType" , payload.DeviceType},
{ "DeviceID", payload.DeviceId },
{ "organizationId", payload.OrganizationId },
{ "ApplicationId", payload.UserApplicationId},
{ "ReceivedAtUtc", payload.HiveRxTime.ToString("s", CultureInfo.InvariantCulture) },
{ "DataLength", payload.Len },
{ "Data", payload.Data },
{ "Status", payload.Status },
};
// Send the message to Azure IoT Hub
using (Message ioTHubmessage = new Message(Encoding.ASCII.GetBytes(JsonConvert.SerializeObject(telemetryEvent))))
{
// Ensure the displayed time is the acquired time rather than the uploaded time.
ioTHubmessage.Properties.Add("iothub-creation-time-utc", payload.HiveRxTime.ToString("s", CultureInfo.InvariantCulture));
ioTHubmessage.Properties.Add("OrganizationId", payload.OrganizationId.ToString());
ioTHubmessage.Properties.Add("ApplicationId", payload.UserApplicationId.ToString());
ioTHubmessage.Properties.Add("DeviceId", payload.DeviceId.ToString());
ioTHubmessage.Properties.Add("deviceType", payload.DeviceType.ToString());
await deviceClient.SendEventAsync(ioTHubmessage);
_logger.LogInformation("Uplink-DeviceID:{deviceId} SendEventAsync success", payload.DeviceId);
}
return this.Ok();
}
}
}
The webhook was configured to “acknowledge messages on successful delivery”. I then checked my Delivery Method configuration with a couple of “Test” messages.
My Swarm Space Eval Kit arrived md-week and after some issues with jumper settings it started reporting position and status information.
'---------------------------------------------------------------------------------
' Copyright (c) November 2022, devMobile Software
'
' Licensed under the Apache License, Version 2.0 (the "License");
' you may Not use this file except in compliance with the License.
' You may obtain a copy of the License at
'
' http://www.apache.org/licenses/LICENSE-2.0
'
' Unless required by applicable law Or agreed to in writing, software
' distributed under the License Is distributed on an "AS IS" BASIS,
' WITHOUT WARRANTIES Or CONDITIONS OF ANY KIND, either express Or implied.
' See the License for the specific language governing permissions And
' limitations under the License.
'
'---------------------------------------------------------------------------------
Imports System.Threading
Imports Microsoft.Azure.WebJobs
Imports Microsoft.Extensions.Logging
Public Class TimerTrigger
Shared executionCount As Int32
<FunctionName("Timer")>
Public Shared Sub Run(<TimerTrigger("0 */1 * * * *")> myTimer As TimerInfo, log As ILogger)
Interlocked.Increment(executionCount)
log.LogInformation("VB.Net .NET V6 TimerTrigger next trigger:{0} Execution count:{1}", myTimer.ScheduleStatus.Next, executionCount)
End Sub
End Class
Visual Studio 2022 highlighting missing libraries
Visual Studio 2022 with additional function SDK references
The next step is to add the hosts.json(empty for timer tigger) and localsettings.json to configure the function
Visual 2022 Hosts.json file
Visual Studio 2022 showing hosts.json & local.settings.json
“Can’t determine project language from files. Please add one of [–csharp, –javascript, –typescript, –java, –powershell, –customer]
Check “FUNCTIONS_WORKER_RUNTIME” in the local.settings.json file.
The baked in error logging doesn’t handle broken message formats very well. Look at the call stack or single step through the application to find the message format that is broken
Visual Studio 2022 editor with malformed message highlighted
There have been blog posts showing how to build Azure Percept integrations with Power BI, Azure Logic Appsetc. with “zero code”. But what do you do if your Azure Percept based solution needs some “glue” to connect to other systems?
I work on a SmartAg computer vision based application that uses security cameras to monitor the flow of cattle through stockyards. It has to control some local hardware, display real-time dashboards, and integrate with an existing application so a “zero code” solution wouldn’t work.
The initial plan was to take the Azure Percept to a piggery to see if I could build a Proof of Concept(PoC) of a product that the CEO and I had been discussing for a couple of weeks.
But shortly after I started working on this series of blog posts New Zealand went into strict lockdown. Only essential shops like supermarkets and petrol stations were open, our groceries were being delivered, and schools were closed.
I needed a demonstration application which used props I could source from home and the local petrol station. In addition my teenage son’s school was closed so he could be the project “intern”.
While at the local petrol station to buy milk I observed that they had a large selection of confectionary so we decided to build a series of object detection models to count different types of chocolates.
In a retail scenario this could be counting products on shelves, pallets in a cold store, or at the SmartAg start-up I work for counting cattle in a yard.
Configuring The Test Environment
I have not included screen shots of the hardware configuration process as this has been covered by other bloggers. Though, for projects like this I always create a new resource group so I can easily delete all the resources so my Azure invoice doesn’t cause “bill shock”.
Azure Resource Group Creation blade
I also created the Azure IoT Hub before configuring the Percept device rather than via the Device provisioning process.
Azure Percept configuration assigning an Azure IoT Hub
The intern trialed different trays, camera orientations, and lighting as part of building a test rig on the living room floor. After some trial and error, he identified the optimal camera orientation (on top of the packing foam) and lighting (indirect sunlight with no shadows) for reliable inferencing. As this was a proof-of-concept project we limited the number of variables so we didn’t have to collect lots of images which the intern would then have to mark up.
Trialing image capture with M&M’s
Trialling Image capture with Cadbury Favourites
Azure Percept Studio + CustomVision.AI for capturing and marking up images
The intern then spent an afternoon drawing minimum bounding rectangles (MBRs) around the different chocolates in the images he had collected.
M&M Size issue
The intern then decided to focus on the chocolate bars after realising they were much easier and faster to markup than the M&Ms.
Cadbury Favourites images before markup
Training
The intern repeatedly trained the model adding additional images and adjusting parameters until the results were “good enough”.
Fine-tuning the Configuration
After using the test rig one evening we found the performance of the model wasn’t great, so the intern collected more images with different lighting, shadows, chocolate bar placements, and orientations to improve the accuracy of the inferencing.
Manual reviewing of object detection results.
Inspecting the Inferencing Results
After several iterations the accuracy of the chocolate bar object detection model was acceptable I wanted to examine the telemetry that was being streamed to my Azure IoT Hub.
In Azure Percept Studio I could view (in a limited way) inferencing telemetry and check the quality and format of the results.
Azure Percept Studio device telemetry
I use Azure IoT Explorer on other projects to configure devices, view telemetry from devices, send messages to devices, view and modify device twinJSON etc. So I used it to inspect the inferencing results streamed to the Azure IoT Hub.
Azure IoT Explorer device telemetry
Summary
In an afternoon the intern had configured and trained a Custom Vision project for me that I could use to to build some “low code” integrations .
Project “Learnings”
If the image capture delay is too short there will be images with hands.
Captured image with interns hands
Though, the untrained model did identify the hands
The intern also discovered that by including images with “not favourites” the robustness of the model improved.
Cadbury Favourites with M&Ms
When I had to collect some more images for a blog post, I found the intern had consumed quite a few of the “props” and left the wrappers in the bottom of the Azure Percept packaging.
public class IoTHubApplicationSetting
{
public string DtdlModelId { get; set; }
}
public class IoTHubSettings
{
public string IoTHubConnectionString { get; set; } = string.Empty;
public Dictionary<string, IoTHubApplicationSetting> Applications { get; set; }
}
public class DeviceProvisiongServiceApplicationSetting
{
public string DtdlModelId { get; set; } = string.Empty;
public string GroupEnrollmentKey { get; set; } = string.Empty;
}
public class DeviceProvisiongServiceSettings
{
public string IdScope { get; set; } = string.Empty;
public Dictionary<string, DeviceProvisiongServiceApplicationSetting> Applications { get; set; }
}
public class IoTCentralMethodSetting
{
public byte Port { get; set; } = 0;
public bool Confirmed { get; set; } = false;
public Models.DownlinkPriority Priority { get; set; } = Models.DownlinkPriority.Normal;
public Models.DownlinkQueue Queue { get; set; } = Models.DownlinkQueue.Replace;
}
public class IoTCentralSetting
{
public Dictionary<string, IoTCentralMethodSetting> Methods { get; set; }
}
public class AzureIoTSettings
{
public IoTHubSettings IoTHub { get; set; }
public DeviceProvisiongServiceSettings DeviceProvisioningService { get; set; }
public IoTCentralSetting IoTCentral { get; set; }
}
Azure IoT Central appears to have no support for setting message properties so the LoRaWAN port, confirmed flag, priority, and queuing so these a retrieved from configuration.
Azure Function Configuration
Models.Downlink downlink;
Models.DownlinkQueue queue;
string payloadText = Encoding.UTF8.GetString(message.GetBytes()).Trim();
if (message.Properties.ContainsKey("method-name"))
{
#region Azure IoT Central C2D message processing
string methodName = message.Properties["method-name"];
if (string.IsNullOrWhiteSpace(methodName))
{
_logger.LogWarning("Downlink-DeviceID:{0} MessagedID:{1} LockToken:{2} method-name property empty", receiveMessageHandlerContext.DeviceId, message.MessageId, message.LockToken);
await deviceClient.RejectAsync(message);
return;
}
// Look up the method settings to get confirmed, port, priority, and queue
if ((_azureIoTSettings == null) || (_azureIoTSettings.IoTCentral == null) || !_azureIoTSettings.IoTCentral.Methods.TryGetValue(methodName, out IoTCentralMethodSetting methodSetting))
{
_logger.LogWarning("Downlink-DeviceID:{0} MessagedID:{1} LockToken:{2} method-name:{3} has no settings", receiveMessageHandlerContext.DeviceId, message.MessageId, message.LockToken, methodName);
await deviceClient.RejectAsync(message);
return;
}
downlink = new Models.Downlink()
{
Confirmed = methodSetting.Confirmed,
Priority = methodSetting.Priority,
Port = methodSetting.Port,
CorrelationIds = AzureLockToken.Add(message.LockToken),
};
queue = methodSetting.Queue;
// Check to see if special case for Azure IoT central command with no request payload
if (payloadText.IsPayloadEmpty())
{
downlink.PayloadRaw = "";
}
if (!payloadText.IsPayloadEmpty())
{
if (payloadText.IsPayloadValidJson())
{
downlink.PayloadDecoded = JToken.Parse(payloadText);
}
else
{
downlink.PayloadDecoded = new JObject(new JProperty(methodName, payloadText));
}
}
logger.LogInformation("Downlink-IoT Central DeviceID:{0} Method:{1} MessageID:{2} LockToken:{3} Port:{4} Confirmed:{5} Priority:{6} Queue:{7}",
receiveMessageHandlerContext.DeviceId,
methodName,
message.MessageId,
message.LockToken,
downlink.Port,
downlink.Confirmed,
downlink.Priority,
queue);
#endregion
}
The reboot command payload only contains an “@” so the TTTI payload will be empty, the minimum and maximum command payloads will contain only a numeric value which is added to the decoded payload with the method name, the combined minimum and maximum command has a JSON payload which is “grafted” into the decoded payload.
TTI Webhook Integration running in desktop emulator
In the Visual Studio 2019 Debugger the exception text was “IsTransient = true” so I went and made a coffee and tried again.
Visual Studio 2019 Quickwatch displaying short from error message
The call was still failing so I dumped out the exception text so I had some key words to search for
Microsoft.Azure.Devices.Provisioning.Client.ProvisioningTransportException: AMQP transport exception
---> System.UnauthorizedAccessException: Sys
at Microsoft.Azure.Amqp.ExceptionDispatcher.Throw(Exception exception)
at Microsoft.Azure.Amqp.AsyncResult.End[TAsyncResult](IAsyncResult result)
at Microsoft.Azure.Amqp.AmqpObject.OpenAsyncResult.End(IAsyncResult result)
at Microsoft.Azure.Amqp.AmqpObject.EndOpen(IAsyncResult result)
at Microsoft.Azure.Amqp.Transport.AmqpTransportInitiator.HandleTransportOpened(IAsyncResult result)
at Microsoft.Azure.Amqp.Transport.AmqpTransportInitiator.OnTransportOpenCompete(IAsyncResult result)
--- End of stack trace from previous location ---
at Microsoft.Azure.Devices.Provisioning.Client.Transport.AmqpClientConnection.OpenAsync(TimeSpan timeout, Boolean useWebSocket, X509Certificate2 clientCert, IWebProxy proxy, RemoteCertificateValidationCallback remoteCerificateValidationCallback)
at Microsoft.Azure.Devices.Provisioning.Client.Transport.ProvisioningTransportHandlerAmqp.RegisterAsync(ProvisioningTransportRegisterMessage message, TimeSpan timeout, CancellationToken cancellationToken)
--- End of inner exception stack trace ---
at Microsoft.Azure.Devices.Provisioning.Client.Transport.ProvisioningTransportHandlerAmqp.RegisterAsync(ProvisioningTransportRegisterMessage message, TimeSpan timeout, CancellationToken cancellationToken)
at Microsoft.Azure.Devices.Provisioning.Client.Transport.ProvisioningTransportHandlerAmqp.RegisterAsync(ProvisioningTransportRegisterMessage message, CancellationToken cancellationToken)
at devMobile.IoT.TheThingsIndustries.AzureIoTHub.Integration.Uplink(HttpRequestData req, FunctionContext executionContext) in C:\Users\BrynLewis\source\repos\TTIV3AzureIoTConnector\TTIV3WebHookAzureIoTHubIntegration\TTIUplinkHandler.cs:line 245
I then tried another program which did used the Device provisioning Service and it worked first time so it was something wrong with the code.
using (var securityProvider = new SecurityProviderSymmetricKey(deviceId, deviceKey, null))
{
using (var transport = new ProvisioningTransportHandlerAmqp(TransportFallbackType.TcpOnly))
{
DeviceRegistrationResult result;
ProvisioningDeviceClient provClient = ProvisioningDeviceClient.Create(
Constants.AzureDpsGlobalDeviceEndpoint,
dpsApplicationSetting.GroupEnrollmentKey, <<= Should be _azureIoTSettings.DeviceProvisioningService.IdScope,
securityProvider,
transport);
try
{
result = await provClient.RegisterAsync();
}
catch (ProvisioningTransportException ex)
{
logger.LogInformation(ex, "Uplink-DeviceID:{0} RegisterAsync failed IDScope and/or GroupEnrollmentKey invalid", deviceId);
return req.CreateResponse(HttpStatusCode.Unauthorized);
}
if (result.Status != ProvisioningRegistrationStatusType.Assigned)
{
_logger.LogError("Uplink-DeviceID:{0} Status:{1} RegisterAsync failed ", deviceId, result.Status);
return req.CreateResponse(HttpStatusCode.FailedDependency);
}
IAuthenticationMethod authentication = new DeviceAuthenticationWithRegistrySymmetricKey(result.DeviceId, (securityProvider as SecurityProviderSymmetricKey).GetPrimaryKey());
deviceClient = DeviceClient.Create(result.AssignedHub, authentication, TransportSettings);
await deviceClient.OpenAsync();
logger.LogInformation("Uplink-DeviceID:{0} Azure IoT Hub connected (Device Provisioning Service)", deviceId);
}
}
I then carefully inspected my source code and worked back through the file history and realised I had accidentally replaced the IDScope with the GroupEnrollment setting so it was never going to work i.e. IsTransient != true. So, for the one or two other people who get this error message check your IDScope and GroupEnrollment key make sure they are the right variables and that values they contain are correct.
The Azure IoT Central Location Telemetry messages have a slightly different format to the output of the TTI LPP Payload formatter so the payload has to be “post processed”.
private void EnumerateChildren(JObject jobject, JToken token)
{
if (token is JProperty property)
{
if (token.First is JValue)
{
// Temporary dirty hack for Azure IoT Central compatibility
if (token.Parent is JObject possibleGpsProperty)
{
// TODO Need to check if similar approach necessary accelerometer and gyro LPP payloads
if (possibleGpsProperty.Path.StartsWith("GPS_", StringComparison.OrdinalIgnoreCase))
{
if (string.Compare(property.Name, "Latitude", true) == 0)
{
jobject.Add("lat", property.Value);
}
if (string.Compare(property.Name, "Longitude", true) == 0)
{
jobject.Add("lon", property.Value);
}
if (string.Compare(property.Name, "Altitude", true) == 0)
{
jobject.Add("alt", property.Value);
}
}
}
jobject.Add(property.Name, property.Value);
}
else
{
JObject parentObject = new JObject();
foreach (JToken token2 in token.Children())
{
EnumerateChildren(parentObject, token2);
jobject.Add(property.Name, parentObject);
}
}
}
else
{
foreach (JToken token2 in token.Children())
{
EnumerateChildren(jobject, token2);
}
}
}
I may have to extend this method for other LPP datatypes
“Post processed” TTI JSON GPS Position data suitable for Azure IoT Central
Azure IoT Central Device Template with Location Capability
For initial development and testing I ran the function application in the desktop emulator and simulated TTI webhook calls with Telerik Fiddler and modified sample payloads. After some issues with iothub-creation-time-utc decoded telemetry messages were displayed in the Device Raw Data tab
Azure IoT Central Device Raw Data tab with successfully decoded GPS location payloads
Azure IoT Central map displaying with device location highlighted
The TTI V3 Connector Minimalist Cloud to Device only required a port number, and there was no way to specify whether delivery of message had to be confirmed, the way the message was queued, or the priority of message delivery. Like the port number these optional settings can be specified in message properties.
If any of these properties are incorrect DeviceClient.RejectAsync is called which deletes the message from the device queue and indicates to the server that the message could not be processed.
private async Task AzureIoTHubClientReceiveMessageHandler(Message message, object userContext)
{
try
{
Models.AzureIoTHubReceiveMessageHandlerContext receiveMessageHandlerContext = (Models.AzureIoTHubReceiveMessageHandlerContext)userContext;
if (!_DeviceClients.TryGetValue(receiveMessageHandlerContext.DeviceId, out DeviceClient deviceClient))
{
_logger.LogWarning("Downlink-DeviceID:{0} unknown", receiveMessageHandlerContext.DeviceId);
return;
}
using (message)
{
string payloadText = Encoding.UTF8.GetString(message.GetBytes()).Trim();
if (!AzureDownlinkMessage.PortTryGet(message.Properties, out byte port))
{
_logger.LogWarning("Downlink-Port property is invalid");
await deviceClient.RejectAsync(message);
return;
}
if (!AzureDownlinkMessage.ConfirmedTryGet(message.Properties, out bool confirmed))
{
_logger.LogWarning("Downlink-Confirmed flag is invalid");
await deviceClient.RejectAsync(message);
return;
}
if (!AzureDownlinkMessage.PriorityTryGet(message.Properties, out Models.DownlinkPriority priority))
{
_logger.LogWarning("Downlink-Priority value is invalid");
await deviceClient.RejectAsync(message);
return;
}
if (!AzureDownlinkMessage.QueueTryGet(message.Properties, out Models.DownlinkQueue queue))
{
_logger.LogWarning("Downlink-Queue value is invalid");
await deviceClient.RejectAsync(message.LockToken);
return;
}
Models.Downlink downlink = new Models.Downlink()
{
Confirmed = confirmed,
Priority = priority,
Port = port,
CorrelationIds = AzureLockToken.Add(message.LockToken),
};
// Split over multiple lines in an attempt to improve readability. In this scenario a valid JSON string should start/end with {/} for an object or [/] for an array
if ((payloadText.StartsWith("{") && payloadText.EndsWith("}"))
||
((payloadText.StartsWith("[") && payloadText.EndsWith("]"))))
{
try
{
downlink.PayloadDecoded = JToken.Parse(payloadText);
}
catch (JsonReaderException)
{
downlink.PayloadRaw = payloadText;
}
}
else
{
downlink.PayloadRaw = payloadText;
}
_logger.LogInformation("Downlink-IoT Hub DeviceID:{0} MessageID:{2} LockToken:{3} Port:{4} Confirmed:{5} Priority:{6} Queue:{7}",
receiveMessageHandlerContext.DeviceId,
message.MessageId,
message.LockToken,
downlink.Port,
downlink.Confirmed,
downlink.Priority,
queue);
Models.DownlinkPayload Payload = new Models.DownlinkPayload()
{
Downlinks = new List<Models.Downlink>()
{
downlink
}
};
string url = $"{receiveMessageHandlerContext.WebhookBaseURL}/{receiveMessageHandlerContext.ApplicationId}/webhooks/{receiveMessageHandlerContext.WebhookId}/devices/{receiveMessageHandlerContext.DeviceId}/down/{queue}".ToLower();
using (var client = new WebClient())
{
client.Headers.Add("Authorization", $"Bearer {receiveMessageHandlerContext.ApiKey}");
client.UploadString(new Uri(url), JsonConvert.SerializeObject(Payload));
}
_logger.LogInformation("Downlink-DeviceID:{0} LockToken:{1} success", receiveMessageHandlerContext.DeviceId, message.LockToken);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Downlink-ReceiveMessge processing failed");
}
}
A correlation identifier containing the Message LockToken is added to the downlink payload.
Unconfirmed Downlink Messages
Azure IoT Explorer Cloud to Device sending an unconfirmed downlink message
For unconfirmed messages The TTI Connector calls the DeviceClient.CompletedAsync method (with the LockToken from the CorrelationIDs list) which deletes the message from the device queue.
[Function("Queued")]
public async Task<HttpResponseData> Queued([HttpTrigger(AuthorizationLevel.Function, "post")] HttpRequestData req, FunctionContext executionContext)
{
var logger = executionContext.GetLogger("Queued");
// Wrap all the processing in a try\catch so if anything blows up we have logged it.
try
{
string payloadText = await req.ReadAsStringAsync();
Models.DownlinkQueuedPayload payload = JsonConvert.DeserializeObject<Models.DownlinkQueuedPayload>(payloadText);
if (payload == null)
{
logger.LogInformation("Queued-Payload {0} invalid", payloadText);
return req.CreateResponse(HttpStatusCode.BadRequest);
}
string applicationId = payload.EndDeviceIds.ApplicationIds.ApplicationId;
string deviceId = payload.EndDeviceIds.DeviceId;
logger.LogInformation("Queued-ApplicationID:{0} DeviceID:{1} ", applicationId, deviceId);
if (!_DeviceClients.TryGetValue(deviceId, out DeviceClient deviceClient))
{
logger.LogInformation("Queued-Unknown device for ApplicationID:{0} DeviceID:{1}", applicationId, deviceId);
return req.CreateResponse(HttpStatusCode.Conflict);
}
// If the message is not confirmed "complete" it as soon as with network
if (!payload.DownlinkQueued.Confirmed)
{
if (!AzureLockToken.TryGet(payload.DownlinkQueued.CorrelationIds, out string lockToken))
{
logger.LogWarning("Queued-DeviceID:{0} LockToken missing from payload:{1}", payload.EndDeviceIds.DeviceId, payloadText);
return req.CreateResponse(HttpStatusCode.BadRequest);
}
try
{
await deviceClient.CompleteAsync(lockToken);
}
catch (DeviceMessageLockLostException)
{
logger.LogWarning("Queued-CompleteAsync DeviceID:{0} LockToken:{1} timeout", payload.EndDeviceIds.DeviceId, lockToken);
return req.CreateResponse(HttpStatusCode.Conflict);
}
logger.LogInformation("Queued-DeviceID:{0} LockToken:{1} success", payload.EndDeviceIds.DeviceId, lockToken);
}
}
catch (Exception ex)
{
logger.LogError(ex, "Queued message processing failed");
return req.CreateResponse(HttpStatusCode.InternalServerError);
}
return req.CreateResponse(HttpStatusCode.OK);
}
The Things Industries Live Data tab for an unconfirmed message-Queued
Azure Application Insights for an unconfirmed message
The Things Industries Live Data tab for an unconfirmed message-Sent
Confirmed Downlink Messages
Azure IoT Explorer Cloud to Device sending a confirmed downlink message
Azure Application Insights for a confirmed message
The Things Industries Live Data tab for a confirmed message-Sent
The Things Industries Live Data tab for a confirmed message-Ack
If message delivery succeeds the deviceClient.CompleteAsync method (with the LockToken from the CorrelationIDs list) is called which removes the message from the device queue.
[Function("Ack")]
public async Task<HttpResponseData> Ack([HttpTrigger(AuthorizationLevel.Function, "post")] HttpRequestData req, FunctionContext executionContext)
{
var logger = executionContext.GetLogger("Queued");
// Wrap all the processing in a try\catch so if anything blows up we have logged it.
try
{
string payloadText = await req.ReadAsStringAsync();
Models.DownlinkAckPayload payload = JsonConvert.DeserializeObject<Models.DownlinkAckPayload>(payloadText);
if (payload == null)
{
logger.LogInformation("Ack-Payload {0} invalid", payloadText);
return req.CreateResponse(HttpStatusCode.BadRequest);
}
string applicationId = payload.EndDeviceIds.ApplicationIds.ApplicationId;
string deviceId = payload.EndDeviceIds.DeviceId;
logger.LogInformation("Ack-ApplicationID:{0} DeviceID:{1} ", applicationId, deviceId);
if (!_DeviceClients.TryGetValue(deviceId, out DeviceClient deviceClient))
{
logger.LogInformation("Ack-Unknown device for ApplicationID:{0} DeviceID:{1}", applicationId, deviceId);
return req.CreateResponse(HttpStatusCode.Conflict);
}
if (!AzureLockToken.TryGet(payload.DownlinkAck.CorrelationIds, out string lockToken))
{
logger.LogWarning("Ack-DeviceID:{0} LockToken missing from payload:{1}", payload.EndDeviceIds.DeviceId, payloadText);
return req.CreateResponse(HttpStatusCode.BadRequest);
}
try
{
await deviceClient.CompleteAsync(lockToken);
}
catch (DeviceMessageLockLostException)
{
logger.LogWarning("Ack-CompleteAsync DeviceID:{0} LockToken:{1} timeout", payload.EndDeviceIds.DeviceId, lockToken);
return req.CreateResponse(HttpStatusCode.Conflict);
}
logger.LogInformation("Ack-DeviceID:{0} LockToken:{1} success", payload.EndDeviceIds.DeviceId, lockToken);
}
catch (Exception ex)
{
logger.LogError(ex, "Ack message processing failed");
return req.CreateResponse(HttpStatusCode.InternalServerError);
}
return req.CreateResponse(HttpStatusCode.OK);
}
Azure Application Insights for an confirmed message Ack
If message delivery fails the deviceClient.AbandonAsync method (with the LockToken from the CorrelationIDs list) is called which puts the downlink message back onto the device queue.
[Function("Failed")]
public async Task<HttpResponseData> Failed([HttpTrigger(AuthorizationLevel.Function, "post")] HttpRequestData req, FunctionContext executionContext)
{
var logger = executionContext.GetLogger("Queued");
// Wrap all the processing in a try\catch so if anything blows up we have logged it.
try
{
string payloadText = await req.ReadAsStringAsync();
Models.DownlinkFailedPayload payload = JsonConvert.DeserializeObject<Models.DownlinkFailedPayload>(payloadText);
if (payload == null)
{
logger.LogInformation("Failed-Payload {0} invalid", payloadText);
return req.CreateResponse(HttpStatusCode.BadRequest);
}
string applicationId = payload.EndDeviceIds.ApplicationIds.ApplicationId;
string deviceId = payload.EndDeviceIds.DeviceId;
logger.LogInformation("Failed-ApplicationID:{0} DeviceID:{1} ", applicationId, deviceId);
if (!_DeviceClients.TryGetValue(deviceId, out DeviceClient deviceClient))
{
logger.LogInformation("Failed-Unknown device for ApplicationID:{0} DeviceID:{1}", applicationId, deviceId);
return req.CreateResponse(HttpStatusCode.Conflict);
}
if (!AzureLockToken.TryGet(payload.DownlinkFailed.CorrelationIds, out string lockToken))
{
logger.LogWarning("Failed-DeviceID:{0} LockToken missing from payload:{1}", payload.EndDeviceIds.DeviceId, payloadText);
return req.CreateResponse(HttpStatusCode.BadRequest);
}
try
{
await deviceClient.RejectAsync(lockToken);
}
catch (DeviceMessageLockLostException)
{
logger.LogWarning("Failed-RejectAsync DeviceID:{0} LockToken:{1} timeout", payload.EndDeviceIds.DeviceId, lockToken);
return req.CreateResponse(HttpStatusCode.Conflict);
}
logger.LogInformation("Failed-DeviceID:{0} LockToken:{1} success", payload.EndDeviceIds.DeviceId, lockToken);
}
catch (Exception ex)
{
logger.LogError(ex, "Failed message processing failed");
return req.CreateResponse(HttpStatusCode.InternalServerError);
}
return req.CreateResponse(HttpStatusCode.OK);
}
If message delivery is unsuccessful the deviceClient.RejectAsync method (with the LockToken from the CorrelationIDs list) is called which deletes the message from the device queue and indicates to the server that the message could not be processed.
[Function("Nack")]
public async Task<HttpResponseData> Nack([HttpTrigger(AuthorizationLevel.Function, "post")] HttpRequestData req, FunctionContext executionContext)
{
var logger = executionContext.GetLogger("Queued");
// Wrap all the processing in a try\catch so if anything blows up we have logged it.
try
{
string payloadText = await req.ReadAsStringAsync();
Models.DownlinkNackPayload payload = JsonConvert.DeserializeObject<Models.DownlinkNackPayload>(payloadText);
if (payload == null)
{
logger.LogInformation("Nack-Payload {0} invalid", payloadText);
return req.CreateResponse(HttpStatusCode.BadRequest);
}
string applicationId = payload.EndDeviceIds.ApplicationIds.ApplicationId;
string deviceId = payload.EndDeviceIds.DeviceId;
logger.LogInformation("Nack-ApplicationID:{0} DeviceID:{1} ", applicationId, deviceId);
if (!_DeviceClients.TryGetValue(deviceId, out DeviceClient deviceClient))
{
logger.LogInformation("Nack-Unknown device for ApplicationID:{0} DeviceID:{1}", applicationId, deviceId);
return req.CreateResponse(HttpStatusCode.Conflict);
}
if (!AzureLockToken.TryGet(payload.DownlinkNack.CorrelationIds, out string lockToken))
{
logger.LogWarning("Nack-DeviceID:{0} LockToken missing from payload:{1}", payload.EndDeviceIds.DeviceId, payloadText);
return req.CreateResponse(HttpStatusCode.BadRequest);
}
try
{
await deviceClient.RejectAsync(lockToken);
}
catch (DeviceMessageLockLostException)
{
logger.LogWarning("Nack-RejectAsync DeviceID:{0} LockToken:{1} timeout", payload.EndDeviceIds.DeviceId, lockToken);
return req.CreateResponse(HttpStatusCode.Conflict);
}
logger.LogInformation("Nack-DeviceID:{0} LockToken:{1} success", payload.EndDeviceIds.DeviceId, lockToken);
}
catch (Exception ex)
{
logger.LogError(ex, "Nack message processing failed");
return req.CreateResponse(HttpStatusCode.InternalServerError);
}
return req.CreateResponse(HttpStatusCode.OK);
}
The way message Failed(Abandon), Ack(CompleteAsync) and Nack(RejectAsync) are handled needs some more testing to confirm my understanding of the sequencing of TTI confirmed message delivery.
BEWARE
The use of Confirmed messaging with devices that send uplink messages irregularly can cause weird problems if the Azure IoT hub downlink message times out.
public partial class Webhooks
{
[Function("Uplink")]
public async Task<HttpResponseData> Uplink([HttpTrigger(AuthorizationLevel.Function, "post")] HttpRequestData req, FunctionContext executionContext)
{
var logger = executionContext.GetLogger("Uplink");
// Wrap all the processing in a try\catch so if anything blows up we have logged it. Will need to specialise for connectivity failues etc.
try
{
Models.PayloadUplink payload = JsonConvert.DeserializeObject<Models.PayloadUplink>(await req.ReadAsStringAsync());
if (payload == null)
{
logger.LogInformation("Uplink: Payload {0} invalid", await req.ReadAsStringAsync());
return req.CreateResponse(HttpStatusCode.BadRequest);
}
string applicationId = payload.EndDeviceIds.ApplicationIds.ApplicationId;
string deviceId = payload.EndDeviceIds.DeviceId;
if ((payload.UplinkMessage.Port == null ) || (!payload.UplinkMessage.Port.HasValue) || (payload.UplinkMessage.Port.Value == 0))
{
logger.LogInformation("Uplink-ApplicationID:{0} DeviceID:{1} Payload Raw:{2} Control nessage", applicationId, deviceId, payload.UplinkMessage.PayloadRaw);
return req.CreateResponse(HttpStatusCode.BadRequest);
}
int port = payload.UplinkMessage.Port.Value;
logger.LogInformation("Uplink-ApplicationID:{0} DeviceID:{1} Port:{2} Payload Raw:{3}", applicationId, deviceId, port, payload.UplinkMessage.PayloadRaw);
if (!_DeviceClients.TryGetValue(deviceId, out DeviceClient deviceClient))
{
logger.LogInformation("Uplink-Unknown device for ApplicationID:{0} DeviceID:{1}", applicationId, deviceId);
deviceClient = DeviceClient.CreateFromConnectionString(_configuration.GetConnectionString("AzureIoTHub"), deviceId);
try
{
await deviceClient.OpenAsync();
}
catch (DeviceNotFoundException)
{
logger.LogWarning("Uplink-Unknown DeviceID:{0}", deviceId);
return req.CreateResponse(HttpStatusCode.NotFound);
}
if (!_DeviceClients.TryAdd(deviceId, deviceClient))
{
logger.LogWarning("Uplink-TryAdd failed for ApplicationID:{0} DeviceID:{1}", applicationId, deviceId);
return req.CreateResponse(HttpStatusCode.Conflict);
}
}
JObject telemetryEvent = new JObject
{
{ "ApplicationID", applicationId },
{ "DeviceID", deviceId },
{ "Port", port },
{ "PayloadRaw", payload.UplinkMessage.PayloadRaw }
};
// If the payload has been decoded by payload formatter, put it in the message body.
if (payload.UplinkMessage.PayloadDecoded != null)
{
telemetryEvent.Add("PayloadDecoded", payload.UplinkMessage.PayloadDecoded);
}
// Send the message to Azure IoT Hub
using (Message ioTHubmessage = new Message(Encoding.ASCII.GetBytes(JsonConvert.SerializeObject(telemetryEvent))))
{
// Ensure the displayed time is the acquired time rather than the uploaded time.
ioTHubmessage.Properties.Add("iothub-creation-time-utc", payload.UplinkMessage.ReceivedAtUtc.ToString("s", CultureInfo.InvariantCulture));
ioTHubmessage.Properties.Add("ApplicationId", applicationId);
ioTHubmessage.Properties.Add("DeviceEUI", payload.EndDeviceIds.DeviceEui);
ioTHubmessage.Properties.Add("DeviceId", deviceId);
ioTHubmessage.Properties.Add("port", port.ToString());
await deviceClient.SendEventAsync(ioTHubmessage);
}
}
catch (Exception ex)
{
logger.LogError(ex, "Uplink message processing failed");
return req.CreateResponse(HttpStatusCode.InternalServerError);
}
return req.CreateResponse(HttpStatusCode.OK);
}
}
For initial development and testing I ran the function application in the desktop emulator and simulated TTI webhook calls with Telerik Fiddler and modified TTI sample payloads.
I then used Azure IoT Explorer to configure devices, view uplink traffic etc. When I connected to my Azure IoT Hub shortly after starting the application all the devices were disconnected.
Azure IoT Explorer – no connected devices
The SeeeduinoLoRaWAN devices report roughly every 15 minutes so it took a while for them all to connect. (the SeeeduinoLoRaWAN4 & SeeeduinoLoRaWAN6 need to be repaired) .