Airbnb Dataset – JSON Long Integer Issue

The Inside Airbnb London dataset has 87946 listings and the id column (which is the primary key) has a minimum value of 13913 and maximum of 973895808066047620 in the database.

Back in the early 90’s I used to live next to the Ealing Lawn Tennis Club in London

I used “Ealing” as the SearchText for my initial testing and tried different page numbers and sizes

Testing the search functionality with SwaggerUI

The listings search results JSON looked good but I missed one important detail…

[
  {
    "id": 32458423,
    "name": "Bed and breakfast in Ealing  · ★5.0 · 1 bedroom · 1 bed · 1 private bath",
    "listingURL": "https://www.airbnb.com/rooms/32458423"
  },
  {
    "id": 7905935,
    "name": "Guest suite in Ealing Broadway · ★4.93 · 1 bedroom · 1 bed · 1 private bath",
    "listingURL": "https://www.airbnb.com/rooms/7905935"
  },
  {
    "id": 5262733,
    "name": "Home in Ealing · ★4.97 · 1 bedroom · 1 shared bath",
    "listingURL": "https://www.airbnb.com/rooms/5262733"
  },
  {
    "id": 685148830257321200,
    "name": "Home in Ealing London · ★5.0 · 4 bedrooms · 8 beds · 2.5 baths",
    "listingURL": "https://www.airbnb.com/rooms/685148830257321258"
  },
  {
    "id": 10704599,
    "name": "Home in ealing, london · ★4.47 · 3 bedrooms · 4 beds · 1 bath",
    "listingURL": "https://www.airbnb.com/rooms/10704599"
  }
]

To “smoke test” to the lookup functionality I tried a couple of the listing ids

Swagger user interface successful lookup listing 7905935
Swagger user interface unsuccessful lookup of listing 685148830257321200

The HTTP GET method routing parameter and the response Data Transfer Object(DTO) the Airbnb listing Id properties are long values (ulong might have been a better choice) which should have sufficient range

string LookupByIdSql = @"SELECT Id, [Name], Listing_URL AS ListingURL
                     FROM ListingsHosts
                     WHERE id = @Id";

public record ListingLookupDto
{
   public long Id { get; set; }
   public string? Name { get; set; }
   public string? ListingURL { get; set; }
};

//...

app.MapGet("/Listing/Results/{id:long}", async (long id, IDapperContext dappperContext) =>
{
   using (var connection = dappperContext.ConnectionCreate())
   {
      ListingLookupDto result = await connection.QuerySingleOrDefaultWithRetryAsync<ListingLookupDto>(LookupByIdSql, new { id });
      if (result is null)
      {
         return Results.Problem($"Listing {id} not found", statusCode: StatusCodes.Status404NotFound);
      }

      return Results.Ok(result);
   }
})
.Produces<ListingLookupDto>(StatusCodes.Status200OK)
.Produces<ProblemDetails>(StatusCodes.Status404NotFound)
.WithOpenApi();

The id values in the search response and lookup DTOs were correct

Visual Studio 2022 Debugger inspecting listing id value

I had missed the clue in the search response JSON the listing id and the listingURL id didn’t match.

{
 "id": 685148830257321200,
 "name": "Home in Ealing London · ★5.0 · 4 bedrooms · 8 beds · 2.5 baths",
 "listingURL": "https://www.airbnb.com/rooms/685148830257321258"
},

The JavaScript Object Notation (JSON) Data Interchange Format(RFC7159) specification for numbers explains the discrepancy.

This specification allows implementations to set limits on the range
and precision of numbers accepted. Since software that implements
IEEE 754-2008 binary64 (double precision) numbers [IEEE754] is
generally available and widely used, good interoperability can be
achieved by implementations that expect no more precision or range
than these provide, in the sense that implementations will
approximate JSON numbers within the expected precision.
Airbnb listing from listingURL

Airbnb Dataset – Querying the Raw Listings

My initial ASP.NET Core Minimal AP exploration uses the Inside Airbnb London dataset which has 87946 listings. The data is pretty “nasty” with lots of nullable and wide columns so it took several attempts to import.

