Debezium reads binlog newest if offset storage is empty or just created&

1.2k views Asked by At

I am running reading binlogs with Debezium, but when I start new reading thread it reads all create statements for table from the beginning, but I dont need them(op=c). I need to handle create/update/delete events that happens after I run code first time. And than work with correct offset (that stored in file "tmp/offsets.dat"), so how I can set initial configuration in this way? So the flow need to be the next one:

  • start reading(first time) -> take current(latest position from binlog and save it, work from here) and handle newest events
  • start reading(not the first run) -> take latest position from file and read data as usual

Here is my current configurations

  config = Configuration.empty().withSystemProperties(Function.identity()).edit()
            .with(MySqlConnectorConfig.SERVER_NAME, SERVER_NAME)
            .with(MySqlConnectorConfig.SKIPPED_OPERATIONS, "r")
            .with(MySqlConnectorConfig.HOSTNAME, HOSTNAME)
            .with(MySqlConnectorConfig.PORT, PORT)
            .with(MySqlConnectorConfig.USER, USER)
            .with(MySqlConnectorConfig.PASSWORD, PASSWORD)
            .with(MySqlConnectorConfig.TABLE_WHITELIST, TABLE_WHITELIST)
            .with(MySqlConnectorConfig.SERVER_ID, 100)
            // 
            .with(EmbeddedEngine.OFFSET_STORAGE, "org.apache.kafka.connect.storage.FileOffsetBackingStore")
            .with(EmbeddedEngine.OFFSET_STORAGE_FILE_FILENAME, "tmp/offsets.dat")
            .with(EmbeddedEngine.CONNECTOR_CLASS, "io.debezium.connector.mysql.MySqlConnector")
            .with(EmbeddedEngine.ENGINE_NAME, SERVER_NAME)
            //
            .with(MySqlConnectorConfig.DATABASE_HISTORY, "io.debezium.relational.history.FileDatabaseHistory")
            .with("database.history.file.filename", "tmp/dbhistory.dat")
            // Send JSON without schema
            .with("schemas.enable", false)
            .build();

and my.cnf values for binlogs

[mysqld]
log-bin=mysql-bin.log
server_id=100
binlog_row_image=full
binlog-format=row
expire_logs_days =10
0

There are 0 answers