This is a long post which covers some oddness I noticed when changing the protocol used by an Azure IoT Hub client from Message Queuing Telemetry Transport(MQTT) to Advanced Message Queuing Protocol (AMQP). I want to build a console application to test the pooling of AMQP connections so I started with an MQTT client written for another post.
class Program
{
private static string payload;
static async Task Main(string[] args)
{
string filename;
string azureIoTHubconnectionString;
DeviceClient azureIoTHubClient;
if (args.Length != 2)
{
Console.WriteLine("[JOSN file] [AzureIoTHubConnectionString]");
Console.WriteLine("Press <enter> to exit");
Console.ReadLine();
return;
}
filename = args[0];
azureIoTHubconnectionString = args[1];
try
{
payload = File.ReadAllText(filename);
// Open up the connection
azureIoTHubClient = DeviceClient.CreateFromConnectionString(azureIoTHubconnectionString, TransportType.Mqtt);
//azureIoTHubClient = DeviceClient.CreateFromConnectionString(azureIoTHubconnectionString, TransportType.Mqtt_Tcp_Only);
//azureIoTHubClient = DeviceClient.CreateFromConnectionString(azureIoTHubconnectionString, TransportType.Mqtt_WebSocket_Only);
await azureIoTHubClient.OpenAsync();
await azureIoTHubClient.SetMethodDefaultHandlerAsync(MethodCallbackDefault, null);
Timer MessageSender = new Timer(TimerCallback, azureIoTHubClient, new TimeSpan(0, 0, 10), new TimeSpan(0, 0, 10));
Console.WriteLine("Press <enter> to exit");
Console.ReadLine();
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
Console.WriteLine("Press <enter> to exit");
Console.ReadLine();
}
}
public static async void TimerCallback(object state)
{
DeviceClient azureIoTHubClient = (DeviceClient)state;
try
{
// I know having the payload as a global is a bit nasty but this is a demo..
using (Message message = new Message(Encoding.ASCII.GetBytes(JsonConvert.SerializeObject(payload))))
{
Console.WriteLine(" {0:HH:mm:ss} AzureIoTHubDeviceClient SendEventAsync start", DateTime.UtcNow);
await azureIoTHubClient.SendEventAsync(message);
Console.WriteLine(" {0:HH:mm:ss} AzureIoTHubDeviceClient SendEventAsync finish", DateTime.UtcNow);
}
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
private static async Task<MethodResponse> MethodCallbackDefault(MethodRequest methodRequest, object userContext)
{
Console.WriteLine($"Default handler method {methodRequest.Name} was called.");
return new MethodResponse(200);
}
}
I configured an Azure IoT hub then used Azure IoT explorer to create a device and get the connections string for my application. After fixing up the application’s command line parameters I could see the timer code was successfully sending telemetry messages to my Azure IoT Hub. I also explored the different MQTT connections options TransportType.Mqtt, TransportType.Mqtt_Tcp_Only, and TransportType.Mqtt_WebSocket_Only which worked as expected.
![](https://blog.devmobile.co.nz/wp-content/uploads/2020/10/mqttconsoletelemetry.jpg?w=978)
![](https://blog.devmobile.co.nz/wp-content/uploads/2020/10/mqttazureiotexplorertelemetry.jpg?w=1024)
I could also initiate Direct Method calls to my console application from Azure IoT explorer.
![](https://blog.devmobile.co.nz/wp-content/uploads/2020/10/mqttazureiotexplorerdirectmethod-1.jpg?w=1024)
![](https://blog.devmobile.co.nz/wp-content/uploads/2020/10/mqttconsoledirectmethod-1.jpg?w=980)
I then changed the protocol to AMQP
class Program
{
private static string payload;
static async Task Main(string[] args)
{
string filename;
string azureIoTHubconnectionString;
DeviceClient azureIoTHubClient;
Timer MessageSender;
if (args.Length != 2)
{
Console.WriteLine("[JOSN file] [AzureIoTHubConnectionString]");
Console.WriteLine("Press <enter> to exit");
Console.ReadLine();
return;
}
filename = args[0];
azureIoTHubconnectionString = args[1];
try
{
payload = File.ReadAllText(filename);
// Open up the connection
azureIoTHubClient = DeviceClient.CreateFromConnectionString(azureIoTHubconnectionString, TransportType.Amqp);
//azureIoTHubClient = DeviceClient.CreateFromConnectionString(azureIoTHubconnectionString, TransportType.Amqp_Tcp_Only);
//azureIoTHubClient = DeviceClient.CreateFromConnectionString(azureIoTHubconnectionString, TransportType.Amqp_WebSocket_Only);
await azureIoTHubClient.OpenAsync();
await azureIoTHubClient.SetMethodDefaultHandlerAsync(MethodCallbackDefault, null);
//MessageSender = new Timer(TimerCallbackAsync, azureIoTHubClient, new TimeSpan(0, 0, 10), new TimeSpan(0, 0, 10));
MessageSender = new Timer(TimerCallbackSync, azureIoTHubClient, new TimeSpan(0, 0, 10), new TimeSpan(0, 0, 10));
#if MESSAGE_PUMP
Console.WriteLine("Press any key to exit");
while (!Console.KeyAvailable)
{
await Task.Delay(100);
}
#else
Console.WriteLine("Press <enter> to exit");
Console.ReadLine();
#endif
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
Console.WriteLine("Press <enter> to exit");
Console.ReadLine();
}
}
public static async void TimerCallbackAsync(object state)
{
DeviceClient azureIoTHubClient = (DeviceClient)state;
try
{
// I know having the payload as a global is a bit nasty but this is a demo..
using (Message message = new Message(Encoding.ASCII.GetBytes(JsonConvert.SerializeObject(payload))))
{
Console.WriteLine(" {0:HH:mm:ss} AzureIoTHubDeviceClient SendEventAsync start", DateTime.UtcNow);
await azureIoTHubClient.SendEventAsync(message);
Console.WriteLine(" {0:HH:mm:ss} AzureIoTHubDeviceClient SendEventAsync finish", DateTime.UtcNow);
}
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
public static void TimerCallbackSync(object state)
{
DeviceClient azureIoTHubClient = (DeviceClient)state;
try
{
// I know having the payload as a global is a bit nasty but this is a demo..
using (Message message = new Message(Encoding.ASCII.GetBytes(JsonConvert.SerializeObject(payload))))
{
Console.WriteLine(" {0:HH:mm:ss} AzureIoTHubDeviceClient SendEventAsync start", DateTime.UtcNow);
azureIoTHubClient.SendEventAsync(message).GetAwaiter();
Console.WriteLine(" {0:HH:mm:ss} AzureIoTHubDeviceClient SendEventAsync finish", DateTime.UtcNow);
}
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
private static async Task<MethodResponse> MethodCallbackDefault(MethodRequest methodRequest, object userContext)
{
Console.WriteLine($"Default handler method {methodRequest.Name} was called.");
return new MethodResponse(200);
}
}
In the first version of my console application I could see the SendEventAsync method was getting called but was not returning
![](https://blog.devmobile.co.nz/wp-content/uploads/2020/10/amqpconsolenopump-1.jpg?w=978)
Even though the SendEventAsync call was not returning the telemetry messages were making it to my Azure IoT Hub.
![](https://blog.devmobile.co.nz/wp-content/uploads/2020/10/amqpazureiotexplorertelemetrynopump-1.jpg?w=1024)
When I tried to initiate a Direct Method call from Azure IoT Explorer it failed after a while with a timeout.
![](https://blog.devmobile.co.nz/wp-content/uploads/2020/10/amqpazureiotexplorerdirectmethodnopump.jpg?w=1024)
The first successful approach I tried was to change the Console.Readline to a “message pump” (flashbacks to Win32 API programming).
Console.WriteLine("Press any key to exit");
while (!Console.KeyAvailable)
{
await Task.Delay(100);
}
After some more experimentation I found that changing the timer method from asynchronous to synchronous also worked.
public static void TimerCallbackSync(object state)
{
DeviceClient azureIoTHubClient = (DeviceClient)state;
try
{
// I know having the payload as a global is a bit nasty but this is a demo..
using (Message message = new Message(Encoding.ASCII.GetBytes(JsonConvert.SerializeObject(payload))))
{
Console.WriteLine(" {0:HH:mm:ss} AzureIoTHubDeviceClient SendEventAsync start", DateTime.UtcNow);
azureIoTHubClient.SendEventAsync(message).GetAwaiter();
Console.WriteLine(" {0:HH:mm:ss} AzureIoTHubDeviceClient SendEventAsync finish", DateTime.UtcNow);
}
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
I also had to change the method declaration and modify the SendEventAsync call to use a GetAwaiter.
![](https://blog.devmobile.co.nz/wp-content/uploads/2020/10/amqpconsoletelemetrymessagepump.jpg?w=978)
![](https://blog.devmobile.co.nz/wp-content/uploads/2020/10/amqpazureiotexplorertelemetrymessagepump.jpg?w=1024)
![](https://blog.devmobile.co.nz/wp-content/uploads/2020/10/amqpazureiotexplorerdirectmethodmessagepump.jpg?w=1024)
![](https://blog.devmobile.co.nz/wp-content/uploads/2020/10/amqpconsolemessagepumpdirectmethod.jpg?w=980)
It took a while to figure out enough about what was going on so I could do a search with the right keywords (DeviceClient AMQP async await SendEventAsync) to confirm my suspicion that MQTT and AMQP clients did behave differently.
For anyone who reads this post, I think this Github issue about task handling and blocking calls is most probably the answer (October 2020).