How do I increment a field with UpdateOneTimestamps writemodel strategy

51 views Asked by At

I want to continuously update a document's field named value {'_id': 'count', 'value':0} by a certain number.

My MongoSinkConnector has

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInValueStrategy
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy

I'm using a python script to produce messages to the appropiate topic

        self._aio_producer.produce(
            topic='mongo',
            value=json.dumps(
                {
                    "_id":"count",
                    "$inc":{"value":len(task['payload'].split(','))}
                 }
            )
        )

But I get this error on the Kafka Connect standalone process:

Failed to put into the sink the following records: [SinkRecord{kafkaOffset=8174, timestampType=CreateTime} ConnectRecord{topic='mongo', kafkaPartition=1, key=null, keySchema=null, value={_id=count, $inc={value=1}}, valueSchema=null, timestamp=1697153679938, headers=ConnectHeaders(headers=)}] 
(com.mongodb.kafka.connect.sink.MongoSinkTask:244) 
com.mongodb.kafka.connect.sink.dlq.WriteException: v=1, code=52, message=The dollar ($) prefixed field '$inc' in '$inc' is not allowed in the context of an update's replacement document. Consider using an aggregation pipeline with $replaceWith., details={} 

I've tried dropping the $inc part, but it seems to just be replacing the document over and over without incrementing the value. Is there any way to increment a value or do I have to write my own custom Class?

2

There are 2 answers

0
Maxxy On BEST ANSWER

Currently the way is to create your own Custom WriteModel Strategy.

    BsonDocument setOnInsertFields =
        new BsonDocument().append(A_FIELD_NAME, aValue).append(B_FIELD_NAME, bValue);

    BsonDocument incFields =
        new BsonDocument()
            .append(C_COUNT_FIELD_NAME, cType)
            .append(D_COUNT_FIELD_NAME, DType);

    // Create new document with specific update operations
    // such as set or setOnInsert, inc, etc
    BsonDocument newDocument =
        new BsonDocument().append("$setOnInsert", setOnInsertFields).append("$inc", incFields);
0
Wernfried Domscheit On

In general your code should work fine. Here the equivalent in mongosh (JavaScript):

task = { payload: 'a,b,c' }
db.collection.insertOne({ _id: 'count', value: 0 })

db.collection.updateOne(
   { _id: 'count' },
   { $inc: { value: task['payload'].split(',').length } }
)

db.collection.findOne()
=> { _id: 'count', value: 3 }

Have a look at Kafka documentation, perhaps $inc operator is not supported.