Now, I have the below code:

PCollection<String> input_data =
    pipeline
        .apply(PubsubIO
            .Read
            .withCoder(StringUtf8Coder.of())
            .named("ReadFromPubSub")
            .subscription("/subscriptions/project_name/subscription_name"));
3

There are 3 answers

0
jkff On BEST ANSWER

Looks like you want to read some messages from pubsub and convert each of them to multiple parts by splitting a message on space characters, and then feed the parts to the rest of your pipeline. No special configuration of PubsubIO is needed, because it's not a "reading data" problem - it's a "transforming data you have already read" problem - you simply need to insert a ParDo which takes your "composite" record and breaks it down in the way you want, e.g.:

PCollection<String> input_data =
pipeline
    .apply(PubsubIO
        .Read
        .withCoder(StringUtf8Coder.of())
        .named("ReadFromPubSub")
        .subscription("/subscriptions/project_name/subscription_name"))
    .apply(ParDo.of(new DoFn<String, String>() {
      public void processElement(ProcessContext c) {
        String composite = c.element();
        for (String part : composite.split(" ")) {
          c.output(part);
        }
      }}));
    }));
1
Pieter De Maeyer On

I take it you mean that the data you want is present in different elements of the PCollection and want to extract and group it somehow.

A possible approach is to write a DoFn function that processes each String in the PCollection. You output a key value pair for each piece of data you want to group. You can then use the GroupByKey transform to group all the relevant data together.

For example you have the following messages from pubsub in your PCollection:

  1. User 1234 bought item A
  2. User 1234 bought item B

The DoFn function will output a key value pair with the user id as key and the item bought as value. ( <1234,A> , <1234, B> ). Using the GroupByKey transform you group the two values together in one element. You can then perform further processing on that element.

This is a very common pattern in bigdata called mapreduce.

0
mr blobby On

You can output an Iterable<A> then use Flatten to squash it. Unsurprisingly this is termed flatMap in many next-gen data processing platforms, c.f. spark / flink.