I am runnning an Xunit functional test within a docker compose stack based on Debian 3.1-buster with Confluent Kafka .NET Client v1.5.3 connecting to broker confluentinc/cp-kafka:6.0.1. I am fairly new to Kafka....

The architecture is illustrated below:

architecture

I am testing with xUnit and have a class fixture that starts an in-process generic Kestrel host for the lifetime of the test collection/class. I am using an in-process generic host since I have an additional signalR service which uses websockets. From what I understand the WebApplicationFactory is in-memory only and does not use network sockets.

The generic host contains a Kafka producer and consumer. The producer is a singleton service that produces using the Produce method. The consumer is BackgroundService that runs a Consume loop with a cancellation token (see listing further below). The consumer has the following configuration:

  • EnableAutoCommit: true
  • EnableAutoOffsetStore: false
  • AutoOffsetReset: AutoOffsetReset.Latest

It is a single consumer with 3 partitions. The group.initial.rebalance.delay is configured as 1000ms.

The test spawns a thread that sends an event to trigger the producer to post data onto the Kafka topic. The test then waits for a time delay via ManualResetEvent to allow time for the consumer to process the topic data.

Problem with Consumer is Blocking

When I run the test within a docker-compose environment I can see from the logs (included below) that:

  • The producer and consumer are connected to the same broker and topic
  • The producer sends the data to the topic but the consumer is blocking

The xUnit and in-process Kestrel host are running within a docker-compose service within the same network as the kafka service. The Kafka producer is able to successfully post data onto the kafka topic as demonstrated by the logs below.

I have created an additional docker-compose service that runs a python client consumer. This uses a poll loop to consume data posted while running the test. Data is consumed by the Python client.

Does anyone have any ideas why the consumer would be blocking within this environment to assist with fault finding? Would the wait performed in the xUnit test block the in-process Kestrel host started by the xUnit fixture?

If I run the Kestrel host locally on MacOS Catalina 10.15.7 connecting to Kafka (image:lensesio:fast-data-dev-2.5.1-L0) in docker-compose it produces and consumes successfully.

Update - Works with lensesio image The local docker-compose that works uses docker image for lensesio:fast-data-dev-2.5.1-L0. This uses Apache Kafka 2.5.1 and Confluent components 5.5.1. I have also tried:

  • Downgrading to Confluent Kafka images 5.5.1
  • Upgrading the .Net Confluent Client to 1.5.3

The result remains the same, the producer produces fine, however the Consumer blocks.

What is the difference between lensesio:fast-data-dev-2.5.1-LO configuration and the confluent/cp images that would cause the blocking?

I have tagged the working docker-compose configuration onto the end of this query.

Update - Works for the confluent/cp-kafka image when group.initial.rebalance.delay is 0ms

Originally the group.initial.rebalance.delay was 1ms, the same as the lensesio:fast-data-dev-2.5.1-LO image. The 1ms settings on confluent/cp-kafka image exhibits the blocking behaviour.

If I change the group.initial.rebalance.delay to 0ms then no blocking occurs with the confluent/cp-kafka image.

Does the lensesio:fast-data-dev-2.5.1-LO image offer better performance in a docker-compose development environment when used with the confluent-kafka-dotnet client?

Test

[Fact]
public async Task MotionDetectionEvent_Processes_Data()
{
    var m = new ManualResetEvent(false);

    // spawn a thread to publish a message and wait for 14 seconds
    var thread = new Thread(async () =>
    {
        await _Fixture.Host.MqttClient.PublishAsync(_Fixture.Data.Message);

        // allow time for kafka to consume event and process
        Console.WriteLine($"TEST THREAD IS WAITING FOR 14 SECONDS");
        await Task.Run(() => Task.Delay(14000));
        Console.WriteLine($"TEST THREAD IS COMPLETED WAITING FOR 14 SECONDS");

        m.Set();
    });
    thread.Start();

    // wait for the thread to have completed
    await Task.Run(() => { m.WaitOne(); });

    // TO DO, ASSERT DATA AVAILABLE ON S3 STORAGE ETC.
}

