TTI V3 Connector Azure IoT Central Cloud to Device(C2D)

Handling Cloud to Device(D2C) Azure IoT Central messages (The Things Industries(TTI) downlink) is a bit more complex than Device To Cloud(D2C) messaging. The format of the command messages is reasonably well documented and I have already explored in detail with basic telemetry, basic commands, request commands, and The Things Industries Friendly commands and Digital Twin Definition Language(DTDL) support.

public class IoTHubApplicationSetting
{
	public string DtdlModelId { get; set; }
}

public class IoTHubSettings
{
	public string IoTHubConnectionString { get; set; } = string.Empty;

	public Dictionary<string, IoTHubApplicationSetting> Applications { get; set; }
}


public class DeviceProvisiongServiceApplicationSetting
{
	public string DtdlModelId { get; set; } = string.Empty;

	public string GroupEnrollmentKey { get; set; } = string.Empty;
}

public class DeviceProvisiongServiceSettings
{
	public string IdScope { get; set; } = string.Empty;

	public Dictionary<string, DeviceProvisiongServiceApplicationSetting> Applications { get; set; }
}


public class IoTCentralMethodSetting
{
	public byte Port { get; set; } = 0;

	public bool Confirmed { get; set; } = false;

	public Models.DownlinkPriority Priority { get; set; } = Models.DownlinkPriority.Normal;

	public Models.DownlinkQueue Queue { get; set; } = Models.DownlinkQueue.Replace;
}

public class IoTCentralSetting
{
	public Dictionary<string, IoTCentralMethodSetting> Methods { get; set; }
}

public class AzureIoTSettings
{
	public IoTHubSettings IoTHub { get; set; }

	public DeviceProvisiongServiceSettings DeviceProvisioningService { get; set; }

	public IoTCentralSetting IoTCentral { get; set; }
}

Azure IoT Central appears to have no support for setting message properties so the LoRaWAN port, confirmed flag, priority, and queuing so these a retrieved from configuration.

Azure Function Configuration
Models.Downlink downlink;
Models.DownlinkQueue queue;

string payloadText = Encoding.UTF8.GetString(message.GetBytes()).Trim();

if (message.Properties.ContainsKey("method-name"))
{
	#region Azure IoT Central C2D message processing
	string methodName = message.Properties["method-name"];

	if (string.IsNullOrWhiteSpace(methodName))
	{
		_logger.LogWarning("Downlink-DeviceID:{0} MessagedID:{1} LockToken:{2} method-name property empty", receiveMessageHandlerContext.DeviceId, message.MessageId, message.LockToken);

		await deviceClient.RejectAsync(message);
		return;
	}

	// Look up the method settings to get confirmed, port, priority, and queue
	if ((_azureIoTSettings == null) || (_azureIoTSettings.IoTCentral == null) || !_azureIoTSettings.IoTCentral.Methods.TryGetValue(methodName, out IoTCentralMethodSetting methodSetting))
	{
		_logger.LogWarning("Downlink-DeviceID:{0} MessagedID:{1} LockToken:{2} method-name:{3} has no settings", receiveMessageHandlerContext.DeviceId, message.MessageId, message.LockToken, methodName);
							
		await deviceClient.RejectAsync(message);
		return;
	}

	downlink = new Models.Downlink()
	{
		Confirmed = methodSetting.Confirmed,
		Priority = methodSetting.Priority,
		Port = methodSetting.Port,
		CorrelationIds = AzureLockToken.Add(message.LockToken),
	};

	queue = methodSetting.Queue;

	// Check to see if special case for Azure IoT central command with no request payload
	if (payloadText.IsPayloadEmpty())
	{
		downlink.PayloadRaw = "";
	}

	if (!payloadText.IsPayloadEmpty())
	{
		if (payloadText.IsPayloadValidJson())
		{
			downlink.PayloadDecoded = JToken.Parse(payloadText);
			}
		else
		{
			downlink.PayloadDecoded = new JObject(new JProperty(methodName, payloadText));
		}
	}

	logger.LogInformation("Downlink-IoT Central DeviceID:{0} Method:{1} MessageID:{2} LockToken:{3} Port:{4} Confirmed:{5} Priority:{6} Queue:{7}",
		receiveMessageHandlerContext.DeviceId,
		methodName,
		message.MessageId,
		message.LockToken,
		downlink.Port,
		downlink.Confirmed,
		downlink.Priority,
		queue);
	#endregion
}

The reboot command payload only contains an “@” so the TTTI payload will be empty, the minimum and maximum command payloads will contain only a numeric value which is added to the decoded payload with the method name, the combined minimum and maximum command has a JSON payload which is “grafted” into the decoded payload.

Azure IoT Central Device Template

TTI V3 Connector Azure IoT Central Device to Cloud(D2C)

This post is largely about adapting the output of The Things Industries(TTI) MyDevices Cayenne Low Power Protocol(LPP) payload formatter so that it can be injested by Azure IoT Central. The Azure function for processing TTI Uplink messages first deserialises the JSON payload discarding any LoRaWAN control messages and messages with empty payloads.

[Function("Uplink")]
public async Task<HttpResponseData> Uplink([HttpTrigger(AuthorizationLevel.Function, "post")] HttpRequestData req, FunctionContext executionContext)
{
	Models.PayloadUplink payload;
	var logger = executionContext.GetLogger("Queued");

	// Wrap all the processing in a try\catch so if anything blows up we have logged it.
	try
	{
		string payloadText = await req.ReadAsStringAsync();

		try
		{
			payload = JsonConvert.DeserializeObject<Models.PayloadUplink>(payloadText);
		}
		catch(JsonException ex)
		{
			logger.LogInformation(ex, "Uplink-Payload Invalid JSON:{0}", payloadText);

			return req.CreateResponse(HttpStatusCode.BadRequest);
		}

		if (payload == null)
		{
			logger.LogInformation("Uplink-Payload invalid:{0}", payloadText);

			return req.CreateResponse(HttpStatusCode.BadRequest);
		}

		string applicationId = payload.EndDeviceIds.ApplicationIds.ApplicationId;
		string deviceId = payload.EndDeviceIds.DeviceId;

		if ((payload.UplinkMessage.Port == null) || (!payload.UplinkMessage.Port.HasValue) || (payload.UplinkMessage.Port.Value == 0))
		{
			logger.LogInformation("Uplink-ApplicationID:{0} DeviceID:{1} Payload Raw:{2} Control message", applicationId, deviceId, payload.UplinkMessage.PayloadRaw);

			return req.CreateResponse(HttpStatusCode.UnprocessableEntity);
		}

		int port = payload.UplinkMessage.Port.Value;

		logger.LogInformation("Uplink-ApplicationID:{0} DeviceID:{1} Port:{2} Payload Raw:{3}", applicationId, deviceId, port, payload.UplinkMessage.PayloadRaw);

		if (!_DeviceClients.TryGetValue(deviceId, out DeviceClient deviceClient))
		{
...		
		}

		JObject telemetryEvent = new JObject
		{
			{ "ApplicationID", applicationId },
			{ "DeviceID", deviceId },
			{ "Port", port },
			{ "Simulated", payload.Simulated },
			{ "ReceivedAtUtc", payload.UplinkMessage.ReceivedAtUtc.ToString("s", CultureInfo.InvariantCulture) },
			{ "PayloadRaw", payload.UplinkMessage.PayloadRaw }
		};

		// If the payload has been decoded by payload formatter, put it in the message body.
		if (payload.UplinkMessage.PayloadDecoded != null)
		{
			EnumerateChildren(telemetryEvent, payload.UplinkMessage.PayloadDecoded);
		}

		// Send the message to Azure IoT Hub
		using (Message ioTHubmessage = new Message(Encoding.ASCII.GetBytes(JsonConvert.SerializeObject(telemetryEvent))))
		{
			// Ensure the displayed time is the acquired time rather than the uploaded time. 
			ioTHubmessage.Properties.Add("iothub-creation-time-utc", payload.UplinkMessage.ReceivedAtUtc.ToString("s", CultureInfo.InvariantCulture));
			ioTHubmessage.Properties.Add("ApplicationId", applicationId);
			ioTHubmessage.Properties.Add("DeviceEUI", payload.EndDeviceIds.DeviceEui);
			ioTHubmessage.Properties.Add("DeviceId", deviceId);
			ioTHubmessage.Properties.Add("port", port.ToString());
			ioTHubmessage.Properties.Add("Simulated", payload.Simulated.ToString());

			await deviceClient.SendEventAsync(ioTHubmessage);

			logger.LogInformation("Uplink-DeviceID:{0} SendEventAsync success", payload.EndDeviceIds.DeviceId);
		}
	}
	catch (Exception ex)
	{
		logger.LogError(ex, "Uplink-Message processing failed");

		return req.CreateResponse(HttpStatusCode.InternalServerError);
	}

	return req.CreateResponse(HttpStatusCode.OK);
}