CREATE TABLE [dbo].[listingsRaw](
	[id] [bigint] NOT NULL,
	[listing_url] [nvarchar](50) NOT NULL,
	[scrape_id] [datetime2](7) NOT NULL,
	[last_scraped] [date] NOT NULL,
	[source] [nvarchar](50) NOT NULL,
	[name] [nvarchar](max) NOT NULL,
	[description] [nvarchar](max) NULL,
	[neighborhood_overview] [nvarchar](1050) NULL,
	[picture_url] [nvarchar](150) NULL,
	[host_id] [int] NOT NULL,
	[host_url] [nvarchar](50) NOT NULL,
	[host_name] [nvarchar](50) NULL,
	[host_since] [date] NULL,
	[host_location] [nvarchar](100) NULL,
	[host_about] [nvarchar](max) NULL,
	[host_response_time] [nvarchar](50) NULL,
	[host_response_rate] [nvarchar](50) NULL,
	[host_acceptance_rate] [nvarchar](50) NULL,
	[host_is_superhost] [bit] NULL,
	[host_thumbnail_url] [nvarchar](150) NULL,
	[host_picture_url] [nvarchar](150) NULL,
	[host_neighbourhood] [nvarchar](50) NULL,
	[host_listings_count] [int] NULL,
	[host_total_listings_count] [int] NULL,
	[host_verifications] [nvarchar](50) NOT NULL,
	[host_has_profile_pic] [bit] NULL,
	[host_identity_verified] [bit] NULL,
	[neighbourhood] [nvarchar](100) NULL,
	[neighbourhood_cleansed] [nvarchar](50) NOT NULL,
	[neighbourhood_group_cleansed] [nvarchar](1) NULL,
	[latitude] [float] NOT NULL,
	[longitude] [float] NOT NULL,
	[property_type] [nvarchar](50) NOT NULL,
	[room_type] [nvarchar](50) NOT NULL,
	[accommodates] [tinyint] NOT NULL,
	[bathrooms] [nvarchar](1) NULL,
	[bathrooms_text] [nvarchar](50) NULL,
	[bedrooms] [tinyint] NULL,
	[beds] [tinyint] NULL,
	[amenities] [nvarchar](max) NOT NULL,
	[price] [money] NOT NULL,
	[minimum_nights] [smallint] NOT NULL,
	[maximum_nights] [int] NOT NULL,
	[minimum_minimum_nights] [smallint] NULL,
	[maximum_minimum_nights] [int] NULL,
	[minimum_maximum_nights] [int] NULL,
	[maximum_maximum_nights] [int] NULL,
	[minimum_nights_avg_ntm] [float] NULL,
	[maximum_nights_avg_ntm] [float] NULL,
	[calendar_updated] [nvarchar](1) NULL,
	[has_availability] [bit] NOT NULL,
	[availability_30] [tinyint] NOT NULL,
	[availability_60] [tinyint] NOT NULL,
	[availability_90] [tinyint] NOT NULL,
	[availability_365] [smallint] NOT NULL,
	[calendar_last_scraped] [date] NOT NULL,
	[number_of_reviews] [smallint] NOT NULL,
	[number_of_reviews_ltm] [int] NOT NULL,
	[number_of_reviews_l30d] [tinyint] NOT NULL,
	[first_review] [date] NULL,
	[last_review] [date] NULL,
	[review_scores_rating] [float] NULL,
	[review_scores_accuracy] [float] NULL,
	[review_scores_cleanliness] [float] NULL,
	[review_scores_checkin] [float] NULL,
	[review_scores_communication] [float] NULL,
	[review_scores_location] [float] NULL,
	[review_scores_value] [float] NULL,
	[license] [nvarchar](max) NULL,
	[instant_bookable] [bit] NOT NULL,
	[calculated_host_listings_count] [int] NULL,
	[calculated_host_listings_count_entire_homes] [int] NOT NULL,
	[calculated_host_listings_count_private_rooms] [int] NOT NULL,
	[calculated_host_listings_count_shared_rooms] [int] NOT NULL,
	[reviews_per_month] [float] NULL
) ON [PRIMARY] TEXTIMAGE_ON [PRIMARY]

There are other data quality issues e.g. the host information is duplicated in each of their Listings e.g. host_id, host_name, host_since, host_* etc. which will need to be tidied up.

Swagger user interface for Raw Listings search functionality.

I have implemented basic (“incomplete”) OpenAPI support for functionality and stress testing.

Swagger user interface parameterised search functionality.

The search results are paginated and individual listings can be retrieved using the Airbnb listing “id”.

const string SearchPaginatedSql = @"SELECT Uid,Id,[Name], neighbourhood
                     FROM listings
                     WHERE[Name] LIKE N'%' + @SearchText + N'%'
                     ORDER By[Name] 
                     OFFSET @PageSize *(@PageNumber - 1) ROWS FETCH NEXT @PageSize ROWS ONLY";

public record ListingListDto
{
   public long Id { get; set; }
   public string? Name { get; set; }
   public string? Neighbourhood { get; set; }
};
Swagger user interface search functionality with untyped response.

The first HTTP GET implementation returns an untyped result-set which was not very helpful.

app.MapGet("/Listing/Search", async (string searchText, int pageNumber, int pageSize, [FromServices] IDapperContext dappperContext) =>
{
   using (var connection = dappperContext.ConnectionCreate())
   {
      return await connection.QueryWithRetryAsync(SearchPaginatedSql, new { searchText, pageNumber, pageSize });
   }
})
.WithOpenApi();
Swagger user interface search functionality with typed response

The second HTTP GET implementation returns a typed result-set which improved the “usability” of clients generated from the OpenAPI definition file.

app.MapGet("/Listing/Search/Typed", async (string searchText, int pageNumber, int pageSize, [FromServices] IDapperContext dappperContext) =>
{
   using (var connection = dappperContext.ConnectionCreate())
   {
      return await connection.QueryWithRetryAsync<ListingListDto>(SearchPaginatedSql, new { searchText, pageNumber, pageSize });
   }
})
.Produces<IList<ListingListDto>>(StatusCodes.Status200OK)
.WithOpenApi();

The third HTTP GET method uses the Listing id specified in a routing parameter to lookup a Listing

string LookupByIdSql = @"SELECT Id,[Name], neighbourhood
FROM ListingsHosts
WHERE id = @Id";

public record ListingLookupDto
{
public long Id { get; set; }
public string? Name { get; set; }
public string? Neighbourhood { get; set; }
};
Swagger user interface Listing lookup functionality
app.MapGet("/Listing/{id:long}", async (long id, IDapperContext dappperContext) =>
{
   using (var connection = dappperContext.ConnectionCreate())
   {
      ListingLookupDto result = await connection.QuerySingleOrDefaultWithRetryAsync<ListingLookupDto>(LookupByIdSql, new { id });
      if (result is null)
      {
         return Results.Problem($"Listing {id} not found", statusCode: StatusCodes.Status404NotFound);
      }

      return Results.Ok(result);
   }
})
.Produces<ListingLookupDto>(StatusCodes.Status200OK)
.Produces<ProblemDetails>(StatusCodes.Status404NotFound)
.WithOpenApi();

The lack of validation of the SearchText, PageSize and PageNumber parameters allow uses to enter invalid values which caused searches to fail.

Swagger user interface search functionality with an invalid page number

My initial approach was to decorate the parameters of the ValidatedQuery method with DataAnnotations to ensure only valid values were accepted.

const byte SearchTextMinimumLength = 3;
const byte SearchTextMaximumLength = 20;
const byte PageNumberMinimum = 1;
const byte PageNumberMaximum = 100;
const byte PageSizeMinimum = 5;
const byte PageSizeMaximum = 50;

