We are using a few side Inputs access data that would be used in filtering out elements in the pipelines. The source of the side input data is redis instance in Google cloud memory store. The jobs are deployed using the generated pipeline templates via the Dataflow service APIs.
The side inputs are generated as below
PCollectionView<Map<String, String>> map = p.apply("Generate a sequence for extracting resource_id-> map", GenerateSequence.from(0).withRate(1,Duration.standardSeconds(Long.parseLong(options.getRulesRefreshDuration().get()))))
.apply("Get all rules from Redis for resourceId-> map", ParDo.of(new GetAllRulesFromRedis(Integer.parseInt(options.getRedisPort().get()), options.getRedisHost().get(), statsdHost, statsdPort)))
.apply("Transform all Rules into Id-> map", ParDo.of(new TransformRulesAsJourneyIdToMetadataMap(statsdHost, statsdPort, Rule.CONDITION_CHECK)))
.apply("Window Id->map", Window.<Map<String, String>>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
.discardingFiredPanes())
.apply("singleton view of Id -> map", View.asSingleton());
and accessed as below:
PCollection<UserData> computed = ...
PCollection<UserData> computeWithSideInput = computed.
apply("applying sideinput ",
ParDo.of(new Dofn(...)
.withSideInputs(map));
The pipelines are being run on Google cloud Dataflow and We have been noticing 2 kinds of issues
- Despite closing the window on the side input collection when the first element occurs, we are seeing this error. Strange thing is we don't see these errors occurring across all the workers in the job and we have also noticed this issue to get resolved during new deployments and only to resurface later.
java.lang.IllegalArgumentException: PCollection with more than one element accessed as a singleton view. Consider using Combine.globally().asSingleton() to combine the PCollection into a single value at
org.apache.beam.sdk.transforms.View$SingletonCombineFn.apply(View.java:434) at org.apache.beam.sdk.transforms.Combine$BinaryCombineFn.addInput(Combine.java:524) at org.apache.beam.sdk.transforms.Combine$BinaryCombineFn.addInput(Combine.java:493) at org.apache.beam.runners.dataflow.worker.WindmillStateInternals$WindmillCombiningState.add(WindmillStateInternals.java:2051) at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SystemReduceFn.processValue(SystemReduceFn.java:119) at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.processElement(ReduceFnRunner.java:613) at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:360) at org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:96) at org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:43) at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:121) at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73) at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80) at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:137) at org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44) at org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49) at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:218) at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:169) at org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83) at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1450) at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:165) at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1125) at org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor.lambda$executeLockHeld$0(BoundedQueueExecutor.java:133) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
- Secondly, we have noticed the side input does not contain data even when the underlying data exists. This side input view is a map, when this occurs, we see empty keys and break our code while accessing it.
Has anyone faced similar issues? I would greatly help if someone could shed some light on this seemingly non-deterministic behaviour of the side inputs when created as Singletons.