If the message has been successfully decoded by a payload formatter the PayloadDecoded contents will be “grafted” into the Azure IoT Central Telemetry message.

TTI JSON GPS position format

The Azure IoT Central Location Telemetry messages have a slightly different format to the output of the TTI LPP Payload formatter so the payload has to be “post processed”.

private void EnumerateChildren(JObject jobject, JToken token)
{
	if (token is JProperty property)
	{
		if (token.First is JValue)
		{
			// Temporary dirty hack for Azure IoT Central compatibility
			if (token.Parent is JObject possibleGpsProperty)
			{
				// TODO Need to check if similar approach necessary accelerometer and gyro LPP payloads
				if (possibleGpsProperty.Path.StartsWith("GPS_", StringComparison.OrdinalIgnoreCase))
				{
					if (string.Compare(property.Name, "Latitude", true) == 0)
					{
						jobject.Add("lat", property.Value);
					}
					if (string.Compare(property.Name, "Longitude", true) == 0)
					{
						jobject.Add("lon", property.Value);
					}
					if (string.Compare(property.Name, "Altitude", true) == 0)
					{
						jobject.Add("alt", property.Value);
					}
				}
			}
			jobject.Add(property.Name, property.Value);
		}
		else
		{
			JObject parentObject = new JObject();
			foreach (JToken token2 in token.Children())
			{
				EnumerateChildren(parentObject, token2);
				jobject.Add(property.Name, parentObject);
			}
		}
	}
	else
	{
		foreach (JToken token2 in token.Children())
		{
			EnumerateChildren(jobject, token2);
		}
	}
}

I may have to extend this method for other LPP datatypes

“Post processed” TTI JSON GPS Position data suitable for Azure IoT Central

To test the telemetry message JSON I created an Azure IoT Central Device Template which had a “capability type” of Location.

Azure IoT Central Device Template with Location Capability

For initial development and testing I ran the function application in the desktop emulator and simulated TTI webhook calls with Telerik Fiddler and modified sample payloads. After some issues with iothub-creation-time-utc decoded telemetry messages were displayed in the Device Raw Data tab

Azure IoT Central Device Raw Data tab with successfully decoded GPS location payloads
Azure IoT Central map displaying with device location highlighted

This post uses a lot of the work done for my The Things Network V2 integration. I also found the first time a device connected to the Azure IoT Central Azure IoT hub (using the Azure IoT Central Device Provisioning Service(DPS) to get the connection string) there was always an exception.

