How to store failed messages in batch in kafka?

49 views Asked by At

I am publishing kafka messages in batch using produce method. In case message gets failed to publish, I am storing the entire batch in database table. Currently, the issue I am facing is that eg. if there are 1000 messages in a batch then these 1000 messages are stored 1000 times in database.Can someone help me to figure out how can store that 1000 number of messages just one time in database ? Note: I don't want to use await in this method.

public bool PublishToKafka(List<GenericData> recordList)
{
    Task.Run(async () =>
    {
        schemaId = await _cachedSchemaRegistryClient.RegisterSchemaAsync(_kafkatable.KafkaTopic + "-value", GenericData._SCHEMA.ToString()).ConfigureAwait(false);
    }).GetAwaiter().GetResult();
    
    var serializer = new AvroSerializer<GenericData>(_cachedSchemaRegistryClient, _avroSerializerConfig).AsSyncOverAsync();

    using (IProducer<Null, GenericData> producer = new ProducerBuilder<Null, GenericData>(_producerConfig).SetValueSerializer(serializer).Build())
    {
        try
        {
            int avroRecordsCount = 0;
            foreach (GenericData genericData in recordList)
            {
                avroRecordsCount++;
                producer.Produce(_kafkatable.KafkaTopic, new Message<Null, GenericData>
                {
                    Value = genericData,
                    Headers = new Headers { new Header(SchemaIdHeader, BitConverter.GetBytes(schemaId)) }
                }, deliveryReport =>
                {
                    if (deliveryReport.Status == PersistenceStatus.NotPersisted)
                    {
                        Logs.Export.Err("Failed message delivery: error reason: {0} ", deliveryReport.Error.Reason);
                        byte[] bytes = ToBytes(recordList);
                        StoreClass.StoreData(bytes);
                    }
                });
                
                producer.Flush();
            }
            return true;

        }
    }
    return false;
}
0

There are 0 answers