Apache beam python groupbykey with kafka io streaming data

380 views Asked by At

I'm trying to create fixed windows of 10 sec using apache beam 2.23 with kafka as data source. It seems to be getting triggered for every record even if I try to set AfterProcessingtime trigger to 15 and throwing the following error if I try to use GroupByKey. Error : KeyError: 0 [while running '[17]: FixedWindow']

Data simulation :

from kafka import KafkaProducer
import time
producer = KafkaProducer()
id_val = 1001
while(1):
    message = {}
    message['id_val'] = str(id_val)
    message['sensor_1'] = 10
    if (id_val<1003):
        id_val = id_val+1
    else:
        id_val=1001
    time.sleep(2)
    print(time.time())
    producer.send('test', str(message).encode())

Beam snippet :


class AddTimestampFn(beam.DoFn):
  def process(self, element):
    timestamp = int(time.time())
    yield beam.window.TimestampedValue(element, timestamp)



pipeline_options = PipelineOptions()
pipeline_options.view_as(StandardOptions).streaming = True
p = beam.Pipeline(options=pipeline_options)

with beam.Pipeline() as p:
lines = p | "Reading messages from Kafka" >> kafkaio.KafkaConsume(kafka_config)
groups = (
    lines
    | 'ParseEventFn' >> beam.Map(lambda x: (ast.literal_eval(x[1])))
    | 'Add timestamp' >> beam.ParDo(AddTimestampFn())
    | 'After timestamp add ' >> beam.ParDo(PrintFn("timestamp add"))
    | 'FixedWindow' >> beam.WindowInto(
        beam.window.FixedWindows(10*1),allowed_lateness = 30)
    | 'Group ' >> beam.GroupByKey())
    | 'After group' >> beam.ParDo(PrintFn("after group")))

What am I doing wrong here? I have just started using beam so it could be something really silly.

0

There are 0 answers