Unbounded Collection based stream in Flink

327 views Asked by At

Is it possible to create an unbounded collection streams in flink. Like in a map if we add a element flink should process as in the socket stream. It should not exit once the initial elements are read.

1

There are 1 answers

0
kkrugler On

You can create a custom SourceFunction that never terminates (until cancel() is called, and emits elements as they appear. You'd want to have a class that looks something like:

class MyUnboundedSource extends RichParallelSourceFunction<MyType> {

    ...
    private transient volatile boolean running;
    ...

    @Override
    public void run(SourceContext<MyType> ctx) throws Exception {
        while (running) {
            // Call some method that returns the next record, if available.
            MyType record = getNextRecordOrNull();
            if (record != null) {
                ctx.collect(record);
            } else {
                Thread.sleep(NO_DATA_SLEEP_TIME());
            }
        }
    }

    @Override
    public void cancel() {
        running = false;
    }
}

Note that you'd need to worry about saving state for this to support at least once or exactly once generation of records.