Test Output - Producer has produced data onto the topic but consumer has not consumed

Test generic host example
SettingsFile::GetConfigMetaData ::: Directory for executing assembly :: /Users/simon/Development/Dotnet/CamFrontEnd/Tests/Temp/WebApp.Test.Host/bin/Debug/netcoreapp3.1
SettingsFile::GetConfigMetaData ::: Executing assembly :: WebApp.Testing.Utils, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null
AutofacTestHost is using settings file /Users/simon/Development/Dotnet/CamFrontEnd/Tests/Temp/WebApp.Test.Host/bin/Debug/netcoreapp3.1/appsettings.Local.json
info: WebApp.Mqtt.MqttService[0]
      Mqtt Settings :: mqtt://mqtt:*********@localhost:1883
info: WebApp.Mqtt.MqttService[0]
      Mqtt Topic :: shinobi/+/+/trigger
info: WebApp.S3.S3Service[0]
      Minio client created for endpoint localhost:9000
info: WebApp.S3.S3Service[0]
      minio://accesskey:12345678abcdefgh@localhost:9000
info: Extensions.Hosting.AsyncInitialization.RootInitializer[0]
      Starting async initialization
info: Extensions.Hosting.AsyncInitialization.RootInitializer[0]
      Starting async initialization for WebApp.Kafka.Admin.KafkaAdminService
info: WebApp.Kafka.Admin.KafkaAdminService[0]
      Admin service trying to create Kafka Topic...
info: WebApp.Kafka.Admin.KafkaAdminService[0]
      Topic::eventbus, ReplicationCount::1, PartitionCount::3
info: WebApp.Kafka.Admin.KafkaAdminService[0]
      Bootstrap Servers::localhost:9092
info: WebApp.Kafka.Admin.KafkaAdminService[0]
      Admin service successfully created topic eventbus
info: WebApp.Kafka.Admin.KafkaAdminService[0]
      Kafka Consumer thread started
info: Extensions.Hosting.AsyncInitialization.RootInitializer[0]
      Async initialization for WebApp.Kafka.Admin.KafkaAdminService completed
info: Extensions.Hosting.AsyncInitialization.RootInitializer[0]
      Async initialization completed
info: Microsoft.AspNetCore.DataProtection.KeyManagement.XmlKeyManager[0]
      User profile is available. Using '/Users/simon/.aspnet/DataProtection-Keys' as key repository; keys will not be encrypted at rest.
info: WebApp.Kafka.ProducerService[0]
      ProducerService constructor called
info: WebApp.Kafka.SchemaRegistry.Serdes.JsonDeserializer[0]
      Kafka Json Deserializer Constructed
info: WebApp.Kafka.ConsumerService[0]
      Kafka consumer listening to camera topics =>
info: WebApp.Kafka.ConsumerService[0]
      Camera Topic :: shinobi/RHSsYfiV6Z/xi5cncrNK6/trigger
info: WebApp.Kafka.ConsumerService[0]
      Camera Topic :: shinobi/group/monitor/trigger
%7|1607790673.462|INIT|rdkafka#consumer-3| [thrd:app]: librdkafka v1.5.3 (0x10503ff) rdkafka#consumer-3 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer, STATIC_LINKING CC GXX PKGCONFIG OSXLD LIBDL PLUGINS ZLIB SSL SASL_CYRUS ZSTD HDRHISTOGRAM SNAPPY SOCKEM SASL_SCRAM SASL_OAUTHBEARER CRC32C_HW, debug 0x2000)
info: WebApp.Kafka.ConsumerService[0]
      Kafka consumer created => Name :: rdkafka#consumer-3
