.NET Core web API + Dapper – MultiMapping

Shaping recordsets with SplitOn

Sometimes there is no easy way to build a “list of lists” using the contents of multiple database tables. I have run into this problem a few times especially when building webby services which query the database of a “legacy” (aka. production) system.

Retrieving a list of StockGroups and their StockItems from the World Wide Importers database was one of the better “real world” examples I could come up with.

SQL Server Management Studio Diagram showing relationships of tables

There is a fair bit of duplication (StockGroupID, StockGroupName) in the results set

SQL Server Management Studio StockItems-StockItemStockGroups-StockGroups query and results

There were 442 rows in the results set and 227 StockItems in the database so I ordered the query results by StockItemID and confirmed that there were many StockItems in several StockGroups.

public class StockItemListDtoV1
{
	public int Id { get; set; }

	public string Name { get; set; }

	public decimal RecommendedRetailPrice { get; set; }

	public decimal TaxRate { get; set; }
}

public class StockGroupStockItemsListDto
{
	StockGroupStockItemsListDto()
	{
		StockItems = new List<StockItemListDto>();
	}

	public int StockGroupID { get; set; }

	public string StockGroupName { get; set; }

	public List<StockItemListDto> StockItems { get; set; }
}

My initial version uses a Generic List for a StockGroup’s StockItems which is most probably not a good idea.

[Route("api/[controller]")]
[ApiController]
public class InvoiceQuerySplitOnController : ControllerBase
{
	private readonly string connectionString;
	private readonly ILogger<InvoiceQuerySplitOnController> logger;

	public InvoiceQuerySplitOnController(IConfiguration configuration, ILogger<InvoiceQuerySplitOnController> logger)
	{
		this.connectionString = configuration.GetConnectionString("WorldWideImportersDatabase");

		this.logger = logger;
	}

