Kafka producer client is not able to connect to schema registry

77 views Asked by At

I am trying to run a job on google Dataproc which pushes a record into a Kafka topic. It uses a schema registry for schema validation and serialization. Representative code below:

case class KafkaProducerExample(envConfig: Config, isLocal: Boolean) {

  @transient
  lazy val logger: Logger = Logger.getLogger(classOf[KafkaProducerExample])

  def test_send(): Unit= {

    val srAuth = envConfig.Kafka.schemaRegAuthInfo

    val schemaRegistryServer: String = if (isLocal) "http://localhost:8081" else
      envConfig.Kafka.registry

    val schemaUrl = s"$schemaRegistryServer/subjects/topic_name-value/versions/1"
    println(schemaUrl)
    val apiKey = srAuth.get.split(":")(0)
    val apiSecret = srAuth.get.split(":")(1)

    // Set up the request headers with Confluent Cloud credentials
    val headers_1 = Map(
      "Content-Type" -> "application/vnd.schemaregistry.v1+json"
    )

    // Send a GET request to retrieve the schema
    val response = requests.get(schemaUrl, auth = Tuple2(apiKey, apiSecret), headers = headers_1, verifySslCerts = false)

    if (response.statusCode == 200) {
      val schema = response.text()
      println("Retrieved schema:")
      println(schema)
    } else {
      println(s"Failed to retrieve schema. Status code: ${response.statusCode}")
      println(response.text())
    }

    val kafkaProps = config.loadKafkaProducerProperties(envConfig, logger, isLocal)
    // Replace with your Kafka topic
    val topic = "topic_name"

    // SomeClass is an avro generated case class
    val bre_form_record = SomeClass(id = "123",
      batch_id = "123"
    )

    // Create Kafka producer
    val producer = new KafkaProducer[String, GenericRecord](kafkaProps)

    val avroRecord = new org.apache.avro.generic.GenericData.Record(SomeClass.SCHEMA$)
    avroRecord.put("id", "123")
    avroRecord.put("batch_id", "123")
  

    val record = new ProducerRecord[String, GenericRecord](topic, "key", avroRecord)

    producer.send(record)
    producer.close()
  }
}

Now the GET request works and I am able to get a response from the Schema Registry. But the kafka producer client gives me the following error (truncated):

Exception in thread "main" org.apache.kafka.common.errors.SerializationException: Error registering Avro schema{"type":"record","name"
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unauthorized; error code: 401

Following are the relevant libraries in build.sbt:

 "org.apache.kafka" % "kafka-clients" % "3.4.0",
 "org.apache.avro" % "avro" % "1.11.0",
 "io.confluent" % "kafka-avro-serializer" % "7.1.1

What could be the issue here?

0

There are 0 answers