%7|1607790673.509|SUBSCRIBE|rdkafka#consumer-3| [thrd:main]: Group "consumer-group": subscribe to new subscription of 1 topics (join state init)
%7|1607790673.509|REBALANCE|rdkafka#consumer-3| [thrd:main]: Group "consumer-group" is rebalancing in state init (join-state init) without assignment: unsubscribe
info: WebApp.Kafka.ConsumerService[0]
      Kafka consumer has subscribed to topic eventbus
info: WebApp.Kafka.ConsumerService[0]
      Kafka is waiting to consume...
info: WebApp.Mqtt.MqttService[0]
      MQTT managed client connected
info: Microsoft.Hosting.Lifetime[0]
      Now listening on: http://127.0.0.1:65212
info: Microsoft.Hosting.Lifetime[0]
      Application started. Press Ctrl+C to shut down.
info: Microsoft.Hosting.Lifetime[0]
      Hosting environment: Production
info: Microsoft.Hosting.Lifetime[0]
      Content root path: /Users/simon/Development/Dotnet/CamFrontEnd/Tests/Temp/WebApp.Test.Host/bin/Debug/netcoreapp3.1/
MQTT HAS PUBLISHED...SPAWNING TEST THREAD TO WAIT
TEST THREAD IS WAITING FOR 14 SECONDS
info: WebApp.S3.S3Service[0]
      Loading json into JSON DOM and updating 'img' property with key 2d8e2438-e674-4d71-94ac-e54df0143a29
info: WebApp.S3.S3Service[0]
      Extracting UTF8 bytes from base64
info: WebApp.S3.S3Service[0]
      Updated JSON payload with img: 2d8e2438-e674-4d71-94ac-e54df0143a29, now uploading 1.3053922653198242 MB to S3 storage
%7|1607790674.478|JOIN|rdkafka#consumer-3| [thrd:main]: Group "consumer-group": postponing join until up-to-date metadata is available
%7|1607790674.483|REJOIN|rdkafka#consumer-3| [thrd:main]: Group "consumer-group": subscription updated from metadata change: rejoining group
%7|1607790674.483|REBALANCE|rdkafka#consumer-3| [thrd:main]: Group "consumer-group" is rebalancing in state up (join-state init) without assignment: group rejoin
%7|1607790674.483|JOIN|rdkafka#consumer-3| [thrd:main]: 127.0.0.1:9092/1: Joining group "consumer-group" with 1 subscribed topic(s)
%7|1607790674.541|JOIN|rdkafka#consumer-3| [thrd:main]: 127.0.0.1:9092/1: Joining group "consumer-group" with 1 subscribed topic(s)
info: WebApp.S3.S3Service[0]
      Converting modified payload back to UTF8 bytes for Kafka processing
info: WebApp.Kafka.ProducerService[0]
      Produce topic : eventbus, key : shinobi/group/monitor/trigger, value : System.Byte[]
info: WebApp.Kafka.ProducerService[0]
      Delivered message to eventbus [[2]] @0
%7|1607790675.573|ASSIGNOR|rdkafka#consumer-3| [thrd:main]: Group "consumer-group": "range" assignor run for 1 member(s)
%7|1607790675.588|ASSIGN|rdkafka#consumer-3| [thrd:main]: Group "consumer-group": new assignment of 3 partition(s) in join state wait-sync
%7|1607790675.588|OFFSET|rdkafka#consumer-3| [thrd:main]: GroupCoordinator/1: Fetch committed offsets for 3/3 partition(s)
%7|1607790675.717|FETCH|rdkafka#consumer-3| [thrd:main]: Partition eventbus [0] start fetching at offset 0
%7|1607790675.719|FETCH|rdkafka#consumer-3| [thrd:main]: Partition eventbus [1] start fetching at offset 0
%7|1607790675.720|FETCH|rdkafka#consumer-3| [thrd:main]: Partition eventbus [2] start fetching at offset 1


        ** EXPECT SOME CONSUMER DATA HERE - INSTEAD IT IS BLOCKING WITH confluent/cp-kafka image **


TEST THREAD IS COMPLETED WAITING FOR 14 SECONDS
Timer Elapsed
Shutting down generic host
info: Microsoft.Hosting.Lifetime[0]
      Application is shutting down...
