Output multiple tuples at same time in apache beam pipeline

208 views Asked by At

I want to know is there any way to output multiple tuples at the same time so that it can intiate all 4 tuples steps at time. Currently In my batch pipline once firsttuple outputed its finishing the first tuple operations. But I need to output all the tuples at the same time so that it can initiate all the tuple next processes same time. I have one list from where I am doing output like below :

    for (int i = 0; i < listlinedto.size(); i++){
      if(i==0) {
        LOG.info("Size of first tuple:{}", listlinedto.get(i).size());
        accountingDTO
                .setFeedStatus(feedStatusDTO)
                .addAllLineStatuses(listlinedto.get(i))
                .build();
        c.output(firsttuple,accountingDTO.build());
      }else if(i==1){
        LOG.info("Size of second tuple:{}", listlinedto.get(i).size());
        accountingDTO
                .setFeedStatus(feedStatusDTO)
                .addAllLineStatuses(listlinedto.get(i))
                .build();
        c.output(firsttuple,accountingDTO.build());

      }else if(i==2){
        LOG.info("Size of third tuple:{}", listlinedto.get(i).size());
        accountingDTO
                .setFeedStatus(feedStatusDTO)
                .addAllLineStatuses(listlinedto.get(i))
                .build();
        c.output(thirdtuple,accountingDTO.build());
      }else if(i==3){
        LOG.info("Size of forth tuple:{}", listofArray.get(i).length);
        accountingDTO
                .setFeedStatus(feedStatusDTO)
                .addAllLineStatuses(listlinedto.get(i))
                .build();
        c.output(forthtuple,accountingDTO.build());

      }
1

There are 1 answers

0
chamikara On

There's no way to control exact timing for the execution of downstream steps in a Beam batch pipeline.

In your case though, this might also depend on the structure of the pipeline. Do each of the else cases (I assume within a DoFn) get called for different input elements ? If so can you restructure your pipeline so that all cases get invoked for the same input element ?