I am using Apache Kafka with Confluent Connect v7.3.2 platform and a MongoDB Connector deployed as Sink Connector in order to stream messages with 2 timestamp fields into a Collection. These values appear with a time format of "yyyy-MM-dd T HH:mm:ss.fffff" (5 digits after decimal point) and I'd like to query these records later by filtering on timestamps.
So far I've been using MongoDB ISODate data type in order to filter by a range of dates and ISODate data type supports time precision of milliseconds (3 digits after decimal point) so I utilized a SMT Timestamp converter in my Sink connector. This is how the config file of the Sink Connector looks like (ignore commented lines):
name=mongo-sink
topics=topic-with-sink
connector.class=com.mongodb.kafka.connect.MongoSinkConnector
tasks.max=1
# Message types
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
connection.uri=mongodb://192.168.41.3:27017
database=MarketData
collection=Mycollection
# Single Message Transform (SMT) to convert "TimeStamp" string to ISODate
#transforms=TimestampToIsoDate
#transforms.TimestampToIsoDate.type=org.apache.kafka.connect.transforms.TimestampConverter$Value
#transforms.TimestampToIsoDate.target.type=Timestamp
#transforms.TimestampToIsoDate.field=TimeStamp
#transforms.TimestampToIsoDate.format=yyyy-MM-dd'T'HH:mm:ss.SSSSS
# Single Message Transform (SMT) to convert "LastUpdated" string to ISODate
transforms=TimestampToIsoDate
transforms.TimestampToIsoDate.type=org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.TimestampToIsoDate.target.type=Timestamp
transforms.TimestampToIsoDate.field=LastUpdated
transforms.TimestampToIsoDate.format=yyyy-MM-dd'T'HH:mm:ss.SSSSS
## Document manipulation settings
key.projection.type=none
key.projection.list=
value.projection.type=none
value.projection.list=
field.renamer.mapping=[]
field.renamer.regex=[]
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy
post.processor.chain=com.mongodb.kafka.connect.sink.processor.DocumentIdAdder
# Write configuration
delete.on.null.values=false
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneDefaultStrategy
max.batch.size = 0
rate.limiting.timeout=0
rate.limiting.every.n=0
# Change Data Capture handling
change.data.capture.handler=
# Topic override examples for the sourceB topic
topic.override.sourceB.collection=sourceB
topic.override.sourceB.document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInValueStrategy
But recently I noticed that "LastUpdated" values generated by this converter are somehow different than how they are in the Kafka topic. Right now in the Kafka topic this is a live "LastUpdated" value:
"LastUpdated":"2023-08-22T13:52:50.13426"
But in MongoDB it is saved as:
"lastUpdated": "2023-08-22T13:53:03.426Z"
Difference of 3 seconds and sometimes the difference is by a minute or two. I can't figure out why but it probably has nothing to do with timezones. Looks like something happens in the process of Timestamp conversion.
Any help would be appreciated.