Message Transformation with cached transform binaries

The second prototype of “transforming” telemetry data used C# code complied and binary cached on demand. The HiveMQClient based application subscribes to topics (devices publishing environmental measurements) and then republishes them to multiple topics.

public class messageTransformer : IMessageTransformer
{
   public MQTT5PublishMessage[] Transform(MQTT5PublishMessage message)
   {
      if (message.Payload is null)
      {
         return [];
      }

      var payload = Encoding.UTF8.GetString(message.Payload);

      // Simple transformations: convert to both upper and lower case
      var toLower = new MQTT5PublishMessage
      {
         Topic = message.Topic,
         Payload = Encoding.UTF8.GetBytes(payload.ToLower()),
         QoS = QualityOfService.AtLeastOnceDelivery
      };

      var toUpper = new MQTT5PublishMessage
      {
         Topic = message.Topic,
         Payload = Encoding.UTF8.GetBytes(payload.ToUpper()),
         QoS = QualityOfService.AtLeastOnceDelivery
      };

      return [toLower, toUpper];
   }
}

The sample C# code (LowerUpper.cs) implements the IMessageTransformer interface and republishes both lower and upper case versions of the message.

Once the transformer had been loaded then compiled there was no noticeable difference between the application, loaded from constant string, and loaded from external file versions.

private static void OnMessageReceived(object? sender, HiveMQtt.Client.Events.OnMessageReceivedEventArgs e)
{
   HiveMQClient client = (HiveMQClient)sender!;

   Console.WriteLine($"{DateTime.UtcNow:yy-MM-dd HH:mm:ss:fff} HiveMQ.receive start");
   Console.WriteLine($" Topic:{e.PublishMessage.Topic} QoS:{e.PublishMessage.QoS} Payload:{e.PublishMessage.PayloadAsString}");

   Console.WriteLine($"{DateTime.UtcNow:yy-MM-dd HH:mm:ss:fff} HiveMQ.Publish start");
   foreach (string topic in _applicationSettings.PublishTopics.Split(',', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries))
   {
      e.PublishMessage.Topic = string.Format(topic, _applicationSettings.ClientId);

      var transformer = _scriptEngine.GetTransformer();

      if (transformer is null)
      {
         Console.WriteLine($"{DateTime.UtcNow:yy-MM-dd HH:mm:ss:fff} Transformer is null");
         return;
      }

      var transformedMessages = transformer.Transform(e.PublishMessage);
      if (transformedMessages is null)
      {
         Console.WriteLine($"{DateTime.UtcNow:yy-MM-dd HH:mm:ss:fff} Transformer returned null");
         return;
      }

      if (transformedMessages.Length == 0)
      {
         Console.WriteLine($"{DateTime.UtcNow:yy-MM-dd HH:mm:ss:fff} Transformer returned no messages");
         return;
      }

      foreach (MQTT5PublishMessage message in transformer.Transform(e.PublishMessage))
      {
         if (message is null)
         {
            Console.WriteLine($"{DateTime.UtcNow:yy-MM-dd HH:mm:ss:fff} Transformer message is null");

            continue;
         }

         try
         {
            Console.WriteLine($"{DateTime.UtcNow:yy-MM-dd HH:mm:ss:fff} Topic:{e.PublishMessage.Topic} HiveMQ Publish start ");

            var resultPublish = client.PublishAsync(message).GetAwaiter().GetResult();

            Console.WriteLine($"{DateTime.UtcNow:yy-MM-dd HH:mm:ss:fff} Published:{resultPublish.QoS1ReasonCode} {resultPublish.QoS2ReasonCode}");
         }
         catch (Exception ex)
         {
            Console.WriteLine($"{DateTime.UtcNow:yy-MM-dd HH:mm:ss:fff} HiveMQ Publish exception {ex.Message}");
         }
      }
   }
   Console.WriteLine($"{DateTime.UtcNow:yy-MM-dd HH:mm:ss:fff} HiveMQ.Receive finish");
}

I used a local instance of NanoMQ: An Ultra-lightweight MQTT Broker for IoT Edge for testing

The MQTTX application subscribed to topics that devices (XiaoTandHandCO2A, XiaoTandHandCO2B etc.) and the simulated bridge (in my case DESKTOP-EN0QGL0) published.

Message Transformation in code with HiveMQ Client

The first prototype of “transforming” telemetry data used C# code complied with the application. The HiveMQClient based application subscribes to topics (devices publishing environmental measurements) and then republishes them to multiple topics.