Microsoft.Azure.Devices.Client.Exceptions.IotHubException: error(condition:com.microsoft:connection-closed-on-new-connection,description:Backend initiated disconnection.

TTI V3 Gateway Azure IoT Central first call exception

This exception occurs when the SetMethodDefaultHandlerAsync method is called which is a bit odd. This exception does not occur when I use Device Provisioning Service(DPS) and Azure IoT Hub instances I have provisioned.

TTI V3 Connector Cloud to Device(C2D)

The TTI V3 Connector Minimalist Cloud to Device only required a port number, and there was no way to specify whether delivery of message had to be confirmed, the way the message was queued, or the priority of message delivery. Like the port number these optional settings can be specified in message properties.

  • Confirmation – True/False
  • Queue – Push/Replace
  • Priority – Lowest/Low/BelowNormal/Normal/AboveNormal/High/Highest

If any of these properties are incorrect DeviceClient.RejectAsync is called which deletes the message from the device queue and indicates to the server that the message could not be processed.

private async Task AzureIoTHubClientReceiveMessageHandler(Message message, object userContext)
{
	try
	{
		Models.AzureIoTHubReceiveMessageHandlerContext receiveMessageHandlerContext = (Models.AzureIoTHubReceiveMessageHandlerContext)userContext;

		if (!_DeviceClients.TryGetValue(receiveMessageHandlerContext.DeviceId, out DeviceClient deviceClient))
		{
			_logger.LogWarning("Downlink-DeviceID:{0} unknown", receiveMessageHandlerContext.DeviceId);
			return;
		}

		using (message)
		{
			string payloadText = Encoding.UTF8.GetString(message.GetBytes()).Trim();

			if (!AzureDownlinkMessage.PortTryGet(message.Properties, out byte port))
			{
				_logger.LogWarning("Downlink-Port property is invalid");

				await deviceClient.RejectAsync(message);
				return;
			}

			if (!AzureDownlinkMessage.ConfirmedTryGet(message.Properties, out bool confirmed))
			{
				_logger.LogWarning("Downlink-Confirmed flag is invalid");

				await deviceClient.RejectAsync(message);
				return;
			}

			if (!AzureDownlinkMessage.PriorityTryGet(message.Properties, out Models.DownlinkPriority priority))
			{
				_logger.LogWarning("Downlink-Priority value is invalid");

				await deviceClient.RejectAsync(message);
				return;
			}

			if (!AzureDownlinkMessage.QueueTryGet(message.Properties, out Models.DownlinkQueue queue))
			{
				_logger.LogWarning("Downlink-Queue value is invalid");

				await deviceClient.RejectAsync(message.LockToken);
				return;
			}

			Models.Downlink downlink = new Models.Downlink()
			{
				Confirmed = confirmed,
				Priority = priority,
				Port = port,
				CorrelationIds = AzureLockToken.Add(message.LockToken),
			};

			// Split over multiple lines in an attempt to improve readability. In this scenario a valid JSON string should start/end with {/} for an object or [/] for an array
			if ((payloadText.StartsWith("{") && payloadText.EndsWith("}"))
													||
				((payloadText.StartsWith("[") && payloadText.EndsWith("]"))))
			{
				try
				{
					downlink.PayloadDecoded = JToken.Parse(payloadText);
				}
				catch (JsonReaderException)
				{
					downlink.PayloadRaw = payloadText;
				}
			}
			else
			{
				downlink.PayloadRaw = payloadText;
			}

			_logger.LogInformation("Downlink-IoT Hub DeviceID:{0} MessageID:{2} LockToken:{3} Port:{4} Confirmed:{5} Priority:{6} Queue:{7}",
				receiveMessageHandlerContext.DeviceId,
				message.MessageId,
				message.LockToken,
				downlink.Port,
				downlink.Confirmed,
				downlink.Priority,
				queue);

			Models.DownlinkPayload Payload = new Models.DownlinkPayload()
			{
				Downlinks = new List<Models.Downlink>()
				{
					downlink
				}
			};

			string url = $"{receiveMessageHandlerContext.WebhookBaseURL}/{receiveMessageHandlerContext.ApplicationId}/webhooks/{receiveMessageHandlerContext.WebhookId}/devices/{receiveMessageHandlerContext.DeviceId}/down/{queue}".ToLower();

			using (var client = new WebClient())
			{
				client.Headers.Add("Authorization", $"Bearer {receiveMessageHandlerContext.ApiKey}");

				client.UploadString(new Uri(url), JsonConvert.SerializeObject(Payload));
			}

			_logger.LogInformation("Downlink-DeviceID:{0} LockToken:{1} success", receiveMessageHandlerContext.DeviceId, message.LockToken);
		}
	}
	catch (Exception ex)
	{
		_logger.LogError(ex, "Downlink-ReceiveMessge processing failed");
	}
}

A correlation identifier containing the Message LockToken is added to the downlink payload.

Azure IoT Explorer Cloud to Device sending an unconfirmed downlink message

For unconfirmed messages The TTI Connector calls the DeviceClient.CompletedAsync method (with the LockToken from the CorrelationIDs list) which deletes the message from the device queue.

[Function("Queued")]
public async Task<HttpResponseData> Queued([HttpTrigger(AuthorizationLevel.Function, "post")] HttpRequestData req, FunctionContext executionContext)
{
	var logger = executionContext.GetLogger("Queued");

	// Wrap all the processing in a try\catch so if anything blows up we have logged it.
	try
	{
		string payloadText = await req.ReadAsStringAsync();

		Models.DownlinkQueuedPayload payload = JsonConvert.DeserializeObject<Models.DownlinkQueuedPayload>(payloadText);
		if (payload == null)
		{
			logger.LogInformation("Queued-Payload {0} invalid", payloadText);

			return req.CreateResponse(HttpStatusCode.BadRequest);
		}

		string applicationId = payload.EndDeviceIds.ApplicationIds.ApplicationId;
		string deviceId = payload.EndDeviceIds.DeviceId;

		logger.LogInformation("Queued-ApplicationID:{0} DeviceID:{1} ", applicationId, deviceId);

		if (!_DeviceClients.TryGetValue(deviceId, out DeviceClient deviceClient))
		{
			logger.LogInformation("Queued-Unknown device for ApplicationID:{0} DeviceID:{1}", applicationId, deviceId);

			return req.CreateResponse(HttpStatusCode.Conflict);
		}

		// If the message is not confirmed "complete" it as soon as with network
		if (!payload.DownlinkQueued.Confirmed)
		{
			if (!AzureLockToken.TryGet(payload.DownlinkQueued.CorrelationIds, out string lockToken))
			{
				logger.LogWarning("Queued-DeviceID:{0} LockToken missing from payload:{1}", payload.EndDeviceIds.DeviceId, payloadText);

				return req.CreateResponse(HttpStatusCode.BadRequest);
			}

			try
			{
				await deviceClient.CompleteAsync(lockToken);
			}
			catch (DeviceMessageLockLostException)
			{
				logger.LogWarning("Queued-CompleteAsync DeviceID:{0} LockToken:{1} timeout", payload.EndDeviceIds.DeviceId, lockToken);

				return req.CreateResponse(HttpStatusCode.Conflict);
			}

			logger.LogInformation("Queued-DeviceID:{0} LockToken:{1} success", payload.EndDeviceIds.DeviceId, lockToken);
		}
	}
	catch (Exception ex)
	{
		logger.LogError(ex, "Queued message processing failed");

		return req.CreateResponse(HttpStatusCode.InternalServerError);
	}

	return req.CreateResponse(HttpStatusCode.OK);
}

The Things Industries Live Data tab for an unconfirmed message-Queued
Azure Application Insights for an unconfirmed message
The Things Industries Live Data tab for an unconfirmed message-Sent
Azure IoT Explorer Cloud to Device sending a confirmed downlink message
Azure Application Insights for a confirmed message
The Things Industries Live Data tab for a confirmed message-Sent
The Things Industries Live Data tab for a confirmed message-Ack

If message delivery succeeds the deviceClient.CompleteAsync method (with the LockToken from the CorrelationIDs list) is called which removes the message from the device queue.

[Function("Ack")]
public async Task<HttpResponseData> Ack([HttpTrigger(AuthorizationLevel.Function, "post")] HttpRequestData req, FunctionContext executionContext)
{
	var logger = executionContext.GetLogger("Queued");

	// Wrap all the processing in a try\catch so if anything blows up we have logged it.
	try
	{
		string payloadText = await req.ReadAsStringAsync();

		Models.DownlinkAckPayload payload = JsonConvert.DeserializeObject<Models.DownlinkAckPayload>(payloadText);
		if (payload == null)
		{
			logger.LogInformation("Ack-Payload {0} invalid", payloadText);

			return req.CreateResponse(HttpStatusCode.BadRequest);
		}

		string applicationId = payload.EndDeviceIds.ApplicationIds.ApplicationId;
		string deviceId = payload.EndDeviceIds.DeviceId;

		logger.LogInformation("Ack-ApplicationID:{0} DeviceID:{1} ", applicationId, deviceId);

		if (!_DeviceClients.TryGetValue(deviceId, out DeviceClient deviceClient))
		{
			logger.LogInformation("Ack-Unknown device for ApplicationID:{0} DeviceID:{1}", applicationId, deviceId);

			return req.CreateResponse(HttpStatusCode.Conflict);
		}

		if (!AzureLockToken.TryGet(payload.DownlinkAck.CorrelationIds, out string lockToken))
		{
			logger.LogWarning("Ack-DeviceID:{0} LockToken missing from payload:{1}", payload.EndDeviceIds.DeviceId, payloadText);

			return req.CreateResponse(HttpStatusCode.BadRequest);
		}

		try
		{
			await deviceClient.CompleteAsync(lockToken);
		}
		catch (DeviceMessageLockLostException)
		{
			logger.LogWarning("Ack-CompleteAsync DeviceID:{0} LockToken:{1} timeout", payload.EndDeviceIds.DeviceId, lockToken);

			return req.CreateResponse(HttpStatusCode.Conflict);
		}

		logger.LogInformation("Ack-DeviceID:{0} LockToken:{1} success", payload.EndDeviceIds.DeviceId, lockToken);
	}
	catch (Exception ex)
	{
		logger.LogError(ex, "Ack message processing failed");

		return req.CreateResponse(HttpStatusCode.InternalServerError);
	}

	return req.CreateResponse(HttpStatusCode.OK);
}

Azure Application Insights for an confirmed message Ack

If message delivery fails the deviceClient.AbandonAsync method (with the LockToken from the CorrelationIDs list) is called which puts the downlink message back onto the device queue.

[Function("Failed")]
public async Task<HttpResponseData> Failed([HttpTrigger(AuthorizationLevel.Function, "post")] HttpRequestData req, FunctionContext executionContext)
{
	var logger = executionContext.GetLogger("Queued");

	// Wrap all the processing in a try\catch so if anything blows up we have logged it.
	try
	{
		string payloadText = await req.ReadAsStringAsync();

		Models.DownlinkFailedPayload payload = JsonConvert.DeserializeObject<Models.DownlinkFailedPayload>(payloadText);
		if (payload == null)
		{
			logger.LogInformation("Failed-Payload {0} invalid", payloadText);

			return req.CreateResponse(HttpStatusCode.BadRequest);
		}

		string applicationId = payload.EndDeviceIds.ApplicationIds.ApplicationId;
		string deviceId = payload.EndDeviceIds.DeviceId;

		logger.LogInformation("Failed-ApplicationID:{0} DeviceID:{1} ", applicationId, deviceId);

		if (!_DeviceClients.TryGetValue(deviceId, out DeviceClient deviceClient))
		{
			logger.LogInformation("Failed-Unknown device for ApplicationID:{0} DeviceID:{1}", applicationId, deviceId);

			return req.CreateResponse(HttpStatusCode.Conflict);
		}

		if (!AzureLockToken.TryGet(payload.DownlinkFailed.CorrelationIds, out string lockToken))
		{
			logger.LogWarning("Failed-DeviceID:{0} LockToken missing from payload:{1}", payload.EndDeviceIds.DeviceId, payloadText);

			return req.CreateResponse(HttpStatusCode.BadRequest);
		}

		try
		{
			await deviceClient.RejectAsync(lockToken);
		}
		catch (DeviceMessageLockLostException)
		{
			logger.LogWarning("Failed-RejectAsync DeviceID:{0} LockToken:{1} timeout", payload.EndDeviceIds.DeviceId, lockToken);

			return req.CreateResponse(HttpStatusCode.Conflict);
		}

		logger.LogInformation("Failed-DeviceID:{0} LockToken:{1} success", payload.EndDeviceIds.DeviceId, lockToken);
	}
	catch (Exception ex)
	{
		logger.LogError(ex, "Failed message processing failed");

		return req.CreateResponse(HttpStatusCode.InternalServerError);
	}

	return req.CreateResponse(HttpStatusCode.OK);
}

If message delivery is unsuccessful the deviceClient.RejectAsync method (with the LockToken from the CorrelationIDs list) is called which deletes the message from the device queue and indicates to the server that the message could not be processed.

[Function("Nack")]
public async Task<HttpResponseData> Nack([HttpTrigger(AuthorizationLevel.Function, "post")] HttpRequestData req, FunctionContext executionContext)
{
	var logger = executionContext.GetLogger("Queued");

	// Wrap all the processing in a try\catch so if anything blows up we have logged it.
	try
	{
		string payloadText = await req.ReadAsStringAsync();

		Models.DownlinkNackPayload payload = JsonConvert.DeserializeObject<Models.DownlinkNackPayload>(payloadText);
		if (payload == null)
		{
			logger.LogInformation("Nack-Payload {0} invalid", payloadText);

			return req.CreateResponse(HttpStatusCode.BadRequest);
		}

		string applicationId = payload.EndDeviceIds.ApplicationIds.ApplicationId;
		string deviceId = payload.EndDeviceIds.DeviceId;

		logger.LogInformation("Nack-ApplicationID:{0} DeviceID:{1} ", applicationId, deviceId);

		if (!_DeviceClients.TryGetValue(deviceId, out DeviceClient deviceClient))
		{
			logger.LogInformation("Nack-Unknown device for ApplicationID:{0} DeviceID:{1}", applicationId, deviceId);

			return req.CreateResponse(HttpStatusCode.Conflict);
		}

		if (!AzureLockToken.TryGet(payload.DownlinkNack.CorrelationIds, out string lockToken))
		{
			logger.LogWarning("Nack-DeviceID:{0} LockToken missing from payload:{1}", payload.EndDeviceIds.DeviceId, payloadText);

			return req.CreateResponse(HttpStatusCode.BadRequest);
		}

		try
		{
			await deviceClient.RejectAsync(lockToken);
		}
		catch (DeviceMessageLockLostException)
		{
			logger.LogWarning("Nack-RejectAsync DeviceID:{0} LockToken:{1} timeout", payload.EndDeviceIds.DeviceId, lockToken);

			return req.CreateResponse(HttpStatusCode.Conflict);
		}

		logger.LogInformation("Nack-DeviceID:{0} LockToken:{1} success", payload.EndDeviceIds.DeviceId, lockToken);
	}
	catch (Exception ex)
	{
		logger.LogError(ex, "Nack message processing failed");

		return req.CreateResponse(HttpStatusCode.InternalServerError);
	}

	return req.CreateResponse(HttpStatusCode.OK);
}

The way message Failed(Abandon), Ack(CompleteAsync) and Nack(RejectAsync) are handled needs some more testing to confirm my understanding of the sequencing of TTI confirmed message delivery.

BEWARE

The use of Confirmed messaging with devices that send uplink messages irregularly can cause weird problems if the Azure IoT hub downlink message times out.

TTI V3 Connector Device Provisioning Service(DPS) support

The previous versions of my Things Network Industries(TTI) and The Things Network(TTN) connectors supported the Azure IoT Hub Device Provisioning Service(DPS) with Symmetric Key Attestation(SAS) to “automagically” setup the LoRaWAN devices in a TTI Application.(See my V2 Gateway DPS setup post for more detail).

Azure Device Provisioning Service configuring Azure IoT Hubs

I used an “evenly weighted distribution” to spread the devices across five Azure IoT Hubs.

Azure IoT Hub no registered devices

In the Azure Portal I configured the DPS ID Scope (AzureSettings:DeviceProvisioningServiceSettings:IdScope) and the Group Enrollment Key(AzureSettings:DeviceProvisioningServiceSettings:GroupEnrollmentKey) then saved the configuration which restarted the AppService.

Azure Portal AppService configration

The first time a device sent an uplink message the cache query fails and the RegisterAsync method of the ProvisioningDeviceClient is called to get a device connection string.

	logger.LogInformation("Uplink-ApplicationID:{0} DeviceID:{1} Port:{2} Payload Raw:{3}", applicationId, deviceId, port, payload.UplinkMessage.PayloadRaw);

	if (!_DeviceClients.TryGetValue(deviceId, out DeviceClient deviceClient))
	{
		logger.LogInformation("Uplink-Unknown device for ApplicationID:{0} DeviceID:{1}", applicationId, deviceId);

		// Check that only one of Azure Connection string or DPS is configured
		if (string.IsNullOrEmpty(_azureSettings.IoTHubConnectionString) && (_azureSettings.DeviceProvisioningServiceSettings == null))
		{
			logger.LogError("Uplink-Neither Azure IoT Hub connection string or Device Provisioning Service configured");

			return req.CreateResponse(HttpStatusCode.UnprocessableEntity);
		}

		// Check that only one of Azure Connection string or DPS is configured
		if (!string.IsNullOrEmpty(_azureSettings.IoTHubConnectionString) && (_azureSettings.DeviceProvisioningServiceSettings != null))
		{
			logger.LogError("Uplink-Both Azure IoT Hub connection string and Device Provisioning Service configured");

			return req.CreateResponse(HttpStatusCode.UnprocessableEntity);
		}

		// User Azure IoT Connection string if configured and Device Provisioning Service isn't
		if (!string.IsNullOrEmpty(_azureSettings.IoTHubConnectionString))
		{
			deviceClient = DeviceClient.CreateFromConnectionString(_azureSettings.IoTHubConnectionString, deviceId, transportSettings);

			try
			{
				await deviceClient.OpenAsync();
			}
			catch (DeviceNotFoundException)
			{
				logger.LogWarning("Uplink-Unknown DeviceID:{0}", deviceId);

				return req.CreateResponse(HttpStatusCode.NotFound);
			}
		}

		// Azure IoT Hub Device provisioning service if configured
		if (_azureSettings.DeviceProvisioningServiceSettings != null) 
		{
			string deviceKey;

			if ( string.IsNullOrEmpty(_azureSettings.DeviceProvisioningServiceSettings.IdScope) || string.IsNullOrEmpty(_azureSettings.DeviceProvisioningServiceSettings.GroupEnrollmentKey))
			{
				logger.LogError("Uplink-Device Provisioning Service requires ID Scope and Group Enrollment Key configured");

				return req.CreateResponse(HttpStatusCode.UnprocessableEntity);
			}

			using (var hmac = new HMACSHA256(Convert.FromBase64String(_azureSettings.DeviceProvisioningServiceSettings.GroupEnrollmentKey)))
			{
				deviceKey = Convert.ToBase64String(hmac.ComputeHash(Encoding.UTF8.GetBytes(deviceId)));
			}

			using (var securityProvider = new SecurityProviderSymmetricKey(deviceId, deviceKey, null))
			{
				using (var transport = new ProvisioningTransportHandlerAmqp(TransportFallbackType.TcpOnly))
				{
					ProvisioningDeviceClient provClient = ProvisioningDeviceClient.Create(
						Constants.AzureDpsGlobalDeviceEndpoint,
						_azureSettings.DeviceProvisioningServiceSettings.IdScope,
						securityProvider,
						transport);

					DeviceRegistrationResult result = await provClient.RegisterAsync();

					if (result.Status != ProvisioningRegistrationStatusType.Assigned)
					{
						_logger.LogError("Config-DeviceID:{0} Status:{1} RegisterAsync failed ", deviceId, result.Status);

						return req.CreateResponse(HttpStatusCode.FailedDependency);
					}

					IAuthenticationMethod authentication = new DeviceAuthenticationWithRegistrySymmetricKey(result.DeviceId, (securityProvider as SecurityProviderSymmetricKey).GetPrimaryKey());

					deviceClient = DeviceClient.Create(result.AssignedHub, authentication, transportSettings);

					await deviceClient.OpenAsync();
				}
			}
		}

		if (!_DeviceClients.TryAdd(deviceId, deviceClient))
		{
			logger.LogWarning("Uplink-TryAdd failed for ApplicationID:{0} DeviceID:{1}", applicationId, deviceId);

			return req.CreateResponse(HttpStatusCode.Conflict);
		}

		Models.AzureIoTHubReceiveMessageHandlerContext context = new Models.AzureIoTHubReceiveMessageHandlerContext()
		{
			DeviceId = deviceId,
			ApplicationId = applicationId,
			WebhookId = _theThingsIndustriesSettings.WebhookId,
			WebhookBaseURL = _theThingsIndustriesSettings.WebhookBaseURL,
			ApiKey = _theThingsIndustriesSettings.ApiKey
		};

		await deviceClient.SetReceiveMessageHandlerAsync(AzureIoTHubClientReceiveMessageHandler, context);

		await deviceClient.SetMethodDefaultHandlerAsync(AzureIoTHubClientDefaultMethodHandler, context);
	}

	JObject telemetryEvent = new JObject
	{
		{ "ApplicationID", applicationId },
		{ "DeviceID", deviceId },
		{ "Port", port },
		{ "Simulated", payload.Simulated },
		{ "ReceivedAtUtc", payload.UplinkMessage.ReceivedAtUtc.ToString("s", CultureInfo.InvariantCulture) },
		{ "PayloadRaw", payload.UplinkMessage.PayloadRaw }
	};

	// If the payload has been decoded by payload formatter, put it in the message body.
	if (payload.UplinkMessage.PayloadDecoded != null)
	{
		telemetryEvent.Add("PayloadDecoded", payload.UplinkMessage.PayloadDecoded);
	}

	// Send the message to Azure IoT Hub
	using (Message ioTHubmessage = new Message(Encoding.ASCII.GetBytes(JsonConvert.SerializeObject(telemetryEvent))))
	{
		// Ensure the displayed time is the acquired time rather than the uploaded time. 
		ioTHubmessage.Properties.Add("iothub-creation-time-utc", payload.UplinkMessage.ReceivedAtUtc.ToString("s", CultureInfo.InvariantCulture));
		ioTHubmessage.Properties.Add("ApplicationId", applicationId);
		ioTHubmessage.Properties.Add("DeviceEUI", payload.EndDeviceIds.DeviceEui);
		ioTHubmessage.Properties.Add("DeviceId", deviceId);
		ioTHubmessage.Properties.Add("port", port.ToString());
		ioTHubmessage.Properties.Add("Simulated", payload.Simulated.ToString());

		await deviceClient.SendEventAsync(ioTHubmessage);

		logger.LogInformation("Uplink-DeviceID:{0} SendEventAsync success", payload.EndDeviceIds.DeviceId);
	}
}
catch (Exception ex)
{
	logger.LogError(ex, "Uplink-Message processing failed");

	return req.CreateResponse(HttpStatusCode.InternalServerError);
}