app.MapGet("/Listing/Search/ValidatedQuery", async (
   [FromQuery,Required, MinLength(SearchTextMinimumLength, ErrorMessage = "SearchTextMaximumLength"), MaxLength(SearchTextMaximumLength, ErrorMessage = "SearchTextMaximumLegth")]
   string searchText,
   [FromQuery, Range(PageNumberMinimum, PageNumberMaximum, ErrorMessage = "PageNumberMinimum PageNumberMaximum")]
   int pageNumber,
   [FromQuery, Range(PageSizeMinimum, PageSizeMaximum, ErrorMessage = "PageSizeMinimum PageSizeMaximum")]
   int pageSize,
   [FromServices] IDapperContext dappperContext) =>
{
   using (var connection = dappperContext.ConnectionCreate())
   {
      return await connection.QueryWithRetryAsync<ListingListDto>(SearchPaginatedSql, new { searchText, pageNumber, pageSize });
   }
})
.Produces<IList<ListingListDto>>(StatusCodes.Status200OK)
.Produces<ProblemDetails>(StatusCodes.Status400BadRequest)
.WithOpenApi();

This wasn’t a great solution because the validation of the parameters was declared as part of the user interface and would have to be repeated everywhere listing search functionality was provided.

Swagger user interface search functionality with parameter validation

The final HTTP GET method uses DataAnnotations on the SearchParameter(DTO) and AsParameters to bind the query string values.

app.MapGet("/Listing/Search/Parameters", async ([AsParameters] SearchParameters searchParameters,
   [FromServices] IDapperContext dappperContext) =>
{
   using (var connection = dappperContext.ConnectionCreate())
   {
      return await connection.QueryWithRetryAsync<ListingListDto>(SearchPaginatedSql, new { searchText = searchParameters.SearchText, searchParameters.PageNumber, searchParameters.PageSize });
   }
})
.Produces<IList<ListingListDto>>(StatusCodes.Status200OK)
.Produces<ProblemDetails>(StatusCodes.Status400BadRequest)
.WithOpenApi();

public record SearchParameters
{
   // https://github.com/domaindrivendev/Swashbuckle.AspNetCore/issues/2658 possibly related?
   public const byte SearchTextMinimumLength = 3;
   public const byte SearchTextMaximumLength = 15;
   public const int PageNumberMinimum = 1;
   public const int PageNumberMaximum = 100;
   public const byte PageSizeMinimum = 5;
   public const byte PageSizeMaximum = 50;

   //[FromQuery, Required, MinLength(SearchTextMinimumLength, ErrorMessage = "SearchTextMinimumLegth"), MaxLength(SearchTextMaximumLength, ErrorMessage = "SearchTextMaximumLegth")]
   //[Required, MinLength(SearchTextMinimumLength, ErrorMessage = "SearchTextMinimumLegth"), MaxLength(SearchTextMaximumLength, ErrorMessage = "SearchTextMaximumLegth")]
   [MinLength(SearchTextMinimumLength, ErrorMessage = "SearchTextMinimumLegth"), MaxLength(SearchTextMaximumLength, ErrorMessage = "SearchTextMaximumLegth")]
   public string SearchText { get; set; }

   //[FromQuery, Range(PageNumberMinimum, PageNumberMaximum, ErrorMessage = "PageNumberMinimum PageNumberMaximum")]
   //[Required, Range(PageNumberMinimum, PageNumberMaximum, ErrorMessage = "PageNumberMinimum PageNumberMaximum")]
   [Range(PageNumberMinimum, PageNumberMaximum, ErrorMessage = "PageNumberMinimum PageNumberMaximum")]
   public int PageNumber { get; set; }

   [Range(PageSizeMinimum, PageSizeMaximum, ErrorMessage = "PageSizeMinimum PageSizeMaximum")]
   public int PageSize { get; set; }
}
Swagger user interface search functionality with parameter validation

This last two implementations worked though the error messages I had embedded in the code were not displayed I think this is related to this Swashbuckle Issue.

There is also an issue looking up some listings with larger listing ids which I will need some investigation.

Myriota Connector – Payload formatters revisited again

The myriota Azure IoT Hub Cloud Identity Translation Gateway payload formatters use compiled C# code to convert uplink/downlink packet payloads to JSON/byte array. While trying out different formatters I had “compile” and “evaluation” errors which would have been a lot easier to debug if there was more diagnostic information in the Azure Application Insights logging.

namespace PayloadFormatter // Additional namespace for shortening interface when usage in formatter code
{
    using System.Collections.Generic;

    using Newtonsoft.Json.Linq;

    public interface IFormatterUplink
    {
        public JObject Evaluate(IDictionary<string, string> properties, string terminalId, DateTime timestamp, byte[] payloadBytes);
    }

    public interface IFormatterDownlink
    {
        public byte[] Evaluate(IDictionary<string, string> properties, string terminalId, JObject? payloadJson, byte[] payloadBytes);
    }
}

An uplink payload formatter is loaded from Azure Storage Blob, compiled with Oleg Shilo’s CS-Script then cached in memory with Alastair Crabtree’s LazyCache.

// Get the payload formatter from Azure Storage container, compile, and then cache binary.
IFormatterUplink formatterUplink;

try
{
   formatterUplink = await _payloadFormatterCache.UplinkGetAsync(context.PayloadFormatterUplink, cancellationToken);
}
catch (Azure.RequestFailedException aex)
{
   _logger.LogError(aex, "Uplink- PayloadID:{0} payload formatter load failed", payload.Id);

   return payload;
}
catch (NullReferenceException nex)
{
   _logger.LogError(nex, "Uplink- PayloadID:{id} formatter:{formatter} compilation failed missing interface", payload.Id, context.PayloadFormatterUplink);

   return payload;
}
catch (CSScriptLib.CompilerException cex)
{
   _logger.LogError(cex, "Uplink- PayloadID:{id} formatter:{formatter} compiler failed", payload.Id, context.PayloadFormatterUplink);

   return payload;
}
catch (Exception ex)
{
   _logger.LogError(ex, "Uplink- PayloadID:{id} formatter:{formatter} compilation failed", payload.Id, context.PayloadFormatterUplink);

   return payload;
}

