I'm trying to catch the JSON events via Apache Beam and do the transformation and loading.
Since I'm in the development and R&D phases, I created a local Kafka server and created Python code to work as a producer and generate sample JSON messages.
The issue is as below:
The code im trying
def run(beam_args: Optional[List[str]] = None) -> None:
TOPIC = "topic"
OUTPUT_FILE = "output.json"
beam_options = PipelineOptions(beam_args, save_main_session=True)
beam_options.view_as(StandardOptions).streaming = True
with beam.Pipeline(options=beam_options) as pipeline:
msg_kv_bytes = (pipeline | 'ReadData' >> ReadFromKafka(consumer_config=CONSUMER_CONFIG,
topics=[TOPIC],
))
def process_and_write_to_file(message):
key, value = message
json_data = json.loads(value)
with open(OUTPUT_FILE, 'a') as output_file:
output_file.write(json.dumps(json_data) + '\n')
_ = msg_kv_bytes | 'ProcessAndWriteToFile' >> beam.Map(process_and_write_to_file)
Expected: write messages in a file when it receive new messages
Results: The code is running without any errors or exceptions, and there is no console or output.json file created
Actions:
I have changed ReadFromKafka constructor with max_num_records=1 as below
msg_kv_bytes = (pipeline | 'ReadData' >> ReadFromKafka(consumer_config=CONSUMER_CONFIG,
topics=[TOPIC],
max_num_records=1
))
Results: Once the producer generates a message, this will create an output.json file, and then execution will stop.
So my concern is why the code not work according to expected behavior.