I used Telerik Fiddler and some sample payloads copied from my Azure Storage Queue sample to simulate many devices and the registrations were spread across my five Azure IoT Hubs.

DPS Device Registrations tab showing distribution of LoRaWAN Devices

I need to review the HTTP Error codes returned for different errors and ensure failures are handled robustly.

TTI V3 Connector Minimalist Cloud to Device(C2D)

In a previous version of my Things Network Industries(TTI) The Things Network(TTN) connector I queried the The Things Stack(TTS) Application Programing Interface(API) to get a list of Applications and their Devices. For a large number of Applications and/or Devices this process could take many 10’s of seconds. Application and Device creation and deletion then had to be tracked to keep the AzureDeviceClient connection list current, which added significant complexity.

In this version a downlink message can be sent to a device only after an uplink message. I’m looking at adding an Azure Function which initiates a connection to the configured Azure IoT Hub for the specified device to mitigate with this issue.

To send a TTN downlink message to a device the minimum required info is the LoRaWAN port number (specified in a Custom Property on the Azure IoT Hub cloud to device message), the device Id (from uplink message payload, which has been validated by a successful Azure IoT Hub connection) web hook id, web hook base URL, and an API Key (The Web Hook parameters are stored in the Connector configuration).

Azure IoT Explorer Clod to Device message with LoRaWAN Port number custom parameter

