Random wanderings through Microsoft Azure esp. PaaS plumbing, the IoT bits, AI on Micro controllers, AI on Edge Devices, .NET nanoFramework, .NET Core on *nix and ML.NET+ONNX
The Azure IoT Central Location Telemetry messages have a slightly different format to the output of the TTI LPP Payload formatter so the payload has to be “post processed”.
private void EnumerateChildren(JObject jobject, JToken token)
{
if (token is JProperty property)
{
if (token.First is JValue)
{
// Temporary dirty hack for Azure IoT Central compatibility
if (token.Parent is JObject possibleGpsProperty)
{
// TODO Need to check if similar approach necessary accelerometer and gyro LPP payloads
if (possibleGpsProperty.Path.StartsWith("GPS_", StringComparison.OrdinalIgnoreCase))
{
if (string.Compare(property.Name, "Latitude", true) == 0)
{
jobject.Add("lat", property.Value);
}
if (string.Compare(property.Name, "Longitude", true) == 0)
{
jobject.Add("lon", property.Value);
}
if (string.Compare(property.Name, "Altitude", true) == 0)
{
jobject.Add("alt", property.Value);
}
}
}
jobject.Add(property.Name, property.Value);
}
else
{
JObject parentObject = new JObject();
foreach (JToken token2 in token.Children())
{
EnumerateChildren(parentObject, token2);
jobject.Add(property.Name, parentObject);
}
}
}
else
{
foreach (JToken token2 in token.Children())
{
EnumerateChildren(jobject, token2);
}
}
}
I may have to extend this method for other LPP datatypes
“Post processed” TTI JSON GPS Position data suitable for Azure IoT Central
Azure IoT Central Device Template with Location Capability
For initial development and testing I ran the function application in the desktop emulator and simulated TTI webhook calls with Telerik Fiddler and modified sample payloads. After some issues with iothub-creation-time-utc decoded telemetry messages were displayed in the Device Raw Data tab
Azure IoT Central Device Raw Data tab with successfully decoded GPS location payloads
Azure IoT Central map displaying with device location highlighted
The TTI V3 Connector Minimalist Cloud to Device only required a port number, and there was no way to specify whether delivery of message had to be confirmed, the way the message was queued, or the priority of message delivery. Like the port number these optional settings can be specified in message properties.
If any of these properties are incorrect DeviceClient.RejectAsync is called which deletes the message from the device queue and indicates to the server that the message could not be processed.
private async Task AzureIoTHubClientReceiveMessageHandler(Message message, object userContext)
{
try
{
Models.AzureIoTHubReceiveMessageHandlerContext receiveMessageHandlerContext = (Models.AzureIoTHubReceiveMessageHandlerContext)userContext;
if (!_DeviceClients.TryGetValue(receiveMessageHandlerContext.DeviceId, out DeviceClient deviceClient))
{
_logger.LogWarning("Downlink-DeviceID:{0} unknown", receiveMessageHandlerContext.DeviceId);
return;
}
using (message)
{
string payloadText = Encoding.UTF8.GetString(message.GetBytes()).Trim();
if (!AzureDownlinkMessage.PortTryGet(message.Properties, out byte port))
{
_logger.LogWarning("Downlink-Port property is invalid");
await deviceClient.RejectAsync(message);
return;
}
if (!AzureDownlinkMessage.ConfirmedTryGet(message.Properties, out bool confirmed))
{
_logger.LogWarning("Downlink-Confirmed flag is invalid");
await deviceClient.RejectAsync(message);
return;
}
if (!AzureDownlinkMessage.PriorityTryGet(message.Properties, out Models.DownlinkPriority priority))
{
_logger.LogWarning("Downlink-Priority value is invalid");
await deviceClient.RejectAsync(message);
return;
}
if (!AzureDownlinkMessage.QueueTryGet(message.Properties, out Models.DownlinkQueue queue))
{
_logger.LogWarning("Downlink-Queue value is invalid");
await deviceClient.RejectAsync(message.LockToken);
return;
}
Models.Downlink downlink = new Models.Downlink()
{
Confirmed = confirmed,
Priority = priority,
Port = port,
CorrelationIds = AzureLockToken.Add(message.LockToken),
};
// Split over multiple lines in an attempt to improve readability. In this scenario a valid JSON string should start/end with {/} for an object or [/] for an array
if ((payloadText.StartsWith("{") && payloadText.EndsWith("}"))
||
((payloadText.StartsWith("[") && payloadText.EndsWith("]"))))
{
try
{
downlink.PayloadDecoded = JToken.Parse(payloadText);
}
catch (JsonReaderException)
{
downlink.PayloadRaw = payloadText;
}
}
else
{
downlink.PayloadRaw = payloadText;
}
_logger.LogInformation("Downlink-IoT Hub DeviceID:{0} MessageID:{2} LockToken:{3} Port:{4} Confirmed:{5} Priority:{6} Queue:{7}",
receiveMessageHandlerContext.DeviceId,
message.MessageId,
message.LockToken,
downlink.Port,
downlink.Confirmed,
downlink.Priority,
queue);
Models.DownlinkPayload Payload = new Models.DownlinkPayload()
{
Downlinks = new List<Models.Downlink>()
{
downlink
}
};
string url = $"{receiveMessageHandlerContext.WebhookBaseURL}/{receiveMessageHandlerContext.ApplicationId}/webhooks/{receiveMessageHandlerContext.WebhookId}/devices/{receiveMessageHandlerContext.DeviceId}/down/{queue}".ToLower();
using (var client = new WebClient())
{
client.Headers.Add("Authorization", $"Bearer {receiveMessageHandlerContext.ApiKey}");
client.UploadString(new Uri(url), JsonConvert.SerializeObject(Payload));
}
_logger.LogInformation("Downlink-DeviceID:{0} LockToken:{1} success", receiveMessageHandlerContext.DeviceId, message.LockToken);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Downlink-ReceiveMessge processing failed");
}
}
A correlation identifier containing the Message LockToken is added to the downlink payload.
Unconfirmed Downlink Messages
Azure IoT Explorer Cloud to Device sending an unconfirmed downlink message
For unconfirmed messages The TTI Connector calls the DeviceClient.CompletedAsync method (with the LockToken from the CorrelationIDs list) which deletes the message from the device queue.
[Function("Queued")]
public async Task<HttpResponseData> Queued([HttpTrigger(AuthorizationLevel.Function, "post")] HttpRequestData req, FunctionContext executionContext)
{
var logger = executionContext.GetLogger("Queued");
// Wrap all the processing in a try\catch so if anything blows up we have logged it.
try
{
string payloadText = await req.ReadAsStringAsync();
Models.DownlinkQueuedPayload payload = JsonConvert.DeserializeObject<Models.DownlinkQueuedPayload>(payloadText);
if (payload == null)
{
logger.LogInformation("Queued-Payload {0} invalid", payloadText);
return req.CreateResponse(HttpStatusCode.BadRequest);
}
string applicationId = payload.EndDeviceIds.ApplicationIds.ApplicationId;
string deviceId = payload.EndDeviceIds.DeviceId;
logger.LogInformation("Queued-ApplicationID:{0} DeviceID:{1} ", applicationId, deviceId);
if (!_DeviceClients.TryGetValue(deviceId, out DeviceClient deviceClient))
{
logger.LogInformation("Queued-Unknown device for ApplicationID:{0} DeviceID:{1}", applicationId, deviceId);
return req.CreateResponse(HttpStatusCode.Conflict);
}
// If the message is not confirmed "complete" it as soon as with network
if (!payload.DownlinkQueued.Confirmed)
{
if (!AzureLockToken.TryGet(payload.DownlinkQueued.CorrelationIds, out string lockToken))
{
logger.LogWarning("Queued-DeviceID:{0} LockToken missing from payload:{1}", payload.EndDeviceIds.DeviceId, payloadText);
return req.CreateResponse(HttpStatusCode.BadRequest);
}
try
{
await deviceClient.CompleteAsync(lockToken);
}
catch (DeviceMessageLockLostException)
{
logger.LogWarning("Queued-CompleteAsync DeviceID:{0} LockToken:{1} timeout", payload.EndDeviceIds.DeviceId, lockToken);
return req.CreateResponse(HttpStatusCode.Conflict);
}
logger.LogInformation("Queued-DeviceID:{0} LockToken:{1} success", payload.EndDeviceIds.DeviceId, lockToken);
}
}
catch (Exception ex)
{
logger.LogError(ex, "Queued message processing failed");
return req.CreateResponse(HttpStatusCode.InternalServerError);
}
return req.CreateResponse(HttpStatusCode.OK);
}
The Things Industries Live Data tab for an unconfirmed message-Queued
Azure Application Insights for an unconfirmed message
The Things Industries Live Data tab for an unconfirmed message-Sent
Confirmed Downlink Messages
Azure IoT Explorer Cloud to Device sending a confirmed downlink message
Azure Application Insights for a confirmed message
The Things Industries Live Data tab for a confirmed message-Sent
The Things Industries Live Data tab for a confirmed message-Ack
If message delivery succeeds the deviceClient.CompleteAsync method (with the LockToken from the CorrelationIDs list) is called which removes the message from the device queue.
[Function("Ack")]
public async Task<HttpResponseData> Ack([HttpTrigger(AuthorizationLevel.Function, "post")] HttpRequestData req, FunctionContext executionContext)
{
var logger = executionContext.GetLogger("Queued");
// Wrap all the processing in a try\catch so if anything blows up we have logged it.
try
{
string payloadText = await req.ReadAsStringAsync();
Models.DownlinkAckPayload payload = JsonConvert.DeserializeObject<Models.DownlinkAckPayload>(payloadText);
if (payload == null)
{
logger.LogInformation("Ack-Payload {0} invalid", payloadText);
return req.CreateResponse(HttpStatusCode.BadRequest);
}
string applicationId = payload.EndDeviceIds.ApplicationIds.ApplicationId;
string deviceId = payload.EndDeviceIds.DeviceId;
logger.LogInformation("Ack-ApplicationID:{0} DeviceID:{1} ", applicationId, deviceId);
if (!_DeviceClients.TryGetValue(deviceId, out DeviceClient deviceClient))
{
logger.LogInformation("Ack-Unknown device for ApplicationID:{0} DeviceID:{1}", applicationId, deviceId);
return req.CreateResponse(HttpStatusCode.Conflict);
}
if (!AzureLockToken.TryGet(payload.DownlinkAck.CorrelationIds, out string lockToken))
{
logger.LogWarning("Ack-DeviceID:{0} LockToken missing from payload:{1}", payload.EndDeviceIds.DeviceId, payloadText);
return req.CreateResponse(HttpStatusCode.BadRequest);
}
try
{
await deviceClient.CompleteAsync(lockToken);
}
catch (DeviceMessageLockLostException)
{
logger.LogWarning("Ack-CompleteAsync DeviceID:{0} LockToken:{1} timeout", payload.EndDeviceIds.DeviceId, lockToken);
return req.CreateResponse(HttpStatusCode.Conflict);
}
logger.LogInformation("Ack-DeviceID:{0} LockToken:{1} success", payload.EndDeviceIds.DeviceId, lockToken);
}
catch (Exception ex)
{
logger.LogError(ex, "Ack message processing failed");
return req.CreateResponse(HttpStatusCode.InternalServerError);
}
return req.CreateResponse(HttpStatusCode.OK);
}
Azure Application Insights for an confirmed message Ack
If message delivery fails the deviceClient.AbandonAsync method (with the LockToken from the CorrelationIDs list) is called which puts the downlink message back onto the device queue.
[Function("Failed")]
public async Task<HttpResponseData> Failed([HttpTrigger(AuthorizationLevel.Function, "post")] HttpRequestData req, FunctionContext executionContext)
{
var logger = executionContext.GetLogger("Queued");
// Wrap all the processing in a try\catch so if anything blows up we have logged it.
try
{
string payloadText = await req.ReadAsStringAsync();
Models.DownlinkFailedPayload payload = JsonConvert.DeserializeObject<Models.DownlinkFailedPayload>(payloadText);
if (payload == null)
{
logger.LogInformation("Failed-Payload {0} invalid", payloadText);
return req.CreateResponse(HttpStatusCode.BadRequest);
}
string applicationId = payload.EndDeviceIds.ApplicationIds.ApplicationId;
string deviceId = payload.EndDeviceIds.DeviceId;
logger.LogInformation("Failed-ApplicationID:{0} DeviceID:{1} ", applicationId, deviceId);
if (!_DeviceClients.TryGetValue(deviceId, out DeviceClient deviceClient))
{
logger.LogInformation("Failed-Unknown device for ApplicationID:{0} DeviceID:{1}", applicationId, deviceId);
return req.CreateResponse(HttpStatusCode.Conflict);
}
if (!AzureLockToken.TryGet(payload.DownlinkFailed.CorrelationIds, out string lockToken))
{
logger.LogWarning("Failed-DeviceID:{0} LockToken missing from payload:{1}", payload.EndDeviceIds.DeviceId, payloadText);
return req.CreateResponse(HttpStatusCode.BadRequest);
}
try
{
await deviceClient.RejectAsync(lockToken);
}
catch (DeviceMessageLockLostException)
{
logger.LogWarning("Failed-RejectAsync DeviceID:{0} LockToken:{1} timeout", payload.EndDeviceIds.DeviceId, lockToken);
return req.CreateResponse(HttpStatusCode.Conflict);
}
logger.LogInformation("Failed-DeviceID:{0} LockToken:{1} success", payload.EndDeviceIds.DeviceId, lockToken);
}
catch (Exception ex)
{
logger.LogError(ex, "Failed message processing failed");
return req.CreateResponse(HttpStatusCode.InternalServerError);
}
return req.CreateResponse(HttpStatusCode.OK);
}
If message delivery is unsuccessful the deviceClient.RejectAsync method (with the LockToken from the CorrelationIDs list) is called which deletes the message from the device queue and indicates to the server that the message could not be processed.
[Function("Nack")]
public async Task<HttpResponseData> Nack([HttpTrigger(AuthorizationLevel.Function, "post")] HttpRequestData req, FunctionContext executionContext)
{
var logger = executionContext.GetLogger("Queued");
// Wrap all the processing in a try\catch so if anything blows up we have logged it.
try
{
string payloadText = await req.ReadAsStringAsync();
Models.DownlinkNackPayload payload = JsonConvert.DeserializeObject<Models.DownlinkNackPayload>(payloadText);
if (payload == null)
{
logger.LogInformation("Nack-Payload {0} invalid", payloadText);
return req.CreateResponse(HttpStatusCode.BadRequest);
}
string applicationId = payload.EndDeviceIds.ApplicationIds.ApplicationId;
string deviceId = payload.EndDeviceIds.DeviceId;
logger.LogInformation("Nack-ApplicationID:{0} DeviceID:{1} ", applicationId, deviceId);
if (!_DeviceClients.TryGetValue(deviceId, out DeviceClient deviceClient))
{
logger.LogInformation("Nack-Unknown device for ApplicationID:{0} DeviceID:{1}", applicationId, deviceId);
return req.CreateResponse(HttpStatusCode.Conflict);
}
if (!AzureLockToken.TryGet(payload.DownlinkNack.CorrelationIds, out string lockToken))
{
logger.LogWarning("Nack-DeviceID:{0} LockToken missing from payload:{1}", payload.EndDeviceIds.DeviceId, payloadText);
return req.CreateResponse(HttpStatusCode.BadRequest);
}
try
{
await deviceClient.RejectAsync(lockToken);
}
catch (DeviceMessageLockLostException)
{
logger.LogWarning("Nack-RejectAsync DeviceID:{0} LockToken:{1} timeout", payload.EndDeviceIds.DeviceId, lockToken);
return req.CreateResponse(HttpStatusCode.Conflict);
}
logger.LogInformation("Nack-DeviceID:{0} LockToken:{1} success", payload.EndDeviceIds.DeviceId, lockToken);
}
catch (Exception ex)
{
logger.LogError(ex, "Nack message processing failed");
return req.CreateResponse(HttpStatusCode.InternalServerError);
}
return req.CreateResponse(HttpStatusCode.OK);
}
The way message Failed(Abandon), Ack(CompleteAsync) and Nack(RejectAsync) are handled needs some more testing to confirm my understanding of the sequencing of TTI confirmed message delivery.
BEWARE
The use of Confirmed messaging with devices that send uplink messages irregularly can cause weird problems if the Azure IoT hub downlink message times out.
In the Azure Portal I configured the DPS ID Scope (AzureSettings:DeviceProvisioningServiceSettings:IdScope) and the Group Enrollment Key(AzureSettings:DeviceProvisioningServiceSettings:GroupEnrollmentKey) then saved the configuration which restarted the AppService.
Azure Portal AppService configration
The first time a device sent an uplink message the cache query fails and the RegisterAsync method of the ProvisioningDeviceClient is called to get a device connection string.
logger.LogInformation("Uplink-ApplicationID:{0} DeviceID:{1} Port:{2} Payload Raw:{3}", applicationId, deviceId, port, payload.UplinkMessage.PayloadRaw);
if (!_DeviceClients.TryGetValue(deviceId, out DeviceClient deviceClient))
{
logger.LogInformation("Uplink-Unknown device for ApplicationID:{0} DeviceID:{1}", applicationId, deviceId);
// Check that only one of Azure Connection string or DPS is configured
if (string.IsNullOrEmpty(_azureSettings.IoTHubConnectionString) && (_azureSettings.DeviceProvisioningServiceSettings == null))
{
logger.LogError("Uplink-Neither Azure IoT Hub connection string or Device Provisioning Service configured");
return req.CreateResponse(HttpStatusCode.UnprocessableEntity);
}
// Check that only one of Azure Connection string or DPS is configured
if (!string.IsNullOrEmpty(_azureSettings.IoTHubConnectionString) && (_azureSettings.DeviceProvisioningServiceSettings != null))
{
logger.LogError("Uplink-Both Azure IoT Hub connection string and Device Provisioning Service configured");
return req.CreateResponse(HttpStatusCode.UnprocessableEntity);
}
// User Azure IoT Connection string if configured and Device Provisioning Service isn't
if (!string.IsNullOrEmpty(_azureSettings.IoTHubConnectionString))
{
deviceClient = DeviceClient.CreateFromConnectionString(_azureSettings.IoTHubConnectionString, deviceId, transportSettings);
try
{
await deviceClient.OpenAsync();
}
catch (DeviceNotFoundException)
{
logger.LogWarning("Uplink-Unknown DeviceID:{0}", deviceId);
return req.CreateResponse(HttpStatusCode.NotFound);
}
}
// Azure IoT Hub Device provisioning service if configured
if (_azureSettings.DeviceProvisioningServiceSettings != null)
{
string deviceKey;
if ( string.IsNullOrEmpty(_azureSettings.DeviceProvisioningServiceSettings.IdScope) || string.IsNullOrEmpty(_azureSettings.DeviceProvisioningServiceSettings.GroupEnrollmentKey))
{
logger.LogError("Uplink-Device Provisioning Service requires ID Scope and Group Enrollment Key configured");
return req.CreateResponse(HttpStatusCode.UnprocessableEntity);
}
using (var hmac = new HMACSHA256(Convert.FromBase64String(_azureSettings.DeviceProvisioningServiceSettings.GroupEnrollmentKey)))
{
deviceKey = Convert.ToBase64String(hmac.ComputeHash(Encoding.UTF8.GetBytes(deviceId)));
}
using (var securityProvider = new SecurityProviderSymmetricKey(deviceId, deviceKey, null))
{
using (var transport = new ProvisioningTransportHandlerAmqp(TransportFallbackType.TcpOnly))
{
ProvisioningDeviceClient provClient = ProvisioningDeviceClient.Create(
Constants.AzureDpsGlobalDeviceEndpoint,
_azureSettings.DeviceProvisioningServiceSettings.IdScope,
securityProvider,
transport);
DeviceRegistrationResult result = await provClient.RegisterAsync();
if (result.Status != ProvisioningRegistrationStatusType.Assigned)
{
_logger.LogError("Config-DeviceID:{0} Status:{1} RegisterAsync failed ", deviceId, result.Status);
return req.CreateResponse(HttpStatusCode.FailedDependency);
}
IAuthenticationMethod authentication = new DeviceAuthenticationWithRegistrySymmetricKey(result.DeviceId, (securityProvider as SecurityProviderSymmetricKey).GetPrimaryKey());
deviceClient = DeviceClient.Create(result.AssignedHub, authentication, transportSettings);
await deviceClient.OpenAsync();
}
}
}
if (!_DeviceClients.TryAdd(deviceId, deviceClient))
{
logger.LogWarning("Uplink-TryAdd failed for ApplicationID:{0} DeviceID:{1}", applicationId, deviceId);
return req.CreateResponse(HttpStatusCode.Conflict);
}
Models.AzureIoTHubReceiveMessageHandlerContext context = new Models.AzureIoTHubReceiveMessageHandlerContext()
{
DeviceId = deviceId,
ApplicationId = applicationId,
WebhookId = _theThingsIndustriesSettings.WebhookId,
WebhookBaseURL = _theThingsIndustriesSettings.WebhookBaseURL,
ApiKey = _theThingsIndustriesSettings.ApiKey
};
await deviceClient.SetReceiveMessageHandlerAsync(AzureIoTHubClientReceiveMessageHandler, context);
await deviceClient.SetMethodDefaultHandlerAsync(AzureIoTHubClientDefaultMethodHandler, context);
}
JObject telemetryEvent = new JObject
{
{ "ApplicationID", applicationId },
{ "DeviceID", deviceId },
{ "Port", port },
{ "Simulated", payload.Simulated },
{ "ReceivedAtUtc", payload.UplinkMessage.ReceivedAtUtc.ToString("s", CultureInfo.InvariantCulture) },
{ "PayloadRaw", payload.UplinkMessage.PayloadRaw }
};
// If the payload has been decoded by payload formatter, put it in the message body.
if (payload.UplinkMessage.PayloadDecoded != null)
{
telemetryEvent.Add("PayloadDecoded", payload.UplinkMessage.PayloadDecoded);
}
// Send the message to Azure IoT Hub
using (Message ioTHubmessage = new Message(Encoding.ASCII.GetBytes(JsonConvert.SerializeObject(telemetryEvent))))
{
// Ensure the displayed time is the acquired time rather than the uploaded time.
ioTHubmessage.Properties.Add("iothub-creation-time-utc", payload.UplinkMessage.ReceivedAtUtc.ToString("s", CultureInfo.InvariantCulture));
ioTHubmessage.Properties.Add("ApplicationId", applicationId);
ioTHubmessage.Properties.Add("DeviceEUI", payload.EndDeviceIds.DeviceEui);
ioTHubmessage.Properties.Add("DeviceId", deviceId);
ioTHubmessage.Properties.Add("port", port.ToString());
ioTHubmessage.Properties.Add("Simulated", payload.Simulated.ToString());
await deviceClient.SendEventAsync(ioTHubmessage);
logger.LogInformation("Uplink-DeviceID:{0} SendEventAsync success", payload.EndDeviceIds.DeviceId);
}
}
catch (Exception ex)
{
logger.LogError(ex, "Uplink-Message processing failed");
return req.CreateResponse(HttpStatusCode.InternalServerError);
}
I used Telerik Fiddler and some sample payloads copied from my Azure Storage Queue sample to simulate many devices and the registrations were spread across my five Azure IoT Hubs.
DPS Device Registrations tab showing distribution of LoRaWAN Devices
I need to review the HTTP Error codes returned for different errors and ensure failures are handled robustly.
In this version a downlink message can be sent to a device only after an uplink message. I’m looking at adding an Azure Function which initiates a connection to the configured Azure IoT Hub for the specified device to mitigate with this issue.
To send a TTN downlink message to a device the minimum required info is the LoRaWAN port number (specified in a Custom Property on the Azure IoT Hub cloud to device message), the device Id (from uplink message payload, which has been validated by a successful Azure IoT Hub connection) web hook id, web hook base URL, and an API Key (The Web Hook parameters are stored in the Connector configuration).
After some experimentation in previous TTN Connectors I found the synchronous nature of DirectMethods didn’t work well with LoRAWAN “irregular” connectivity so currently they are ignored.
public partial class Integration
{
private async Task AzureIoTHubClientReceiveMessageHandler(Message message, object userContext)
{
try
{
Models.AzureIoTHubReceiveMessageHandlerContext receiveMessageHandlerContext = (Models.AzureIoTHubReceiveMessageHandlerContext)userContext;
if (!_DeviceClients.TryGetValue(receiveMessageHandlerContext.DeviceId, out DeviceClient deviceClient))
{
_logger.LogWarning("Downlink-DeviceID:{0} unknown", receiveMessageHandlerContext.DeviceId);
return;
}
using (message)
{
string payloadText = Encoding.UTF8.GetString(message.GetBytes()).Trim();
if (!AzureDownlinkMessage.PortTryGet(message.Properties, out byte port))
{
_logger.LogWarning("Downlink-Port property is invalid");
await deviceClient.RejectAsync(message);
return;
}
// Split over multiple lines in an attempt to improve readability. In this scenario a valid JSON string should start/end with {/} for an object or [/] for an array
if ((payloadText.StartsWith("{") && payloadText.EndsWith("}"))
||
((payloadText.StartsWith("[") && payloadText.EndsWith("]"))))
{
try
{
downlink.PayloadDecoded = JToken.Parse(payloadText);
}
catch (JsonReaderException)
{
downlink.PayloadRaw = payloadText;
}
}
else
{
downlink.PayloadRaw = payloadText;
}
_logger.LogInformation("Downlink-IoT Hub DeviceID:{0} MessageID:{1} LockToken :{2} Port{3}",
receiveMessageHandlerContext.DeviceId,
message.MessageId,
message.LockToken,
downlink.Port);
Models.DownlinkPayload Payload = new Models.DownlinkPayload()
{
Downlinks = new List<Models.Downlink>()
{
downlink
}
};
string url = $"{receiveMessageHandlerContext.WebhookBaseURL}/{receiveMessageHandlerContext.ApplicationId}/webhooks/{receiveMessageHandlerContext.WebhookId}/devices/{receiveMessageHandlerContext.DeviceId}/down/replace");
using (var client = new WebClient())
{
client.Headers.Add("Authorization", $"Bearer {receiveMessageHandlerContext.ApiKey}");
client.UploadString(new Uri(url), JsonConvert.SerializeObject(Payload));
}
_logger.LogInformation("Downlink-DeviceID:{0} LockToken:{1} success", receiveMessageHandlerContext.DeviceId, message.LockToken);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Downlink-ReceiveMessge processing failed");
}
}
}
public partial class Webhooks
{
[Function("Uplink")]
public async Task<HttpResponseData> Uplink([HttpTrigger(AuthorizationLevel.Function, "post")] HttpRequestData req, FunctionContext executionContext)
{
var logger = executionContext.GetLogger("Uplink");
// Wrap all the processing in a try\catch so if anything blows up we have logged it. Will need to specialise for connectivity failues etc.
try
{
Models.PayloadUplink payload = JsonConvert.DeserializeObject<Models.PayloadUplink>(await req.ReadAsStringAsync());
if (payload == null)
{
logger.LogInformation("Uplink: Payload {0} invalid", await req.ReadAsStringAsync());
return req.CreateResponse(HttpStatusCode.BadRequest);
}
string applicationId = payload.EndDeviceIds.ApplicationIds.ApplicationId;
string deviceId = payload.EndDeviceIds.DeviceId;
if ((payload.UplinkMessage.Port == null ) || (!payload.UplinkMessage.Port.HasValue) || (payload.UplinkMessage.Port.Value == 0))
{
logger.LogInformation("Uplink-ApplicationID:{0} DeviceID:{1} Payload Raw:{2} Control nessage", applicationId, deviceId, payload.UplinkMessage.PayloadRaw);
return req.CreateResponse(HttpStatusCode.BadRequest);
}
int port = payload.UplinkMessage.Port.Value;
logger.LogInformation("Uplink-ApplicationID:{0} DeviceID:{1} Port:{2} Payload Raw:{3}", applicationId, deviceId, port, payload.UplinkMessage.PayloadRaw);
if (!_DeviceClients.TryGetValue(deviceId, out DeviceClient deviceClient))
{
logger.LogInformation("Uplink-Unknown device for ApplicationID:{0} DeviceID:{1}", applicationId, deviceId);
deviceClient = DeviceClient.CreateFromConnectionString(_configuration.GetConnectionString("AzureIoTHub"), deviceId);
try
{
await deviceClient.OpenAsync();
}
catch (DeviceNotFoundException)
{
logger.LogWarning("Uplink-Unknown DeviceID:{0}", deviceId);
return req.CreateResponse(HttpStatusCode.NotFound);
}
if (!_DeviceClients.TryAdd(deviceId, deviceClient))
{
logger.LogWarning("Uplink-TryAdd failed for ApplicationID:{0} DeviceID:{1}", applicationId, deviceId);
return req.CreateResponse(HttpStatusCode.Conflict);
}
}
JObject telemetryEvent = new JObject
{
{ "ApplicationID", applicationId },
{ "DeviceID", deviceId },
{ "Port", port },
{ "PayloadRaw", payload.UplinkMessage.PayloadRaw }
};
// If the payload has been decoded by payload formatter, put it in the message body.
if (payload.UplinkMessage.PayloadDecoded != null)
{
telemetryEvent.Add("PayloadDecoded", payload.UplinkMessage.PayloadDecoded);
}
// Send the message to Azure IoT Hub
using (Message ioTHubmessage = new Message(Encoding.ASCII.GetBytes(JsonConvert.SerializeObject(telemetryEvent))))
{
// Ensure the displayed time is the acquired time rather than the uploaded time.
ioTHubmessage.Properties.Add("iothub-creation-time-utc", payload.UplinkMessage.ReceivedAtUtc.ToString("s", CultureInfo.InvariantCulture));
ioTHubmessage.Properties.Add("ApplicationId", applicationId);
ioTHubmessage.Properties.Add("DeviceEUI", payload.EndDeviceIds.DeviceEui);
ioTHubmessage.Properties.Add("DeviceId", deviceId);
ioTHubmessage.Properties.Add("port", port.ToString());
await deviceClient.SendEventAsync(ioTHubmessage);
}
}
catch (Exception ex)
{
logger.LogError(ex, "Uplink message processing failed");
return req.CreateResponse(HttpStatusCode.InternalServerError);
}
return req.CreateResponse(HttpStatusCode.OK);
}
}
For initial development and testing I ran the function application in the desktop emulator and simulated TTI webhook calls with Telerik Fiddler and modified TTI sample payloads.
I then used Azure IoT Explorer to configure devices, view uplink traffic etc. When I connected to my Azure IoT Hub shortly after starting the application all the devices were disconnected.
Azure IoT Explorer – no connected devices
The SeeeduinoLoRaWAN devices report roughly every 15 minutes so it took a while for them all to connect. (the SeeeduinoLoRaWAN4 & SeeeduinoLoRaWAN6 need to be repaired) .
After some experimentation I realised that I had forgotten that the order of message processing was important e.g. a TTI Queued message should be processed before the associated Ack. This could (and did happen) because I had a queue for each message type and in addition the Azure Queue Storage trigger binding would use parallel execution to process backlogs of messages. My approach caused issues with both intra and inter queue message ordering
namespace devMobile.IoT.TheThingsIndustries.HttpInputStorageQueueOutput
{
using System.Net;
using System.Threading.Tasks;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Azure.Functions.Worker.Http;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;
[StorageAccount("AzureWebJobsStorage")]
public static class Webhooks
{
[Function("Uplink")]
public static async Task<HttpTriggerUplinkOutputBindingType> Uplink([HttpTrigger(AuthorizationLevel.Function, "post")] HttpRequestData req, FunctionContext context)
{
var logger = context.GetLogger("UplinkMessage");
logger.LogInformation("Uplink processed");
var response = req.CreateResponse(HttpStatusCode.OK);
return new HttpTriggerUplinkOutputBindingType()
{
Name = await req.ReadAsStringAsync(),
HttpReponse = response
};
}
public class HttpTriggerUplinkOutputBindingType
{
[QueueOutput("uplink")]
public string Name { get; set; }
public HttpResponseData HttpReponse { get; set; }
}
...
[Function("Failed")]
public static async Task<HttpTriggerFailedOutputBindingType> Failed([HttpTrigger(AuthorizationLevel.Function, "post")] HttpRequestData req, FunctionContext context)
{
var logger = context.GetLogger("Failed");
logger.LogInformation("Failed procssed");
var response = req.CreateResponse(HttpStatusCode.OK);
return new HttpTriggerFailedOutputBindingType()
{
Name = await req.ReadAsStringAsync(),
HttpReponse = response
};
}
public class HttpTriggerFailedOutputBindingType
{
[QueueOutput("failed")]
public string Name { get; set; }
public HttpResponseData HttpReponse { get; set; }
}
}
}
Azure Functions Desktop Development environment running my functions
I used Telerik Fiddler with some sample payloads to test my application.
Telerik Fiddler Request Composer “posting” sample message to desktop endpoint
Once the functions were running reliably on my desktop, I created an Azure Service Plan, deployed the code, then generated an API Key for securing my HTTPTrigger endpoints.
Azure Functions Host Key configuration dialog
I then added a TTI Webhook Integration to my TTI SeeduinoLoRaWAN application, manually configured the endpoint, enabled the different messages I wanted to process and set the x-functions-key header.
TTI Application Webhook configuration
After a short delay I could see messages in the message uplink queue with Azure Storage Explorer
Azure Storage Explorer displaying content of my uplink queue
Building a new version of my TTIV3 Azure IoT connector is a useful learning exercise but I’m still deciding whether is it worth the effort as TTI has one now?
Earlier in the year I built Things Network(TTN)V2 and V3 connectors and after using these in production applications I have learnt a lot about what I had got wrong, less wrong and what I had got right.
Using a TTN V3MQTTApplication integration wasn’t a great idea. The management of state was very complex. The storage of application keys in a app.settings file made configuration easy but was bad for security.
The use of Azure Key Vault in the TTNV2 connector was a good approach, but the process of creation and updating of the settings needs to be easier.
Using TTN device registry as the “single source of truth” was a good decision as managing the amount of LoRaWAN network, application and device specific configuration in an Azure IoT Hub would be non-trivial.
As part of my “day job” I spend a lot of time working with C# and VB.Net 4.X “legacy” projects doing upgrades, bugs fixes and moving applications to Azure. For the last couple of months I have been working on a project replacing Microsoft message queue(MSMQ) queues with Azure Storage Queues so the solution is easier to deploy in Azure.
The next phase of the project is to replace a number of Windows Services with Azure Queue Trigger and Timer Trigger functions. The aim is a series of small steps which we can test before deployment rather than major changes, hence the use of V1 Azure functions for the first release.
Silver Fox systems sells a Visual Studio extension which generates an HTTP Trigger VB.Net project. I needed Timer and Queue Trigger functions so I created C# examples and then used them to figure out how to build VB.Net equivalents
Visual Studio Solution Explorer
After quite a few failed attempts I found this sequence worked for me
Add a new VB.Net class library
Provide a name for new class library
Select target framework
Even though the target platform is not .NET 5.0 ignore this and continue.
Microsoft.NET.Sdk.Functions
Added Microsoft.NET.Sdk.Functions (make sure version 1.0.38)
At this point the project should compile but won’t do much, so update the class to look like the code below.
Imports System.Threading
Imports Microsoft.Azure.WebJobs
Imports Microsoft.Extensions.Logging
Public Class TimerTrigger
Shared executionCount As Int32
<FunctionName("Timer")>
Public Shared Sub Run(<TimerTrigger("0 */1 * * * *")> myTimer As TimerInfo, log As ILogger)
Interlocked.Increment(executionCount)
log.LogInformation("VB.Net TimerTrigger next trigger:{0} Execution count:{1}", myTimer.ScheduleStatus.Next, executionCount)
End Sub
End Class
Then add an empty hosts.json file (make sure “copy if newer” is configured in properties) to the project directory, then depending on deployment model configure the AzureWebJobsStorage and AzureWebJobsDashboard connection strings via environment variables or a local.settings.json file.
Visual Studio Environment variables for AzureWebJobsStorage and AzureWebJobsDashboard connection strings
Imports System.IO
Imports System.Threading
Imports Microsoft.Azure.WebJobs
Imports Microsoft.Extensions.Logging
Public Class BlobTrigger
Shared executionCount As Int32
' This function will get triggered/executed when a new message is written on an Azure Queue called events.
<FunctionName("Notifications")>
Public Shared Async Sub Run(<BlobTrigger("notifications/{name}", Connection:="BlobEndPoint")> payload As Stream, name As String, log As ILogger)
Interlocked.Increment(executionCount)
log.LogInformation("VB.Net BlobTrigger processed blob name:{0} Size:{1} bytes Execution count:{2}", name, payload.Length, executionCount)
End Sub
End Class
Imports System.Net
Imports System.Net.Http
Imports System.Threading
Imports Microsoft.Azure.WebJobs
Imports Microsoft.Azure.WebJobs.Extensions.Http
Imports Microsoft.Extensions.Logging
Public Class HttpTrigger
Shared executionCount As Int32
<FunctionName("Notifications")>
Public Shared Async Function Run(<HttpTrigger(AuthorizationLevel.Anonymous, "get", "post", Route:=Nothing)> req As HttpRequestMessage, log As ILogger) As Task(Of HttpResponseMessage)
Interlocked.Increment(executionCount)
log.LogInformation($"VB.Net HTTP trigger Execution count:{0} Method:{1}", executionCount, req.Method)
Return New HttpResponseMessage(HttpStatusCode.OK)
End Function
End Class
Imports System.Threading
Imports Microsoft.Azure.WebJobs
Imports Microsoft.Extensions.Logging
Public Class QueueTrigger
Shared ConcurrencyCount As Long
Shared ExecutionCount As Long
<FunctionName("Alerts")>
Public Shared Sub ProcessQueueMessage(<QueueTrigger("notifications", Connection:="QueueEndpoint")> message As String, log As ILogger)
Interlocked.Increment(ConcurrencyCount)
Interlocked.Increment(ExecutionCount)
log.LogInformation("VB.Net Concurrency:{0} Message:{1} Execution count:{2}", ConcurrencyCount, message, ExecutionCount)
' Wait for a bit to force some consurrency
Thread.Sleep(5000)
Interlocked.Decrement(ConcurrencyCount)
End Sub
End Class
As well as counting the number of executions I also wanted to check that >1 instances were started to process messages when the queues had many messages. I added a “queues” section to the hosts.json file so I could tinker with the options.
Imports System.Threading
Imports Microsoft.Azure.WebJobs
Imports Microsoft.Extensions.Logging
Public Class TimerTrigger
Shared executionCount As Int32
<FunctionName("Timer")>
Public Shared Sub Run(<TimerTrigger("0 */1 * * * *")> myTimer As TimerInfo, log As ILogger)
Interlocked.Increment(executionCount)
log.LogInformation("VB.Net TimerTrigger next trigger:{0} Execution count:{1}", myTimer.ScheduleStatus.Next, executionCount)
End Sub
End Class
The source code for the C# and VB.Net functions is available on GitHub