Siddhi cdc Postgres App Not giving siddhi logs

103 views Asked by At

I have created the following Siddhi CDC app to capture data changes in a PostgreSQL database table.

@App:name('post')
@source(type = 'cdc' ,url = 'jdbc:postgresql://postgres:5432/shipment_db',
username = 'postgresuser', password = 'postgrespw',
table.name = 'public.shipments', operation = 'insert',  plugin.name='pgoutput',slot.name='postslot',
@map(type='keyvalue', @attributes(shipment_id = 'shipment_id', order_id = 'order_id',date_created='date_created',status='status')))
define stream inputStream (shipment_id long, order_id long,date_created string, status string);

@sink(type = 'log')
define stream OutputStream (shipment_id long, date_created string);

@info(name = 'query1')
from inputStream
select shipment_id, date_created
insert into OutputStream;

I placed siddhi-io-cdc-2.0.12.jar and siddhi-core-5.1.21.jar in ./files/bundles directory, org.wso2.carbon.si.metrics.core-3.0.57.jar and postgresql-42.3.3.jar in ./files/jars directory and created a Docker image named siddhiimgpostgres out of https://siddhi.io/en/v5.1/docs/config-guide/#adding-to-siddhi-docker-microservice dockerfile.

Following is the docker command I used to run the siddhi app.

docker run -it --net postgres-docker_default --rm -p 8006:8006 -v /home/me/siddhi-apps:/apps siddhiimgpostgres:tag1 -Dapps=/apps/post.siddhi 

Following are the logs I got.

[2022-08-24 06:35:43,975]  INFO {io.debezium.relational.RelationalSnapshotChangeEventSource} - Snapshot step 7 - Snapshotting data
[2022-08-24 06:35:43,976]  INFO {io.debezium.relational.RelationalSnapshotChangeEventSource} -   Exporting data from table 'public.shipments'
[2022-08-24 06:35:43,976]  INFO {io.debezium.relational.RelationalSnapshotChangeEventSource} -   For table 'public.shipments' using select statement: 'SELECT * FROM "public"."shipments"'
[2022-08-24 06:35:43,995]  INFO {io.debezium.relational.RelationalSnapshotChangeEventSource} -   Finished exporting 11 records for table 'public.shipments'; total duration '00:00:00.019'
[2022-08-24 06:35:43,997]  INFO {io.debezium.pipeline.source.AbstractSnapshotChangeEventSource} - Snapshot - Final stage
[2022-08-24 06:35:43,998]  INFO {io.debezium.pipeline.ChangeEventSourceCoordinator} - Snapshot ended with SnapshotResult [status=COMPLETED, offset=PostgresOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.postgresql.Source:STRUCT}, sourceInfo=source_info[server='postgres_5432'db='shipment_db', lsn=LSN{0/16DCA30}, txId=592, timestamp=2022-08-24T06:35:43.994Z, snapshot=FALSE, schema=public, table=shipments], partition={server=postgres_5432}, lastSnapshotRecord=true, lastCompletelyProcessedLsn=null, lastCommitLsn=null, transactionContext=TransactionContext [currentTransactionId=null, perTableEventCount={}, totalEventCount=0]]]
[2022-08-24 06:35:44,001]  INFO {io.debezium.pipeline.metrics.StreamingChangeEventSourceMetrics} - Connected metrics set to 'true'
[2022-08-24 06:35:44,001]  INFO {io.debezium.pipeline.ChangeEventSourceCoordinator} - Starting streaming
[2022-08-24 06:35:44,001]  INFO {io.debezium.connector.postgresql.PostgresStreamingChangeEventSource} - Retrieved latest position from stored offset 'LSN{0/16DCA30}'
[2022-08-24 06:35:44,002]  INFO {io.debezium.connector.postgresql.connection.WalPositionLocator} - Looking for WAL restart position for last commit LSN 'null' and last change LSN 'LSN{0/16DCA30}'
[2022-08-24 06:35:44,002]  INFO {io.debezium.connector.postgresql.connection.PostgresReplicationConnection} - Initializing PgOutput logical decoder publication
[2022-08-24 06:35:44,017]  INFO {io.debezium.connector.postgresql.connection.PostgresConnection} - Obtained valid replication slot ReplicationSlot [active=false, latestFlushedLsn=LSN{0/16DB220}, catalogXmin=585]
[2022-08-24 06:35:44,021]  INFO {io.debezium.jdbc.JdbcConnection} - Connection gracefully closed
[2022-08-24 06:35:44,072]  INFO {io.debezium.connector.postgresql.PostgresSchema} - REPLICA IDENTITY for 'public.shipments' is 'DEFAULT'; UPDATE and DELETE events will contain previous values only for PK columns
[2022-08-24 06:35:44,073]  INFO {io.debezium.connector.postgresql.PostgresStreamingChangeEventSource} - Searching for WAL resume position

I am only getting logs on the number of data records in the table. Can I know why I am not getting Siddhi logs on that which data are in the database table?

Thank you!

1

There are 1 answers

1
Anusha Jayasundara On

Is it capturing the changed data after starting up the server? If yes, the issue should be the snapshot mode[1] configured when creating the connection. By default, it is set to initial which will take a snapshot at the connection establishment and start to read the changed data from that position onward.

[1] https://debezium.io/documentation/reference/1.4/connectors/postgresql.html#postgresql-property-snapshot-mode