MassTransit Kafka. How can I suppress commit if I get exception in my consumer

44 views Asked by At
builder.Services.AddMassTransit(x =>
{
    x.UsingInMemory((context, config) => config.ConfigureEndpoints(context));
    x.AddRider(xx =>
    {
        xx.AddConsumer<CheckOrderStatusConsumer>();
        xx.UsingKafka((context, configurator) =>
        {
            configurator.Host("localhost:29092");
            configurator.TopicEndpoint<CheckOrderStatus>("topic-name", "\"group-name-2\"", e =>
            {
                e.UseKillSwitch(ks =>
                {
                    ks.SetActivationThreshold(1);
                    ks.SetTripThreshold(1);
                    ks.SetRestartTimeout(s: 5);
                });
                e.AutoOffsetReset = AutoOffsetReset.Earliest;
                e.EnableAutoOffsetStore = false;
                e.ConfigureConsumer<CheckOrderStatusConsumer>(context);
            });
        });
    });
});
public class CheckOrderStatusConsumer : IConsumer<CheckOrderStatus>
{
    public async Task Consume(ConsumeContext<CheckOrderStatus> context)
    {
        if (context.TryGetPayload<KafkaConsumeContext>(out var ctx))
        {
            Console.WriteLine($"Consume: {ctx.Offset}");
        }

        Console.WriteLine($"Consume: {context.SentTime.ToString()}");
        throw new Exception();
    }
}

How can I suppress commit and don't tach offset on next step? I cannot prosessed all items of kafka if them commit and up offset.

I tryed use enableoffset false but thet not helped

0

There are 0 answers