I am trying to run a job on google Dataproc which pushes a record into a Kafka topic. It uses a schema registry for schema validation and serialization. Representative code below:
case class KafkaProducerExample(envConfig: Config, isLocal: Boolean) {
@transient
lazy val logger: Logger = Logger.getLogger(classOf[KafkaProducerExample])
def test_send(): Unit= {
val srAuth = envConfig.Kafka.schemaRegAuthInfo
val schemaRegistryServer: String = if (isLocal) "http://localhost:8081" else
envConfig.Kafka.registry
val schemaUrl = s"$schemaRegistryServer/subjects/topic_name-value/versions/1"
println(schemaUrl)
val apiKey = srAuth.get.split(":")(0)
val apiSecret = srAuth.get.split(":")(1)
// Set up the request headers with Confluent Cloud credentials
val headers_1 = Map(
"Content-Type" -> "application/vnd.schemaregistry.v1+json"
)
// Send a GET request to retrieve the schema
val response = requests.get(schemaUrl, auth = Tuple2(apiKey, apiSecret), headers = headers_1, verifySslCerts = false)
if (response.statusCode == 200) {
val schema = response.text()
println("Retrieved schema:")
println(schema)
} else {
println(s"Failed to retrieve schema. Status code: ${response.statusCode}")
println(response.text())
}
val kafkaProps = config.loadKafkaProducerProperties(envConfig, logger, isLocal)
// Replace with your Kafka topic
val topic = "topic_name"
// SomeClass is an avro generated case class
val bre_form_record = SomeClass(id = "123",
batch_id = "123"
)
// Create Kafka producer
val producer = new KafkaProducer[String, GenericRecord](kafkaProps)
val avroRecord = new org.apache.avro.generic.GenericData.Record(SomeClass.SCHEMA$)
avroRecord.put("id", "123")
avroRecord.put("batch_id", "123")
val record = new ProducerRecord[String, GenericRecord](topic, "key", avroRecord)
producer.send(record)
producer.close()
}
}
Now the GET request works and I am able to get a response from the Schema Registry. But the kafka producer client gives me the following error (truncated):
Exception in thread "main" org.apache.kafka.common.errors.SerializationException: Error registering Avro schema{"type":"record","name"
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unauthorized; error code: 401
Following are the relevant libraries in build.sbt:
"org.apache.kafka" % "kafka-clients" % "3.4.0",
"org.apache.avro" % "avro" % "1.11.0",
"io.confluent" % "kafka-avro-serializer" % "7.1.1
What could be the issue here?