I use Flink(1.9.2) and MongoDB. I want to customize a sink to output some messages into mongoDB for test. But after I finished and run it, My Job cannot take savepoint
My Sink:
public class MongoDBSink extends RichSinkFunction<GeneralizedMessage> implements CheckpointedFunction, CheckpointListener {
private static final Logger logger = LoggerFactory.getLogger(MongoDBSink.class);
private MongoCollection sinkCollection;
private Random random = new Random();
// init mongo connection.
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
MongoConfigurationCenter mongoCenter = MongoConfigurationCenter.getInstance();
MongoClient mongoClient = mongoCenter.getMongoClient();
MongoDatabase database = mongoClient.getDatabase("xxxx");
sinkCollection = database.getCollection(sinkCollectionName);
}
/** Writes the given value to the sink. This function is called for every record.*/
@Override
public void invoke(GeneralizedMessage value, Context context) throws Exception {
// some logic
sinkCollection.insertOne(doc);
// wait for some times.
Thread.sleep(200 + random.nextInt(1000));
}
/** checkpoint methods */
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
logger.info("notifyCheckpointComplete");
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
logger.info("snapshotState");
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
logger.info("initializeState");
}
}
When I click savepoint: The sink operator can not be Acknowledged.
Logs I cannot find error logs in JobManager and TaskManager, It seems fine, but with failed savepoint.