If the Azure Storage blob is missing or the payload formatter code incorrect an exception is thrown. I added specialised exception handers for Azure.RequestFailedException, NullReferenceException and CSScriptLib.CompilerException to add more detail to the Azure Application Insights logging.

// Process the payload with configured formatter
Dictionary<string, string> properties = new Dictionary<string, string>();
JObject telemetryEvent;

try
{
   telemetryEvent = formatterUplink.Evaluate(properties, packet.TerminalId, packet.Timestamp, payloadBytes);
}
catch (Exception ex)
{
   _logger.LogError(ex, "Uplink- PayloadId:{0} TerminalId:{1} Value:{2} Bytes:{3} payload formatter evaluate failed", payload.Id, packet.TerminalId, packet.Value, Convert.ToHexString(payloadBytes));

   return payload;
}

if (telemetryEvent is null)
{
   _logger.LogError("Uplink- PayloadId:{0} TerminalId:{1} Value:{2} Bytes:{3} payload formatter evaluate failed returned null", payload.Id, packet.TerminalId, packet.Value, Convert.ToHexString(payloadBytes));

   return payload;
}

The Evaluate method can return many different types of exception so in the initial version only the “generic” exception is caught and logged.

using System;
using System.Collections.Generic;

using Newtonsoft.Json;
using Newtonsoft.Json.Linq;

public class FormatterUplink : PayloadFormatter.IFormatterUplink
{
    public JObject Evaluate(IDictionary<string, string> properties, string terminalId, DateTime timestamp, byte[] payloadBytes)
    {
        JObject telemetryEvent = new JObject();

        telemetryEvent.Add("Bytes", BitConverter.ToString(payloadBytes));
        telemetryEvent.Add("Bytes", BitConverter.ToString(payloadBytes));

        return telemetryEvent;
    }
}

There are a number (which should grow over time) of test uplink/downlink payload formatters for testing different compile and execution failures.

Azure IoT Storage Explorer container with sample formatter blobs.

I used Azure Storage Explorer to upload my test payload formatters to the uplink/downlink Azure Storage containers.

Myriota Connector – Uplink Payload formatters revisited

The myriota Azure IoT Hub Cloud Identity Translation Gateway payload formatters use compiled C# code to convert uplink packet payloads to JSON.

namespace PayloadFormattercode
{
    using System.Collections.Generic;

    using Newtonsoft.Json.Linq;

    public interface IFormatterUplink
    {
        public JObject Evaluate(IDictionary<string, string> properties, string application, string terminalId, DateTime timestamp, JObject payloadJson, string payloadText, byte[] payloadBytes);
    }
..
}

The myriota uplink packet payload is only 20 bytes long so it is very unlikely that the payloadText and payloadJSON parameters would ever be populated so I removed them from the interface. The uplink message handler interface has been updated and the code to convert (if possible) the payload bytes to text and then to JSON deleted.

namespace PayloadFormatter
{
    using System.Collections.Generic;

    using Newtonsoft.Json.Linq;

    public interface IFormatterUplink
    {
        public JObject Evaluate(IDictionary<string, string> properties, string application, string terminalId, DateTime timestamp, byte[] payloadBytes);
    }
...
}

All of the sample payload formatters have been updated to reflect the updated parameters. The sample Tracker.cs payload formatter unpacks a message from Myriota Dev Kit running the Tracker sample and returns an Azure IoT Central compatible location telemetry payload.

/*
myriota tracker payload format

typedef struct {
  uint16_t sequence_number;
  int32_t latitude;   // scaled by 1e7, e.g. -891234567 (south 89.1234567)
  int32_t longitude;  // scaled by 1e7, e.g. 1791234567 (east 179.1234567)
  uint32_t time;      // epoch timestamp of last fix
} __attribute__((packed)) tracker_message; 

*/ 
using System;
using System.Collections.Generic;
using System.Globalization;

using Newtonsoft.Json;
using Newtonsoft.Json.Linq;


public class FormatterUplink : PayloadFormatter.IFormatterUplink
{
    public JObject Evaluate(IDictionary<string, string> properties, string application, string terminalId, DateTime timestamp, byte[] payloadBytes)
    {
        JObject telemetryEvent = new JObject();

        if (payloadBytes is null)
        {
            return telemetryEvent;
        }

        telemetryEvent.Add("SequenceNumber", BitConverter.ToUInt16(payloadBytes));

        JObject location = new JObject();

        double latitude = BitConverter.ToInt32(payloadBytes, 2) / 10000000.0;
        location.Add("lat", latitude);

        double longitude = BitConverter.ToInt32(payloadBytes, 6) / 10000000.0;
        location.Add("lon", longitude);

        location.Add("alt", 0);

        telemetryEvent.Add("DeviceLocation", location);

        UInt32 packetimestamp = BitConverter.ToUInt32(payloadBytes, 10);

        DateTime fixAtUtc = DateTime.UnixEpoch.AddSeconds(packetimestamp);

        telemetryEvent.Add("FixAtUtc", fixAtUtc);

        properties.Add("iothub-creation-time-utc", fixAtUtc.ToString("s", CultureInfo.InvariantCulture));

        return telemetryEvent;
    }
}

If a message payload is text or JSON it can still be converted in the payload formatter.

Myriota – Uplink Payload formatters and caching

My myriota Azure IoT Hub Cloud Identity Translation Gateway payload formatters uses C# code (compiled with CS-Script cached with Alastair Crabtrees’s LazyCache) to convert uplink packet payloads to JSON.

I have found that putting the C/C++ structure for the uplink payload at the top of the convertor really helpful.