When a LoRaWAN device sends an Uplink message a session is established using Advanced Message Queuing Protocol(AMQP) so connections can be multiplexed)

I used Azure IoT Explorer to send Cloud to Device messages to the Azure IoT Hub (to initiate the sending of a downlink message to the Device by the Connector) after simulating a TTN uplink message with Telerik Fiddler and a modified TTN sample payload.

BEWARE – TTN URLs and Azure IoT Hub device identifiers are case sensitive

...
if (!_DeviceClients.TryGetValue(deviceId, out DeviceClient deviceClient))
{
   logger.LogInformation("Uplink-Unknown device for ApplicationID:{0} DeviceID:{1}", applicationId, deviceId);

   deviceClient = DeviceClient.CreateFromConnectionString(_configuration.GetConnectionString("AzureIoTHub"), deviceId, 
                    new ITransportSettings[]
                    {
                        new AmqpTransportSettings(TransportType.Amqp_Tcp_Only)
                        {
                            AmqpConnectionPoolSettings = new AmqpConnectionPoolSettings()
                            {
                                Pooling = true,
                            }
                        }
                    });

   try
   {
      await deviceClient.OpenAsync();
   }
   catch (DeviceNotFoundException)
   {
      logger.LogWarning("Uplink-Unknown DeviceID:{0}", deviceId);

      return req.CreateResponse(HttpStatusCode.NotFound);
   }

   if (!_DeviceClients.TryAdd(deviceId, deviceClient))
   {
      logger.LogWarning("Uplink-TryAdd failed for ApplicationID:{0} DeviceID:{1}", applicationId, deviceId);

      return req.CreateResponse(HttpStatusCode.Conflict);
   }

   Models.AzureIoTHubReceiveMessageHandlerContext context = new Models.AzureIoTHubReceiveMessageHandlerContext()
   { 
      DeviceId = deviceId,
      ApplicationId = applicationId,
      WebhookId = _configuration.GetSection("TheThingsIndustries").GetSection("WebhookId").Value,
      WebhookBaseURL = _configuration.GetSection("TheThingsIndustries").GetSection("WebhookBaseURL").Value,
      ApiKey = _configuration.GetSection("TheThingsIndustries").GetSection("APiKey").Value,
   };      

   await deviceClient.SetReceiveMessageHandlerAsync(AzureIoTHubClientReceiveMessageHandler, context);

   await deviceClient.SetMethodDefaultHandlerAsync(AzureIoTHubClientDefaultMethodHandler, context);
 }

...

An Azure IoT Hub can invoke methods(synchronous) or send messages(asynchronous) to a device for processing. The Azure IoT Hub DeviceClient has two methods SetMethodDefaultHandlerAsync and SetReceiveMessageHandlerAsync which enable the processing of direct methods and messages.

private async Task<MethodResponse> AzureIoTHubClientDefaultMethodHandler(MethodRequest methodRequest, object userContext)
{
	if (methodRequest.DataAsJson != null)
	{
		_logger.LogWarning("AzureIoTHubClientDefaultMethodHandler name:{0} payload:{1}", methodRequest.Name, methodRequest.DataAsJson);
	}
	else
	{
		_logger.LogWarning("AzureIoTHubClientDefaultMethodHandler name:{0} payload:NULL", methodRequest.Name);
	}

	return new MethodResponse(404);
}

After some experimentation in previous TTN Connectors I found the synchronous nature of DirectMethods didn’t work well with LoRAWAN “irregular” connectivity so currently they are ignored.

public partial class Integration
{
	private async Task AzureIoTHubClientReceiveMessageHandler(Message message, object userContext)
	{
		try
		{
			Models.AzureIoTHubReceiveMessageHandlerContext receiveMessageHandlerContext = (Models.AzureIoTHubReceiveMessageHandlerContext)userContext;

			if (!_DeviceClients.TryGetValue(receiveMessageHandlerContext.DeviceId, out DeviceClient deviceClient))
			{
				_logger.LogWarning("Downlink-DeviceID:{0} unknown", receiveMessageHandlerContext.DeviceId);
				return;
			}

			using (message)
			{
				string payloadText = Encoding.UTF8.GetString(message.GetBytes()).Trim();

				if (!AzureDownlinkMessage.PortTryGet(message.Properties, out byte port))
				{
					_logger.LogWarning("Downlink-Port property is invalid");

					await deviceClient.RejectAsync(message);
					return;
				}

				// Split over multiple lines in an attempt to improve readability. In this scenario a valid JSON string should start/end with {/} for an object or [/] for an array
				if ((payloadText.StartsWith("{") && payloadText.EndsWith("}"))
														||
					((payloadText.StartsWith("[") && payloadText.EndsWith("]"))))
				{
					try
					{
						downlink.PayloadDecoded = JToken.Parse(payloadText);
					}
					catch (JsonReaderException)
					{
						downlink.PayloadRaw = payloadText;
					}
				}
				else
				{
					downlink.PayloadRaw = payloadText;
				}

				_logger.LogInformation("Downlink-IoT Hub DeviceID:{0} MessageID:{1} LockToken :{2} Port{3}",
					receiveMessageHandlerContext.DeviceId,
					message.MessageId,
		            message.LockToken,
					downlink.Port);

				Models.DownlinkPayload Payload = new Models.DownlinkPayload()
				{
					Downlinks = new List<Models.Downlink>()
					{
						downlink
					}
				};

				string url = $"{receiveMessageHandlerContext.WebhookBaseURL}/{receiveMessageHandlerContext.ApplicationId}/webhooks/{receiveMessageHandlerContext.WebhookId}/devices/{receiveMessageHandlerContext.DeviceId}/down/replace");

				using (var client = new WebClient())
				{
					client.Headers.Add("Authorization", $"Bearer {receiveMessageHandlerContext.ApiKey}");

					client.UploadString(new Uri(url), JsonConvert.SerializeObject(Payload));
				}

				_logger.LogInformation("Downlink-DeviceID:{0} LockToken:{1} success", receiveMessageHandlerContext.DeviceId, message.LockToken);
			}
		}
		catch (Exception ex)
		{
			_logger.LogError(ex, "Downlink-ReceiveMessge processing failed");
		}
	}
}