	[HttpGet]
	public async Task<ActionResult<IAsyncEnumerable<StockGroupStockItemsListDto>>> Get()
	{
		IEnumerable<StockGroupStockItemsListDto> response = null;

		try
		{
			using (SqlConnection db = new SqlConnection(this.connectionString))
			{
				var stockGroups = await db.QueryAsync<StockGroupStockItemsListDto, StockItemListDto, StockGroupStockItemsListDto>(
					sql: @"SELECT [StockGroups].[StockGroupID] as 'StockGroupID'" +
								",[StockGroups].[StockGroupName]" +
								",[StockItems].StockItemID as 'ID'" +
								",[StockItems].StockItemName as 'Name'" +
								",[StockItems].TaxRate" +
								",[StockItems].RecommendedRetailPrice " +
							"FROM [Warehouse].[StockGroups] " +
							"INNER JOIN[Warehouse].[StockItemStockGroups] ON ([StockGroups].[StockGroupID] = [StockItemStockGroups].[StockGroupID])" +
							"INNER JOIN[Warehouse].[StockItems] ON ([Warehouse].[StockItemStockGroups].[StockItemID] = [StockItems].[StockItemID])",
					(stockGroup, stockItem) =>
					{
						// Not certain I think using a List<> here is a good idea...
						stockGroup.StockItems.Add(stockItem);
						return stockGroup;
					},
				splitOn: "ID",
				commandType: CommandType.Text);

			response = stockGroups.GroupBy(p => p.StockGroupID).Select(g =>
			{
				var groupedStockGroup = g.First();
				groupedStockGroup.StockItems = g.Select(p => p.StockItems.Single()).ToList();
				return groupedStockGroup;
			});
		}
	}
	catch (SqlException ex)
	{
		logger.LogError(ex, "Retrieving S, Invoice Lines or Stock Item Transactions");

		return this.StatusCode(StatusCodes.Status500InternalServerError);
	}

	return this.Ok(response);
}

The MultiMapper syntax always trips me up and it usually takes a couple of attempts to get it to work.

List of StockGroups with StockItems

I have extended my DapperTransient module adding WithRetry versions of the 14 MultiMapper methods.

.NET Core web API + Dapper – QueryMultiple

Returning multiple recordsets

My current “day job” is building applications for managing portfolios of foreign currency instruments. A portfolio can contain many different types of instrument (Forwards, Options, Swaps etc.). One of the “optimisations” we use is retrieving all the different types of instruments in a portfolio with one stored procedure call.

SQL Server Management Studio Dependency viewer

The closest scenario I could come up with using the World Wide Importers database was retrieving a summary of all the information associated with an Invoice for display on a single screen.

CREATE PROCEDURE [Sales].[InvoiceSummaryGetV1](@InvoiceID as int)
AS
BEGIN

SELECT [InvoiceID]
--        ,[CustomerID]
--        ,[BillToCustomerID]
		,[OrderID]
		,[Invoices].[DeliveryMethodID]
		,[DeliveryMethodName]
--        ,[ContactPersonID]
--        ,[AccountsPersonID]
		,[SalespersonPersonID] as SalesPersonID
		,[SalesPerson].[PreferredName] as SalesPersonName
--        ,[PackedByPersonID]
		,[InvoiceDate]
		,[CustomerPurchaseOrderNumber]
		,[IsCreditNote]
		,[CreditNoteReason]
		,[Comments]
		,[DeliveryInstructions]
--        ,[InternalComments]
--        ,[TotalDryItems]
--        ,[TotalChillerItems]
		,[DeliveryRun]
		,[RunPosition] as DeliveryRunPosition
		,[ReturnedDeliveryData] as DeliveryData
		,[ConfirmedDeliveryTime] as DeliveredAt
		,[ConfirmedReceivedBy] as DeliveredTo
--        ,[LastEditedBy]
--        ,[LastEditedWhen]
	FROM [Sales].[Invoices]
	INNER JOIN [Application].[People] as SalesPerson ON (Invoices.[SalespersonPersonID] = [SalesPerson].[PersonID])
	INNER JOIN [Application].[DeliveryMethods] as DeliveryMethod ON (Invoices.[DeliveryMethodID] = DeliveryMethod.[DeliveryMethodID])
WHERE ([Invoices].[InvoiceID] = @InvoiceID)

SELECT [InvoiceLineID]
      ,[InvoiceID]
      ,[StockItemID]
      ,[Description] as StockItemDescription
      ,[InvoiceLines].[PackageTypeID]
	  ,[PackageType].[PackageTypeName]
      ,[Quantity]
      ,[UnitPrice]
      ,[TaxRate]
      ,[TaxAmount]
--      ,[LineProfit]
      ,[ExtendedPrice]
--      ,[LastEditedBy]
--      ,[LastEditedWhen]
	FROM [Sales].[InvoiceLines]
		INNER JOIN [Warehouse].[PackageTypes] as PackageType ON ([PackageType].[PackageTypeID] = [InvoiceLines].[PackageTypeID])
WHERE ([InvoiceLines].[InvoiceID] = @InvoiceID)

SELECT [StockItemTransactionID]
      ,[StockItemTransactions].[StockItemID]
      ,StockItem.[StockItemName] as StockItemName
      ,[StockItemTransactions].[TransactionTypeID]
      ,[TransactionType].[TransactionTypeName]
--      ,[CustomerID]
--      ,[InvoiceID]
--      ,[SupplierID]
--      ,[PurchaseOrderID]
      ,[TransactionOccurredWhen] as TransactionAt
      ,[Quantity]
--      ,[LastEditedBy]
--      ,[LastEditedWhen]
	FROM [Warehouse].[StockItemTransactions]
	INNER JOIN [Warehouse].[StockItems] as StockItem ON ([StockItemTransactions].StockItemID = [StockItem].StockItemID)
	INNER JOIN [Application].[TransactionTypes] as TransactionType ON ([StockItemTransactions].[TransactionTypeID] = TransactionType.[TransactionTypeID])
	WHERE ([StockItemTransactions].[InvoiceID] = @InvoiceID)

END

The stored procedure returns 3 recordsets, a “summary” of the Order, a summary of the associated OrderLines and a summary of the associated StockItemTransactions.

public async Task<ActionResult<Model.InvoiceSummaryGetDtoV1>>Get([Range(1, int.MaxValue, ErrorMessage = "Invoice id must greater than 0")] int id)
{
	Model.InvoiceSummaryGetDtoV1 response = null;

	try
	{
		using (SqlConnection db = new SqlConnection(this.connectionString))
		{
			var invoiceSummary = await db.QueryMultipleWithRetryAsync("[Sales].[InvoiceSummaryGetV1]", param: new { InvoiceId = id }, commandType: CommandType.StoredProcedure);

			response = await invoiceSummary.ReadSingleOrDefaultWithRetryAsync<Model.InvoiceSummaryGetDtoV1>();
			if (response == default)
			{
				logger.LogInformation("Invoice:{0} not found", id);

				return this.NotFound($"Invoice:{id} not found");
			}

			response.InvoiceLines = (await invoiceSummary.ReadWithRetryAsync<Model.InvoiceLineSummaryListDtoV1>()).ToArray();

			response.StockItemTransactions = (await invoiceSummary.ReadWithRetryAsync<Model.StockItemTransactionSummaryListDtoV1>()).ToArray();
		}
	}
	catch (SqlException ex)
	{
		logger.LogError(ex, "Retrieving Invoice, Invoice Lines or Stock Item Transactions");

		return this.StatusCode(StatusCodes.Status500InternalServerError);
	}

	return this.Ok(response);
}

I use Google Chrome, Mozilla Firefox, Microsoft Edgeium, and Opera but the screen capture was done with FireFox mainly because it formats the Java Script Object Notation(JSON) response payloads nicely.

FireFox displaying Invoice Summary response

I had to extend the DapperTransient module to add SqlMapper extension (plus all the different overloads) retry methods.

.NET Core web API + Dapper – Caching

Response Cache

In the beginning this was long long post about In-memory caching, distributed caching, Response caching, Response caching with middleware and Object reuse with ObjectPool. As I was re-reading the post before publishing it I came to the realisation that these different caching approaches didn’t require Dapper.

I started again, but kept the first section as it covers one of the simplest possible approaches to caching using the [ResponseCache] attribute and VaryByQueryKeys.

[HttpGet("Response")]
[ResponseCache(Duration = StockItemsListResponseCacheDuration)]
public async Task<ActionResult<IAsyncEnumerable<Model.StockItemListDtoV1>>> GetResponse()
{
	IEnumerable<Model.StockItemListDtoV1> response = null;

	logger.LogInformation("Response cache load");

	try
	{
		response = await dapper.QueryAsync<Model.StockItemListDtoV1>(sql: @"SELECT [StockItemID] as ""ID"", [StockItemName] as ""Name"", [RecommendedRetailPrice], [TaxRate] FROM [Warehouse].[StockItems]", commandType: CommandType.Text);
	}
	catch (SqlException ex)
	{
		logger.LogError(ex, "Retrieving list of StockItems");

		return this.StatusCode(StatusCodes.Status500InternalServerError);
	}

	return this.Ok(response);
}

[HttpGet("ResponseVarying")]
[ResponseCache(Duration = StockItemsListResponseCacheDuration, VaryByQueryKeys = new string[] { "id" })]
public async Task<ActionResult<Model.StockItemGetDtoV1>> Get([FromQuery(Name = "id"), Range(1, int.MaxValue, ErrorMessage = "Stock item id must greater than 0")] int id)
{
	Model.StockItemGetDtoV1 response = null;

	logger.LogInformation("Response cache varying load id:{0}", id);

	try
	{
		response = await dapper.QuerySingleOrDefaultAsync<Model.StockItemGetDtoV1>(sql: "[Warehouse].[StockItemsStockItemLookupV1]", param: new { stockItemId = id }, commandType: CommandType.StoredProcedure);
		if (response == default)
		{
			logger.LogInformation("StockItem:{0} not found", id);

			return this.NotFound($"StockItem:{id} not found");
		}
	}
	catch (SqlException ex)
	{
		logger.LogError(ex, "Looking up StockItem with Id:{0}", id);

		return this.StatusCode(StatusCodes.Status500InternalServerError);
	}

	return this.Ok(response);
}

I use Google Chrome, Mozilla Firefox, Microsoft Edgeium, and Opera but the screen captures have been done with FireFox mainly because it formats the Java Script Object Notation(JSON) response payloads nicely.

All the browsers appeared to respect the cache control headers but Firefox was the only one which did not initiate a new request when I pressed return in the Uniform Resource Locator(URL) field.

Firefox displaying list of stock items

I used Telerik Fiddler and FiddlerFox to capture the HTTP GET method request and response payloads.

Fiddler Fox extension details
Response payload for a list of StockItems with cache control headers highlighted
Firefox displaying single stock item
Response payload for a single StockItem with cache control headers highlighted

Dapper Cache

The Dapper Extensions Library has built in support for In-memory and Redis caching. The Dapper.Extensions Library extends Dapper’s functionality. It requires minimal configuration but I was tripped up by the default connection string requirement because I was using Dependency Injection

Dapper.Extensions NuGet package configuration

The configuration code in the application startup.cs supports in-memory and Redis caches.

// This method gets called by the runtime. Use this method to add services to the container.
public void ConfigureServices(IServiceCollection services)
{
	services.AddControllers();

	services.AddResponseCaching();

	services.AddDapperForMSSQL();

#if DAPPER_EXTENSIONS_CACHE_MEMORY
	services.AddDapperCachingInMemory(new MemoryConfiguration
	{
		AllMethodsEnableCache = false
	});
#endif
#if DAPPER_EXTENSIONS_CACHE_REDIS
	services.AddDapperCachingInRedis(new RedisConfiguration
	{
		AllMethodsEnableCache = false,
		KeyPrefix = Configuration.GetConnectionString("RedisKeyPrefix"),
		ConnectionString = Configuration.GetConnectionString("RedisConnection")
	}); 
#endif
	services.AddApplicationInsightsTelemetry();
}

The StockItemsCachingController was rewritten with the Dapper.Extensions QueryAsync and QuerySingleOrDefaultAsync methods.

[HttpGet("DapperMemory")]
public async Task<ActionResult<IAsyncEnumerable<Model.StockItemListDtoV1>>> GetDapper()
{
	List<Model.StockItemListDtoV1> response;

	logger.LogInformation("Dapper cache load");

	try
	{
		response = await dapper.QueryAsync<Model.StockItemListDtoV1>(
							sql: @"SELECT [StockItemID] as ""ID"", [StockItemName] as ""Name"", [RecommendedRetailPrice], [TaxRate] FROM [Warehouse].[StockItems]",
							commandType: CommandType.Text,
							enableCache: true,
							cacheExpire: TimeSpan.Parse(this.Configuration.GetValue<string>("DapperCachingDuration"))
					);

	}
	catch (SqlException ex)
	{
		logger.LogError(ex, "Retrieving list of StockItems");

		return this.StatusCode(StatusCodes.Status500InternalServerError);
	}

	return this.Ok(response);
}

[HttpGet("DapperMemoryVarying")]
public async Task<ActionResult<Model.StockItemGetDtoV1>> GetDapperVarying([FromQuery(Name = "id"), Range(1, int.MaxValue, ErrorMessage = "Stock item id must greater than 0")] int id)
{
	Model.StockItemGetDtoV1 response = null;

	logger.LogInformation("Dapper cache varying load id:{0}", id);

	try
	{
		response = await dapper.QuerySingleOrDefaultAsync<Model.StockItemGetDtoV1>(
					sql: "[Warehouse].[StockItemsStockItemLookupV1]",
					param: new { stockItemId = id },
					commandType: CommandType.StoredProcedure,
					cacheKey: $"StockItem:{id}",
					enableCache: true,
					cacheExpire: TimeSpan.Parse(this.Configuration.GetValue<string>("DapperCachingDuration"))
							);
		if (response == default)
		{
			logger.LogInformation("StockItem:{0} not found", id);

			return this.NotFound($"StockItem:{id} not found");
		}
	}
	catch (SqlException ex)
	{
		logger.LogError(ex, "Looking up StockItem with Id:{0}", id);

		return this.StatusCode(StatusCodes.Status500InternalServerError);
	}

	return this.Ok(response);
}

Both the Dapper.Extensions In-Memory and Redis cache reduced the number of database requests to the bare minimum. In a larger application the formatting of the cacheKey (cacheKey: “StockItems” & cacheKey: $”StockItem:{id}”) would be important to stop database query result collisions.

SQL Server Profiler displaying the list and single record requests.

I used Memurai which is a Microsoft Windows version of Redis for testing on my development machine before deploying to Microsoft Azure and using Azure Cache for Redis. Memurai runs as a Windows Service and supports master, replica, cluster node or sentinel roles.

Memurai running as a Windows Service on my development machine

When the Web API project was restarted the contents in-memory cache were lost. The Redis cache contents survive a restart and can be access from multiple clients.

The Dapper.Extensions Query, QueryAsync, QueryFirstOrDefaultAsync, QuerySingleOrDefault, QuerySingleOrDefaultAsync, QueryMultiple, QueryMultipleAsync, ExecuteReader, ExecuteReaderAsync, QueryPageAsync, QueryPageAsync, QueryPlainPage, QueryPlainPageAsync, Execute, ExecuteAsync, ExecuteScalar, ExecuteScalarAsync, BeginTransaction, CommitTransactionm and RollbackTransaction do not appear to a versions which “Retry” actions when there is a “Transient” failure. If there is no solution available I will build one using the approach in my DapperTransient module.

Azure Device Provisioning Service(DPS) when transient isn’t

After some updates to my Device Provisioning Service(DPS) code the RegisterAsync method was exploding with an odd exception.

TTI Webhook Integration running in desktop emulator

In the Visual Studio 2019 Debugger the exception text was “IsTransient = true” so I went and made a coffee and tried again.

Visual Studio 2019 Quickwatch displaying short from error message

The call was still failing so I dumped out the exception text so I had some key words to search for

Microsoft.Azure.Devices.Provisioning.Client.ProvisioningTransportException: AMQP transport exception
 ---> System.UnauthorizedAccessException: Sys
   at Microsoft.Azure.Amqp.ExceptionDispatcher.Throw(Exception exception)
   at Microsoft.Azure.Amqp.AsyncResult.End[TAsyncResult](IAsyncResult result)
   at Microsoft.Azure.Amqp.AmqpObject.OpenAsyncResult.End(IAsyncResult result)
   at Microsoft.Azure.Amqp.AmqpObject.EndOpen(IAsyncResult result)
   at Microsoft.Azure.Amqp.Transport.AmqpTransportInitiator.HandleTransportOpened(IAsyncResult result)
   at Microsoft.Azure.Amqp.Transport.AmqpTransportInitiator.OnTransportOpenCompete(IAsyncResult result)
--- End of stack trace from previous location ---
   at Microsoft.Azure.Devices.Provisioning.Client.Transport.AmqpClientConnection.OpenAsync(TimeSpan timeout, Boolean useWebSocket, X509Certificate2 clientCert, IWebProxy proxy, RemoteCertificateValidationCallback remoteCerificateValidationCallback)
   at Microsoft.Azure.Devices.Provisioning.Client.Transport.ProvisioningTransportHandlerAmqp.RegisterAsync(ProvisioningTransportRegisterMessage message, TimeSpan timeout, CancellationToken cancellationToken)
   --- End of inner exception stack trace ---
   at Microsoft.Azure.Devices.Provisioning.Client.Transport.ProvisioningTransportHandlerAmqp.RegisterAsync(ProvisioningTransportRegisterMessage message, TimeSpan timeout, CancellationToken cancellationToken)
   at Microsoft.Azure.Devices.Provisioning.Client.Transport.ProvisioningTransportHandlerAmqp.RegisterAsync(ProvisioningTransportRegisterMessage message, CancellationToken cancellationToken)
   at devMobile.IoT.TheThingsIndustries.AzureIoTHub.Integration.Uplink(HttpRequestData req, FunctionContext executionContext) in C:\Users\BrynLewis\source\repos\TTIV3AzureIoTConnector\TTIV3WebHookAzureIoTHubIntegration\TTIUplinkHandler.cs:line 245

I tried a lot of keywords and went and looked at the source code on github

One of the many keyword searches

Another of the many keyword searches

I then tried another program which did used the Device provisioning Service and it worked first time so it was something wrong with the code.

using (var securityProvider = new SecurityProviderSymmetricKey(deviceId, deviceKey, null))
{
	using (var transport = new ProvisioningTransportHandlerAmqp(TransportFallbackType.TcpOnly))
	{
		DeviceRegistrationResult result;

		ProvisioningDeviceClient provClient = ProvisioningDeviceClient.Create(
			Constants.AzureDpsGlobalDeviceEndpoint,
			 dpsApplicationSetting.GroupEnrollmentKey, <<= Should be _azureIoTSettings.DeviceProvisioningService.IdScope,
			securityProvider,
			transport);

		try
		{
				result = await provClient.RegisterAsync();
		}
		catch (ProvisioningTransportException ex)
		{
			logger.LogInformation(ex, "Uplink-DeviceID:{0} RegisterAsync failed IDScope and/or GroupEnrollmentKey invalid", deviceId);

			return req.CreateResponse(HttpStatusCode.Unauthorized);
		}

		if (result.Status != ProvisioningRegistrationStatusType.Assigned)
		{
			_logger.LogError("Uplink-DeviceID:{0} Status:{1} RegisterAsync failed ", deviceId, result.Status);

			return req.CreateResponse(HttpStatusCode.FailedDependency);
		}

		IAuthenticationMethod authentication = new DeviceAuthenticationWithRegistrySymmetricKey(result.DeviceId, (securityProvider as SecurityProviderSymmetricKey).GetPrimaryKey());

		deviceClient = DeviceClient.Create(result.AssignedHub, authentication, TransportSettings);

		await deviceClient.OpenAsync();

		logger.LogInformation("Uplink-DeviceID:{0} Azure IoT Hub connected (Device Provisioning Service)", deviceId);
	}
}

I then carefully inspected my source code and worked back through the file history and realised I had accidentally replaced the IDScope with the GroupEnrollment setting so it was never going to work i.e. IsTransient != true. So, for the one or two other people who get this error message check your IDScope and GroupEnrollment key make sure they are the right variables and that values they contain are correct.

.NET Core web API + Dapper – Retries

Recovering from transient failures with Polly

It’s not uncommon for SQL Azure servers and databases to suffer from “transient failures”. In application logs I have seen these occur during scale up/down events, periods where my application’s performance has been temporarily impacted (but its throughput has not changed), which I assume has been some load balancing going on in the background and when network connectivity has been a bit flakey.

Microsoft has published guidance for building Microservices applications, troubleshooting common AzureSQL errors and improving the resilience of ADO.Net connections which cover different approaches in depth.

For many years I used the Microsoft Enterprise Library Transient Fault Handling Application Block (TOPAZ), then upgraded to the .Net Core Version built by Mo Chavoshi both of which have been retired.

Now I’m using The Polly Project which builds on the concepts of TOPAZ but has been thoroughly re-engineered with lots of extensibility, an active community and modern codebase. Inspired by Ben Hyrman and several other developers I have built a minimalist wrapper for the Dapper Async methods which detects transient errors using the same approach as the Entity Framework Core library.

public static Task<int> ExecuteWithRetryAsync(
			  this IDbConnection connection,
			  string sql,
			  object param = null,
			  IDbTransaction transaction = null,
			  int? commandTimeout = null,
			  CommandType? commandType = null) => RetryPolicy.ExecuteAsync(() => connection.ExecuteAsync(sql, param, transaction, commandTimeout, commandType));

I did think about retry functionality for async methods which returned object/dynamic but have only implemented strongly typed ones for the initial version.

[HttpGet]
public async Task<ActionResult<IAsyncEnumerable<Model.StockItemListDtoV1>>> Get()
{
	IEnumerable<Model.StockItemListDtoV1> response = null;

	try
	{
		using (SqlConnection db = new SqlConnection(this.connectionString))
		{
			response = await db.QueryWithRetryAsync<Model.StockItemListDtoV1>(sql: @"SELECT [StockItemID] as ""ID"", [StockItemName] as ""Name"", [RecommendedRetailPrice], [TaxRate] FROM [Warehouse].[StockItems]", commandType: CommandType.Text);
		}
	}
	catch (SqlException ex)
	{
		logger.LogError(ex, "Retrieving list of StockItems");

		return this.StatusCode(StatusCodes.Status500InternalServerError);
	}

	return this.Ok(response);
}

I have struggled to get reproduceable transient failures without pausing execution in the Visual Studio debugger and tinkering with variables or scaling up/down my databases (limit to how often this can be done) or unplugging the network cable at the wrong time.

The Things Network V2 MQTT SQL Connector

This code was written to solve a problem I had debugging and testing an application which processed data from sensors attached to The Things Network(TTN) and I figured others might find it useful.

As part of my series of TTN projects I wanted to verify that the data from a number of LoRaWAN sensors connected to TTN was reasonable and complete. I’m familiar with Microsoft SQL Server so I built a .Net Core console application which uses the TTN Message Queue Telemetry Transport(MQTT) Data API (so it can run alongside my existing TTN integration) to receive messages from the all devices in a TTN application and store them in a database for post processing.

The console application uses MQTTNet to connect to TTN MQTT Data API. It subscribes to an application device uplink topic, then uses a combination of Stackoverflow Dapper with Microsoft SQL Server tables and stored procedures to store the device data points. I re-generated the classes I had used in my other projects, added any obvious missing fields and fine tuned the data types by delving into the TTN V2 GO code.

The core of the application is in the MQTTNet application message received handler.

private static void MqttClient_ApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e)
{
   PayloadUplinkV2 payload;

   log.InfoFormat($"Receive Start Topic:{e.ApplicationMessage.Topic}");

   string connectionString = configuration.GetSection("TTNDatabase").Value;

   try
   {
      payload = JsonConvert.DeserializeObject<PayloadUplinkV2>(e.ApplicationMessage.ConvertPayloadToString());
   }
   catch (Exception ex)
   {
      log.Error("DeserializeObject failed", ex);
      return;
   }

   try
   {
      if (payload.PayloadFields != null)
      {
         var parameters = new DynamicParameters();

         EnumerateChildren(parameters, payload.PayloadFields);

         log.Debug($"Parameters:{parameters.ParameterNames.Aggregate((i, j) => i + ',' + j)}");

         foreach (string storedProcedure in storedProcedureMappings.Keys)
         {
            if (Enumerable.SequenceEqual(parameters.ParameterNames, storedProcedureMappings[storedProcedure].Split(',', StringSplitOptions.RemoveEmptyEntries), StringComparer.InvariantCultureIgnoreCase))
            {
               log.Info($"Payload fields processing with:{storedProcedure}");

               using (SqlConnection db = new SqlConnection(connectionString))
               {
                  parameters.Add("@ReceivedAtUtc", payload.Metadata.ReceivedAtUtc);
                  parameters.Add("@DeviceID", payload.DeviceId);
                  parameters.Add("@DeviceEui", payload.DeviceEui);
                  parameters.Add("@ApplicationID", payload.ApplicationId);
                  parameters.Add("@IsConfirmed", payload.IsConfirmed);
                  parameters.Add("@IsRetry", payload.IsRetry);
                  parameters.Add("@Port", payload.Port);

                  db.Execute(sql: storedProcedure, param: parameters, commandType: CommandType.StoredProcedure);
               }
            }
         }
      }
      else
      {
         foreach (string storedProcedure in storedProcedureMappings.Keys)
         {
            if (string.Compare(storedProcedureMappings[storedProcedure], "payload_raw", true) == 0)
            {
               log.Info($"Payload raw processing with:{storedProcedure}");

               using (SqlConnection db = new SqlConnection(connectionString))
               {
                  var parameters = new DynamicParameters();

                  parameters.Add("@ReceivedAtUtc", payload.Metadata.ReceivedAtUtc);
                  parameters.Add("@DeviceID", payload.DeviceId);
                  parameters.Add("@DeviceEui", payload.DeviceEui);
                  parameters.Add("@ApplicationID", payload.ApplicationId);
                  parameters.Add("@IsConfirmed", payload.IsConfirmed);
                  parameters.Add("@IsRetry", payload.IsRetry);
                  parameters.Add("@Port", payload.Port);
                  parameters.Add("@Payload", payload.PayloadRaw);

                  db.Execute(sql: storedProcedure, param: parameters, commandType: CommandType.StoredProcedure);
               }
            }
         }
      }
   }
   catch (Exception ex)
   {
      log.Error("Message processing failed", ex);
   }
}

