BatchSize option is not working in ChangeStream WatchAsync

21 views Asked by At

I am using ChangeStream for catch my backend deletions and send mails which is working fine. In case of bulk deletions I want to send only few e-mails hence I am using BatchSize, some how it is not working. As per my understanding, depending on BatchSize setting those many changes should capture. I set BatchSize is 2, when I delete 5 records from collection it should only send 2 mails as I set BatchSize is 2, however it is sending all 5 mails. Please help me to fix this issue. Below is my code:

public async Task RealtionalCollectionCollectionChange(CancellationToken cancellationToken)
        {
            var options = new ChangeStreamOptions
            {               
                FullDocument = ChangeStreamFullDocumentOption.UpdateLookup,
                BatchSize = 2
            };
            
            string logHistory = string.Empty;
            var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>().Match("{operationType: { $in: [ 'replace', 'insert', 'update', 'delete' ] } }");
            using (var cursor = await collection.WatchAsync(pipeline, options, cancellationToken))
            {
                
                while (await cursor.MoveNextAsync(cancellationToken))
                {                    
                    if (cancellationToken.IsCancellationRequested)
                    {
                        break;
                    }

                    foreach (var change in cursor.Current)
                    {
                        if (change.OperationType == ChangeStreamOperationType.Invalidate)
                        {
                            _logger.LogWarning("Change stream cursor has been invalidated");                            
                            break;
                        }

                        var key = change.DocumentKey.GetValue("_id").ToString();

                        switch (change.OperationType)
                        {
                            case ChangeStreamOperationType.Insert:
                                await InsertIntoHistoryCollection(change);
                                await TriggerEmail(change);
                                break;

                            case ChangeStreamOperationType.Delete:                                
                                _logger.LogInformation("{Key} has been deleted from Mongo DB", key);                                
                                var filter = Builders<BsonDocument>.Filter.Eq("_id", ObjectId.Parse(key.ToString()));
                                var document = await collectionHistory.Find(filter).FirstOrDefaultAsync();

                                try
                                {
                                    await _mailService.SendEmail(change, document, logHistory);
                                }
                                catch (Exception ex)
                                {
                                    _logger.LogError(ex, "An error occurred while sending email for {Key} for operation type {OperationType}", key, change.OperationType);
                                }
                                break;
                        }
                    }
                }
                cursor.Dispose();
            }            
        }

Thanks, Lalitha.C

0

There are 0 answers