info: WebApp.Mqtt.MqttService[0]
      Mqtt managed client disconnected
info: WebApp.Kafka.ConsumerService[0]
      The Kafka consumer thread has been cancelled
info: WebApp.Kafka.ConsumerService[0]
      Kafka Consumer background service disposing
%7|1607790688.191|CLOSE|rdkafka#consumer-3| [thrd:app]: Closing consumer
%7|1607790688.191|CLOSE|rdkafka#consumer-3| [thrd:app]: Waiting for close events
%7|1607790688.191|REBALANCE|rdkafka#consumer-3| [thrd:main]: Group "consumer-group" is rebalancing in state up (join-state started) with assignment: unsubscribe
%7|1607790688.191|UNASSIGN|rdkafka#consumer-3| [thrd:main]: Group "consumer-group": unassigning 3 partition(s) (v5)
%7|1607790688.191|LEAVE|rdkafka#consumer-3| [thrd:main]: 127.0.0.1:9092/1: Leaving group
%7|1607790688.201|CLOSE|rdkafka#consumer-3| [thrd:app]: Consumer closed
%7|1607790688.201|DESTROY|rdkafka#consumer-3| [thrd:app]: Terminating instance (destroy flags NoConsumerClose (0x8))
%7|1607790688.201|CLOSE|rdkafka#consumer-3| [thrd:app]: Closing consumer
%7|1607790688.201|CLOSE|rdkafka#consumer-3| [thrd:app]: Disabling and purging temporary queue to quench close events
%7|1607790688.201|CLOSE|rdkafka#consumer-3| [thrd:app]: Consumer closed
%7|1607790688.201|DESTROY|rdkafka#consumer-3| [thrd:main]: Destroy internal
%7|1607790688.201|DESTROY|rdkafka#consumer-3| [thrd:main]: Removing all topics
info: WebApp.Mqtt.MqttService[0]
      Disposing Mqtt Client
info: WebApp.Kafka.ProducerService[0]
      Flushing remaining messages to produce...
info: WebApp.Kafka.ProducerService[0]
      Disposing Kafka producer...
info: WebApp.S3.S3Service[0]
      Disposing of resources
Stopping...

Kafka Consumer

using System;
using System.Threading;
using System.Threading.Tasks;

using Confluent.Kafka;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Microsoft.AspNetCore.SignalR;

using WebApp.Data;
using WebApp.Kafka.Config;
using WebApp.Realtime.SignalR;


namespace WebApp.Kafka
{
    public delegate IConsumer<string, MotionDetection> ConsumerFactory(
        KafkaConfig config,
        IAsyncDeserializer<MotionDetection> serializer
    );

    public class ConsumerService : BackgroundService, IDisposable
    {
        private KafkaConfig _config;
        private readonly IConsumer<string, MotionDetection> _kafkaConsumer;
        private ILogger<ConsumerService> _logger;
        private IHubContext<MotionHub, IMotion> _messagerHubContext;
        private IAsyncDeserializer<MotionDetection> _serializer { get; }

        public ConsumerFactory _factory { get; set; }


