builder.Services.AddMassTransit(x =>
{
x.UsingInMemory((context, config) => config.ConfigureEndpoints(context));
x.AddRider(xx =>
{
xx.AddConsumer<CheckOrderStatusConsumer>();
xx.UsingKafka((context, configurator) =>
{
configurator.Host("localhost:29092");
configurator.TopicEndpoint<CheckOrderStatus>("topic-name", "\"group-name-2\"", e =>
{
e.UseKillSwitch(ks =>
{
ks.SetActivationThreshold(1);
ks.SetTripThreshold(1);
ks.SetRestartTimeout(s: 5);
});
e.AutoOffsetReset = AutoOffsetReset.Earliest;
e.EnableAutoOffsetStore = false;
e.ConfigureConsumer<CheckOrderStatusConsumer>(context);
});
});
});
});
public class CheckOrderStatusConsumer : IConsumer<CheckOrderStatus>
{
public async Task Consume(ConsumeContext<CheckOrderStatus> context)
{
if (context.TryGetPayload<KafkaConsumeContext>(out var ctx))
{
Console.WriteLine($"Consume: {ctx.Offset}");
}
Console.WriteLine($"Consume: {context.SentTime.ToString()}");
throw new Exception();
}
}
How can I suppress commit and don't tach offset on next step? I cannot prosessed all items of kafka if them commit and up offset.
I tryed use enableoffset false but thet not helped