Following exception is thrown while debugging this specific code line. The execption is thrown after the specified time set (5 minutes) after exiting the kafka consume method.
private void OnCheckPartitionAssignmentTimerElapsed(object? state)
{
try
{
if (!_isConsumerClosed)
{
if (string.IsNullOrEmpty(kconsumer?.MemberId))
{
logger.Warning($"Consumer has left group, MemeberId: {kconsumer?.MemberId}. Cancelling the task...");
taskToken.Cancel();
}
}
}
catch (ObjectDisposedException e)
{
logger.Error($"Topics - {string.Join(",",kconsumer?.Subscription)} - {e}");
}
}
The same line of code throws exception in deployed environment.
Unhandled exception. System.ObjectDisposedException: handle is destroyed
at Confluent.Kafka.Impl.SafeKafkaHandle.ThrowIfHandleClosed()
at Confluent.Kafka.Impl.SafeKafkaHandle.GetSubscription()
at Confluent.Kafka.Consumer`2.get_Subscription()
at Kore.Analytics.Shared.Kafka.Consumer.SharedKafkaConsumer.SharedKafkaConsumer`2.OnCheckPartitionAssignmentTimerElapsed(Object state)
at System.Threading.ExecutionContext.RunFromThreadPoolDispatchLoop(Thread threadPoolThread, ExecutionContext executionContext, ContextCallback callback, Object state)
--- End of stack trace from previous location ---
at System.Threading.TimerQueueTimer.Fire(Boolean isThreadPool)
at System.Threading.ThreadPoolWorkQueue.Dispatch()
at System.Threading.PortableThreadPool.WorkerThread.WorkerThreadStart()
at System.Threading.Thread.StartCallback()
Couldnt understand the reason behind this exception