If the message body contains a valid JavaScript Object Notation(JSON) payload it is “spliced” into the DownLink message decoded_payload field otherwise the Base64 encoded frm_payload is populated.

The Things “Industries Live” data tab downlink message

The SetReceiveMessageHandlerAsync context information is used to construct a TTN downlink message payload and request URL(with default queuing, message priority and confirmation options)

Arduino Serial Monitor displaying Uplink and Downlink messages

TTI V3 Connector Minimalist Device to Cloud(D2C)

After pausing my Azure Storage Queued based approach I built a quick Proof of Concept(PoC) with an HTTPTrigger Azure Function. The application has a single endpoint for processing uplink messages which is called by a The Things Industries(TTI) Webhooks integration.

The Things Industries Application Webhook configuration
namespace devMobile.IoT.TheThingsIndustries.AzureIoTHub
{
	using System.Collections.Concurrent;
	using Microsoft.Azure.Devices.Client;
...

	public partial class Integration
	{
...
		private static readonly ConcurrentDictionary<string, DeviceClient> _DeviceClients = new ConcurrentDictionary<string, DeviceClient>();
...
	}
}

The connector uses a ConcurrentDictionary(indexed by TTI deviceID) to cache Azure IoT Hub DeviceClient instances.

public partial class Webhooks
{
	[Function("Uplink")]
	public async Task<HttpResponseData> Uplink([HttpTrigger(AuthorizationLevel.Function, "post")] HttpRequestData req, FunctionContext executionContext)
	{
		var logger = executionContext.GetLogger("Uplink");

		// Wrap all the processing in a try\catch so if anything blows up we have logged it. Will need to specialise for connectivity failues etc.
		try
		{
			Models.PayloadUplink payload = JsonConvert.DeserializeObject<Models.PayloadUplink>(await req.ReadAsStringAsync());
			if (payload == null)
			{
				logger.LogInformation("Uplink: Payload {0} invalid", await req.ReadAsStringAsync());

				return req.CreateResponse(HttpStatusCode.BadRequest);
			}

			string applicationId = payload.EndDeviceIds.ApplicationIds.ApplicationId;
			string deviceId = payload.EndDeviceIds.DeviceId;

			if ((payload.UplinkMessage.Port == null ) || (!payload.UplinkMessage.Port.HasValue) || (payload.UplinkMessage.Port.Value == 0))
			{
				logger.LogInformation("Uplink-ApplicationID:{0} DeviceID:{1} Payload Raw:{2} Control nessage", applicationId, deviceId, payload.UplinkMessage.PayloadRaw);

				return req.CreateResponse(HttpStatusCode.BadRequest);
			}

			int port = payload.UplinkMessage.Port.Value;

			logger.LogInformation("Uplink-ApplicationID:{0} DeviceID:{1} Port:{2} Payload Raw:{3}", applicationId, deviceId, port, payload.UplinkMessage.PayloadRaw);

			if (!_DeviceClients.TryGetValue(deviceId, out DeviceClient deviceClient))
			{
				logger.LogInformation("Uplink-Unknown device for ApplicationID:{0} DeviceID:{1}", applicationId, deviceId);

				deviceClient = DeviceClient.CreateFromConnectionString(_configuration.GetConnectionString("AzureIoTHub"), deviceId);

				try
				{
					await deviceClient.OpenAsync();
				}
				catch (DeviceNotFoundException)
				{
					logger.LogWarning("Uplink-Unknown DeviceID:{0}", deviceId);

					return req.CreateResponse(HttpStatusCode.NotFound);
				}

				if (!_DeviceClients.TryAdd(deviceId, deviceClient))
				{
					logger.LogWarning("Uplink-TryAdd failed for ApplicationID:{0} DeviceID:{1}", applicationId, deviceId);

					return req.CreateResponse(HttpStatusCode.Conflict);
				}
			}

			JObject telemetryEvent = new JObject
			{
				{ "ApplicationID", applicationId },
				{ "DeviceID", deviceId },
				{ "Port", port },
				{ "PayloadRaw", payload.UplinkMessage.PayloadRaw }
			};

			// If the payload has been decoded by payload formatter, put it in the message body.
			if (payload.UplinkMessage.PayloadDecoded != null)
			{
				telemetryEvent.Add("PayloadDecoded", payload.UplinkMessage.PayloadDecoded);
			}

			// Send the message to Azure IoT Hub
			using (Message ioTHubmessage = new Message(Encoding.ASCII.GetBytes(JsonConvert.SerializeObject(telemetryEvent))))
			{
				// Ensure the displayed time is the acquired time rather than the uploaded time. 
				ioTHubmessage.Properties.Add("iothub-creation-time-utc", payload.UplinkMessage.ReceivedAtUtc.ToString("s", CultureInfo.InvariantCulture));
				ioTHubmessage.Properties.Add("ApplicationId", applicationId);
				ioTHubmessage.Properties.Add("DeviceEUI", payload.EndDeviceIds.DeviceEui);
				ioTHubmessage.Properties.Add("DeviceId", deviceId);
				ioTHubmessage.Properties.Add("port", port.ToString());

				await deviceClient.SendEventAsync(ioTHubmessage);
			}
		}
		catch (Exception ex)
		{
			logger.LogError(ex, "Uplink message processing failed");

			return req.CreateResponse(HttpStatusCode.InternalServerError);
		}

		return req.CreateResponse(HttpStatusCode.OK);
	}
}

For initial development and testing I ran the function application in the desktop emulator and simulated TTI webhook calls with Telerik Fiddler and modified TTI sample payloads.

Azure Functions Desktop development environment

I then deployed my function to Azure and configured the Azure IoT Hub connection string, Azure Application Insights key etc.

Azure Function configuration

I then used Azure IoT Explorer to configure devices, view uplink traffic etc. When I connected to my Azure IoT Hub shortly after starting the application all the devices were disconnected.

Azure IoT Explorer – no connected devices

The SeeeduinoLoRaWAN devices report roughly every 15 minutes so it took a while for them all to connect. (the SeeeduinoLoRaWAN4 & SeeeduinoLoRaWAN6 need to be repaired) .

Azure IoT Explorer – some connected devices

After a device had connected I could use Azure IoT Explorer to inspect the Seeeduino LoRaWAN device uplink message payloads.

Azure IoT Explorer displaying device telemetry

I also used Azure Application Insights to monitor the performance of the function and device activity.

Azure Application Insights displaying device telemetry

The Azure functions uplink message processor was then “soak tested” for a week without an issues.

Security Camera Azure IoT Hub Image upload

The final two projects of this series both upload images to the Azure Storage account associated with an Azure IoT Hub. One project uses a Timer to upload pictures with a configurable delay. The other uploads an image every time a General Purpose Input Output(GPIO) pin on the Raspberry PI3 is strobed.

Uniview IPC3635SB-ADZK-I0 Security camera test rig with Raspberry PI and PIR motion detector