/*
myriota tracker payload format

typedef struct {
  uint16_t sequence_number;
  int32_t latitude;   // scaled by 1e7, e.g. -891234567 (south 89.1234567)
  int32_t longitude;  // scaled by 1e7, e.g. 1791234567 (east 179.1234567)
  uint32_t time;      // epoch timestamp of last fix
} __attribute__((packed)) tracker_message; 

*/ 
using System;
using System.Collections.Generic;

using Newtonsoft.Json;
using Newtonsoft.Json.Linq;


public class FormatterUplink : PayloadFormatter.IFormatterUplink
{
    public JObject Evaluate(IDictionary<string, string> properties, string application, string terminalId, DateTime timestamp, JObject payloadJson, string payloadText, byte[] payloadBytes)
    {
        JObject telemetryEvent = new JObject();

        telemetryEvent.Add("SequenceNumber", BitConverter.ToUInt16(payloadBytes));

        double latitude = BitConverter.ToInt32(payloadBytes, 2) / 10000000.0;
        telemetryEvent.Add("Latitude", latitude);

        double longitude = BitConverter.ToInt32(payloadBytes, 6) / 10000000.0;
        telemetryEvent.Add("Longitude", longitude);

        UInt32 packetimestamp = BitConverter.ToUInt32(payloadBytes, 10);
        DateTime lastFix = DateTime.UnixEpoch.AddSeconds(packetimestamp);

       properties.Add("iothub-creation-time-utc", lastFix .ToString("s", CultureInfo.InvariantCulture));

       return telemetryEvent;
    }
}

The sample Tracker.cs payload formatter unpacks a message from Myriota Dev Kit running the Tracker sample and returns an Azure IoT Central compatible location telemetry payload.

BEWARE : I think the Azure IoT Central Position lat, lon + alt values might be case sensitive.

Azure IoT Explorer displaying Tracker.cs payload formatter output

The identity payload formatter to use is configured as part of the Destination webhook Uniform Resource Locator (URL).

Myriota Destination configuration application name URL configuration
namespace devMobile.IoT.MyriotaAzureIoTConnector.Connector.Models
{
    public class UplinkPayloadQueueDto
    {
        public string Application { get; set; }
        public string EndpointRef { get; set; }
        public DateTime PayloadReceivedAtUtc { get; set; }
        public DateTime PayloadArrivedAtUtc { get; set; }
        public QueueData Data { get; set; }
        public string Id { get; set; }
        public Uri CertificateUrl { get; set; }
        public string Signature { get; set; }
    }

    public class QueueData
    {
        public List<QueuePacket> Packets { get; set; }
    }

    public class QueuePacket
    {
        public string TerminalId { get; set; }

        public DateTime Timestamp { get; set; }

        public string Value { get; set; }
    }
}

A pair of Azure Blob Storage containers are used to store the uplink/downlink (coming soon) formatter files. The compiled payload formatters are cached with Uplink/Downlink + Application (from the UplinkPayloadQueueDto) as the key.

Azure IoT Storage Explorer uplink payload formatters

The default uplink and downlink formatters used when there is no payload formatter for “Application” are configured in the application settings.

Myriota device Uplink Serialisation

The Myriota Developer documentation has some sample webhook data payloads so I used JSON2csharp to generate a Data Transfer Object(DTO) to deserialise payload. The format of the message is a bit “odd”, the “Data “Value” contains an “escaped” JSON object.

{
  "EndpointRef": "ksnb8GB_TuGj:__jLfs2BQJ2d",
  "Timestamp": 1692928585,
  "Data": "{"Packets": [{"Timestamp": 1692927646796, "TerminalId": "0001020304", "Value": "00008c9512e624cce066adbae764cccccccccccc"}]}",
  "Id": "a5c1bffe-4b62-4233-bbe9-d4ecc4f8b6cb",
  "CertificateUrl": "https://security.myriota.com/data-13f7751f3c5df569a6c9c42a9ce73a8a.crt",
  "Signature": "FDJpQdWHwCY+tzCN/WvQdnbyjgu4BmP/t3cJIOEF11sREGtt7AH2L9vMUDji6X/lxWBYa4K8tmI0T914iPyFV36i+GtjCO4UHUGuFPJObCtiugVV8934EBM+824xgaeW8Hvsqj9eDeyJoXH2S6C1alcAkkZCVt0pUhRZSZZ4jBJGGEEQ1Gm+SOlYjC2exUOf0mCrI5Pct+qyaDHbtiHRd/qNGW0LOMXrB/9difT+/2ZKE1xvDv9VdxylXi7W0/mARCfNa0J6aWtQrpvEXJ5w22VQqKBYuj3nlGtL1oOuXCZnbFYFf4qkysPaXON31EmUBeB4WbZMyPaoyFK0wG3rwA=="
}
namespace devMobile.IoT.myriotaAzureIoTConnector.myriota.UplinkWebhook.Models
{
    public class UplinkPayloadWebDto
    {
        public string EndpointRef { get; set; }
        public long Timestamp { get; set; } 
        public string Data { get; set; } // Embedded JSON ?
        public string Id { get; set; }
        public string CertificateUrl { get; set; }
        public string Signature { get; set; }
    }
}

The UplinkWebhook controller “automagically” deserialises the message, then in code the embedded JSON is deserialised and “unpacked”, finally the processed message is inserted into an Azure Storage queue.

namespace devMobile.IoT.myriotaAzureIoTConnector.myriota.UplinkWebhook.Controllers
{
    [Route("[controller]")]
    [ApiController]
    public class UplinkController : ControllerBase
    {
        private readonly Models.ApplicationSettings _applicationSettings;
        private readonly ILogger<UplinkController> _logger;
        private readonly QueueServiceClient _queueServiceClient;

        public UplinkController(IOptions<Models.ApplicationSettings> applicationSettings, QueueServiceClient queueServiceClient, ILogger<UplinkController> logger)
        {
            _applicationSettings = applicationSettings.Value;
            _queueServiceClient = queueServiceClient;
            _logger = logger;
        }