        // Using SignalR with background services:
        // https://learn.microsoft.com/en-us/aspnet/core/signalr/background-services?view=aspnetcore-2.2
        public ConsumerService(
            IOptions<KafkaConfig> config,
            ConsumerFactory factory,
            IHubContext<MotionHub, IMotion> messagerHubContext,
            IAsyncDeserializer<MotionDetection> serializer,
            ILogger<ConsumerService> logger
        )
        {
            if (config is null)
                throw new ArgumentNullException(nameof(config));

            _config = config.Value;
            _factory = factory ?? throw new ArgumentNullException(nameof(factory));
            _logger = logger ?? throw new ArgumentNullException(nameof(logger));
            _messagerHubContext = messagerHubContext ?? throw new ArgumentNullException(nameof(messagerHubContext));
            _serializer = serializer ?? throw new ArgumentNullException(nameof(serializer));

            // enforced configuration
            _config.Consumer.EnableAutoCommit = true; // allow consumer to autocommit offsets
            _config.Consumer.EnableAutoOffsetStore = false; // allow control over which offsets stored
            _config.Consumer.AutoOffsetReset = AutoOffsetReset.Latest; // if no offsets committed for topic for consumer group, default to latest   
            _config.Consumer.Debug = "consumer";

            _logger.LogInformation("Kafka consumer listening to camera topics =>");
            foreach (var topic in _config.MqttCameraTopics) { _logger.LogInformation($"Camera Topic :: {topic}"); }

            _kafkaConsumer = _factory(_config, _serializer);
            _logger.LogInformation($"Kafka consumer created => Name :: {_kafkaConsumer.Name}");
        }

        protected override Task ExecuteAsync(CancellationToken cancellationToken)
        {
            new Thread(() => StartConsumerLoop(cancellationToken)).Start();
            return Task.CompletedTask;
        }

        private void StartConsumerLoop(CancellationToken cancellationToken)
        {
            _kafkaConsumer.Subscribe(_config.Topic.Name);
            _logger.LogInformation($"Kafka consumer has subscribed to topic {_config.Topic.Name}");


            while (!cancellationToken.IsCancellationRequested)
            {
                try
                {
                    _logger.LogInformation("Kafka is waiting to consume...");
                    var consumerResult = _kafkaConsumer.Consume(cancellationToken);
                    _logger.LogInformation("Kafka Consumer consumed message => {}", consumerResult.Message.Value);

                    if (_config.MqttCameraTopics.Contains(consumerResult.Message.Key))
                    {
                        // we need to consider here security for auth, only want for user
                        // await _messagerHubContext.Clients.All.ReceiveMotionDetection(consumerResult.Message.Value);
                        _logger.LogInformation("Kafka Consumer dispatched message to SignalR");

                        // instruct background thread to commit this offset
                        _kafkaConsumer.StoreOffset(consumerResult);
                    }
                }
                catch (OperationCanceledException)
                {
                    _logger.LogInformation("The Kafka consumer thread has been cancelled");
                    break;
                }
                catch (ConsumeException ce)
                {
                    _logger.LogError($"Consume error: {ce.Error.Reason}");

                    if (ce.Error.IsFatal)
                    {
                        // https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#fatal-consumer-errors
                        _logger.LogError(ce, ce.Message);
                        break;
                    }
                }
                catch (Exception e)
                {
                    _logger.LogError(e, $"Unexpected exception while consuming motion detection {e}");
                    break;
                }
            }
        }


        public override void Dispose()
        {
            _logger.LogInformation("Kafka Consumer background service disposing");
            _kafkaConsumer.Close();
            _kafkaConsumer.Dispose();

            base.Dispose();
        }
    }
}

Kestrel Host Configuration

/// <summary>
/// Build the server, with Autofac IOC.
/// </summary>
protected override IHost BuildServer(HostBuilder builder)
{
    // build the host instance
    return new HostBuilder()
    .UseServiceProviderFactory(new AutofacServiceProviderFactory())
    .ConfigureLogging(logging =>
    {
        logging.ClearProviders();
        logging.AddConsole();
        logging.AddFilter("Microsoft.AspNetCore.SignalR", LogLevel.Information);
    })
    .ConfigureWebHost(webBuilder =>
    {
        webBuilder.ConfigureAppConfiguration((context, cb) =>
        {
            cb.AddJsonFile(ConfigMetaData.SettingsFile, optional: false)
            .AddEnvironmentVariables();
        })
        .ConfigureServices(services =>
        {
            services.AddHttpClient();
        })
        .UseStartup<TStartup>()
        .UseKestrel()
        .UseUrls("http://127.0.0.1:0");
    }).Build();
}

docker-compose stack