For messages with payload fields the code attempts to match the list of field names (there maybe more than one match) with the parameter list for stored procedures in the AppSettings.json file. The Enumerable.SequenceEqual uses a case insensitive comparison but order is important. I did consider sorting the two lists of parameters but wasn’t certain the added complexity was worth it.

{
   "TTNDatabase": "Server=DESKTOP-1234567;Initial Catalog=Rak7200TrackerTest;Persist Security Info=False;User ID=TopSecret;Password=TopSecret;Connection Timeout=30",
   "MqttServer": "eu.thethings.network",
   "MqttPassword": "ttn-account-TopSecret",
   "ApplicationId": "rak811wisnodetest",
   "MqttClientId": "TTNSQLClient",
   "StoredProcedureMappings": {
      "EnvironmentalSensorProcess": "relative_humidity_0,temperature_0",
      "PayloadRawProcess": "payload_raw",
      "WeatherSensorProcess": "barometric_pressure_0,temperature_0",
      "PositionReportProcess": "accelerometer_3x,accelerometer_3y,accelerometer_3z,analog_in_10,analog_in_11,analog_in_8,analog_in_9,gps_1altitude,gps_1latitude,gps_1longitude,gyrometer_5x,gyrometer_5y,gyrometer_5z"
   }
}

To reduce the scope for mistakes (especially with longer parameter lists) I usually copy them from the Log4Net RollingFileAppender file or ManagedColoredConsoleAppender console output.

