Azure Event Grid esp-mqtt-arduino Client – Success

Still couldn’t figure out why my code was failing so I turned up logging to 11 and noticed a couple of messages which didn’t make sense. The device was connecting than disconnecting which indicated a another problem. As part of the Message Queue Telemetry Transport(MQTT) specification there is a “feature” Last Will and Testament(LWT) which a client can configure so that the MQTT broker sends a message to a topic if the device disconnects unexpectedly.

I was looking at the code and noticed that LWT was being used and that the topic didn’t exist in my Azure Event Grid MQTT Broker namespace. When the LWT configuration was commented out the application worked.

void Mqtt5ClientESP32::begin(const char* uri, const char* client_id, const char* user, const char* pass, bool use_v5) {
  connected_ = false;
  insecure_ = false;
  cfg_.broker.address.uri = uri;
  if (client_id) cfg_.credentials.client_id = client_id;
  if (user)      cfg_.credentials.username  = user;
  if (pass)      cfg_.credentials.authentication.password = pass;

  cfg_.broker.verification.use_global_ca_store = false;
  cfg_.broker.verification.certificate = nullptr;
  cfg_.broker.verification.certificate_len = 0;
  cfg_.broker.verification.skip_cert_common_name_check = false;
  
/*
  cfg_.session.last_will.topic  = "devices/esp32/lwt";
  cfg_.session.last_will.msg    = "offline";
  cfg_.session.last_will.qos    = 1;
  cfg_.session.last_will.retain = true;
*/

cfg_.session.protocol_ver = 
#if CONFIG_MQTT_PROTOCOL_5
      use_v5 ? MQTT_PROTOCOL_V_5 : MQTT_PROTOCOL_V_3_1_1;
#else
      MQTT_PROTOCOL_V_3_1_1;
  (void)use_v5;  // MQTT v5 support disabled at build time
#endif
}

Two methods were added so that the LWT could be configured if required

void SetLWT(const char *topic, const char *msg, int msg_len,int qos, int retain);
void Mqtt5ClientESP32::SetLWT(const char *topic, const char *msg, int msg_len,int qos, int retain){
   cfg_.session.last_will.topic  = topic;
   cfg_.session.last_will.msg    = msg;
   cfg_.session.last_will.msg_len= msg_len;
   cfg_.session.last_will.qos    = qos;
   cfg_.session.last_will.retain = retain;
}

Paying close attention to the logging I noticed the “Subscribing to ssl/mqtts” followed by “Subscribe request sent”

I checked the sample application and found that if the connect was successful the application would then try and subscribe to a topic that didn’t exist.

