I have a C# Windows service Kafka consumer based on Confluent Kafka library that is reading the values from a Kafka topic and writes to a file. When I start the windows service it is always remaining in "Starting" status. But on the other side I see that values are written to the target file , but not in that frequency, that was required (as 1 second). Additionaly the data that is written to the file is not the actual data, but always 30-40 seconds from the past. It seems that the service is running "partialy" and with delay.(although it is not in "running" status. How is that possible?

I have restarted the service several times but always the same. There seems nothing strange on the Kafka side, there are hundreds of topics that can be consumed without problem. I have also started my server. I get also no exception from my code. What could be the problem?

My code:

public void Read_from_Kafka()
    {
        try
        {

           using (StreamWriter sw = File.AppendText(error_log))
            {
                sw.WriteLine("Start Kafka2 " + " " + Convert.ToString(DateTime.Now));
            }

            var config = new ConsumerConfig
            {
                BootstrapServers = "XXXXXXXX.XXXX.XXXX.XXX:XXXX",
                GroupId = "foo",
                AutoOffsetReset = AutoOffsetReset.Latest,
                SecurityProtocol = SecurityProtocol.Ssl,
                SslCaLocation = "C:\\Cert\\Kafka\\BOSCH-CA-DE.pem",
                SslCertificateLocation = "C:\\Cert\\Kafka\\TEF4.cert.pem",
                SslKeyLocation = "C:\\cert\\Kafka\\TEF4.key.pem",
                SslKeyPassword = "TEF4",
                EnableAutoCommit = false


            };

            CancellationTokenSource source = new CancellationTokenSource();
            CancellationToken cancellationToken = source.Token;

            using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
            {
                consumer.Subscribe("TOPIC_NAME");     

                while (var_true)
                {
                    var consumeResult = consumer.Consume(cancellationToken);
                    Kafka_message_total = consumeResult.Message.Value;
                    
                    Write_Tag_Values_to_File();
                    System.Threading.Thread.Sleep(1000);

                }

                consumer.Close();
            }
         }

        catch(Exception ex)
        {

            using (StreamWriter sw = File.AppendText(error_log))
            {
                sw.WriteLine("Kafka Read Error: " + ex + " " + Convert.ToString(DateTime.Now));
            }
        }
        
        
    }
0

There are 0 answers