        [HttpPost]
        public async Task<IActionResult> Post([FromBody] Models.UplinkPayloadWebDto payloadWeb)
        {
            _logger.LogInformation("SendAsync queue name:{QueueName}", _applicationSettings.QueueName);

            QueueClient queueClient = _queueServiceClient.GetQueueClient(_applicationSettings.QueueName);

            var serializeOptions = new JsonSerializerOptions
            {
                WriteIndented = true,
                Encoder = System.Text.Encodings.Web.JavaScriptEncoder.UnsafeRelaxedJsonEscaping
            };

            await queueClient.SendMessageAsync(Convert.ToBase64String(JsonSerializer.SerializeToUtf8Bytes(payloadWeb, serializeOptions)));

            return this.Ok();
        }
    }
}

The webhook application uses the QueueClientBuilderExtensions and AddServiceClient so a QueueServiceClient can be injected into the webhook controller.

namespace devMobile.IoT.myriotaAzureIoTConnector.myriota.UplinkWebhook
{
    public class Program
    {
        public static void Main(string[] args)
        {
            var builder = WebApplication.CreateBuilder(args);

            // Add services to the container.
            builder.Services.AddControllers();

            builder.Services.AddApplicationInsightsTelemetry(i => i.ConnectionString = builder.Configuration.GetConnectionString("ApplicationInsights"));

            builder.Services.Configure<Models.ApplicationSettings>(builder.Configuration.GetSection("Application"));

            builder.Services.AddAzureClients(azureClient =>
            {
                azureClient.AddQueueServiceClient(builder.Configuration.GetConnectionString("AzureWebApi"));
            });

            var app = builder.Build();

            // Configure the HTTP request pipeline.

            app.UseHttpsRedirection();

            app.MapControllers();

            app.Run();
        }
    }
}

After debugging the application on my desktop with Telerik fiddler I deployed the application to one of my Azure subscriptions.

Azure Resource Group for the myriota Azure IoT Connector
Adding a new Destination in the myriota device manager

As part of configuring a new device test messages can be sent to the configured destinations.

Testing a new Destination in the myriota device manager
{
  "EndpointRef": "N_HlfTNgRsqe:uyXKvYTmTAO5",
  "Timestamp": 1563521870,
  "Data": "{"Packets": [{"Timestamp": 1563521870359,
    "TerminalId": "f74636ec549f9bde50cf765d2bcacbf9",
    "Value": "0101010101010101010101010101010101010101"}]}",
  "Id": "fe77e2c7-8e9c-40d0-8980-43720b9dab75",
  "CertificateUrl":    "https://security.myriota.com/data-13f7751f3c5df569a6c9c42a9ce73a8a.crt",
  "Signature": "k2OIBppMRmBT520rUlIvMxNg+h9soJYBhQhOGSIWGdzkppdT1Po2GbFr7jbg..."
}

The DTO generated with JSON2csharp needed some manual “tweaking” after examining how a couple of the sample messages were deserialised.

Azure Storage Explorer messages

I left the Myriota Developer Toolkit device (running the tracker sample) outside overnight and the following day I could see with Azure Storage Explorer a couple of messages in the Azure Storage Queue

Myriota device configuration

For a couple of weeks Myriota Developer Toolkit has been sitting under my desk and today I got some time to setup a device, register it, then upload some data.

Myriota Developer Toolkit

The first step was to download and install the Myriota Configurator so I could get the device registration information and install the tracker example application.

Using Windows File Explorer to “unblock” the downloaded file

After “unblocking” the zip file and upgrading my pip install the install script worked.

Myriota Configurator installation script

The application had to be run from the command line with “python MyriotaConfigurator.py”

Myriota Configurator main menu
Myriota Configurator retrieving device registration code

On the device I’m using the Tracker sample application to generate some sample payloads.

Myriota Configurator downloading tracker sample to device

The next step was to “register” my device and configure the destination(s) for its messages.

Myriota Device Manager Device configuration

Once the device and device manager configuration were sorted, I put the Tracker out on the back lawn on top of a large flowerpot.

Device Manager Access Times

On the “Access Times” page I could see that there were several periods when a satellite was overhead and overnight a couple of messages were uploaded.

Swarm Space – Underlying Architecture sorted

After figuring out that calling an Azure Http Trigger function to load the cache wasn’t going to work reliably, I have revisited the architecture one last time and significantly refactored the SwarmSpaceAzuureIoTConnector project.

Visual Studio 2022 solution

The application now has a StartUpService which loads the Azure DeviceClient cache (Lazy Cache) in the background as the application starts up. If an uplink message is received from a SwarmDevice before, it has been loaded by the FunctionsStartup the DeviceClient information is cached and another connection to the Azure IoT Hub is not established.

...
using Microsoft.Azure.Functions.Extensions.DependencyInjection;

[assembly: FunctionsStartup(typeof(devMobile.IoT.SwarmSpaceAzureIoTConnector.Connector.StartUpService))]
namespace devMobile.IoT.SwarmSpaceAzureIoTConnector.Connector
{
...
    public class StartUpService : BackgroundService
    {
        private readonly ILogger<StartUpService> _logger;
        private readonly ISwarmSpaceBumblebeeHive _swarmSpaceBumblebeeHive;
        private readonly Models.ApplicationSettings _applicationSettings;
        private readonly IAzureDeviceClientCache _azureDeviceClientCache;

        public StartUpService(ILogger<StartUpService> logger, IAzureDeviceClientCache azureDeviceClientCache, ISwarmSpaceBumblebeeHive swarmSpaceBumblebeeHive, IOptions<Models.ApplicationSettings> applicationSettings)//, IOptions<Models.AzureIoTSettings> azureIoTSettings)
        {
            _logger = logger;
            _azureDeviceClientCache = azureDeviceClientCache;
            _swarmSpaceBumblebeeHive = swarmSpaceBumblebeeHive;
            _applicationSettings = applicationSettings.Value;
        }

