Random wanderings through Microsoft Azure esp. PaaS plumbing, the IoT bits, AI on Micro controllers, AI on Edge Devices, .NET nanoFramework, .NET Core on *nix and ML.NET+ONNX
The Digital Twin Definition Language(DTDL) configuration used when a device is provisioned or when it connects is determined by the payload application which is based on the Myriota Destination endpoint.
The Azure Function Configuration of Application to DTDL Model ID
BEWARE – They application in ApplicationToDtdlModelIdMapping is case sensitive!
Azure IoT Central Device Template Configuration
I used Azure IoT Central Device Template functionality to create my Azure Digital Twin definitions.
Azure IoT Function with Azure IoT Hub Device Provisioning Service(DPS) configuration
Symmetric key attestation with the Azure IoT Hub Device Provisioning Service(DPS) is performed using the same security tokens supported by Azure IoT Hubs to securely connect devices. The symmetric key of an enrollment group isn’t used directly by devices in the provisioning process. Instead, devices that provision through an enrollment group do so using a derived device key.
private async Task<DeviceClient> AzureIoTHubDeviceProvisioningServiceConnectAsync(string terminalId, object context)
{
DeviceClient deviceClient;
string deviceKey;
using (var hmac = new HMACSHA256(Convert.FromBase64String(_azureIoTSettings.AzureIoTHub.DeviceProvisioningService.GroupEnrollmentKey)))
{
deviceKey = Convert.ToBase64String(hmac.ComputeHash(Encoding.UTF8.GetBytes(terminalId)));
}
using (var securityProvider = new SecurityProviderSymmetricKey(terminalId, deviceKey, null))
{
using (var transport = new ProvisioningTransportHandlerAmqp(TransportFallbackType.TcpOnly))
{
DeviceRegistrationResult result;
ProvisioningDeviceClient provClient = ProvisioningDeviceClient.Create(
_azureIoTSettings.AzureIoTHub.DeviceProvisioningService.GlobalDeviceEndpoint,
_azureIoTSettings.AzureIoTHub.DeviceProvisioningService.IdScope,
securityProvider,
transport);
result = await provClient.RegisterAsync();
if (result.Status != ProvisioningRegistrationStatusType.Assigned)
{
_logger.LogWarning("Uplink-DeviceID:{0} RegisterAsync status:{1} failed ", terminalId, result.Status);
throw new ApplicationException($"Uplink-DeviceID:{0} RegisterAsync status:{1} failed");
}
IAuthenticationMethod authentication = new DeviceAuthenticationWithRegistrySymmetricKey(result.DeviceId, (securityProvider as SecurityProviderSymmetricKey).GetPrimaryKey());
deviceClient = DeviceClient.Create(result.AssignedHub, authentication, TransportSettings);
}
}
await deviceClient.OpenAsync();
return deviceClient;
}
The derived device key is a hash of the device’s registration ID and is computed using the symmetric key of the enrollment group. The device can then use its derived device key to sign the SAS token it uses to register with DPS.
Azure Device Provisioning Service Adding Enrollment Group Attestation
Azure Device Provisioning Service Add Enrollment Group IoT Hub(s) selection.
Azure Device Provisioning Service Manager Enrollments
For initial development and testing I ran the function application in the desktop emulator and simulated Myriota Device Manager webhook calls with Azure Storage Explorer and modified sample payloads.
Azure Storage Explorer Storage Account Queued Messages
I then used Azure IoT Explorer to configure devices, view uplink traffic etc.
Azure IoT Explorer Devices
When I connected to my Azure IoT Hub shortly after starting the Myriota Azure IoT Connector Function my test devices started connecting as messages arrived.
public class IoTHubApplicationSetting
{
public string DtdlModelId { get; set; }
}
public class IoTHubSettings
{
public string IoTHubConnectionString { get; set; } = string.Empty;
public Dictionary<string, IoTHubApplicationSetting> Applications { get; set; }
}
public class DeviceProvisiongServiceApplicationSetting
{
public string DtdlModelId { get; set; } = string.Empty;
public string GroupEnrollmentKey { get; set; } = string.Empty;
}
public class DeviceProvisiongServiceSettings
{
public string IdScope { get; set; } = string.Empty;
public Dictionary<string, DeviceProvisiongServiceApplicationSetting> Applications { get; set; }
}
public class IoTCentralMethodSetting
{
public byte Port { get; set; } = 0;
public bool Confirmed { get; set; } = false;
public Models.DownlinkPriority Priority { get; set; } = Models.DownlinkPriority.Normal;
public Models.DownlinkQueue Queue { get; set; } = Models.DownlinkQueue.Replace;
}
public class IoTCentralSetting
{
public Dictionary<string, IoTCentralMethodSetting> Methods { get; set; }
}
public class AzureIoTSettings
{
public IoTHubSettings IoTHub { get; set; }
public DeviceProvisiongServiceSettings DeviceProvisioningService { get; set; }
public IoTCentralSetting IoTCentral { get; set; }
}
Azure IoT Central appears to have no support for setting message properties so the LoRaWAN port, confirmed flag, priority, and queuing so these a retrieved from configuration.
Azure Function Configuration
Models.Downlink downlink;
Models.DownlinkQueue queue;
string payloadText = Encoding.UTF8.GetString(message.GetBytes()).Trim();
if (message.Properties.ContainsKey("method-name"))
{
#region Azure IoT Central C2D message processing
string methodName = message.Properties["method-name"];
if (string.IsNullOrWhiteSpace(methodName))
{
_logger.LogWarning("Downlink-DeviceID:{0} MessagedID:{1} LockToken:{2} method-name property empty", receiveMessageHandlerContext.DeviceId, message.MessageId, message.LockToken);
await deviceClient.RejectAsync(message);
return;
}
// Look up the method settings to get confirmed, port, priority, and queue
if ((_azureIoTSettings == null) || (_azureIoTSettings.IoTCentral == null) || !_azureIoTSettings.IoTCentral.Methods.TryGetValue(methodName, out IoTCentralMethodSetting methodSetting))
{
_logger.LogWarning("Downlink-DeviceID:{0} MessagedID:{1} LockToken:{2} method-name:{3} has no settings", receiveMessageHandlerContext.DeviceId, message.MessageId, message.LockToken, methodName);
await deviceClient.RejectAsync(message);
return;
}
downlink = new Models.Downlink()
{
Confirmed = methodSetting.Confirmed,
Priority = methodSetting.Priority,
Port = methodSetting.Port,
CorrelationIds = AzureLockToken.Add(message.LockToken),
};
queue = methodSetting.Queue;
// Check to see if special case for Azure IoT central command with no request payload
if (payloadText.IsPayloadEmpty())
{
downlink.PayloadRaw = "";
}
if (!payloadText.IsPayloadEmpty())
{
if (payloadText.IsPayloadValidJson())
{
downlink.PayloadDecoded = JToken.Parse(payloadText);
}
else
{
downlink.PayloadDecoded = new JObject(new JProperty(methodName, payloadText));
}
}
logger.LogInformation("Downlink-IoT Central DeviceID:{0} Method:{1} MessageID:{2} LockToken:{3} Port:{4} Confirmed:{5} Priority:{6} Queue:{7}",
receiveMessageHandlerContext.DeviceId,
methodName,
message.MessageId,
message.LockToken,
downlink.Port,
downlink.Confirmed,
downlink.Priority,
queue);
#endregion
}
The reboot command payload only contains an “@” so the TTTI payload will be empty, the minimum and maximum command payloads will contain only a numeric value which is added to the decoded payload with the method name, the combined minimum and maximum command has a JSON payload which is “grafted” into the decoded payload.
TTI Webhook Integration running in desktop emulator
In the Visual Studio 2019 Debugger the exception text was “IsTransient = true” so I went and made a coffee and tried again.
Visual Studio 2019 Quickwatch displaying short from error message
The call was still failing so I dumped out the exception text so I had some key words to search for
Microsoft.Azure.Devices.Provisioning.Client.ProvisioningTransportException: AMQP transport exception
---> System.UnauthorizedAccessException: Sys
at Microsoft.Azure.Amqp.ExceptionDispatcher.Throw(Exception exception)
at Microsoft.Azure.Amqp.AsyncResult.End[TAsyncResult](IAsyncResult result)
at Microsoft.Azure.Amqp.AmqpObject.OpenAsyncResult.End(IAsyncResult result)
at Microsoft.Azure.Amqp.AmqpObject.EndOpen(IAsyncResult result)
at Microsoft.Azure.Amqp.Transport.AmqpTransportInitiator.HandleTransportOpened(IAsyncResult result)
at Microsoft.Azure.Amqp.Transport.AmqpTransportInitiator.OnTransportOpenCompete(IAsyncResult result)
--- End of stack trace from previous location ---
at Microsoft.Azure.Devices.Provisioning.Client.Transport.AmqpClientConnection.OpenAsync(TimeSpan timeout, Boolean useWebSocket, X509Certificate2 clientCert, IWebProxy proxy, RemoteCertificateValidationCallback remoteCerificateValidationCallback)
at Microsoft.Azure.Devices.Provisioning.Client.Transport.ProvisioningTransportHandlerAmqp.RegisterAsync(ProvisioningTransportRegisterMessage message, TimeSpan timeout, CancellationToken cancellationToken)
--- End of inner exception stack trace ---
at Microsoft.Azure.Devices.Provisioning.Client.Transport.ProvisioningTransportHandlerAmqp.RegisterAsync(ProvisioningTransportRegisterMessage message, TimeSpan timeout, CancellationToken cancellationToken)
at Microsoft.Azure.Devices.Provisioning.Client.Transport.ProvisioningTransportHandlerAmqp.RegisterAsync(ProvisioningTransportRegisterMessage message, CancellationToken cancellationToken)
at devMobile.IoT.TheThingsIndustries.AzureIoTHub.Integration.Uplink(HttpRequestData req, FunctionContext executionContext) in C:\Users\BrynLewis\source\repos\TTIV3AzureIoTConnector\TTIV3WebHookAzureIoTHubIntegration\TTIUplinkHandler.cs:line 245
I then tried another program which did used the Device provisioning Service and it worked first time so it was something wrong with the code.
using (var securityProvider = new SecurityProviderSymmetricKey(deviceId, deviceKey, null))
{
using (var transport = new ProvisioningTransportHandlerAmqp(TransportFallbackType.TcpOnly))
{
DeviceRegistrationResult result;
ProvisioningDeviceClient provClient = ProvisioningDeviceClient.Create(
Constants.AzureDpsGlobalDeviceEndpoint,
dpsApplicationSetting.GroupEnrollmentKey, <<= Should be _azureIoTSettings.DeviceProvisioningService.IdScope,
securityProvider,
transport);
try
{
result = await provClient.RegisterAsync();
}
catch (ProvisioningTransportException ex)
{
logger.LogInformation(ex, "Uplink-DeviceID:{0} RegisterAsync failed IDScope and/or GroupEnrollmentKey invalid", deviceId);
return req.CreateResponse(HttpStatusCode.Unauthorized);
}
if (result.Status != ProvisioningRegistrationStatusType.Assigned)
{
_logger.LogError("Uplink-DeviceID:{0} Status:{1} RegisterAsync failed ", deviceId, result.Status);
return req.CreateResponse(HttpStatusCode.FailedDependency);
}
IAuthenticationMethod authentication = new DeviceAuthenticationWithRegistrySymmetricKey(result.DeviceId, (securityProvider as SecurityProviderSymmetricKey).GetPrimaryKey());
deviceClient = DeviceClient.Create(result.AssignedHub, authentication, TransportSettings);
await deviceClient.OpenAsync();
logger.LogInformation("Uplink-DeviceID:{0} Azure IoT Hub connected (Device Provisioning Service)", deviceId);
}
}
I then carefully inspected my source code and worked back through the file history and realised I had accidentally replaced the IDScope with the GroupEnrollment setting so it was never going to work i.e. IsTransient != true. So, for the one or two other people who get this error message check your IDScope and GroupEnrollment key make sure they are the right variables and that values they contain are correct.
The Azure IoT Central Location Telemetry messages have a slightly different format to the output of the TTI LPP Payload formatter so the payload has to be “post processed”.
private void EnumerateChildren(JObject jobject, JToken token)
{
if (token is JProperty property)
{
if (token.First is JValue)
{
// Temporary dirty hack for Azure IoT Central compatibility
if (token.Parent is JObject possibleGpsProperty)
{
// TODO Need to check if similar approach necessary accelerometer and gyro LPP payloads
if (possibleGpsProperty.Path.StartsWith("GPS_", StringComparison.OrdinalIgnoreCase))
{
if (string.Compare(property.Name, "Latitude", true) == 0)
{
jobject.Add("lat", property.Value);
}
if (string.Compare(property.Name, "Longitude", true) == 0)
{
jobject.Add("lon", property.Value);
}
if (string.Compare(property.Name, "Altitude", true) == 0)
{
jobject.Add("alt", property.Value);
}
}
}
jobject.Add(property.Name, property.Value);
}
else
{
JObject parentObject = new JObject();
foreach (JToken token2 in token.Children())
{
EnumerateChildren(parentObject, token2);
jobject.Add(property.Name, parentObject);
}
}
}
else
{
foreach (JToken token2 in token.Children())
{
EnumerateChildren(jobject, token2);
}
}
}
I may have to extend this method for other LPP datatypes
“Post processed” TTI JSON GPS Position data suitable for Azure IoT Central
Azure IoT Central Device Template with Location Capability
For initial development and testing I ran the function application in the desktop emulator and simulated TTI webhook calls with Telerik Fiddler and modified sample payloads. After some issues with iothub-creation-time-utc decoded telemetry messages were displayed in the Device Raw Data tab
Azure IoT Central Device Raw Data tab with successfully decoded GPS location payloads
Azure IoT Central map displaying with device location highlighted
The TTI V3 Connector Minimalist Cloud to Device only required a port number, and there was no way to specify whether delivery of message had to be confirmed, the way the message was queued, or the priority of message delivery. Like the port number these optional settings can be specified in message properties.
If any of these properties are incorrect DeviceClient.RejectAsync is called which deletes the message from the device queue and indicates to the server that the message could not be processed.
private async Task AzureIoTHubClientReceiveMessageHandler(Message message, object userContext)
{
try
{
Models.AzureIoTHubReceiveMessageHandlerContext receiveMessageHandlerContext = (Models.AzureIoTHubReceiveMessageHandlerContext)userContext;
if (!_DeviceClients.TryGetValue(receiveMessageHandlerContext.DeviceId, out DeviceClient deviceClient))
{
_logger.LogWarning("Downlink-DeviceID:{0} unknown", receiveMessageHandlerContext.DeviceId);
return;
}
using (message)
{
string payloadText = Encoding.UTF8.GetString(message.GetBytes()).Trim();
if (!AzureDownlinkMessage.PortTryGet(message.Properties, out byte port))
{
_logger.LogWarning("Downlink-Port property is invalid");
await deviceClient.RejectAsync(message);
return;
}
if (!AzureDownlinkMessage.ConfirmedTryGet(message.Properties, out bool confirmed))
{
_logger.LogWarning("Downlink-Confirmed flag is invalid");
await deviceClient.RejectAsync(message);
return;
}
if (!AzureDownlinkMessage.PriorityTryGet(message.Properties, out Models.DownlinkPriority priority))
{
_logger.LogWarning("Downlink-Priority value is invalid");
await deviceClient.RejectAsync(message);
return;
}
if (!AzureDownlinkMessage.QueueTryGet(message.Properties, out Models.DownlinkQueue queue))
{
_logger.LogWarning("Downlink-Queue value is invalid");
await deviceClient.RejectAsync(message.LockToken);
return;
}
Models.Downlink downlink = new Models.Downlink()
{
Confirmed = confirmed,
Priority = priority,
Port = port,
CorrelationIds = AzureLockToken.Add(message.LockToken),
};
// Split over multiple lines in an attempt to improve readability. In this scenario a valid JSON string should start/end with {/} for an object or [/] for an array
if ((payloadText.StartsWith("{") && payloadText.EndsWith("}"))
||
((payloadText.StartsWith("[") && payloadText.EndsWith("]"))))
{
try
{
downlink.PayloadDecoded = JToken.Parse(payloadText);
}
catch (JsonReaderException)
{
downlink.PayloadRaw = payloadText;
}
}
else
{
downlink.PayloadRaw = payloadText;
}
_logger.LogInformation("Downlink-IoT Hub DeviceID:{0} MessageID:{2} LockToken:{3} Port:{4} Confirmed:{5} Priority:{6} Queue:{7}",
receiveMessageHandlerContext.DeviceId,
message.MessageId,
message.LockToken,
downlink.Port,
downlink.Confirmed,
downlink.Priority,
queue);
Models.DownlinkPayload Payload = new Models.DownlinkPayload()
{
Downlinks = new List<Models.Downlink>()
{
downlink
}
};
string url = $"{receiveMessageHandlerContext.WebhookBaseURL}/{receiveMessageHandlerContext.ApplicationId}/webhooks/{receiveMessageHandlerContext.WebhookId}/devices/{receiveMessageHandlerContext.DeviceId}/down/{queue}".ToLower();
using (var client = new WebClient())
{
client.Headers.Add("Authorization", $"Bearer {receiveMessageHandlerContext.ApiKey}");
client.UploadString(new Uri(url), JsonConvert.SerializeObject(Payload));
}
_logger.LogInformation("Downlink-DeviceID:{0} LockToken:{1} success", receiveMessageHandlerContext.DeviceId, message.LockToken);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Downlink-ReceiveMessge processing failed");
}
}
A correlation identifier containing the Message LockToken is added to the downlink payload.
Unconfirmed Downlink Messages
Azure IoT Explorer Cloud to Device sending an unconfirmed downlink message
For unconfirmed messages The TTI Connector calls the DeviceClient.CompletedAsync method (with the LockToken from the CorrelationIDs list) which deletes the message from the device queue.
[Function("Queued")]
public async Task<HttpResponseData> Queued([HttpTrigger(AuthorizationLevel.Function, "post")] HttpRequestData req, FunctionContext executionContext)
{
var logger = executionContext.GetLogger("Queued");
// Wrap all the processing in a try\catch so if anything blows up we have logged it.
try
{
string payloadText = await req.ReadAsStringAsync();
Models.DownlinkQueuedPayload payload = JsonConvert.DeserializeObject<Models.DownlinkQueuedPayload>(payloadText);
if (payload == null)
{
logger.LogInformation("Queued-Payload {0} invalid", payloadText);
return req.CreateResponse(HttpStatusCode.BadRequest);
}
string applicationId = payload.EndDeviceIds.ApplicationIds.ApplicationId;
string deviceId = payload.EndDeviceIds.DeviceId;
logger.LogInformation("Queued-ApplicationID:{0} DeviceID:{1} ", applicationId, deviceId);
if (!_DeviceClients.TryGetValue(deviceId, out DeviceClient deviceClient))
{
logger.LogInformation("Queued-Unknown device for ApplicationID:{0} DeviceID:{1}", applicationId, deviceId);
return req.CreateResponse(HttpStatusCode.Conflict);
}
// If the message is not confirmed "complete" it as soon as with network
if (!payload.DownlinkQueued.Confirmed)
{
if (!AzureLockToken.TryGet(payload.DownlinkQueued.CorrelationIds, out string lockToken))
{
logger.LogWarning("Queued-DeviceID:{0} LockToken missing from payload:{1}", payload.EndDeviceIds.DeviceId, payloadText);
return req.CreateResponse(HttpStatusCode.BadRequest);
}
try
{
await deviceClient.CompleteAsync(lockToken);
}
catch (DeviceMessageLockLostException)
{
logger.LogWarning("Queued-CompleteAsync DeviceID:{0} LockToken:{1} timeout", payload.EndDeviceIds.DeviceId, lockToken);
return req.CreateResponse(HttpStatusCode.Conflict);
}
logger.LogInformation("Queued-DeviceID:{0} LockToken:{1} success", payload.EndDeviceIds.DeviceId, lockToken);
}
}
catch (Exception ex)
{
logger.LogError(ex, "Queued message processing failed");
return req.CreateResponse(HttpStatusCode.InternalServerError);
}
return req.CreateResponse(HttpStatusCode.OK);
}
The Things Industries Live Data tab for an unconfirmed message-Queued
Azure Application Insights for an unconfirmed message
The Things Industries Live Data tab for an unconfirmed message-Sent
Confirmed Downlink Messages
Azure IoT Explorer Cloud to Device sending a confirmed downlink message
Azure Application Insights for a confirmed message
The Things Industries Live Data tab for a confirmed message-Sent
The Things Industries Live Data tab for a confirmed message-Ack
If message delivery succeeds the deviceClient.CompleteAsync method (with the LockToken from the CorrelationIDs list) is called which removes the message from the device queue.
[Function("Ack")]
public async Task<HttpResponseData> Ack([HttpTrigger(AuthorizationLevel.Function, "post")] HttpRequestData req, FunctionContext executionContext)
{
var logger = executionContext.GetLogger("Queued");
// Wrap all the processing in a try\catch so if anything blows up we have logged it.
try
{
string payloadText = await req.ReadAsStringAsync();
Models.DownlinkAckPayload payload = JsonConvert.DeserializeObject<Models.DownlinkAckPayload>(payloadText);
if (payload == null)
{
logger.LogInformation("Ack-Payload {0} invalid", payloadText);
return req.CreateResponse(HttpStatusCode.BadRequest);
}
string applicationId = payload.EndDeviceIds.ApplicationIds.ApplicationId;
string deviceId = payload.EndDeviceIds.DeviceId;
logger.LogInformation("Ack-ApplicationID:{0} DeviceID:{1} ", applicationId, deviceId);
if (!_DeviceClients.TryGetValue(deviceId, out DeviceClient deviceClient))
{
logger.LogInformation("Ack-Unknown device for ApplicationID:{0} DeviceID:{1}", applicationId, deviceId);
return req.CreateResponse(HttpStatusCode.Conflict);
}
if (!AzureLockToken.TryGet(payload.DownlinkAck.CorrelationIds, out string lockToken))
{
logger.LogWarning("Ack-DeviceID:{0} LockToken missing from payload:{1}", payload.EndDeviceIds.DeviceId, payloadText);
return req.CreateResponse(HttpStatusCode.BadRequest);
}
try
{
await deviceClient.CompleteAsync(lockToken);
}
catch (DeviceMessageLockLostException)
{
logger.LogWarning("Ack-CompleteAsync DeviceID:{0} LockToken:{1} timeout", payload.EndDeviceIds.DeviceId, lockToken);
return req.CreateResponse(HttpStatusCode.Conflict);
}
logger.LogInformation("Ack-DeviceID:{0} LockToken:{1} success", payload.EndDeviceIds.DeviceId, lockToken);
}
catch (Exception ex)
{
logger.LogError(ex, "Ack message processing failed");
return req.CreateResponse(HttpStatusCode.InternalServerError);
}
return req.CreateResponse(HttpStatusCode.OK);
}
Azure Application Insights for an confirmed message Ack
If message delivery fails the deviceClient.AbandonAsync method (with the LockToken from the CorrelationIDs list) is called which puts the downlink message back onto the device queue.
[Function("Failed")]
public async Task<HttpResponseData> Failed([HttpTrigger(AuthorizationLevel.Function, "post")] HttpRequestData req, FunctionContext executionContext)
{
var logger = executionContext.GetLogger("Queued");
// Wrap all the processing in a try\catch so if anything blows up we have logged it.
try
{
string payloadText = await req.ReadAsStringAsync();
Models.DownlinkFailedPayload payload = JsonConvert.DeserializeObject<Models.DownlinkFailedPayload>(payloadText);
if (payload == null)
{
logger.LogInformation("Failed-Payload {0} invalid", payloadText);
return req.CreateResponse(HttpStatusCode.BadRequest);
}
string applicationId = payload.EndDeviceIds.ApplicationIds.ApplicationId;
string deviceId = payload.EndDeviceIds.DeviceId;
logger.LogInformation("Failed-ApplicationID:{0} DeviceID:{1} ", applicationId, deviceId);
if (!_DeviceClients.TryGetValue(deviceId, out DeviceClient deviceClient))
{
logger.LogInformation("Failed-Unknown device for ApplicationID:{0} DeviceID:{1}", applicationId, deviceId);
return req.CreateResponse(HttpStatusCode.Conflict);
}
if (!AzureLockToken.TryGet(payload.DownlinkFailed.CorrelationIds, out string lockToken))
{
logger.LogWarning("Failed-DeviceID:{0} LockToken missing from payload:{1}", payload.EndDeviceIds.DeviceId, payloadText);
return req.CreateResponse(HttpStatusCode.BadRequest);
}
try
{
await deviceClient.RejectAsync(lockToken);
}
catch (DeviceMessageLockLostException)
{
logger.LogWarning("Failed-RejectAsync DeviceID:{0} LockToken:{1} timeout", payload.EndDeviceIds.DeviceId, lockToken);
return req.CreateResponse(HttpStatusCode.Conflict);
}
logger.LogInformation("Failed-DeviceID:{0} LockToken:{1} success", payload.EndDeviceIds.DeviceId, lockToken);
}
catch (Exception ex)
{
logger.LogError(ex, "Failed message processing failed");
return req.CreateResponse(HttpStatusCode.InternalServerError);
}
return req.CreateResponse(HttpStatusCode.OK);
}
If message delivery is unsuccessful the deviceClient.RejectAsync method (with the LockToken from the CorrelationIDs list) is called which deletes the message from the device queue and indicates to the server that the message could not be processed.
[Function("Nack")]
public async Task<HttpResponseData> Nack([HttpTrigger(AuthorizationLevel.Function, "post")] HttpRequestData req, FunctionContext executionContext)
{
var logger = executionContext.GetLogger("Queued");
// Wrap all the processing in a try\catch so if anything blows up we have logged it.
try
{
string payloadText = await req.ReadAsStringAsync();
Models.DownlinkNackPayload payload = JsonConvert.DeserializeObject<Models.DownlinkNackPayload>(payloadText);
if (payload == null)
{
logger.LogInformation("Nack-Payload {0} invalid", payloadText);
return req.CreateResponse(HttpStatusCode.BadRequest);
}
string applicationId = payload.EndDeviceIds.ApplicationIds.ApplicationId;
string deviceId = payload.EndDeviceIds.DeviceId;
logger.LogInformation("Nack-ApplicationID:{0} DeviceID:{1} ", applicationId, deviceId);
if (!_DeviceClients.TryGetValue(deviceId, out DeviceClient deviceClient))
{
logger.LogInformation("Nack-Unknown device for ApplicationID:{0} DeviceID:{1}", applicationId, deviceId);
return req.CreateResponse(HttpStatusCode.Conflict);
}
if (!AzureLockToken.TryGet(payload.DownlinkNack.CorrelationIds, out string lockToken))
{
logger.LogWarning("Nack-DeviceID:{0} LockToken missing from payload:{1}", payload.EndDeviceIds.DeviceId, payloadText);
return req.CreateResponse(HttpStatusCode.BadRequest);
}
try
{
await deviceClient.RejectAsync(lockToken);
}
catch (DeviceMessageLockLostException)
{
logger.LogWarning("Nack-RejectAsync DeviceID:{0} LockToken:{1} timeout", payload.EndDeviceIds.DeviceId, lockToken);
return req.CreateResponse(HttpStatusCode.Conflict);
}
logger.LogInformation("Nack-DeviceID:{0} LockToken:{1} success", payload.EndDeviceIds.DeviceId, lockToken);
}
catch (Exception ex)
{
logger.LogError(ex, "Nack message processing failed");
return req.CreateResponse(HttpStatusCode.InternalServerError);
}
return req.CreateResponse(HttpStatusCode.OK);
}
The way message Failed(Abandon), Ack(CompleteAsync) and Nack(RejectAsync) are handled needs some more testing to confirm my understanding of the sequencing of TTI confirmed message delivery.
BEWARE
The use of Confirmed messaging with devices that send uplink messages irregularly can cause weird problems if the Azure IoT hub downlink message times out.
public partial class Webhooks
{
[Function("Uplink")]
public async Task<HttpResponseData> Uplink([HttpTrigger(AuthorizationLevel.Function, "post")] HttpRequestData req, FunctionContext executionContext)
{
var logger = executionContext.GetLogger("Uplink");
// Wrap all the processing in a try\catch so if anything blows up we have logged it. Will need to specialise for connectivity failues etc.
try
{
Models.PayloadUplink payload = JsonConvert.DeserializeObject<Models.PayloadUplink>(await req.ReadAsStringAsync());
if (payload == null)
{
logger.LogInformation("Uplink: Payload {0} invalid", await req.ReadAsStringAsync());
return req.CreateResponse(HttpStatusCode.BadRequest);
}
string applicationId = payload.EndDeviceIds.ApplicationIds.ApplicationId;
string deviceId = payload.EndDeviceIds.DeviceId;
if ((payload.UplinkMessage.Port == null ) || (!payload.UplinkMessage.Port.HasValue) || (payload.UplinkMessage.Port.Value == 0))
{
logger.LogInformation("Uplink-ApplicationID:{0} DeviceID:{1} Payload Raw:{2} Control nessage", applicationId, deviceId, payload.UplinkMessage.PayloadRaw);
return req.CreateResponse(HttpStatusCode.BadRequest);
}
int port = payload.UplinkMessage.Port.Value;
logger.LogInformation("Uplink-ApplicationID:{0} DeviceID:{1} Port:{2} Payload Raw:{3}", applicationId, deviceId, port, payload.UplinkMessage.PayloadRaw);
if (!_DeviceClients.TryGetValue(deviceId, out DeviceClient deviceClient))
{
logger.LogInformation("Uplink-Unknown device for ApplicationID:{0} DeviceID:{1}", applicationId, deviceId);
deviceClient = DeviceClient.CreateFromConnectionString(_configuration.GetConnectionString("AzureIoTHub"), deviceId);
try
{
await deviceClient.OpenAsync();
}
catch (DeviceNotFoundException)
{
logger.LogWarning("Uplink-Unknown DeviceID:{0}", deviceId);
return req.CreateResponse(HttpStatusCode.NotFound);
}
if (!_DeviceClients.TryAdd(deviceId, deviceClient))
{
logger.LogWarning("Uplink-TryAdd failed for ApplicationID:{0} DeviceID:{1}", applicationId, deviceId);
return req.CreateResponse(HttpStatusCode.Conflict);
}
}
JObject telemetryEvent = new JObject
{
{ "ApplicationID", applicationId },
{ "DeviceID", deviceId },
{ "Port", port },
{ "PayloadRaw", payload.UplinkMessage.PayloadRaw }
};
// If the payload has been decoded by payload formatter, put it in the message body.
if (payload.UplinkMessage.PayloadDecoded != null)
{
telemetryEvent.Add("PayloadDecoded", payload.UplinkMessage.PayloadDecoded);
}
// Send the message to Azure IoT Hub
using (Message ioTHubmessage = new Message(Encoding.ASCII.GetBytes(JsonConvert.SerializeObject(telemetryEvent))))
{
// Ensure the displayed time is the acquired time rather than the uploaded time.
ioTHubmessage.Properties.Add("iothub-creation-time-utc", payload.UplinkMessage.ReceivedAtUtc.ToString("s", CultureInfo.InvariantCulture));
ioTHubmessage.Properties.Add("ApplicationId", applicationId);
ioTHubmessage.Properties.Add("DeviceEUI", payload.EndDeviceIds.DeviceEui);
ioTHubmessage.Properties.Add("DeviceId", deviceId);
ioTHubmessage.Properties.Add("port", port.ToString());
await deviceClient.SendEventAsync(ioTHubmessage);
}
}
catch (Exception ex)
{
logger.LogError(ex, "Uplink message processing failed");
return req.CreateResponse(HttpStatusCode.InternalServerError);
}
return req.CreateResponse(HttpStatusCode.OK);
}
}
For initial development and testing I ran the function application in the desktop emulator and simulated TTI webhook calls with Telerik Fiddler and modified TTI sample payloads.
I then used Azure IoT Explorer to configure devices, view uplink traffic etc. When I connected to my Azure IoT Hub shortly after starting the application all the devices were disconnected.
Azure IoT Explorer – no connected devices
The SeeeduinoLoRaWAN devices report roughly every 15 minutes so it took a while for them all to connect. (the SeeeduinoLoRaWAN4 & SeeeduinoLoRaWAN6 need to be repaired) .
ProvisioningRegistrationAdditionalData provisioningRegistrationAdditionalData = new ProvisioningRegistrationAdditionalData()
{
JsonData = $"{{\"modelId\": \"{modelId}\"}}"
};
result = await provClient.RegisterAsync(provisioningRegistrationAdditionalData);
I remembered seeing a sample where the DTDLV2 methodId was formatted by a library function and after a surprising amount of searching I found what I was looking for in Azure-Samples repository.
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using Microsoft.Azure.Devices.Provisioning.Client.Extensions;
namespace Microsoft.Azure.Devices.Provisioning.Client.PlugAndPlay
{
/// <summary>
/// A helper class for formatting the DPS device registration payload, per plug and play convention.
/// </summary>
public static class PnpConvention
{
/// <summary>
/// Create the DPS payload to provision a device as plug and play.
/// </summary>
/// <remarks>
/// For more information on device provisioning service and plug and play compatibility,
/// and PnP device certification, see <see href="https://docs.microsoft.com/en-us/azure/iot-pnp/howto-certify-device"/>.
/// The DPS payload should be in the format:
/// <code>
/// {
/// "modelId": "dtmi:com:example:modelName;1"
/// }
/// </code>
/// For information on DTDL, see <see href="https://github.com/Azure/opendigitaltwins-dtdl/blob/master/DTDL/v2/dtdlv2.md"/>
/// </remarks>
/// <param name="modelId">The Id of the model the device adheres to for properties, telemetry, and commands.</param>
/// <returns>The DPS payload to provision a device as plug and play.</returns>
public static string CreateDpsPayload(string modelId)
{
modelId.ThrowIfNullOrWhiteSpace(nameof(modelId));
return $"{{\"modelId\":\"{modelId}\"}}";
}
}
}
With a couple of changes my code now uses the CreateDpsPayload method
using Microsoft.Azure.Devices.Provisioning.Client.PlugAndPlay;
...
using (var securityProvider = new SecurityProviderSymmetricKey(deviceId, deviceKey, null))
{
using (var transport = new ProvisioningTransportHandlerAmqp(TransportFallbackType.TcpOnly))
{
ProvisioningDeviceClient provClient = ProvisioningDeviceClient.Create(
Constants.AzureDpsGlobalDeviceEndpoint,
deviceProvisiongServiceSettings.IdScope,
securityProvider,
transport);
DeviceRegistrationResult result;
if (!string.IsNullOrEmpty(modelId))
{
ProvisioningRegistrationAdditionalData provisioningRegistrationAdditionalData = new ProvisioningRegistrationAdditionalData()
{
JsonData = PnpConvention.CreateDpsPayload(modelId)
};
result = await provClient.RegisterAsync(provisioningRegistrationAdditionalData, stoppingToken);
}
else
{
result = await provClient.RegisterAsync(stoppingToken);
}
if (result.Status != ProvisioningRegistrationStatusType.Assigned)
{
_logger.LogError("Config-DeviceID:{0} Status:{1} RegisterAsync failed ", deviceId, result.Status);
return false;
}
IAuthenticationMethod authentication = new DeviceAuthenticationWithRegistrySymmetricKey(result.DeviceId, (securityProvider as SecurityProviderSymmetricKey).GetPrimaryKey());
deviceClient = DeviceClient.Create(result.AssignedHub, authentication, transportSettings);
}
}
To reduce the impact of the RegisterAsync call duration this Proof of Concept(PoC) code uses the System.Tasks.Threading library to execute each request in its own thread and then wait for all the requests to finish.
The connector application paginates the retrieval of device configuration from TTI API and a Task is created for each device returned in a page. In the Application Insights Trace logging the duration of a single page of device registrations was approximately the duration of the longest call.
There will be a tradeoff between device page size (resource utilisation by many threads) and startup duration (to many sequential page operations) which will need to be explored.
While debugging the connector on my desktop I had noticed that using a connection string was quite a bit faster than using DPS and I had assumed this was just happenstance. While doing some testing in the Azure North Europe data-center (Closer to TTI European servers) I grabbed some screen shots of the trace messages in Azure Application Insights as the TTI Connector Application was starting.
I only have six LoRaWAN devices configured in my TTI dev instance, but I repeated each test several times and the results were consistent so the request durations are reasonable. My TTI Connector application, IoT Hub, DPS and Application insights instances are all in the same Azure Region and Azure Resource Group so networking overheads shouldn’t be significant.
Using my own DPS instance to provide the connection string and then establishing a connection took between 3 and 7 seconds.
Azure IoT Central DPS
For my Azure IoT Central instance getting a connection string and establishing a connection took between 4 and 7 seconds.
The Azure DPS client code was copied from one of the sample applications so I have assumed it is “correct”.
using (var transport = new ProvisioningTransportHandlerAmqp(TransportFallbackType.TcpOnly))
{
ProvisioningDeviceClient provClient = ProvisioningDeviceClient.Create(
Constants.AzureDpsGlobalDeviceEndpoint,
deviceProvisiongServiceSettings.IdScope,
securityProvider,
transport);
DeviceRegistrationResult result;
if (!string.IsNullOrEmpty(modelId))
{
ProvisioningRegistrationAdditionalData provisioningRegistrationAdditionalData = new ProvisioningRegistrationAdditionalData()
{
JsonData = $"{{"modelId": "{modelId}"}}"
};
result = await provClient.RegisterAsync(provisioningRegistrationAdditionalData, stoppingToken);
}
else
{
result = await provClient.RegisterAsync(stoppingToken);
}
if (result.Status != ProvisioningRegistrationStatusType.Assigned)
{
_logger.LogError("Config-DeviceID:{0} Status:{1} RegisterAsync failed ", deviceId, result.Status);
return false;
}
IAuthenticationMethod authentication = new DeviceAuthenticationWithRegistrySymmetricKey(result.DeviceId, (securityProvider as SecurityProviderSymmetricKey).GetPrimaryKey());
deviceClient = DeviceClient.Create(result.AssignedHub, authentication, transportSettings);
}
I need to investigate why getting a connection string from the DPS then connecting take significantly longer (I appreciate that “behind the scenes” service calls maybe required). This wouldn’t be an issue for individual devices connecting from different locations but for my Identity Translation Cloud gateway which currently open connections sequentially this could be a problem when there are a large number of devices.
If the individual requests duration can’t be reduced (using connection pooling etc.) I may have to spin up multiple threads so multiple devices can be connecting concurrently.