private static void OnMessageReceived(object? sender, HiveMQtt.Client.Events.OnMessageReceivedEventArgs e)
{
   HiveMQClient client = (HiveMQClient)sender!;

   Console.WriteLine($"{DateTime.UtcNow:yy-MM-dd HH:mm:ss:fff} HiveMQ.receive start");
   Console.WriteLine($" Topic:{e.PublishMessage.Topic} QoS:{e.PublishMessage.QoS} Payload:{e.PublishMessage.PayloadAsString}");

   Console.WriteLine($"{DateTime.UtcNow:yy-MM-dd HH:mm:ss:fff} HiveMQ.Publish start");
   foreach (string topic in _applicationSettings.PublishTopics.Split(',', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries))
   {
      e.PublishMessage.Topic = string.Format(topic, _applicationSettings.ClientId);

      Console.WriteLine($"{DateTime.UtcNow:yy-MM-dd HH:mm:ss:fff} Topic:{e.PublishMessage.Topic} HiveMQ Publish start ");
      var resultPublish = client.PublishAsync(e.PublishMessage).GetAwaiter().GetResult();
      Console.WriteLine($"{DateTime.UtcNow:yy-MM-dd HH:mm:ss:fff} Published:{resultPublish.QoS1ReasonCode} {resultPublish.QoS2ReasonCode}");
   }
   Console.WriteLine($"{DateTime.UtcNow:yy-MM-dd HH:mm:ss:fff} HiveMQ.Receive finish");
}

The MQTTX application subscribed to topics that devices (XiaoTandHandCO2A, XiaoTandHandCO2B etc.) and the simulated bridge (DESKTOP-EN0QGL0) published.

The second prototype “transforms” the telemetry message payloads with C# code that is compiled (with CSScript) as the application starts. The application subscribes to the topics which devices publish (environmental measurements), transforms the payloads, and then republishes the transformed messages to “bridge” topics

private static void OnMessageReceived(object? sender, HiveMQtt.Client.Events.OnMessageReceivedEventArgs e)
{
   HiveMQClient client = (HiveMQClient)sender!;

   Console.WriteLine($"{DateTime.UtcNow:yy-MM-dd HH:mm:ss:fff} HiveMQ.receive start");
   Console.WriteLine($" Topic:{e.PublishMessage.Topic} QoS:{e.PublishMessage.QoS} Payload:{e.PublishMessage.PayloadAsString}");

   Console.WriteLine($"{DateTime.UtcNow:yy-MM-dd HH:mm:ss:fff} HiveMQ.Publish start");
   foreach (string topic in _applicationSettings.PublishTopics.Split(',', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries))
   {
      e.PublishMessage.Topic = string.Format(topic, _applicationSettings.ClientId);

      foreach (MQTT5PublishMessage message in _evaluator.Transform(e.PublishMessage))
      {
         Console.WriteLine($"{DateTime.UtcNow:yy-MM-dd HH:mm:ss:fff} Topic:{e.PublishMessage.Topic} HiveMQ Publish start ");
         var resultPublish = client.PublishAsync(message).GetAwaiter().GetResult();
         Console.WriteLine($"{DateTime.UtcNow:yy-MM-dd HH:mm:ss:fff} Published:{resultPublish.QoS1ReasonCode} {resultPublish.QoS2ReasonCode}");
      }
   }
   Console.WriteLine($"{DateTime.UtcNow:yy-MM-dd HH:mm:ss:fff} HiveMQ.Receive finish");
}

// This code is compiled as the application starts up, it implements the IMessageTransformer interface
const string sampleTransformCode = @"
using System.Text;
using HiveMQtt.MQTT5.Types;

public class messageTransformer : devMobile.IoT.MqttTransformer.CSScriptLoopback.IMessageTransformer
{
   public MQTT5PublishMessage[] Transform(MQTT5PublishMessage message)
   {
      // Example: echo the payload to a new topic
      var payload = Encoding.UTF8.GetString(message.Payload);


      // Simple transformations: convert to uppercase or lowercase
      var toLower = new MQTT5PublishMessage
      {
         Topic = message.Topic,
         Payload = Encoding.UTF8.GetBytes(payload.ToLower()),
         QoS = QualityOfService.AtLeastOnceDelivery
      };

      var toUpper = new MQTT5PublishMessage
      {
         Topic = message.Topic,
         Payload = Encoding.UTF8.GetBytes(payload.ToUpper()),
         QoS = QualityOfService.AtLeastOnceDelivery
      };

      return new[] { toLower, toUpper };
   }
}";

The sample C# code implements the IMessageTransformer interface and republishes lower and upper case versions of the message.

public interface IMessageTransformer
{
   public MQTT5PublishMessage[] Transform(MQTT5PublishMessage mqttPublishMessage);
}
...
  _evaluator = CSScript.Evaluator.LoadCode<IMessageTransformer>(sampleTransformCode);

The SwarmSpace, and Myriota gateways both use an interface based approach to process uplink and downlink messages. Future versions will include support for isolating processing so that a rogue script can’t crash the application or reference unapproved assemblies