        protected override async Task ExecuteAsync(CancellationToken cancellationToken)
        {
            await Task.Yield();

            _logger.LogInformation("StartUpService.ExecuteAsync start");

            try
            {
                _logger.LogInformation("BumblebeeHiveCacheRefresh start");

                foreach (SwarmSpace.BumblebeeHiveClient.Device device in await _swarmSpaceBumblebeeHive.DeviceListAsync(cancellationToken))
                {
                    _logger.LogInformation("BumblebeeHiveCacheRefresh DeviceId:{DeviceId} DeviceName:{DeviceName}", device.DeviceId, device.DeviceName);

                    Models.AzureIoTDeviceClientContext context = new Models.AzureIoTDeviceClientContext()
                    {
                        OrganisationId = _applicationSettings.OrganisationId,
                        DeviceType = (byte)device.DeviceType,
                        DeviceId = (uint)device.DeviceId,
                    };

                    await _azureDeviceClientCache.GetOrAddAsync(context.DeviceId, context);
                }

                _logger.LogInformation("BumblebeeHiveCacheRefresh finish");
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "StartUpService.ExecuteAsync error");

                throw;
            }

            _logger.LogInformation("StartUpService.ExecuteAsync finish");
        }
    }
}

The uplink and downlink payload formatters are stored in Azure Blob Storage are compiled (CS-Script) as they are loaded then cached (Lazy Cache)

Azure Storage explorer displaying list of uplink payload formatter blobs.
Azure Storage explorer displaying list of downlink payload formatter blobs.
private async Task<IFormatterDownlink> DownlinkLoadAsync(int userApplicationId)
{
    BlobClient blobClient = new BlobClient(_payloadFormatterConnectionString, _applicationSettings.PayloadFormattersDownlinkContainer, $"{userApplicationId}.cs");

    if (!await blobClient.ExistsAsync())
    {
        _logger.LogInformation("PayloadFormatterDownlink- UserApplicationId:{0} Container:{1} not found using default:{2}", userApplicationId, _applicationSettings.PayloadFormattersUplinkContainer, _applicationSettings.PayloadFormatterUplinkBlobDefault);

        blobClient = new BlobClient(_payloadFormatterConnectionString, _applicationSettings.PayloadFormatterDownlinkBlobDefault, _applicationSettings.PayloadFormatterDownlinkBlobDefault);
    }

    BlobDownloadResult downloadResult = await blobClient.DownloadContentAsync();

    return CSScript.Evaluator.LoadCode<PayloadFormatter.IFormatterDownlink>(downloadResult.Content.ToString());
}

The uplink and downlink formatters can be edited in Visual Studio 2022 with syntax highlighting (currently they have to be manually uploaded).

The SwarmSpaceBumbleebeehive module no longer has public login or logout methods.

    public interface ISwarmSpaceBumblebeeHive
    {
        public Task<ICollection<Device>> DeviceListAsync(CancellationToken cancellationToken);

        public Task SendAsync(uint organisationId, uint deviceId, byte deviceType, ushort userApplicationId, byte[] payload);
    }

The DeviceListAsync and SendAsync methods now call the BumblebeeHive login method after configurable period of inactivity.

public async Task<ICollection<Device>> DeviceListAsync(CancellationToken cancellationToken)
{
        if ((_TokenActivityAtUtC + _bumblebeeHiveSettings.TokenValidFor) < DateTime.UtcNow)
        {
            await Login();
        }

        using (HttpClient httpClient = _httpClientFactory.CreateClient())
       {
            Client client = new Client(httpClient);

            client.BaseUrl = _bumblebeeHiveSettings.BaseUrl;

            httpClient.DefaultRequestHeaders.Add("Authorization", $"bearer {_token}");

            return await client.GetDevicesAsync(null, null, null, null, null, null, null, null, null, cancellationToken);
        }
}

I’m looking at building a webby user interface where users an interactivity list, create, edit, delete formatters with syntax highlighter support, and the executing the formatter with sample payloads.

Swarm Space Azure IoT Connector Identity Translation Gateway Architecture

This approach uses most of the existing building blocks, and that’s it no more changes.

Swarm Space – DeviceClient Cache warming with HTTPTrigger

For C2D messaging to work a device must have a DeviceClient “connection” established to the Azure IoT Hub which is a problem for irregularly connect devices. Sometimes establishing a connection on the first D2C messages is sufficient, especially for devices which only support D2C messaging. An Identity Translation Gateway establishes a connection for each device (see discussion about AMQP Connection multiplexing) so that C2D messages can be sent immediately.

I initially tried building a cache loader with BackgroundService so that the DeviceClient cache would start loading as the application started but interdependencies became problem.