---
version: "3.8"

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:6.0.1
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    networks:
      - camnet
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_LOG4J_ROOT_LOGLEVEL: WARN

  kafka:
    image: confluentinc/cp-kafka:6.0.1
    hostname: kafka
    container_name: kafka
    depends_on:
      - zookeeper
    networks:
      - camnet
    ports:
      - "9092:9092"
      - "19092:19092"
    environment:
      CONFLUENT_METRICS_ENABLE: 'false'
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
      KAFKA_BROKER_ID: 1
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 1000
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_LOG4J_ROOT_LOGLEVEL: WARN
      KAFKA_LOG4J_LOGGERS: "org.apache.zookeeper=WARN,org.apache.kafka=WARN,kafka=WARN,kafka.cluster=WARN,kafka.controller=WARN,kafka.coordinator=WARN,kafka.log=WARN,kafka.server=WARN,kafka.zookeeper=WARN,state.change.logger=WARN"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      
  mqtt:
    container_name: mqtt
    image: eclipse-mosquitto:1.6.9
    ports:
      - "8883:8883"
      - "1883:1883"
      - "9901:9001"
    environment:
      - MOSQUITTO_USERNAME=${MQTT_USER}
      - MOSQUITTO_PASSWORD=${MQTT_PASSWORD}
    networks:
      - camnet
    volumes:
      - ./Mqtt/Config/mosquitto.conf:/mosquitto/config/mosquitto.conf
      - ./Mqtt/Certs/localCA.crt:/mosquitto/config/ca.crt
      - ./Mqtt/Certs/server.crt:/mosquitto/config/server.crt
      - ./Mqtt/Certs/server.key:/mosquitto/config/server.key

  minio:
    container_name: service-minio
    image: dcs3spp/minio:version-1.0.2
    ports:
      - "127.0.0.1:9000:9000"
    environment:
      - MINIO_BUCKET=images
      - MINIO_ACCESS_KEY=${MINIO_USER}
      - MINIO_SECRET_KEY=${MINIO_PASSWORD}
    networks:
      - camnet

networks:
  camnet:

Works with the lensesio:fast-data-dev image. Why?

version: "3.8"

services:
  kafka:
    image: lensesio/fast-data-dev:2.5.1-L0
    container_name: kafka
    networks:
      - camnet
    ports:
      - 2181:2181 # zookeeper
      - 3030:3030 # ui
      - 9092:9092 # broker
      - 8081:8081 # schema registry
      - 8082:8082 # rest proxy
      - 8083:8083 # kafka connect
    environment:
      - ADV_HOST=127.0.0.1
      - SAMPLEDATA=0
      - REST_PORT=8082
      - FORWARDLOGS=0
      - RUNTESTS=0
      - DISABLE_JMX=1
      - CONNECTORS=${CONNECTOR}
      - WEB_PORT=3030
      - DISABLE=hive-1.1

  mqtt:
    container_name: mqtt
    image: eclipse-mosquitto:1.6.9
    ports:
      - "8883:8883"
      - "1883:1883"
      - "9901:9001"
    environment:
      - MOSQUITTO_USERNAME=${MQTT_USER}
      - MOSQUITTO_PASSWORD=${MQTT_PASSWORD}
    networks:
      - camnet
    volumes:
      - ./Mqtt/Config/mosquitto.conf:/mosquitto/config/mosquitto.conf
      - ./Mqtt/Certs/localCA.crt:/mosquitto/config/ca.crt
      - ./Mqtt/Certs/server.crt:/mosquitto/config/server.crt
      - ./Mqtt/Certs/server.key:/mosquitto/config/server.key

  minio:
    container_name: service-minio
    image: dcs3spp/minio:version-1.0.2
    ports:
      - "127.0.0.1:9000:9000"
    environment:
      - MINIO_BUCKET=images
      - MINIO_ACCESS_KEY=${MINIO_USER}
      - MINIO_SECRET_KEY=${MINIO_PASSWORD}
    networks:
      - camnet

networks:
  camnet:
0

There are 0 answers