Consume kafka topic from Apache Beam

30 views Asked by At

I'm trying to catch the JSON events via Apache Beam and do the transformation and loading.

Since I'm in the development and R&D phases, I created a local Kafka server and created Python code to work as a producer and generate sample JSON messages.

The issue is as below:

The code im trying

def run(beam_args: Optional[List[str]] = None) -> None:
    TOPIC = "topic"
    OUTPUT_FILE = "output.json"
    beam_options = PipelineOptions(beam_args, save_main_session=True)
    beam_options.view_as(StandardOptions).streaming = True
    with beam.Pipeline(options=beam_options) as pipeline:
        msg_kv_bytes = (pipeline | 'ReadData' >> ReadFromKafka(consumer_config=CONSUMER_CONFIG,
                                                               topics=[TOPIC],
                                                               ))

        def process_and_write_to_file(message):
            key, value = message
            json_data = json.loads(value)
            with open(OUTPUT_FILE, 'a') as output_file:
                output_file.write(json.dumps(json_data) + '\n')

        _ = msg_kv_bytes | 'ProcessAndWriteToFile' >> beam.Map(process_and_write_to_file)

Expected: write messages in a file when it receive new messages

Results: The code is running without any errors or exceptions, and there is no console or output.json file created

Actions:

I have changed ReadFromKafka constructor with max_num_records=1 as below

msg_kv_bytes = (pipeline | 'ReadData' >> ReadFromKafka(consumer_config=CONSUMER_CONFIG,
                                                       topics=[TOPIC],
                                                       max_num_records=1
                                                       ))

Results: Once the producer generates a message, this will create an output.json file, and then execution will stop.

So my concern is why the code not work according to expected behavior.

0

There are 0 answers