Create Flink Datastream from Postgres table

713 views Asked by At

I am trying to process large streams of data (source = Kinesis stream) and sink in to Postgres DB. While doing so, I need to first join the incoming stream with some master data which is already present in the Postgres DB.

I am creating a keyed stream from incoming kinesis stream and using JDBC catalog creating a second stream using Flink table API. I have setup my Database sink as follows :

  public class PosgresSink extends RichSinkFunction <Clazz> implements CheckpointedFunction, CheckpointListener { .. }

So that every time Flinks makes a checkpoint sink gets triggered.

However when I do a join with the incoming stream from JDBC source, I am getting following INFO:

   org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Checkpoint triggering task Source TableSourceScan(table ...
    ...
   (1/4) of job 3961ccdf0d2a7da504f61e094a74fa5f is not in state RUNNING but FINISHED instead. Aborting checkpoint

This is blocking my sink as checkpoints are getting aborted every time.

It seems that my JDBC source is getting finished quite early and when Flink tries to checkpoint, it does not find any running job and it aborts the checkpoint. There seems to be a limitation with Flink that it only checkpoints when all the operators/tasks are still running

https://issues.apache.org/jira/browse/FLINK-2491

I am setting up my JDBC stream as follows :

 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
 StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env, bsSettings);

// Register a JDBC catalog 

static JdbcCatalog registerJdbcCatalog(StreamTableEnvironment bsTableEnv)   {
   String name = "<>";
   String defaultDatabase = "<>";
   String username = "<>";
   String password = "<>";
   String baseUrl = "jdbc:postgresql://localhost:5432/";

   JdbcCatalog jdbcCatalog = new JdbcCatalog(name, defaultDatabase, username, password, baseUrl);
   bsTableEnv.registerCatalog("catalogName", jdbcCatalog);
   bsTableEnv.useCatalog("catalogName");
   return jdbcCatalog;
 }

// get the table
Table table= bsTableEnv.from("table")

// create a data stream from table
DataStream<Table> myStream= bsTableEnv.toAppendStream(table, Table.class);

Is this correct understanding and is there a way around here ?

0

There are 0 answers