mqtt.onConnected([]{
  Serial.println("[MQTT] Connected event");

   mqttReady = true;
/*
Serial.println("[MQTT] Subscribing to ssl/mqtt5");
if (mqtt.subscribe("ssl/mqtt5", 1, true)) {
  Serial.println("[MQTT] Subscribe request sent");
} else {
  Serial.println("[MQTT] Subscribe request failed");
}
*/

I commented out that code and the application started without any messages

Just to make sure I checked that the message count in the Azure Storage Queue was increasing and the payload client ID matched my device

Yet again a couple of hours lost from my life which I can never get back

Azure Event Grid esp-mqtt-arduino Client – Finding fail

Still couldn’t figure out why my code was failing so I built a test harness which connected to the wifi, set the time with the Network Time Protocol(NTP), established a Transport Layer Security(TLS) connection with the Azure Event Grid MQTT Broker then finally Authenticated (using Client Certificate authentication). Basically, it was The joy of certs without the Arduino PubSubClient library and with authentication

/*
  Azure Event Grid MQTT Endpoint Probe with mTLS
  - Wi-Fi connect
  - SNTP time sync
  - DNS resolve
  - TCP reachability (port 8883)
  - TLS (server-only) handshake using CRT bundle (or custom CA)
  - TLS (mTLS) handshake with client certificate & private key

  Notes:
    - Client certificate must be PEM and match private key.
    - Private key must be PEM and UNENCRYPTED (no passphrase).
    - SNI uses HOSTNAME automatically; do NOT use raw IP.
*/
#include <Arduino.h>
#include <WiFi.h>
#include <WiFiClient.h>
#include <WiFiClientSecure.h>

#include <../constants.h>
#include <../secrets.h>

extern "C" {
  #include <lwip/netdb.h>
  #include <lwip/sockets.h>
  #include <lwip/inet.h>
  #include <lwip/errno.h>
  #include <time.h>
}
static const char* HOSTNAME  = "ThisIsNotTheMQTTBrokerYouAreLookingFor.newzealandnorth-1.ts.eventgrid.azure.net";
static const uint16_t PORT   = 8883;

// Time servers (for TLS validity window)
static const char* NTP_1 = "pool.ntp.org";
static const char* NTP_2 = "time.cloudflare.com";

static const char* errnoName(int e) {
  switch (e) {
    case 5:   return "EIO";
    case 101: return "ENETUNREACH";
    case 104: return "ECONNRESET";
    case 110: return "ETIMEDOUT";
    case 111: return "ECONNREFUSED";
    case 113: return "EHOSTUNREACH";
    default:  return "?";
  }
}


bool waitForWifi(uint32_t timeout_ms = 20000) {
  uint32_t start = millis();
  Serial.printf("[WiFi] Connecting to '%s'...\n", WIFI_SSID);
  WiFi.begin(WIFI_SSID, WIFI_PASSWORD);
  while (WiFi.status() != WL_CONNECTED && (millis() - start) < timeout_ms) {
    delay(250);
    Serial.print(".");
  }
  Serial.println();
  return WiFi.status() == WL_CONNECTED;
}


void syncTime() {
  configTime(0, 0, NTP_1, NTP_2);
  Serial.println("[NTP] Syncing time...");
  for (int i = 0; i < 20; ++i) {
    time_t now = time(nullptr);
    if (now > 1609459200) { // > Jan 1, 2021
      Serial.printf("[NTP] OK (unix=%ld)\n", (long)now);
      return;
    }
    delay(500);
  }
  Serial.println("[NTP] Time sync may have failed; continuing.");
}

bool probeDNS(const char* host, char outIp[16]) {
  struct addrinfo hints = {};
  hints.ai_family = AF_INET; // IPv4
  struct addrinfo* res = nullptr;

  Serial.printf("[DNS] Resolving %s...\n", host);
  int rc = getaddrinfo(host, NULL, &hints, &res);
  Serial.printf("[DNS] getaddrinfo rc=%d\n", rc);
  if (rc != 0 || !res) {
    Serial.println("[DNS] FAILED");
    return false;
  }
  struct sockaddr_in* sin = (struct sockaddr_in*)res->ai_addr;
  inet_ntop(AF_INET, &sin->sin_addr, outIp, 16);
  Serial.printf("[DNS] %s -> %s\n", host, outIp);
  freeaddrinfo(res);
  return true;
}


bool probeTCP(const char* host, uint16_t port, uint32_t timeout_ms = 5000) {
  WiFiClient cli;
  cli.setTimeout(timeout_ms);
  Serial.printf("[TCP] Connecting to %s:%u ...\n", host, port);
  if (!cli.connect(host, port)) {
    Serial.printf("[TCP] connect() FAILED\n");
    return false;
  }
  Serial.println("[TCP] Connected (no TLS). Closing (probe only).");
  cli.stop();
  return true;
}


bool probeTLS(const char* host, uint16_t port, uint32_t timeout_ms = 7000) {
  WiFiClientSecure tls;
  tls.setTimeout(timeout_ms);

  tls.setCACert(CA_ROOT_PEM);  

  Serial.printf("[TLS] Handshake to %s:%u ...\n", host, port);
  if (!tls.connect(host, port)) {
    int e = errno;
    Serial.printf("[TLS] connect() FAILED errno=%d (%s)\n", e, errnoName(e));
    return false;
  }
  Serial.println("[TLS] Handshake OK (server-only TLS)");
  tls.stop();
  return true;
}

bool probeMTLS(const char* host, uint16_t port, uint32_t timeout_ms = 8000) {
  WiFiClientSecure tls;
  tls.setTimeout(timeout_ms);

  tls.setCACert(CA_ROOT_PEM);
  tls.setCertificate(CLIENT_CERT_PEM);
  tls.setPrivateKey(CLIENT_KEY_PEM);

  Serial.printf("[mTLS] Handshake to %s:%u with client cert ...\n", host, port);
  if (!tls.connect(host, port)) {
    int e = errno;
    Serial.printf("[mTLS] connect() FAILED errno=%d (%s)\n", e, errnoName(e));
    Serial.println("[mTLS] If errno=ETIMEDOUT/ECONNRESET, server may be closing due to cert policy mismatch.");
    return false;
  }
  Serial.println("[mTLS] Handshake OK (client authenticated)");
  tls.stop();
  return true;
}

void setup() {
  Serial.begin(9600);
  delay(5000);
  Serial.println();
  Serial.println("==== Azure Event Grid MQTT Probe (mTLS) ====");

  WiFi.mode(WIFI_STA);

  if (!waitForWifi()) {
    Serial.println("[WiFi] FAILED to connect within timeout");
  } else {
    Serial.printf("[WiFi] Connected. IP=%s  RSSI=%d dBm\n",
                  WiFi.localIP().toString().c_str(), WiFi.RSSI());
  }

  // TLS sanity: time
  syncTime();

  // DNS
  char ip[16] = {0};
  bool dnsOk = probeDNS(HOSTNAME, ip);

  // TCP reachability
  bool tcpOk = probeTCP(HOSTNAME, PORT);

  // TLS (server-only)
  bool tlsOk = probeTLS(HOSTNAME, PORT);

  // TLS (mTLS with client cert/key)
  bool mtlsOk = probeMTLS(HOSTNAME, PORT);

  Serial.println("==== Summary ====");
  Serial.printf("DNS:  %s\n", dnsOk  ? "OK" : "FAILED");
  Serial.printf("TCP:  %s\n", tcpOk  ? "OK" : "FAILED");
  Serial.printf("TLS:  %s\n", tlsOk  ? "OK" : "FAILED");
  Serial.printf("mTLS: %s\n", mtlsOk ? "OK" : "FAILED");
  Serial.println("=================");

  Serial.println("If mTLS=FAILED, check: correct cert/key pair, chain/trust CA, and namespace mTLS policy.");
}

void loop() {
  delay(1000);
}

The test harness worked which meant the issue was with my “re-factoring” of the BasicMqtt5_cert example.

Azure Event Grid esp-mqtt-arduino Client – Hours of fail

I wanted to get other Arduino base clients (e.g. my SeeedStudio XiaoESP32S3) for Azure Event Grid MQTT Broker working (for MQTT 5 support) so installed the esp-mqtt-arduino library.

The library doesn’t support client authentication with certificates, so I added two methods setClientCert and setClientKey to the esp-mqtt-arduino.h and esp-mqtt-arduino.cpp files

class Mqtt5ClientESP32 {
   public:
   Mqtt5ClientESP32();
   ~Mqtt5ClientESP32();
//...
  void useCrtBundle(bool enable = true);
  void setCACert(const char* cert, size_t len = 0);
  void setClientCert(const char* cert, size_t len = 0);
  void setClientKey(const char* key, size_t len = 0);  
  void setInsecure(bool enable = true);
  void setKeepAlive(uint16_t seconds);
private:
void Mqtt5ClientESP32::setClientCert(const char* cert, size_t len)
{
  insecure_ = false;
  cfg_.credentials.authentication.certificate = cert;
  if (cert) {
    cfg_.credentials.authentication.certificate_len = len ? len : strlen(cert) + 1;
  } else {
    cfg_.credentials.authentication.certificate_len = 0;
  }  
  cfg_.broker.verification.skip_cert_common_name_check = false;  
}

void Mqtt5ClientESP32::setClientKey(const char* key, size_t len)
{
  insecure_ = false;
  cfg_.credentials.authentication.key = key;
  if (key) {
    cfg_.credentials.authentication.key_len = len ? len : strlen(key) + 1;
  } else {
    cfg_.credentials.authentication.key_len = 0;
  } 
  cfg_.broker.verification.skip_cert_common_name_check = false;  
}

I had started with the basic_mqtt5_cert example stripping it back to the bare minimum hacking out all the certificate bundle support et.c

#include <WiFi.h>
#include <esp-mqtt-arduino.h>
#include <esp_log.h>
#include "sdkconfig.h"
#include "../secrets.h"
#include "../constants.h"

Mqtt5ClientESP32 mqtt;

volatile bool mqttReady = false;
volatile bool mqttSubscribed = false;
void setup() {
  Serial.begin(9600);
  delay(5000);
  Serial.setDebugOutput(true);
  Serial.println("[BOOT] Starting MQTT5 demo");

  esp_log_level_set("*", ESP_LOG_INFO);
  esp_log_level_set("MQTT_CLIENT", ESP_LOG_VERBOSE);

  WiFi.onEvent([](WiFiEvent_t event, WiFiEventInfo_t info){
    (void)info;
    Serial.printf("[WiFi event] id=%d\n", event);
  });

  Serial.printf("[WiFi] Connecting to %s\n", WIFI_SSID);
  WiFi.begin(WIFI_SSID, WIFI_PASSWORD);

  uint8_t attempts = 0;
  while (WiFi.status() != WL_CONNECTED) {
    Serial.printf("[WiFi] status=%d attempt=%u\n", WiFi.status(), attempts++);
    delay(500);
  }
  Serial.print("[WiFi] Connected, IP: ");
  Serial.println(WiFi.localIP());

  // Sync time for TLS
  Serial.println("\[NTP] synchronising");
  configTime(0, 0, "pool.ntp.org", "time.nist.gov");
  Serial.print("*");
  while (time(nullptr) < 100000) {
    delay(500);
    Serial.print("*");
  }
  Serial.println("\[NTP]  synchronised");

  Serial.printf("[MQTT] Init broker %s as %s\n", MQTT_SERVER_URL,MQTT_CLIENTID);
  mqtt.begin(MQTT_SERVER_URL, MQTT_CLIENTID);
  mqtt.setKeepAlive(45);

  mqtt.setCACert(CA_ROOT_PEM); 
  mqtt.setClientCert(CLIENT_CERT_PEM);
  mqtt.setClientKey(CLIENT_KEY_PEM);
  mqtt.setInsecure(false);

  mqtt.onMessage([](const char* topic, size_t topic_len, const uint8_t* data, size_t len){
    Serial.printf("[MSG] %.*s => %.*s\n", (int)topic_len, topic, (int)len, (const char*)data);
  });
  mqtt.onConnected([]{
    Serial.println("[MQTT] Connected event");
    mqttReady = true;
    Serial.println("[MQTT] Subscribing to ssl/mqtt5");
    if (mqtt.subscribe("ssl/mqtt5", 1, true)) {
      Serial.println("[MQTT] Subscribe request sent");
    } else {
      Serial.println("[MQTT] Subscribe request failed");
    }
  });

  mqtt.onDisconnected([]{
    Serial.println("[MQTT] Disconnected event");
    mqttReady = false;
  });

  Serial.println("[MQTT] Connecting...");
  if (!mqtt.connect()) {
    Serial.println("[MQTT] Connect start failed");
  }
}

void loop() {
  static unsigned long lastPublishMs = 0;
  const unsigned long now = millis();

  if (mqttReady && (now - lastPublishMs) >= 60000) {
    const char* msg = "Hello from Arduino MQTT5 ESP32!";
    Serial.println("[MQTT] Publishing demo message");
    if (mqtt.publish(MQTT_TOPIC_PUBLISH, (const uint8_t*)msg, strlen(msg))) {
      Serial.println("[MQTT] Publish queued (next in ~60s)");
    } else {
      Serial.println("[MQTT] Publish failed");
    }
    lastPublishMs = now;
  }

  delay(10);
}

It was important to put the setClientCert & setClient after the mqtt.begin because it resets the configuration

void Mqtt5ClientESP32::begin(const char* uri, const char* client_id,
                             const char* user, const char* pass, bool use_v5) {
  connected_ = false;
  insecure_ = false;
  cfg_.broker.address.uri = uri;
  if (client_id) cfg_.credentials.client_id = client_id;
  if (user)      cfg_.credentials.username  = user;
  if (pass)      cfg_.credentials.authentication.password = pass;

  cfg_.broker.verification.use_global_ca_store = false;
  cfg_.broker.verification.certificate = nullptr;
  cfg_.broker.verification.certificate_len = 0;
  cfg_.broker.verification.skip_cert_common_name_check = false;
  
  cfg_.session.last_will.topic  = "devices/esp32/lwt";
  cfg_.session.last_will.msg    = "offline";
  cfg_.session.last_will.qos    = 1;
  cfg_.session.last_will.retain = true;

cfg_.session.protocol_ver = 
#if CONFIG_MQTT_PROTOCOL_5
      use_v5 ? MQTT_PROTOCOL_V_5 : MQTT_PROTOCOL_V_3_1_1;
#else
      MQTT_PROTOCOL_V_3_1_1;
  (void)use_v5;  // MQTT v5 support disabled at build time
#endif
}

I tried increasing the log levels to get more debugging information, adding delays on startup to make it easier to see what was going on, trying different options of protocol support.

After hours of trying I gave up.

Azure Event Grid nanoFramework Client – Publisher

Building a .NET nanoFramework application for testing Azure Event Grid MQTT Broker connectivity that would run on my Seeedstudio EdgeBox ESP100 and Seeedstudio Xiao ESP32S3 devices took a couple of hours. Most of that time was spent figuring out how to generate the certificate and elliptic curve private key

Create an elliptic curve private key

 openssl ecparam -name prime256v1 -genkey -noout -out device.key

Generate a certificate signing request

openssl req -new -key device.key -out device.csr -subj "/CN=device.example.com/O=YourOrg/OU=IoT"

Then use the intermediate certificate and key file from earlier to generate a device certificate and key.

 openssl x509 -req -in device.csr -CA IntermediateCA.crt -CAkey IntermediateCA.key -CAcreateserial -out device.crt -days 365 -sha256

In this post I have assumed that the reader is familiar with configuring Azure Event Grid clients, client groups, topic spaces, permission bindings and routing.

The PEM encoded root CA certificate chain that is used to validate the server
public const string CA_ROOT_PEM = @"-----BEGIN CERTIFICATE-----
CN: CN = Microsoft Azure ECC TLS Issuing CA 03
-----END CERTIFICATE-----
-----BEGIN CERTIFICATE-----
CN: CN = DigiCert Global Root G3
-----END CERTIFICATE-----";

The PEM encoded certificate chain that is used to authenticate the device
public const string CLIENT_CERT_PEM_A = @"-----BEGIN CERTIFICATE-----
-----BEGIN CERTIFICATE-----
 CN=Self signed device certificate
-----END CERTIFICATE-----
-----BEGIN CERTIFICATE-----
 CN=Self signed Intermediate certificate
-----END CERTIFICATE-----";

 The PEM encoded private key of device
public const string CLIENT_KEY_PEM_A = @"-----BEGIN EC PRIVATE KEY-----
-----END EC PRIVATE KEY-----";

My application was “inspired” by the .NET nanoFramework m2mqtt example.

public static void Main()
{
   int sequenceNumber = 0;
   MqttClient mqttClient = null;
   Thread.Sleep(1000); // Found this works around some issues with running immediately after a reset

   bool wifiConnected = false;
   Console.WriteLine("WiFi connecting...");
   do
   {
      // Attempt to connect using DHCP
      wifiConnected = WifiNetworkHelper.ConnectDhcp(Secrets.WIFI_SSID, Secrets.WIFI_PASSWORD, requiresDateTime: true);

      if (!wifiConnected)
      {
         Console.WriteLine($"Failed to connect. Error: {WifiNetworkHelper.Status}");
         if (WifiNetworkHelper.HelperException != null)
         {
            Console.WriteLine($"Exception: {WifiNetworkHelper.HelperException}");
         }

         Thread.Sleep(1000);
      }
   }
   while (!wifiConnected);
   Console.WriteLine("WiFi connected");

   var caCert = new X509Certificate(Constants.CA_ROOT_PEM);

   X509Certificate2 clientCert = null;
   try
   {
      clientCert = new X509Certificate2(Secrets.CLIENT_CERT_PEM_A, Secrets.CLIENT_KEY_PEM_A, string.Empty);
   }
   catch (Exception ex)
   {
      Console.WriteLine($"Client Certificate Exception: {ex.Message}");
   }

   mqttClient = new MqttClient(Secrets.MQTT_SERVER, Constants.MQTT_PORT, true, caCert, clientCert, MqttSslProtocols.TLSv1_2);

   mqttClient.ProtocolVersion = MqttProtocolVersion.Version_5;

   bool mqttConnected = false;
   Console.WriteLine("MQTT connecting...");
   do
   {
      try
      {
         // Regular connect
         var resultConnect = mqttClient.Connect(Secrets.MQTT_CLIENTID, Secrets.MQTT_USERNAME, Secrets.MQTT_PASSWORD);
         if (resultConnect != MqttReasonCode.Success)
         {
            Console.WriteLine($"MQTT ERROR connecting: {resultConnect}");
            Thread.Sleep(1000);
         }
         else
         {
            mqttConnected = true;
         }
      }
      catch (Exception ex)
      {
         Console.WriteLine($"MQTT ERROR Exception '{ex.Message}'");
         Thread.Sleep(1000);
      }
   }
   while (!mqttConnected);
   Console.WriteLine("MQTT connected...");

   mqttClient.MqttMsgPublishReceived += MqttMsgPublishReceived;
   mqttClient.MqttMsgSubscribed += MqttMsgSubscribed;
   mqttClient.MqttMsgUnsubscribed += MqttMsgUnsubscribed;
   mqttClient.ConnectionOpened += ConnectionOpened;
   mqttClient.ConnectionClosed += ConnectionClosed;
   mqttClient.ConnectionClosedRequest += ConnectionClosedRequest;

   string topicPublish = string.Format(MQTT_TOPIC_PUBLISH_FORMAT, Secrets.MQTT_CLIENTID);
   while (true)
   {
      Console.WriteLine("MQTT publish message start...");

      var payload = new MessagePayload() { ClientID = Secrets.MQTT_CLIENTID, Sequence = sequenceNumber++ };

      string jsonPayload = JsonSerializer.SerializeObject(payload);

      var result = mqttClient.Publish(topicPublish, Encoding.UTF8.GetBytes(jsonPayload), "application/json; charset=utf-8", null);

      Debug.WriteLine($"MQTT published ({result}): {jsonPayload}");

      Thread.Sleep(100);
   }
}

I then configured my client (Edgebox100Z) and updated the “secrets.cs” file

Azure Event Grid MQTT Broker Clients

The application connected to the Azure Event Grid MQTT broker and started publishing the JSON payload with the incrementing sequence number.

Visual Studio debugger output of JSON payload publishing

The published messages were “routed” to an Azure Storage Queue where they could be inspected with a tool like Azure Storage Explorer.

Azure Event Grid MQTT Broker metrics with messages published selected

I could see the application was working in the Azure Event Grid MQTT broker metrics because the number of messages published was increasing.

Azure Event Grid Arduino Client – Publisher

The Arduino application for testing Azure Event Grid MQTT Broker connectivity worked on my Seeedstudio EdgeBox ESP100 and Seeedstudio Xiao ESP32S3 devices, so the next step was to modify it to publish some messages.

The first version generated the JSON payload using an snprintf which was a bit “nasty”

static uint32_t sequenceNumber = 0;

void loop() {
  mqttClient.loop();

  Serial.println("MQTT Publish start");

  char payloadBuffer[64];

  snprintf(payloadBuffer, sizeof(payloadBuffer), "{\"ClientID\":\"%s\", \"Sequence\": %i}", MQTT_CLIENTID, sequenceNumber++);

  Serial.println(payloadBuffer);

  if (!mqttClient.publish(MQTT_TOPIC_PUBLISH, payloadBuffer, strlen(payloadBuffer))) {
    Serial.print("\nMQTT publish failed:");        
    Serial.println(mqttClient.state());    
  }
  Serial.println("MQTT Publish finish");

  delay(60000);
}

I then configured my client (Edgebox100A) and updated the “secrets.h” file

Azure Event Grid MQTT Broker Clients

The application connected to the Azure Event Grid MQTT broker and started publishing the JSON payload with the incrementing sequence number.

Arduino IDE serial monitor output of JSON payload publishing

The second version generated the JSON payload using ArduinoJson library.

static uint32_t sequenceNumber = 0;

void loop() {
  mqttClient.loop();

  Serial.println("MQTT Publish start");

  // Create a static JSON document with fixed size
  StaticJsonDocument<64> doc;

  doc["Sequence"] = counter++;
  doc["ClientID"] = MQTT_CLIENTID;

  // Serialize JSON to a buffer
  char jsonBuffer[64];
  size_t n = serializeJson(doc, jsonBuffer);

  Serial.println(jsonBuffer);

  if(!mqttClient.publish(MQTT_TOPIC_PUBLISH, jsonBuffer, n))
  {
    Serial.println(mqttClient.state());    
  }

  Serial.println("MQTT Publish finish");

  delay(2000);
}

I could see the application was working in the Azure Event Grid MQTT broker metrics because the number of messages published was increasing.

Azure Event Grid MQTT Broker metrics with messages published selected

The published messages were “routed” to an Azure Storage Queue where they can be inspected with a tool like Azure Storage Explorer.

Azure Storage Explorer displaying a message’s payload

The message payload is in Base64 encoded so I used copilot convert it to text.

Microsoft copilot decoding the Base64 payload

In this post I have assumed that the reader is familiar with configuring Azure Event Grid clients, client groups, topic spaces, permission bindings and routing.

Bonus also managed to slip in a reference to copilot.

Azure Event Grid Arduino Client – The joy of certs

“Lets start at the very beginning, A very good place to start”

The Azure Event Grid MQTT Broker server X509 certificate chain can be copy ‘n’ paste from the output of the openssl command

openssl s_client -connect YourNamespace.newzealandnorth-1.ts.eventgrid.azure.net:8883 -showcerts

A self-signed X509 root certificate which can sign intermediate X509 certificates and key file can be generated with a single openssl command.

openssl req -x509 -newkey rsa:4096 -keyout rootCA.key -out rootCA.crt -days 3650 -nodes -subj "/CN=devMobile  /O=devMobile.co.nz /C=NZ" -addext "basicConstraints=critical,CA:TRUE" -addext "keyUsage=critical,keyCertSign"

For a non-trivial system there should be a number of intermediate certificates. I have tried creating intermediate certificates for a device type, geography, application, customer and combinations of these. The first couple of times got it wrong so start with a field trial so that it isn’t so painful to go back and fix. (beware the sunk cost fallacy)

openssl genrsa -out intermediate.key 4096

openssl req -new -key intermediate.key -out intermediate.csr -subj "/CN=intermediate  /O=devMobile.co.nz /C=NZ"

I found creating an intermediate certificate that could sign device certificates required a conf file for the basicConstraints and keyUsage configuration.

[ v3_intermediate_ca ]
basicConstraints = critical, CA:TRUE, pathlen:0
keyUsage = critical, keyCertSign
  • critical-The extension must be understood and processed by any application validating the certificate. If the application does not understand it, the certificate must be rejected.
  • CA:TRUE-This certificate is allowed to act as a Certificate Authority (CA), meaning it can sign other certificates.
  • pathlen:0-This CA can only issue end-entity (leaf) certificates and cannot issue further intermediate CA certificates.
  • keyCertSig- The certificate can be used to sign other certificates (i.e., it’s a CA certificate).
openssl x509 -req -in intermediate.csr  -CA rootCA.crt -CAkey rootCA.key -CAcreateserial -out intermediate.crt -days 1825 -extfile intermediate_ext.cnf -extensions v3_intermediate_ca

Creating a device certificate is similar to the process for the intermediate certificate but doesn’t need to be able to sign certificates.

openssl genrsa -out EdgeBox100A.key 4096

openssl req -new -key EdgeBox100A.key -out EdgeBox100A.csr -subj "/CN=EdgeBox100A"

openssl x509 -req -in EdgeBox100A.csr -CA intermediate.crt -CAkey intermediate.key -CAcreateserial -out EdgeBox100A.crt -days 365 

For production systems putting some thought into the Common name(CN), Organizational unit name(OU), Organization name(O), locality name(L), state or province name(S) and Country name(C)

// Minimalist ESP32 + Event Grid MQTT (mTLS) with PubSubClient
// Copyright (c) November 2025, devMobile Software
#include <PubSubClient.h>
#include <WiFi.h>
#include <WiFiClientSecure.h>

#include "constants.h"
#include "secrets.h"

// --- Wi-Fi ---
//const char* WIFI_SSID     = "";
//const char* WIFI_PASSWORD = "";

// --- Event Grid MQTT ---
//const char* MQTT_SERVER = "";
const uint16_t MQTT_PORT = 8883;

//const char* MQTT_CLIENTID = "";
//const char* MQTT_USERNAME = "";
//const char* MQTT_PASSWORD = "";
//const char* MQTT_TOPIC_PUBLISH = "devices/";
//const char* MQTT_TOPIC_SUBSCRIBE = "devices/";

/*
// The certificate that is used to authenticate the MQTT Broker
const char CA_ROOT_PEM[] PROGMEM = R"PEM(
-----BEGIN CERTIFICATE-----
      Thumbprint: 56D955C849887874AA1767810366D90ADF6C8536
      CN: CN=Microsoft Azure ECC TLS Issuing CA 03
-----END CERTIFICATE-----
-----BEGIN CERTIFICATE-----
      Thumbprint: 7E04DE896A3E666D00E687D33FFAD93BE83D349E
      CN: CN=DigiCert Global Root G3
-----END CERTIFICATE-----
)PEM";

The certificate that is used to authenticate the device
static const char CLIENT_CERT_PEM[] PROGMEM = R"PEM(
-----BEGIN CERTIFICATE-----
 CN=Self signed device certificate
-----END CERTIFICATE-----
-----BEGIN CERTIFICATE-----
 CN=Self signed Intermediate certificate
-----END CERTIFICATE-----
)PEM";

 The PEM encoded private key of device
static const char CLIENT_KEY_PEM[] PROGMEM = R"PEM(
-----BEGIN PRIVATE KEY-----
-----END PRIVATE KEY-----
)PEM";
*/

WiFiClientSecure secureClient;
PubSubClient mqttClient(secureClient);

void setup() {
  Serial.begin(9600);
  delay(5000);
  Serial.println();

  // Connect to WiFi
  Serial.println("WiFi connecting");
  WiFi.begin(WIFI_SSID, WIFI_PASSWORD);
  Serial.print("*");
  while (WiFi.status() != WL_CONNECTED) {
    delay(500);
    Serial.print("*");
  }
  Serial.println("\nWiFi connected");

  // Sync time for TLS
  Serial.println("\nTime synchronising");
  configTime(0, 0, "pool.ntp.org", "time.nist.gov");
  Serial.print("*");
  while (time(nullptr) < 100000) {
    delay(500);
    Serial.print("*");
  }
  Serial.println("\nTime synchronised");

  Serial.println("\nValidating ServerFQDN-Certificate combination");
  secureClient.setCACert(CA_ROOT_PEM);

  Serial.println("TCP connecting");
  if (secureClient.connect(MQTT_SERVER, MQTT_PORT)) {
    Serial.println("\nTCP connected");
  } else {
    Serial.println("\nTCP connection failed");
    return;
  }

  secureClient.setCertificate(CLIENT_CERT_A_PEM);
  secureClient.setPrivateKey(CLIENT_KEY_A_PEM);

  mqttClient.setServer(MQTT_SERVER, MQTT_PORT);

  Serial.println("\nMQTT connecting");
  Serial.print("*");
  while (!mqttClient.connect(MQTT_CLIENTID, MQTT_USERNAME, MQTT_PASSWORD)) {
    Serial.println(mqttClient.state());
    delay(5000);
    Serial.print("*");
  }
  Serial.println("\nMQTT connected");
}

static uint32_t sequenceNumber = 0;

void loop() {
  mqttClient.loop();

  Serial.print("'.");
  delay(10000);
}

My Arduino Xiao ESP32S3 and EdgeBox-ESP-100-Industrial Edge Controller devices could connect to the local Wi-Fi, get the time and date using the network time protocol(NTP), and validate the Azure Event Grid MQTT broker certificate. Then connect to the Azure Event Grid MQTT broker with the client name specified in the subject name of its X509 certificate.

Establishing a connection to the Azure Event Grid MQTT broker often failed which surprised me. Initially I didn’t have any retry logic which meant I wasted quite a bit of time trying to debug failed connections

RAK7258 Local server and Message Queuing Telemetry Transport(MQTT)

This post was originally about getting the built in Network Server of my RAKWireless RAK7258 WisGate Edge Lite to connect to an Azure IoT Hub or Azure IoT Central. The RAK7258 had been connected to The Things Industries(TTI) network so I updated the firmware and checked the “mode” in the LoRaWAN Network settings.

RAK 7258 LoRaWAN Network settings

Azure IoT Hub is not a fully featured MQTT broker so I initially looked at running Eclipse Mosquitto or HiveMQ locally but this seemed like a lot of effort for a Proof of Concept(PoC).

RAK 7258 Network Server Global Integration settings

I have used MQTTNet in a few other projects (The Things Network(TTN) V3 Azure IoT Connector, The Things Network V2 MQTT SQL Connector, Windows 10 IoT Core MQTT Field gateway etc.) and there was a sample application which showed ho to build a simple server so that became my preferred approach.

I then started exploring how applications and devices are provisioned in the RAK Network Server.

RAK 7258 Network Server applications list

The network server software has “unified” and “separate” “Device authentication mode”s and will “auto Add LoRa Device”s if enabled.

RAK 7258 Network Server Separate Application basic setup
RAK 7258 Network Server Separate Application device basic setup
RAK 7258 Network Server Unified Application device basic setup

Applications also have configurable payload formats(raw & LPP) and integrations (uplink messages plus join, ack, and device notifications etc.)

RAK7258 live device data display

In the sample server I could see how ValidatingConnectionAsync was used to check the clientID, username and password when a device connected. I just wanted to display messages and payloads without having to use an MQTT client and it looked like InterceptingPublishAsync was a possible solution.

But the search results were a bit sparse…

InterceptingPublishAsync + MQTTNet search results

After some reading the MQTTNet documentation and some experimentation I could display the message payload (same as in the live device data display) in a “nasty” console application.

namespace devMobile.IoT.RAKWisgate.ServerBasic
{
   using System;
	using System.Threading.Tasks;

   using MQTTnet;
   using MQTTnet.Protocol;
   using MQTTnet.Server;

   public static class Program
   {
      static async Task Main(string[] args)
      {
         var mqttFactory = new MqttFactory();

         var mqttServerOptions = new MqttServerOptionsBuilder()
             .WithDefaultEndpoint()
             .Build();

         using (var mqttServer = mqttFactory.CreateMqttServer(mqttServerOptions))
         {
            mqttServer.InterceptingPublishAsync += e =>
            {
               Console.WriteLine($"Client:{e.ClientId} Topic:{e.ApplicationMessage.Topic} {e.ApplicationMessage.ConvertPayloadToString()}");

               return Task.CompletedTask;
            };

            mqttServer.ValidatingConnectionAsync += e =>
            {
               if (e.ClientId != "RAK Wisgate7258")
               {
                  e.ReasonCode = MqttConnectReasonCode.ClientIdentifierNotValid;
               }

               if (e.Username != "ValidUser")
               {
                  e.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
               }

               if (e.Password != "TopSecretPassword")
               {
                  e.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
               }

               return Task.CompletedTask;
            };

            await mqttServer.StartAsync();

            Console.WriteLine("Press Enter to exit.");
            Console.ReadLine();

            await mqttServer.StopAsync();
         }
      }
   }
}
MQTTNet based console application displaying device payloads

The process of provisioning Applications and Devices is quite different (The use of the AppEUI/JoinEUI is odd) to The Things Network(TTN) and other platforms I have used so I will explore this some more in future post(s).

Device Provisioning Service(DPS) JsonData

While building my The Things Industries(TTI) V3 connector which uses the Azure Device Provisioning Service(DPS) the way pretty much all of the samples formatted the JsonData property of the ProvisioningRegistrationAdditionalData (part of Plug n Play provisioning) by manually constructing a JSON object which bugged me.

ProvisioningRegistrationAdditionalData provisioningRegistrationAdditionalData = new ProvisioningRegistrationAdditionalData()
{
   JsonData = $"{{\"modelId\": \"{modelId}\"}}"
};

result = await provClient.RegisterAsync(provisioningRegistrationAdditionalData);

I remembered seeing a sample where the DTDLV2 methodId was formatted by a library function and after a surprising amount of searching I found what I was looking for in Azure-Samples repository.

The code for the CreateDpsPayload method

// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using Microsoft.Azure.Devices.Provisioning.Client.Extensions;

namespace Microsoft.Azure.Devices.Provisioning.Client.PlugAndPlay
{
    /// <summary>
    /// A helper class for formatting the DPS device registration payload, per plug and play convention.
    /// </summary>
    public static class PnpConvention
    {
        /// <summary>
        /// Create the DPS payload to provision a device as plug and play.
        /// </summary>
        /// <remarks>
        /// For more information on device provisioning service and plug and play compatibility,
        /// and PnP device certification, see <see href="https://docs.microsoft.com/en-us/azure/iot-pnp/howto-certify-device"/>.
        /// The DPS payload should be in the format:
        /// <code>
        /// {
        ///   "modelId": "dtmi:com:example:modelName;1"
        /// }
        /// </code>
        /// For information on DTDL, see <see href="https://github.com/Azure/opendigitaltwins-dtdl/blob/master/DTDL/v2/dtdlv2.md"/>
        /// </remarks>
        /// <param name="modelId">The Id of the model the device adheres to for properties, telemetry, and commands.</param>
        /// <returns>The DPS payload to provision a device as plug and play.</returns>
        public static string CreateDpsPayload(string modelId)
        {
            modelId.ThrowIfNullOrWhiteSpace(nameof(modelId));
            return $"{{\"modelId\":\"{modelId}\"}}";
        }
    }
}

With a couple of changes my code now uses the CreateDpsPayload method

using Microsoft.Azure.Devices.Provisioning.Client.PlugAndPlay;

...

using (var securityProvider = new SecurityProviderSymmetricKey(deviceId, deviceKey, null))
{
   using (var transport = new ProvisioningTransportHandlerAmqp(TransportFallbackType.TcpOnly))
   {
      ProvisioningDeviceClient provClient = ProvisioningDeviceClient.Create(
         Constants.AzureDpsGlobalDeviceEndpoint,
         deviceProvisiongServiceSettings.IdScope,
         securityProvider,
         transport);

      DeviceRegistrationResult result;

      if (!string.IsNullOrEmpty(modelId))
      {
         ProvisioningRegistrationAdditionalData provisioningRegistrationAdditionalData = new ProvisioningRegistrationAdditionalData()
         {
               JsonData = PnpConvention.CreateDpsPayload(modelId)
         };

         result = await provClient.RegisterAsync(provisioningRegistrationAdditionalData, stoppingToken);
      }
      else
      {
         result = await provClient.RegisterAsync(stoppingToken);
      }

      if (result.Status != ProvisioningRegistrationStatusType.Assigned)
      {
         _logger.LogError("Config-DeviceID:{0} Status:{1} RegisterAsync failed ", deviceId, result.Status);

         return false;
      }

      IAuthenticationMethod authentication = new DeviceAuthenticationWithRegistrySymmetricKey(result.DeviceId, (securityProvider as SecurityProviderSymmetricKey).GetPrimaryKey());

      deviceClient = DeviceClient.Create(result.AssignedHub, authentication, transportSettings);
   }
}

TTI V3 Gateway Device Provisioning Service(DPS) Concurrent Requests

While debugging The Things Industries(TTI) V3 connector on my desktop I had noticed that using an Azure IoT Hub device connection string was quite a bit faster than using the Azure Device Provisioning Service(DPS). The Azure Webjob connector was executing the requests sequentially which made the duration of the DPS call even more apparent.

To reduce the impact of the RegisterAsync call duration this Proof of Concept(PoC) code uses the System.Tasks.Threading library to execute each request in its own thread and then wait for all the requests to finish.

try
{
   int devicePage = 1;
   V3EndDevices endDevices = await endDeviceRegistryClient.ListAsync(
      applicationSetting.Key,
      field_mask_paths: Constants.DevicefieldMaskPaths,
      page: devicePage,
      limit: _programSettings.TheThingsIndustries.DevicePageSize,
      cancellationToken: stoppingToken);

   while ((endDevices != null) && (endDevices.End_devices != null)) // If no devices returns null rather than empty list
   {
      List<Task<bool>> tasks = new List<Task<bool>>();

      _logger.LogInformation("Config-ApplicationID:{0} start", applicationSetting.Key);

      foreach (V3EndDevice device in endDevices.End_devices)
      {
         if (DeviceAzureEnabled(device))
         {
            _logger.LogInformation("Config-ApplicationID:{0} DeviceID:{1} Device EUI:{2}", device.Ids.Application_ids.Application_id, device.Ids.Device_id, BitConverter.ToString(device.Ids.Dev_eui));

            tasks.Add(DeviceRegistration(device.Ids.Application_ids.Application_id,
                                       device.Ids.Device_id,
                                       _programSettings.ResolveDeviceModelId(device.Ids.Application_ids.Application_id, device.Attributes),
                                       stoppingToken));
         }
      }

      _logger.LogInformation("Config-ApplicationID:{0} Page:{1} processing start", applicationSetting.Key, devicePage);

      Task.WaitAll(tasks.ToArray(),stoppingToken);

      _logger.LogInformation("Config-ApplicationID:{0} Page:{1} processing finish", applicationSetting.Key, devicePage);

      endDevices = await endDeviceRegistryClient.ListAsync(
         applicationSetting.Key,
         field_mask_paths: Constants.DevicefieldMaskPaths,
         page: devicePage += 1,
         limit: _programSettings.TheThingsIndustries.DevicePageSize,
         cancellationToken: stoppingToken);
   }
   _logger.LogInformation("Config-ApplicationID:{0} finish", applicationSetting.Key);
}
catch (ApiException ex)
{
   _logger.LogError("Config-Application configuration API error:{0}", ex.StatusCode);
}

The connector application paginates the retrieval of device configuration from TTI API and a Task is created for each device returned in a page. In the Application Insights Trace logging the duration of a single page of device registrations was approximately the duration of the longest call.

There will be a tradeoff between device page size (resource utilisation by many threads) and startup duration (to many sequential page operations) which will need to be explored.

TTI V3 Gateway Device Provisioning Service(DPS) Performance

My The Things Industries(TTI) V3 connector is an Identity Translation Cloud Gateway, it maps LoRaWAN devices to Azure IoT Hub devices. The connector creates a DeviceClient for each TTI LoRaWAN device and can use an Azure Device Connection string or the Azure Device Provisioning Service(DPS).

While debugging the connector on my desktop I had noticed that using a connection string was quite a bit faster than using DPS and I had assumed this was just happenstance. While doing some testing in the Azure North Europe data-center (Closer to TTI European servers) I grabbed some screen shots of the trace messages in Azure Application Insights as the TTI Connector Application was starting.

I only have six LoRaWAN devices configured in my TTI dev instance, but I repeated each test several times and the results were consistent so the request durations are reasonable. My TTI Connector application, IoT Hub, DPS and Application insights instances are all in the same Azure Region and Azure Resource Group so networking overheads shouldn’t be significant.

Azure IoT Hub Connection device connection string

Using an Azure IoT Hub Device Shared Access policy connection string establishing a connection took less than a second.

My Azure DPS Instance

Using my own DPS instance to provide the connection string and then establishing a connection took between 3 and 7 seconds.

Azure IoT Central DPS

For my Azure IoT Central instance getting a connection string and establishing a connection took between 4 and 7 seconds.

The Azure DPS client code was copied from one of the sample applications so I have assumed it is “correct”.

using (var transport = new ProvisioningTransportHandlerAmqp(TransportFallbackType.TcpOnly))
{
	ProvisioningDeviceClient provClient = ProvisioningDeviceClient.Create( 
		Constants.AzureDpsGlobalDeviceEndpoint,
		deviceProvisiongServiceSettings.IdScope,
		securityProvider,
		transport);

	DeviceRegistrationResult result;

	if (!string.IsNullOrEmpty(modelId))
	{
		ProvisioningRegistrationAdditionalData provisioningRegistrationAdditionalData = new ProvisioningRegistrationAdditionalData()
		{
			JsonData = $"{{"modelId": "{modelId}"}}"
		};

		result = await provClient.RegisterAsync(provisioningRegistrationAdditionalData, stoppingToken);
	}
	else
    {
		result = await provClient.RegisterAsync(stoppingToken);
	}

	if (result.Status != ProvisioningRegistrationStatusType.Assigned)
	{
		_logger.LogError("Config-DeviceID:{0} Status:{1} RegisterAsync failed ", deviceId, result.Status);

		return false;
	}

	IAuthenticationMethod authentication = new DeviceAuthenticationWithRegistrySymmetricKey(result.DeviceId, (securityProvider as SecurityProviderSymmetricKey).GetPrimaryKey());

	deviceClient = DeviceClient.Create(result.AssignedHub, authentication, transportSettings);
}

I need to investigate why getting a connection string from the DPS then connecting take significantly longer (I appreciate that “behind the scenes” service calls maybe required). This wouldn’t be an issue for individual devices connecting from different locations but for my Identity Translation Cloud gateway which currently open connections sequentially this could be a problem when there are a large number of devices.

If the individual requests duration can’t be reduced (using connection pooling etc.) I may have to spin up multiple threads so multiple devices can be connecting concurrently.