Ingesting json message into even hub by using python

61 views Asked by At

I am trying to ingest the json message into AZURE Event Hub My problem is size of json message since Event hub has a limit of 1MB I have one big json message which consists of multiple json message

DATA = [{"Id": "393092", "UID": "7f0034ee", "date": "2023-01-06", "f_id": "430", "origin": "CN"}, {"Id": "393092", "UID": "7f0034ee", "date": "2023-01-06", "f_id": "430", "origin": "CN"}, {"Id": "393092", "UID": "7f0034ee", "date": "2023-01-06", "f_id": "430", "origin": "CN"}]

This Data is an example. DATA is already in json format but DATA contains 10000+ json event with same format I would like to ingest this json message into event hub

Can anyone help me out How can I ingest this one big message into event hub? by slicing it or some other way

Thanks a lot !

Yoonsik

I tried slicing it but the number of events within one json message always different and very big...

1

There are 1 answers

0
Anupam Chand On

The most efficient way is to split up the huge message list to into multiple batches and send the batches one by one to Eventhub. The size of the batch is determined by the size of each individual message and keeping in mind that you can only send 1MB in a single batch. Assuming the avg size of a single message is 100 bytes, you get around 10K messages per batch. You can reduce this to 5000-8000 to be safe.

Below is a piece of code which does breaks up the original message JSON array (DATA) to individual batches of JSON arrays and sends them one at a time to the eventhub. You can adjust batch_limit to anything between 5000-8000.Since you said the number of messages in the array can be 10K+ you will be sending this in barely 2-3 batches.

import time
import asyncio
import os
import json

from azure.eventhub.aio import EventHubProducerClient
from azure.eventhub.exceptions import EventHubError
from azure.eventhub import EventData

CONNECTION_STR = 'Endpoint=sb://xxxxxxxxxxxx='
EVENTHUB_NAME = 'xxxxxxxxxxxxxx'

DATA = [{"Id": "393092", "UID": "7f0034ee", "date": "2023-01-06", "f_id": "430", "origin": "CN"}, {"Id": "393092", "UID": "7f0034ee", "date": "2023-01-06", "f_id": "430", "origin": "CN"}, {"Id": "393092", "UID": "7f0034ee", "date": "2023-01-06", "f_id": "430", "origin": "CN"}]
batch_limit = 2
async def run():
    print('started')
    producer = EventHubProducerClient.from_connection_string(
        conn_str=CONNECTION_STR,
        eventhub_name=EVENTHUB_NAME
    )
    batch_cnt = 0
    msg_array = []
    for DATA_msg in DATA:
      msg_array.append(DATA_msg)
      batch_cnt += 1
      if batch_cnt > batch_limit:
        async with producer:
            event_data_batch = await producer.create_batch()
            for msg in msg_array:
                event_data_batch.add(EventData(json.dumps(msg)))
            await producer.send_batch(event_data_batch)
            print('sent a batch of messages')
        batch_cnt = 0
        msg_array = []
    if batch_cnt > 0:
       async with producer:
            event_data_batch = await producer.create_batch()
            for msg in msg_array:
                event_data_batch.add(EventData(json.dumps(msg)))
            await producer.send_batch(event_data_batch)
            print('sent remaining messages as last batch')
      
start_time = time.time()
asyncio.run(run())
print("Send messages in {} seconds.".format(time.time() - start_time))