I am trying to work with Debezium engine and postgresql connector to listen to my postgres DB changes. The configurations:
io.debezium.config.Configuration.create()
.with("name", "author-connector")
.with("connector.class", "io.debezium.connector.postgresql.PostgresConnector")
.with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
.with("offset.storage.file.filename", "/tmp/offsets.dat")
.with("offset.flush.interval.ms", "60000")
.with("database.hostname", "localhost")
.with("database.port", "5432")
.with("database.user", postgres)
.with("database.password", postgres)
.with("database.dbname", postgres)
.with("database.include.list", postgres)
.with("schema.include.list", "admin")
.with("table.include.list", "admin.connector")
.with("include.schema.changes", "true")
.with("database.server.name", "author-server")
.with("database.history", "io.debezium.relational.history.FileDatabaseHistory")
.with("database.history.file.filename", "/tmp/dbhistory.dat")
.with("plugin.name", "pgoutput")
.with("signal.enabled.channels", "source")
.with("topic.prefix", "admin")
.with("quarkus.http.port", "7001")
.with("signal.data.collection", "admin.signal_table")
.with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
.with("offset.storage.file.filename", "/tmp/offsets.dat")
.with("offset.flush.interval.ms", "60000")
.build();
I am using debzium-embedded version: 2.4.1.Final. The connection is able to setup correctly.
But, any change (INSERT/UPDATE/DELETE) in admin.connector are somehow not getting captured.
Again if I am trying incremental snapshot using my signal table, I get the below error:
ractIncrementalSnapshotChangeEventSource : Schema not found for table 'admin.connector', known tables [admin.signal_table]
Finally figured out I only need to keep
schema.include.listwhich enables debezium to track all tables in the schema list. Thetable.include.listis not honoured in this case.