Environmental sensor output with flat data format

I created a database table to store the temperature and humidity values.

CREATE TABLE [dbo].[EnvironmentalSensorReport](
	[WeatherSensorReportUID] [UNIQUEIDENTIFIER] NOT NULL,
	[ReceivedAtUtC] [DATETIME] NOT NULL,
	[DeviceID] [NVARCHAR](32) NOT NULL,
	[DeviceEui] [NVARCHAR](32) NOT NULL,
	[ApplicationID] [NVARCHAR](32) NOT NULL,
	[IsConfirmed] [BIT] NOT NULL,
	[IsRetry] [BIT] NOT NULL,
	[Port] [SMALLINT] NOT NULL,
	[Temperature] [FLOAT] NOT NULL,
	[Humidity] [FLOAT] NOT NULL,
CONSTRAINT [PK_EnvironmentalSensorReport] PRIMARY KEY CLUSTERED 
(
	[WeatherSensorReportUID] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]
GO

ALTER TABLE [dbo].[EnvironmentalSensorReport] ADD  CONSTRAINT [DF_EnvironmentalSensorReport_EnvironmentalSensorReporttUID]  DEFAULT (NEWID()) FOR [WeatherSensorReportUID]
GO

The stored procedure must have the parameters @ReceivedAtUtc, @DeviceID, @DeviceEui, @ApplicationID, @IsRetry, @IsConfirmed and @Port. In this example the payload specific fields generated by the Cayenne Low Power Protocol(LPP) decoder are @Temperature_0 and @relative_humidity_0

CREATE PROCEDURE [dbo].[EnvironmentalSensorProcess]
   @ReceivedAtUtc AS DATETIME,
   @DeviceID AS NVARCHAR(32),
   @DeviceEui AS NVARCHAR(32),
   @ApplicationID AS NVARCHAR(32),
   @IsRetry AS BIT,
   @IsConfirmed AS BIT,
   @Port AS SMALLINT,
   @Temperature_0 AS FLOAT,
   @relative_humidity_0 AS FLOAT
AS
BEGIN
   SET NOCOUNT ON;
 
   INSERT INTO [dbo].[EnvironmentalSensorReport]
           ([PositionReportUID]
	   .[ReceivedAtUtc]
           ,[DeviceID]
           ,[DeviceEui]
           ,[ApplicationID]
           ,[IsConfirmed]
           ,[IsRetry]
           ,[Port]
	   ,Temperature
	   ,Humidity)
   VALUES
   (
      @ReceivedAtUtc,
      @DeviceID,
      @DeviceEui,
      @ApplicationID,
      @IsConfirmed,
      @IsRetry,
      @port,
      @Temperature_0,
      @relative_humidity_0)
END
Environmental sensor data displayed in SQL Server Management Studio(SSMS)

To store more complex nest payload fields (e.g. latitude, longitude and altitude values), I flattened the the hierarchy.

private static void EnumerateChildren(DynamicParameters parameters, JToken token, string prefix ="")
{
   if (token is JProperty)
      if (token.First is JValue)
      {
         JProperty property = (JProperty)token;
         parameters.Add($"@{prefix}{property.Name}", property.Value.ToString());
      }
      else
      {
         JProperty property = (JProperty)token;
         prefix += property.Name;
      }

   foreach (JToken token2 in token.Children())
   {
      EnumerateChildren(parameters,token2, prefix);
   }
}
Unpacked LPP payload from GPS tracker displayed in TTN application data view
Flattened location, acceleration and rotation information
CREATE TABLE [dbo].[PositionReport](
      [PositionReportUID] [UNIQUEIDENTIFIER] NOT NULL,
      [ReceivedAtUtC] [DATETIME] NOT NULL,
      [DeviceID] [NVARCHAR](32) NOT NULL,
      [DeviceEui] [NVARCHAR](32) NOT NULL,
      [ApplicationID] [NVARCHAR](32) NOT NULL,
      [IsConfirmed] [BIT] NOT NULL,
      [IsRetry] [BIT] NOT NULL,
      [Port] [SMALLINT] NOT NULL,
      [Latitude] [FLOAT] NOT NULL,
      [Longitude] [FLOAT] NOT NULL,
      [Altitude] [FLOAT] NOT NULL,
 CONSTRAINT [PK_PositionReport] PRIMARY KEY CLUSTERED 
(
	[PositionReportUID] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]
GO

I created a database table to store values of only the fields I cared about.

CREATE PROCEDURE [dbo].[PositionReportProcess]
      @ReceivedAtUtc AS DATETIME,
      @DeviceID AS NVARCHAR(32),
      @DeviceEui AS NVARCHAR(32),
      @ApplicationID AS NVARCHAR(32),
      @IsRetry AS Bit,
      @IsConfirmed AS BIT,
      @Port AS SMALLINT,
      @accelerometer_3x AS FLOAT,
      @accelerometer_3y AS FLOAT,
      @accelerometer_3z AS FLOAT,
      @analog_in_8 AS FLOAT,
      @analog_in_9 AS FLOAT,
      @analog_in_10 AS FLOAT,
      @analog_in_11 AS FLOAT,
      @gps_1Latitude AS FLOAT,
      @gps_1Longitude AS FLOAT,
      @gps_1Altitude AS FLOAT,
      @gyrometer_5x  AS FLOAT, 
      @gyrometer_5y  AS FLOAT, 
      @gyrometer_5z  AS FLOAT 
AS
BEGIN
   SET NOCOUNT ON;

   INSERT INTO [dbo].[PositionReport]
      ([PositionReportUID]
      .[ReceivedAtUtc]
      ,[DeviceID]
      ,[DeviceEui]
      ,[ApplicationID]
      ,[IsConfirmed]
      ,[IsRetry]
      ,[Port]
      ,Latitude
      ,Longitude
      ,Altitude)
   VALUES
   (
      @ReceivedAtUtc,
      @DeviceID,
      @DeviceEui,
      @ApplicationID,
      @IsConfirmed,
      @IsRetry,
      @port,
      @gps_1Latitude,
      @gps_1Longitude,
      @gps_1Altitude)
END

The stored procedure for storing the GPS tracker payload has to have parameters matching each payload field but some of the fields are not used.

Location data displayed in SQL Server Management Studio(SSMS)

For uplink messages with no payload fields the message processor looks for a stored procedure with a single parameter called “payload_raw”.(there maybe more than one match)

CREATE TABLE [dbo].[PayloadReport](
      [PayloadReportUID] [UNIQUEIDENTIFIER] NOT NULL,
      [ReceivedAtUtC] [DATETIME] NOT NULL,
      [DeviceID] [NVARCHAR](32) NOT NULL,
      [DeviceEui] [NVARCHAR](32) NOT NULL,
      [ApplicationID] [NVARCHAR](32) NOT NULL,
      [IsConfirmed] [BIT] NOT NULL,
      [IsRetry] [BIT] NOT NULL,
      [Port] [SMALLINT] NOT NULL,
      [Payload] [NVARCHAR](128) NOT NULL,
CONSTRAINT [PK_PayloadReport] PRIMARY KEY CLUSTERED 
(
      [PayloadReportUID] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]
GO

ALTER TABLE [dbo].[PayloadReport] ADD  CONSTRAINT [DF_PayloadReport_PositionReportUID]  DEFAULT (NEWID()) FOR [PayloadReportUID]
GO
ALTER PROCEDURE [dbo].[PayloadRawProcess]
      @ReceivedAtUtc AS DATETIME,
      @DeviceID AS NVARCHAR(32),
      @DeviceEui AS NVARCHAR(32),
      @ApplicationID AS NVARCHAR(32),
      @IsRetry AS Bit,
      @IsConfirmed AS BIT,
      @Port AS SMALLINT,
      @Payload AS NVARCHAR(128)
AS
BEGIN
      SET NOCOUNT ON;

      INSERT INTO [dbo].[PayloadReport]
         ([PositionReportUID]
         .[ReceivedAtUtc]
         ,[DeviceID]
         ,[DeviceEui]
         ,[ApplicationID]
         ,[IsConfirmed]
         ,[IsRetry]
         ,[Port]
         ,[Payload])
     VALUES(@ReceivedAtUtc,
         @DeviceID,
         @DeviceEui,
         @ApplicationID,
         @IsConfirmed,
         @IsRetry,
         @port,
         @Payload)
END
Raw payload data displayed in SQL Server Management Studio(SSMS)

Initially the application just used Console.Writeline for logging, then I added Log4Net because it would be useful to persist information about failures and so I could copy n paste parameter lists to the appSettings.json file.

To make the application more robust adding a retries with the Enterprise Library Transient Fault Handling and Configuration blocks or Polly on the Dapper Execute would be a good idea. It also would take much work to get the application to run in Microsoft Azure as a “headless” webapp.

Dapper supports a number of database platforms so in theory this application (with a little bit of effort) should be platform portable.

Enterprise Library V6 Data, Exception and Logging with Azure SDK 2.8

I have used the Enterprise library Blocks (which in different forms have been around since 2005) in quite a few projects. Individually the components are pretty good (not always best of breed) but they are well integrated and when used in the way which they were intended to be used work well.

I have just upgraded a client application to Visual Studio 2015 + .Net 4.5 + Enterprise Library V6 and some of the steps were not immediately obvious so hopefully this saves someone else some time. I have sample code for Azure Cloud Service Web and Worker roles.

For both web and worker roles I added the Azure Diagnostics listener to the listener config section of the enterprise library logging settings.

<loggingConfiguration name="" tracingEnabled="true" defaultCategory="General">
	<listeners>
    <add listenerDataType="Microsoft.Practices.EnterpriseLibrary.Logging.Configuration.SystemDiagnosticsTraceListenerData, Microsoft.Practices.EnterpriseLibrary.Logging, Version=6.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35&amp;amp;quot;
         type="Microsoft.WindowsAzure.Diagnostics.DiagnosticMonitorTraceListener, Microsoft.WindowsAzure.Diagnostics, Version=2.8.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35"
         name="AzureDiagnosticTraceListener"/>
   </listeners>
...
</loggingConfiguration>

I then enabled diagnostics on the role and configured the transfer of logs.

Azure Diagnostics configuration dialog

Azure Diagnostic Configuration

This replaces the DiagnosticMonitorConfiguration based approach

DiagnosticMonitorConfiguration diagConfig = DiagnosticMonitor.GetDefaultInitialConfiguration();
diagConfig.Logs.ScheduledTransferLogLevelFilter = LogLevel.Verbose;

// Enable scheduled transfer
diagConfig.Directories.ScheduledTransferPeriod = TimeSpan.FromMinutes(1);
diagConfig.Logs.ScheduledTransferPeriod = TimeSpan.FromMinutes(1);

...
DiagnosticMonitor.Start(&amp;amp;quot;Microsoft.WindowsAzure.Plugins.Diagnostics.ConnectionString&amp;amp;quot;, diagConfig);

For the web role I configured the exception and logging blocks in the Global.asax.cs file


protected void Application_Start()
{
   AreaRegistration.RegisterAllAreas();
   GlobalConfiguration.Configure(WebApiConfig.Register);
   FilterConfig.RegisterGlobalFilters(GlobalFilters.Filters);
   RouteConfig.RegisterRoutes(RouteTable.Routes);
   BundleConfig.RegisterBundles(BundleTable.Bundles);

   // Load the Entlib logging block configuration
   LogWriterFactory logWriterFactory = new LogWriterFactory();
   LogWriter logWriter = logWriterFactory.Create();
   Logger.SetLogWriter(logWriter);

   // Load the Entlib Exception block configuration
   ExceptionPolicyFactory policyFactory = new ExceptionPolicyFactory();
   exManager = policyFactory.CreateManager();
}

For the worker role I configured the exception and logging blocks in the worker role startup

public override bool OnStart()
{
   // Set the maximum number of concurrent connections
   ServicePointManager.DefaultConnectionLimit = 12;
   ...
   LogWriterFactory logWriterFactory = new LogWriterFactory();
   LogWriter logWriter = logWriterFactory.Create();
   Logger.SetLogWriter(logWriter);
   ...
   return result;
}

Then in the webrole webapi2 API controllers you can use embedded SQL or call stored procedures with retries. (This sample code uses the Northwind database and default retry configuration)

public IEnumerable&amp;amp;lt;ProductDto&amp;amp;gt; Get()
{
var products = new List&amp;amp;lt;ProductDto&amp;amp;gt;();

WebApiApplication.exManager.Process(() =&amp;amp;gt;
{
Database db = new DatabaseProviderFactory().Create(&amp;amp;quot;NorthwindInstance&amp;amp;quot;);

RetryPolicy retry = new RetryPolicy&amp;amp;lt;SqlDatabaseTransientErrorDetectionStrategy&amp;amp;gt;(RetryStrategy.DefaultExponential);

var productAccessor = db.CreateSqlStringAccessor(
&amp;amp;quot;SELECT [ProductID],[ProductName],[QuantityPerUnit],[UnitPrice],[UnitsInStock],[Discontinued] FROM Products&amp;amp;quot;,
MapBuilder&amp;amp;lt;ProductDto&amp;amp;gt;
.MapAllProperties()
.Map(p =&amp;amp;gt; p.ID).ToColumn(&amp;amp;quot;ProductID&amp;amp;quot;)
.Map(p =&amp;amp;gt; p.Name).ToColumn(&amp;amp;quot;ProductName&amp;amp;quot;)
.Map(p =&amp;amp;gt; p.QuantityPerUnit).ToColumn(&amp;amp;quot;QuantityPerUnit&amp;amp;quot;)
.Map(p =&amp;amp;gt; p.UnitPrice).ToColumn(&amp;amp;quot;UnitPrice&amp;amp;quot;)
.Map(p =&amp;amp;gt; p.UnitsInStock).ToColumn(&amp;amp;quot;UnitsInStock&amp;amp;quot;)
.Map(p =&amp;amp;gt; p.Discontinued).ToColumn(&amp;amp;quot;Discontinued&amp;amp;quot;)
.Build());
products = retry.ExecuteAction(() =&amp;amp;gt;
{
return productAccessor.Execute().ToList();
});

}, &amp;amp;quot;ProductService&amp;amp;quot;);

return products;
}
public ProductDto Get(int id)
{
ProductDto productDto = null;

WebApiApplication.exManager.Process(() =&amp;amp;gt;
{
Database db = new DatabaseProviderFactory().Create(&amp;amp;quot;NorthwindInstance&amp;amp;quot;);

var productAccessor = db.CreateSqlStringAccessor(
&amp;amp;quot;SELECT [ProductID],[ProductName],[QuantityPerUnit],[UnitPrice],[UnitsInStock],[Discontinued] FROM Products WHERE [ProductID]=@ProductID&amp;amp;quot;,
new ProdductGetByProductIdParameterMapper(db),
MapBuilder&amp;amp;amp;lt;ProductDto&amp;amp;amp;gt;
.MapAllProperties()
.Map(p =&amp;amp;gt; p.ID).ToColumn(&amp;amp;quot;ProductID&amp;amp;quot;)
.Map(p =&amp;amp;gt; p.Name).ToColumn(&amp;amp;quot;ProductName&amp;amp;quot;)
.Map(p =&amp;amp;gt; p.QuantityPerUnit).ToColumn(&amp;amp;quot;QuantityPerUnit&amp;amp;quot;)
.Map(p =&amp;amp;gt; p.UnitPrice).ToColumn(&amp;amp;quot;UnitPrice&amp;amp;quot;)
.Map(p =&amp;amp;gt; p.UnitsInStock).ToColumn(&amp;amp;quot;UnitsInStock&amp;amp;quot;)
.Map(p =&amp;amp;gt; p.Discontinued).ToColumn(&amp;amp;quot;Discontinued&amp;amp;quot;)
.Build());

productDto = productAccessor.Execute(id).SingleOrDefault();

}, &amp;amp;quot;ProductService&amp;amp;quot;);

return productDto;
}