I'm creating a test to see if the timeout of my flink pattern functions correctly. I'm using flink spector for this and I have the following testcase:
@Test
public void SameDoor_TwoStatuses_OneSecondTimeoutPattern() {
// Arrange
long now = new Date().getTime();
DoorEvent event1 = new DoorEvent();
event1.setId(123);
event1.getDoor().setId(1);
event1.getDoor().setStatus("statusaaaaaa");
event1.setTimestamp(now);
EventTimeInputBuilder<DoorEvent> builder = EventTimeInputBuilder.startWith(event1, event1.getTimestamp());
DataStream<DoorEvent> stream = createTestStream(builder).assignTimestampsAndWatermarks(new TestTimestampExtractor<DoorEvent>());
// Act
Pattern<DoorEvent, ?> pattern = StatusNotFollowedByAnotherStatusPattern.getPatternForSameDoor(1, "firstevent", "statusaaaaaa","secondevent", "status2");
PatternStream<DoorEvent> pStream = CEP.pattern(stream, pattern);
DataStream<Either<Integer,Tuple2<Integer,Integer>>> patterns = pStream.select(getEventIdOfTimeoutEvent(),selectEventIdsOfPatterns()).forward();
patterns.print(); // prints Left(123)
ExpectedRecords<Either<Integer,Tuple2<Integer,Integer>>> expectedRecords =
new ExpectedRecords<Either<Integer,Tuple2<Integer,Integer>>>()
.expect(new Left<Integer, Tuple2<Integer,Integer>>(123));
expectedRecords.refine().sameFrequency();
// Assert
assertStream(patterns, expectedRecords);
}
private PatternSelectFunction<DoorEvent, Tuple2<Integer, Integer>> selectEventIdsOfPatterns(){
return new PatternSelectFunction<DoorEvent, Tuple2<Integer,Integer>>() {
private static final long serialVersionUID = 3830508947015151715L;
@Override
public Tuple2<Integer,Integer> select(Map<String, List<DoorEvent>> pattern) throws Exception {
Tuple2<Integer,Integer> t = new Tuple2<Integer,Integer>();
t.f0 = pattern.get("firstevent").get(0).getId();
t.f1 = pattern.get("secondevent").get(0).getId();
return t;
}
};
}
private PatternTimeoutFunction<DoorEvent, Integer> getEventIdOfTimeoutEvent(){
return new PatternTimeoutFunction<DoorEvent, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer timeout(Map<String, List<DoorEvent>> arg0, long arg1) throws Exception {
int id = arg0.get("firstevent").get(0).getId();
System.out.println("Timeout triggered on eventstatus " + arg0.get("firstevent").get(0).getDoor().getStatus());
return id;
}
};
}
My code does print the status statusaaaaaa
, which is the status of my first event that is in the pattern, in the patternTimeoutFunction
. the second status is not detected within the defined timeperiod so the timeout gets called and adds an integer to the patterns stream. How do I say in my ExpectedRecords that I expect an Either Left with a value of 123?
EDIT
The error I currently have is:
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:933)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: Exception occurred while processing valve output watermark:
at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:530)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
at org.apache.flink.cep.operator.TimeoutKeyedCEPPatternOperator.emitTimedOutSequences(TimeoutKeyedCEPPatternOperator.java:77)
at org.apache.flink.cep.operator.TimeoutKeyedCEPPatternOperator.advanceTime(TimeoutKeyedCEPPatternOperator.java:68)
at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:242)
at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275)
at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286)
... 7 more
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:530)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:575)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:536)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
... 18 more
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Type extraction is not possible on Either type as it does not contain information about the 'left' type.
at org.apache.flink.api.java.typeutils.EitherTypeInfoFactory.createTypeInfo(EitherTypeInfoFactory.java:37)
at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoFromFactory(TypeExtractor.java:1233)
at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForObject(TypeExtractor.java:2054)
at org.apache.flink.api.java.typeutils.TypeExtractor.getForObject(TypeExtractor.java:2044)
at io.flinkspector.datastream.functions.TestSink.invoke(TestSink.java:82)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:41)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
... 26 more
The problem is a bug in spectors
TestSink
function. TheTestSink
function tries to extract the generic parameter ofLeft
at runtime which is not possible. Instead it would be necessary to pass this information into theTestSink
function when it is instantiated in order to create the correct type serializer. Please open a corresponding issue at the Github repository to let the developers know.