I have a C# Windows service Kafka consumer based on Confluent Kafka library that is reading the values from a Kafka topic and writes to a file. When I start the windows service it is always remaining in "Starting" status. But on the other side I see that values are written to the target file , but not in that frequency, that was required (as 1 second). Additionaly the data that is written to the file is not the actual data, but always 30-40 seconds from the past. It seems that the service is running "partialy" and with delay.(although it is not in "running" status. How is that possible?
I have restarted the service several times but always the same. There seems nothing strange on the Kafka side, there are hundreds of topics that can be consumed without problem. I have also started my server. I get also no exception from my code. What could be the problem?
My code:
public void Read_from_Kafka()
{
try
{
using (StreamWriter sw = File.AppendText(error_log))
{
sw.WriteLine("Start Kafka2 " + " " + Convert.ToString(DateTime.Now));
}
var config = new ConsumerConfig
{
BootstrapServers = "XXXXXXXX.XXXX.XXXX.XXX:XXXX",
GroupId = "foo",
AutoOffsetReset = AutoOffsetReset.Latest,
SecurityProtocol = SecurityProtocol.Ssl,
SslCaLocation = "C:\\Cert\\Kafka\\BOSCH-CA-DE.pem",
SslCertificateLocation = "C:\\Cert\\Kafka\\TEF4.cert.pem",
SslKeyLocation = "C:\\cert\\Kafka\\TEF4.key.pem",
SslKeyPassword = "TEF4",
EnableAutoCommit = false
};
CancellationTokenSource source = new CancellationTokenSource();
CancellationToken cancellationToken = source.Token;
using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
{
consumer.Subscribe("TOPIC_NAME");
while (var_true)
{
var consumeResult = consumer.Consume(cancellationToken);
Kafka_message_total = consumeResult.Message.Value;
Write_Tag_Values_to_File();
System.Threading.Thread.Sleep(1000);
}
consumer.Close();
}
}
catch(Exception ex)
{
using (StreamWriter sw = File.AppendText(error_log))
{
sw.WriteLine("Kafka Read Error: " + ex + " " + Convert.ToString(DateTime.Now));
}
}
}