I am running reading binlogs with Debezium, but when I start new reading thread it reads all create statements for table from the beginning, but I dont need them(op=c
). I need to handle create/update/delete events that happens after I run code first time. And than work with correct offset (that stored in file "tmp/offsets.dat"), so how I can set initial configuration in this way? So the flow need to be the next one:
- start reading(first time) -> take current(latest position from binlog and save it, work from here) and handle newest events
- start reading(not the first run) -> take latest position from file and read data as usual
Here is my current configurations
config = Configuration.empty().withSystemProperties(Function.identity()).edit()
.with(MySqlConnectorConfig.SERVER_NAME, SERVER_NAME)
.with(MySqlConnectorConfig.SKIPPED_OPERATIONS, "r")
.with(MySqlConnectorConfig.HOSTNAME, HOSTNAME)
.with(MySqlConnectorConfig.PORT, PORT)
.with(MySqlConnectorConfig.USER, USER)
.with(MySqlConnectorConfig.PASSWORD, PASSWORD)
.with(MySqlConnectorConfig.TABLE_WHITELIST, TABLE_WHITELIST)
.with(MySqlConnectorConfig.SERVER_ID, 100)
//
.with(EmbeddedEngine.OFFSET_STORAGE, "org.apache.kafka.connect.storage.FileOffsetBackingStore")
.with(EmbeddedEngine.OFFSET_STORAGE_FILE_FILENAME, "tmp/offsets.dat")
.with(EmbeddedEngine.CONNECTOR_CLASS, "io.debezium.connector.mysql.MySqlConnector")
.with(EmbeddedEngine.ENGINE_NAME, SERVER_NAME)
//
.with(MySqlConnectorConfig.DATABASE_HISTORY, "io.debezium.relational.history.FileDatabaseHistory")
.with("database.history.file.filename", "tmp/dbhistory.dat")
// Send JSON without schema
.with("schemas.enable", false)
.build();
and my.cnf
values for binlogs
[mysqld]
log-bin=mysql-bin.log
server_id=100
binlog_row_image=full
binlog-format=row
expire_logs_days =10