I tried to keep the .Net Core 5 console applications as simple as possible, they download an image from the camera “snapshot” endpoint (In this case http://10.0.0.47:85/images/snapshot.jpg), save it to the local filesystem and then upload it.

The core of the two applications is the “upload” image method, which is called by a timer or GPIO pin EventHandler

private static async void ImageUpdateTimerCallback(object state)
{
	CommandLineOptions options = (CommandLineOptions)state;
	DateTime requestAtUtc = DateTime.UtcNow;

	// Just incase - stop code being called while retrival of the photo already in progress
	if (cameraBusy)
	{
		return;
	}
	cameraBusy = true;

	Console.WriteLine($"{requestAtUtc:yy-MM-dd HH:mm:ss} Image up load start");

	try
	{
		// First go and get the image file from the camera onto local file system
		using (var client = new WebClient())
		{
			NetworkCredential networkCredential = new NetworkCredential()
			{
				UserName = options.UserName,
				Password = options.Password
			};

			client.Credentials = networkCredential;

			await client.DownloadFileTaskAsync(new Uri(options.CameraUrl), options.LocalFilename);
		}

		// Then open the file ready to stream ito upto storage account associated with Azuure IoT Hub
		using (FileStream fileStreamSource = new FileStream(options.LocalFilename, FileMode.Open))
		{
			var fileUploadSasUriRequest = new FileUploadSasUriRequest
			{
				BlobName = string.Format("{0:yyMMdd}/{0:yyMMddHHmmss}.jpg", requestAtUtc)
			};

			// Get the plumbing sorted for where the file is going in Azure Storage
			FileUploadSasUriResponse sasUri = await azureIoTCentralClient.GetFileUploadSasUriAsync(fileUploadSasUriRequest);
			Uri uploadUri = sasUri.GetBlobUri();

			try
			{
				var blockBlobClient = new BlockBlobClient(uploadUri);

				var response = await blockBlobClient.UploadAsync(fileStreamSource, new BlobUploadOptions());

				var successfulFileUploadCompletionNotification = new FileUploadCompletionNotification()
				{
					// Mandatory. Must be the same value as the correlation id returned in the sas uri response
					CorrelationId = sasUri.CorrelationId,

					// Mandatory. Will be present when service client receives this file upload notification
					IsSuccess = true,

					// Optional, user defined status code. Will be present when service client receives this file upload notification
					StatusCode = 200,

					// Optional, user-defined status description. Will be present when service client receives this file upload notification
					StatusDescription = "Success"
				};

				await azureIoTCentralClient.CompleteFileUploadAsync(successfulFileUploadCompletionNotification);
			}
			catch (Exception ex)
			{
				Console.WriteLine($"Failed to upload file to Azure Storage using the Azure Storage SDK due to {ex}");

				var failedFileUploadCompletionNotification = new FileUploadCompletionNotification
				{
					// Mandatory. Must be the same value as the correlation id returned in the sas uri response
					CorrelationId = sasUri.CorrelationId,

					// Mandatory. Will be present when service client receives this file upload notification
					IsSuccess = false,

					// Optional, user-defined status code. Will be present when service client receives this file upload notification
					StatusCode = 500,

					// Optional, user defined status description. Will be present when service client receives this file upload notification
					StatusDescription = ex.Message
				};

				await azureIoTCentralClient.CompleteFileUploadAsync(failedFileUploadCompletionNotification);
			}
		}

		TimeSpan uploadDuration = DateTime.UtcNow - requestAtUtc;

		Console.WriteLine($"{requestAtUtc:yy-MM-dd HH:mm:ss} Image up load done. Duration:{uploadDuration.TotalMilliseconds:0.} mSec");
	}
	catch (Exception ex)
	{
		Console.WriteLine($"Camera image upload process failed {ex.Message}");
	}
	finally
	{
		cameraBusy = false;
	}
}

I have used Azure DeviceClient UploadToBlobAsync in other projects and it was a surprise to see it deprecated and replaced with GetFileUploadSasUriAsync and GetBlobUri with sample code from the development team.

string blobName = string.Format("{0:yyMMdd}/{0:yyMMddHHmmss}.jpg", requestAtUtc);

azureIoTCentralClient.UploadToBlobAsync(blobName, fileStreamSource);

It did seem to take a lot of code to implement what was previously a single line (I’m going try and find out why this method has been deprecated)

TImer application image uploader

Using Azure Storage Explorer I could view and download the images uploaded by the application(s) running on my development machine and Raspberry PI

Azure Storage Displaying most recent image uploaded by a RaspberryPI device

After confirming the program was working I used the excellent RaspberryDebugger to download the application and debug it on my Raspberry PI 3 running the Raspberry PI OS.

Now that the basics are working my plan is to figure out how to control the camera using Azure IoT Hub method calls, display live Real Time Streaming Protocol(RTSP) using Azure IoT Hub Device Streams, upload images to Azure Cognitive Services for processing and use ML.Net to process them locally.

Security Camera ONVIF Capabilities

The ONVIF specification standardises the network interface (the network layer) of network video products. It defines a communication framework based on relevant IETF and Web Services standards including security and IP configuration requirements.

After discovering a device the next step was to query it to determine its capabilities. I had some issues with .Net Core 5 application configuring the Windows Communication Foundation(WCF) to use Digest authentication (RFC2617) credentials on all bar the device management service client.

This .Net Core 5 console application queries the device management service (ONVID application programmers guide) to get the capabilities of the device then calls the media, imaging and pan tilt zoom services and displays the results.

I generated the client services using the Microsoft WCF Web Service Reference Provider.

Connected Services management dialog

The Uniform Resource Locators(URL) and namespace prefixes for each generated service are configured in the ConnectedService.json file.

First step configuring a WCF Service

Initially I used a devMobile.IoT.SecurityCameraClient prefix but after some experimentation changed to OnvifServices.

Second step configuring a WCF Service

For testing I selected “Generated Synchronous Operations” as they are easier to use in a console application while exploring the available functionality.

Third step configuring a WCF Service

The WSDL generated a number of warnings so I inspected the WSDL to see if the were easy to fix. I did consider copying the WSDL to my development box but it didn’t appear to be worth the effort.

SVCUtil warning messages about invalid Onvif WSDL

For this application I’m using the CommandLineParser NuGet package to parse and validate the client, username and password configured in the debugger tab.

Required Nuget packages
private static async Task ApplicationCore(CommandLineOptions options)
{
   Device deviceClient;
   ImagingPortClient imagingPortClient;
   MediaClient mediaClient;
   PTZClient panTiltZoomClient;

   var messageElement = new TextMessageEncodingBindingElement()
   {
      MessageVersion = MessageVersion.CreateVersion(EnvelopeVersion.Soap12, AddressingVersion.None),
      WriteEncoding = Encoding.UTF8
    };

    HttpTransportBindingElement httpTransportNoPassword = new HttpTransportBindingElement();
    CustomBinding bindingHttpNoPassword = new CustomBinding(messageElement, httpTransportNoPassword);
         
    HttpTransportBindingElement httpTransport = new HttpTransportBindingElement()
    {
       AuthenticationScheme = AuthenticationSchemes.Digest
    };
    CustomBinding bindingHttpPassword = new CustomBinding(messageElement, httpTransport);

    try
    {
       // Setup the imaging porting binding, use TLS, and ignore certificate errors
       deviceClient = new DeviceClient(bindingHttpNoPassword, new EndpointAddress($"http://{options.CameraUrl}/onvif/devicemgmt"));

       GetCapabilitiesResponse capabilitiesResponse = await deviceClient.GetCapabilitiesAsync(new GetCapabilitiesRequest(new CapabilityCategory[] { CapabilityCategory.All }));

       Console.WriteLine("Device capabilities");
       Console.WriteLine($"  Device: {capabilitiesResponse.Capabilities.Device.XAddr}");
       Console.WriteLine($"  Events: {capabilitiesResponse.Capabilities.Events.XAddr}"); // Not interested in events for V1
       Console.WriteLine($"  Imaging: {capabilitiesResponse.Capabilities.Imaging.XAddr}");
       Console.WriteLine($"  Media: {capabilitiesResponse.Capabilities.Media.XAddr}");
       Console.WriteLine($"  Pan Tilt Zoom: {capabilitiesResponse.Capabilities.PTZ.XAddr}");
       Console.WriteLine();
       ...
       Console.WriteLine($"Video Source Configuration");
       foreach (OnvifServices.Media.VideoSourceConfiguration videoSourceConfiguration in videoSourceConfigurations.Configurations)
      {
         Console.WriteLine($" Name: {videoSourceConfiguration.Name}");
         Console.WriteLine($" Token: {videoSourceConfiguration.token}");
         Console.WriteLine($" UseCount: {videoSourceConfiguration.UseCount}");
         Console.WriteLine($" Bounds: {videoSourceConfiguration.Bounds.x}:{videoSourceConfiguration.Bounds.y} {videoSourceConfiguration.Bounds.width}:{videoSourceConfiguration.Bounds.height}");
         Console.WriteLine($" View mode: {videoSourceConfiguration.ViewMode}");
      }
   }
   catch (Exception ex)
   {
      Console.WriteLine(ex.Message);
   }

   Console.WriteLine();
   Console.WriteLine("Press <enter> to exit");
   Console.ReadLine();
}

I had to do a bit of “null checking” as often if a feature wasn’t supported the root node was null. I need to get a selection of cameras (especially one with pan/tilt/zoom) to check that I’m processing the responses from the device correctly.

Console application output showing capabilities of Uniview device

After confirming the program was working on my development box I used the excellent RaspberryDebugger to download the application and run it on a Raspberry PI 3 running the Raspberry PI OS.

Security Camera ONVIF Discovery

The ONVIF specification standardises the network interface (the network layer) of network video products. It defines a communication framework based on relevant IETF and Web Services standards including security and IP configuration requirements. ONVIF uses Web Services Dynamic Discovery (WS-Discovery) to locate devices on the local network which operates over UDP port 3702 and uses IP multicast address 239.255.255.250.

The first issue was that WS-Discovery is not currently supported by the .Net Core Windows Communication Foundation(WCF) implementation CoreWCF(2021-08). So I built a proof of concept(PoC) client which used UDP to send and receive XML messages (WS-Discovery specification) to “probe” the local network.

My .Net Core 5 console application enumerates the host device’s network interfaces, then sends a “probe” message and waits for responses. The ONVID application programmers guide specifies the format of the “probe” request and response messages (One of the namespace prefixes in the sample is wrong). The client device can return its name and details of it’s capabilities in the response. Currently I only need the IP addresses of the cameras but if more information was required I would use the XML Serialisation functionality of .Net Core to generate the requests and unpack the responses.

class Program
{
	// From https://specs.xmlsoap.org/ws/2005/04/discovery/ws-discovery.pdf & http://www.onvif.org/wp-content/uploads/2016/12/ONVIF_WG-APG-Application_Programmers_Guide-1.pdf
	const string WSDiscoveryProbeMessages =
		"<?xml version = \"1.0\" encoding=\"UTF-8\"?>" +
		"<e:Envelope xmlns:e=\"http://www.w3.org/2003/05/soap-envelope\" " +
			"xmlns:w=\"http://schemas.xmlsoap.org/ws/2004/08/addressing\" " +
			"xmlns:d=\"http://schemas.xmlsoap.org/ws/2005/04/discovery\" " +
			"xmlns:dn=\"http://www.onvif.org/ver10/network/wsdl\"> " +
				"<e:Header>" +
					"<w:MessageID>uuid:{0}</w:MessageID>" +
					"<w:To e:mustUnderstand=\"true\">urn:schemas-xmlsoap-org:ws:2005:04:discovery</w:To> " +
					"<w:Action mustUnderstand=\"true\">http://schemas.xmlsoap.org/ws/2005/04/discovery/Probe</w:Action> " +
				"</e:Header> " +
				"<e:Body> " +
					"<d:Probe> " +
						"<d:Types>dn:NetworkVideoTransmitter</d:Types>" +
					"</d:Probe> " +
				"</e:Body> " +
		"</e:Envelope>";

	static async Task Main(string[] args)
	{
		List<UdpClient> udpClients = new List<UdpClient>();

		foreach (var networkInterface in NetworkInterface.GetAllNetworkInterfaces())
		{
			Console.WriteLine($"Name {networkInterface.Name}");
			foreach (var unicastAddress in networkInterface.GetIPProperties().UnicastAddresses)
			{
				if (unicastAddress.Address.AddressFamily == AddressFamily.InterNetwork)
				{
					var udpClient = new UdpClient(new IPEndPoint(unicastAddress.Address, 0)) { EnableBroadcast = true };

					udpClient.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReceiveTimeout, 5000);

					udpClients.Add(udpClient);
				}
			}
		}

	var multicastEndpoint = new IPEndPoint(IPAddress.Parse("239.255.255.250"), 3702);

		foreach (UdpClient udpClient in udpClients)
		{
			byte[] message = UTF8Encoding.UTF8.GetBytes(string.Format(WSDiscoveryProbeMessages, Guid.NewGuid().ToString()));

			try
			{
				await udpClient.SendAsync(message, message.Length, multicastEndpoint);

				IPEndPoint remoteEndPoint = null;

				while(true)
				{				
					message = udpClient.Receive(ref remoteEndPoint);

					Console.WriteLine($"IPAddress {remoteEndPoint.Address}");
					Console.WriteLine(UTF8Encoding.UTF8.GetString(message));

					Console.WriteLine();
				}
			}
			catch (SocketException sex)
			{
				Console.WriteLine($"Probe failed {sex.Message}");
			}
		}

		Console.WriteLine("Press enter to <exit>");
		Console.ReadKey();
	}
}

