I have created the following Siddhi CDC app to capture data changes in a PostgreSQL database table.
@App:name('post')
@source(type = 'cdc' ,url = 'jdbc:postgresql://postgres:5432/shipment_db',
username = 'postgresuser', password = 'postgrespw',
table.name = 'public.shipments', operation = 'insert', plugin.name='pgoutput',slot.name='postslot',
@map(type='keyvalue', @attributes(shipment_id = 'shipment_id', order_id = 'order_id',date_created='date_created',status='status')))
define stream inputStream (shipment_id long, order_id long,date_created string, status string);
@sink(type = 'log')
define stream OutputStream (shipment_id long, date_created string);
@info(name = 'query1')
from inputStream
select shipment_id, date_created
insert into OutputStream;
I placed siddhi-io-cdc-2.0.12.jar and siddhi-core-5.1.21.jar in ./files/bundles directory, org.wso2.carbon.si.metrics.core-3.0.57.jar and postgresql-42.3.3.jar in ./files/jars directory and created a Docker image named siddhiimgpostgres out of https://siddhi.io/en/v5.1/docs/config-guide/#adding-to-siddhi-docker-microservice dockerfile.
Following is the docker command I used to run the siddhi app.
docker run -it --net postgres-docker_default --rm -p 8006:8006 -v /home/me/siddhi-apps:/apps siddhiimgpostgres:tag1 -Dapps=/apps/post.siddhi
Following are the logs I got.
[2022-08-24 06:35:43,975] INFO {io.debezium.relational.RelationalSnapshotChangeEventSource} - Snapshot step 7 - Snapshotting data
[2022-08-24 06:35:43,976] INFO {io.debezium.relational.RelationalSnapshotChangeEventSource} - Exporting data from table 'public.shipments'
[2022-08-24 06:35:43,976] INFO {io.debezium.relational.RelationalSnapshotChangeEventSource} - For table 'public.shipments' using select statement: 'SELECT * FROM "public"."shipments"'
[2022-08-24 06:35:43,995] INFO {io.debezium.relational.RelationalSnapshotChangeEventSource} - Finished exporting 11 records for table 'public.shipments'; total duration '00:00:00.019'
[2022-08-24 06:35:43,997] INFO {io.debezium.pipeline.source.AbstractSnapshotChangeEventSource} - Snapshot - Final stage
[2022-08-24 06:35:43,998] INFO {io.debezium.pipeline.ChangeEventSourceCoordinator} - Snapshot ended with SnapshotResult [status=COMPLETED, offset=PostgresOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.postgresql.Source:STRUCT}, sourceInfo=source_info[server='postgres_5432'db='shipment_db', lsn=LSN{0/16DCA30}, txId=592, timestamp=2022-08-24T06:35:43.994Z, snapshot=FALSE, schema=public, table=shipments], partition={server=postgres_5432}, lastSnapshotRecord=true, lastCompletelyProcessedLsn=null, lastCommitLsn=null, transactionContext=TransactionContext [currentTransactionId=null, perTableEventCount={}, totalEventCount=0]]]
[2022-08-24 06:35:44,001] INFO {io.debezium.pipeline.metrics.StreamingChangeEventSourceMetrics} - Connected metrics set to 'true'
[2022-08-24 06:35:44,001] INFO {io.debezium.pipeline.ChangeEventSourceCoordinator} - Starting streaming
[2022-08-24 06:35:44,001] INFO {io.debezium.connector.postgresql.PostgresStreamingChangeEventSource} - Retrieved latest position from stored offset 'LSN{0/16DCA30}'
[2022-08-24 06:35:44,002] INFO {io.debezium.connector.postgresql.connection.WalPositionLocator} - Looking for WAL restart position for last commit LSN 'null' and last change LSN 'LSN{0/16DCA30}'
[2022-08-24 06:35:44,002] INFO {io.debezium.connector.postgresql.connection.PostgresReplicationConnection} - Initializing PgOutput logical decoder publication
[2022-08-24 06:35:44,017] INFO {io.debezium.connector.postgresql.connection.PostgresConnection} - Obtained valid replication slot ReplicationSlot [active=false, latestFlushedLsn=LSN{0/16DB220}, catalogXmin=585]
[2022-08-24 06:35:44,021] INFO {io.debezium.jdbc.JdbcConnection} - Connection gracefully closed
[2022-08-24 06:35:44,072] INFO {io.debezium.connector.postgresql.PostgresSchema} - REPLICA IDENTITY for 'public.shipments' is 'DEFAULT'; UPDATE and DELETE events will contain previous values only for PK columns
[2022-08-24 06:35:44,073] INFO {io.debezium.connector.postgresql.PostgresStreamingChangeEventSource} - Searching for WAL resume position
I am only getting logs on the number of data records in the table. Can I know why I am not getting Siddhi logs on that which data are in the database table?
Thank you!
Is it capturing the changed data after starting up the server? If yes, the issue should be the snapshot mode[1] configured when creating the connection. By default, it is set to
initialwhich will take a snapshot at the connection establishment and start to read the changed data from that position onward.[1] https://debezium.io/documentation/reference/1.4/connectors/postgresql.html#postgresql-property-snapshot-mode