Flink, How to create a Sink supported Savepoint?

189 views Asked by At

I use Flink(1.9.2) and MongoDB. I want to customize a sink to output some messages into mongoDB for test. But after I finished and run it, My Job cannot take savepoint

My Sink:

public class MongoDBSink extends RichSinkFunction<GeneralizedMessage> implements CheckpointedFunction, CheckpointListener {
    private static final Logger logger = LoggerFactory.getLogger(MongoDBSink.class);

    private MongoCollection sinkCollection;
    private Random random = new Random();

    // init mongo connection.
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        MongoConfigurationCenter mongoCenter = MongoConfigurationCenter.getInstance();
        MongoClient mongoClient = mongoCenter.getMongoClient();
        MongoDatabase database = mongoClient.getDatabase("xxxx");
        sinkCollection = database.getCollection(sinkCollectionName);
    }

    /** Writes the given value to the sink. This function is called for every record.*/
    @Override
    public void invoke(GeneralizedMessage value, Context context) throws Exception {
        // some logic
        sinkCollection.insertOne(doc);
        // wait for some times.
        Thread.sleep(200 + random.nextInt(1000));
    }



    /** checkpoint methods */
    @Override
    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        logger.info("notifyCheckpointComplete");
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        logger.info("snapshotState");
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        logger.info("initializeState");
    }
}

When I click savepoint: The sink operator can not be Acknowledged. ErrorMessage

Logs I cannot find error logs in JobManager and TaskManager, It seems fine, but with failed savepoint.

0

There are 0 answers