Store single json from azure iot hub to datalake2

301 views Asked by At

I added iot hub and devices. All data from iot hub is saved to data lake 2 in json format. Works fine but if there are several messages at once from device, it is saved in a single json. It causes some troubles... Is there a way to save each message-event in a separate json? I've looked through settings of iot hub but found nothing.

1

There are 1 answers

2
Roman Kiss On BEST ANSWER

There is no such as settings for always forwarding a single message to the storage in the IoT Hub routing mechanism. Basically this requirement can be implemented by azure function either in the stream pipeline consumer (IoTHubTrigger) or in the event grid subscriber (EventGridTrigger).

Update:

The following is an example of the IoTHubTrigger function with an output blob binding to the container of the Data Lake Storage Gen2:

run.csx:

#r "Microsoft.Azure.EventHubs"
#r "Newtonsoft.Json"
#r "Microsoft.WindowsAzure.Storage"

using System;
using System.IO;
using System.Text;
using System.Linq;
using Microsoft.Azure.EventHubs;
using Microsoft.WindowsAzure.Storage.Blob;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;

public static async Task Run(EventData ed, CloudBlockBlob outputBlob, ILogger log)
{   
    //log.LogInformation($"DeviceId = {ed.SystemProperties["iothub-connection-device-id"]}\r\n{JObject.Parse(Encoding.ASCII.GetString(ed.Body))}");  

    var msg = new { 
        EnqueuedTimeUtc = ed.SystemProperties["iothub-enqueuedtime"],
        Properties = ed.Properties,
        SystemProperties = new {
          connectionDeviceId = ed.SystemProperties["iothub-connection-device-id"], 
          connectionAuthMethod = ed.SystemProperties["iothub-connection-auth-method"],
          connectionDeviceGenerationId = ed.SystemProperties["iothub-connection-auth-generation-id"],
          enqueuedTime = ed.SystemProperties["iothub-enqueuedtime"]   
        },
        Body = JObject.Parse(Encoding.ASCII.GetString(ed.Body))
    };

    byte[] buffer = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(msg));
    await outputBlob.UploadFromStreamAsync(new MemoryStream(buffer));

    await Task.CompletedTask;
}

function.json:

{
  "bindings": [
    {
      "name": "ed",
      "connection": "rk2020iot_IOTHUB",
      "eventHubName": "rk2020iot_IOTHUBNAME",
      "consumerGroup": "function",
      "cardinality": "one",
      "direction": "in",
      "type": "eventHubTrigger"
    },
    {
      "name": "outputBlob",
      "path": "iot/rk2020iot/{DateTime}.json",
      "connection": "rk2020datalake2_STORAGE",
      "direction": "out",
      "type": "blob"
    }
  ]
}