After confirming the program was working I used the excellent RaspberryDebugger to download the application and debug it on a Raspberry PI 3 running the Raspberry PI OS.

Security Camera HTTP Image download

As part of a contract a customer sent me a Uniview IPC3635SB-ADZK-I0 Security camera for a proof of concept(PoC) project. Before the PoC I wanted to explore the camera functionality in more depth, especially how to retrieve individual images from the camera, remotely control it’s zoom, focus, pan, tilt etc.. I’m trying to source a couple of other vendors’ security cameras with remotely controllable pan and tilt for testing.

Uniview IPC3635SB-ADZK-I0 Security camera

It appears that many cameras support retrieving the latest image with an HyperText Transfer Protocol (HTTP) GET so that looked like a good place to start. For the next couple of posts the camera will be sitting on the bookcase in my office looking through the window at the backyard.

Unv camera software live view of my backyard

One thing I did notice (then confirmed with Telerik Fiddler and in the camera configuration) was that the camera was configured to use Digest authentication(RFC 2069) which broke my initial attempt with a Universal Windows Platform(UWP) application.

Telerik Fiddler showing 401 authorisation challenge

My .Net Core 5 console application is as simple possible, it just downloads an image from the camera “snapshot” endpoint (In this case http://10.0.0.47:85/images/snapshot.jpg) and saves it to the local filesystem.

class Program
{
	static async Task Main(string[] args)
	{
		await Parser.Default.ParseArguments<CommandLineOptions>(args)
			.WithNotParsed(HandleParseError)
			.WithParsedAsync(ApplicationCore);
	}

	private static async Task ApplicationCore(CommandLineOptions options)
	{
		Console.WriteLine($"Camera:{options.CameraUrl} UserName:{options.UserName} filename:{options.Filename}");

		using (var client = new WebClient())
		{
			NetworkCredential networkCredential = new NetworkCredential()
			{
				UserName = options.UserName,
				Password = options.Password
			};

			client.Credentials = networkCredential;

			try
			{
				await client.DownloadFileTaskAsync(new Uri(options.CameraUrl), options.Filename);
			}
			catch (Exception ex)
			{
				Console.WriteLine($"File download failed {ex.Message}");
			}
		}

		Console.WriteLine("Press <enter> to exit");
		Console.ReadLine();
	}

	private static void HandleParseError(IEnumerable<Error> errors)
	{
		if (errors.IsVersion())
		{
			Console.WriteLine("Version Request");
			return;
		}

		if (errors.IsHelp())
		{
			Console.WriteLine("Help Request");
			return;
		}
		Console.WriteLine("Parser Fail");
	}
}

After confirming the program was working I used the excellent RaspberryDebugger to download the application and debug it on a Raspberry PI 3 running the Raspberry PI OS.

Visual Studio 2019 Debug Output showing application download process

Once the application had finished running on the device I wanted to check that the file was on the local filesystem. I used Putty to connect to the Raspberry PI then searched for LatestImage.jpg.

Linux find utility displaying the location of the downloaded file

I though about using a utility like scp to download the image file but decided (because I have been using Microsoft Window since WIndows 286) to install xrdp an open-source Remote Desktop Protocol(RDP) server so I could use a Windows 10 RDP client.

xrdp login screen
xrdp home screen
xrdp file manager display files in application deployment directory
Raspberry PI OS default image view

Now that the basics are working my plan is to figure out how to control the camera, display live video with the Real Time Streaming Protocol(RTSP) upload images to Azure Cognitive Services for processing and use ML.Net to process them locally.

This post was about selecting the tooling I’m comfortable with and configuring my development environment so they work well together. The next step will be using Open Network Video Interface Forum (ONVIF) to discover, determine the capabilities of and then control the camera (for this device just zoom and focus).