public partial class Connector
{
    [Function("BumblebeeHiveCacheRefresh")]
    public async Task<IActionResult> BumblebeeHiveCacheRefreshRun([HttpTrigger(AuthorizationLevel.Function, "get")] CancellationToken cancellationToken)
    {
        _logger.LogInformation("BumblebeeHiveCacheRefresh start");

        await _swarmSpaceBumblebeeHive.Login(cancellationToken);

        foreach (SwarmSpace.BumblebeeHiveClient.Device device in await _swarmSpaceBumblebeeHive.DeviceListAsync(cancellationToken))
        {
            _logger.LogInformation("BumblebeeHiveCacheRefresh DeviceId:{DeviceId} DeviceName:{DeviceName}", device.DeviceId, device.DeviceName);

            Models.AzureIoTDeviceClientContext context = new Models.AzureIoTDeviceClientContext()
            {
                // TODO seems a bit odd getting this from application settings
                OrganisationId = _applicationSettings.OrganisationId, 
                //UserApplicationId = device.UserApplicationId, deprecated
                DeviceType = (byte)device.DeviceType,
                DeviceId = (uint)device.DeviceId,
            };

            switch (_azureIoTSettings.ApplicationType)
            {
                case Models.ApplicationType.AzureIotHub:
                    switch (_azureIoTSettings.AzureIotHub.ConnectionType)
                    {
                        case Models.AzureIotHubConnectionType.DeviceConnectionString:
                             await _azureDeviceClientCache.GetOrAddAsync<DeviceClient>(device.DeviceId.ToString(), (ICacheEntry x) => AzureIoTHubDeviceConnectionStringConnectAsync(device.DeviceId.ToString(), context));
                            break;
                        case Models.AzureIotHubConnectionType.DeviceProvisioningService:
                             await _azureDeviceClientCache.GetOrAddAsync<DeviceClient>(device.DeviceId.ToString(), (ICacheEntry x) => AzureIoTHubDeviceProvisioningServiceConnectAsync(device.DeviceId.ToString(), context, _azureIoTSettings.AzureIotHub.DeviceProvisioningService));
                            break;
                        default:

                        _logger.LogError("Azure IoT Hub ConnectionType unknown {0}", _azureIoTSettings.AzureIotHub.ConnectionType);

                            throw new NotImplementedException("AzureIoT Hub unsupported ConnectionType");
                    }
                    break;

                case Models.ApplicationType.AzureIoTCentral:
                    await _azureDeviceClientCache.GetOrAddAsync<DeviceClient>(device.DeviceId.ToString(), (ICacheEntry x) => AzureIoTHubDeviceProvisioningServiceConnectAsync(device.DeviceId.ToString(), context, _azureIoTSettings.AzureIoTCentral.DeviceProvisioningService));
                break;

                default:
                    _logger.LogError("AzureIoT application type unknown {0}", _azureIoTSettings.ApplicationType);

                    throw new NotImplementedException("AzureIoT unsupported ApplicationType");
            }
        }

        _logger.LogInformation("BumblebeeHiveCacheRefresh finish");

        return new OkResult();
    }
}

The HTTP WEBSITE_WARMUP_PATH environment variable is used to call the Azure HTTPTrigger Function and this is secured with an x-functions-key header.

Azure Function App Configuration

In the short-term loading the cache with a call to an Azure HTTPTrigger Function works but may timeout issues. When I ran the connector with my 100’s of devices simulator the function timed out every so often.

Swarm Space – Uplink with WebAPI Revisited again

After reviewing my ASP .NET Core WebAPI Swarm Space Delivery Method webhook implementation I have made a final round of changes.

There are now separate Data Transfer Objects(DTO) for the uplink and queue message payloads mainly, because the UplinkPayloadQueueDto has additional fields for the client (based on the x-api-key) and when the webhook was called.

public class UplinkPayloadQueueDto
{
    public ulong PacketId { get; set; }
    public byte DeviceType { get; set; }
    public uint DeviceId { get; set; }
    public ushort UserApplicationId { get; set; }
    public uint OrganizationId { get; set; }
    public string Data { get; set; } = string.Empty;
    public byte Length { get; set; }
    public int Status { get; set; }
    public DateTime SwarmHiveReceivedAtUtc { get; set; }
    public DateTime UplinkWebHookReceivedAtUtc { get; set; }
    public string Client { get; set; } = string.Empty;
 }

public class UplinkPayloadWebDto
{
    public ulong PacketId { get; set; }
    public byte DeviceType { get; set; }
    public uint DeviceId { get; set; }
    public ushort UserApplicationId { get; set; }
    public uint OrganizationId { get; set; }
    public string Data { get; set; } = string.Empty;

    [Range(Constants.PayloadLengthMinimum, Constants.PayloadLengthMaximum)]
    public byte Len { get; set; }
    public int Status { get; set; }

    public DateTime HiveRxTime { get; set; }
}

I did consider using AutoMapper to copy the values from the UplinkPayloadWebDto to the UplinkPayloadQueueDto but the additional complexity/configuration required for one mapping wasn’t worth it.

The UplinkController has a single POST method, which has a JSON payload(FromBody) and a single header (FromHeader) “x-api-key” which is to secure the method and identify the caller.

[HttpPost]
public async Task<IActionResult> Post([FromHeader(Name = "x-api-key")] string xApiKeyValue, [FromBody] Models.UplinkPayloadWebDto payloadWeb)
{
    if (!_applicationSettings.XApiKeys.TryGetValue(xApiKeyValue, out string apiKeyName))
    {
        _logger.LogWarning("Authentication unsuccessful X-API-KEY value:{xApiKeyValue}", xApiKeyValue);

        return this.Unauthorized("Unauthorized client");
    }

    _logger.LogInformation("Authentication successful X-API-KEY value:{apiKeyName}", apiKeyName);

    // Could of used AutoMapper but didn't seem worth it for one place
    Models.UplinkPayloadQueueDto payloadQueue = new()
    {
        PacketId = payloadWeb.PacketId,
        DeviceType = payloadWeb.DeviceType,
        DeviceId = payloadWeb.DeviceId,
        UserApplicationId = payloadWeb.UserApplicationId,
        OrganizationId = payloadWeb.OrganizationId,
        Data = payloadWeb.Data,
        Length = payloadWeb.Len,
        Status = payloadWeb.Status,
        SwarmHiveReceivedAtUtc = payloadWeb.HiveRxTime,
        UplinkWebHookReceivedAtUtc = DateTime.UtcNow,
        Client = apiKeyName,
    };

    _logger.LogInformation("SendAsync queue name:{QueueName}", _applicationSettings.QueueName);

    QueueClient queueClient = _queueServiceClient.GetQueueClient(_applicationSettings.QueueName);

    await queueClient.SendMessageAsync(Convert.ToBase64String(JsonSerializer.SerializeToUtf8Bytes(payloadQueue)));

    return this.Ok();
 }

I’ve also used dependency injection (DI) to get a QueueClient just because “it’s always better with DI”.

Azure Web App Application settings with x-api-key configuration

The “x-api-key” values can also be updated without having to redeploy the application.