Exception thrown while debugging kafka consumer

100 views Asked by At

Following exception is thrown while debugging this specific code line. The execption is thrown after the specified time set (5 minutes) after exiting the kafka consume method.

private void OnCheckPartitionAssignmentTimerElapsed(object? state)
    {
        try
        {
            if (!_isConsumerClosed)
            {
                if (string.IsNullOrEmpty(kconsumer?.MemberId))
                {
                    logger.Warning($"Consumer has left group, MemeberId: {kconsumer?.MemberId}. Cancelling the task...");
                    taskToken.Cancel();
                }
            }
        } 
        catch (ObjectDisposedException e)
        {
            logger.Error($"Topics - {string.Join(",",kconsumer?.Subscription)} - {e}");
        }
    }

enter image description here

The same line of code throws exception in deployed environment.

Unhandled exception. System.ObjectDisposedException: handle is destroyed
   at Confluent.Kafka.Impl.SafeKafkaHandle.ThrowIfHandleClosed()
   at Confluent.Kafka.Impl.SafeKafkaHandle.GetSubscription()
   at Confluent.Kafka.Consumer`2.get_Subscription()
   at Kore.Analytics.Shared.Kafka.Consumer.SharedKafkaConsumer.SharedKafkaConsumer`2.OnCheckPartitionAssignmentTimerElapsed(Object state)
   at System.Threading.ExecutionContext.RunFromThreadPoolDispatchLoop(Thread threadPoolThread, ExecutionContext executionContext, ContextCallback callback, Object state)
--- End of stack trace from previous location ---
   at System.Threading.TimerQueueTimer.Fire(Boolean isThreadPool)
   at System.Threading.ThreadPoolWorkQueue.Dispatch()
   at System.Threading.PortableThreadPool.WorkerThread.WorkerThreadStart()
   at System.Threading.Thread.StartCallback()

Couldnt understand the reason behind this exception

0

There are 0 answers