I am experiencing a reproducible error while producing Avro messages with reactive kafka and avro4s. Once the identityMapCapacity
of the client (CachedSchemaRegistryClient
) is reached, serialization fails with
java.lang.IllegalStateException: Too many schema objects created for <myTopic>-value
This is unexpected, since all messages should have the same schema - they are serializations of the same case class.
val avroProducerSettings: ProducerSettings[String, GenericRecord] =
ProducerSettings(system, Serdes.String().serializer(),
avroSerde.serializer())
.withBootstrapServers(settings.bootstrapServer)
val avroProdFlow: Flow[ProducerMessage.Message[String, GenericRecord, String],
ProducerMessage.Result[String, GenericRecord, String],
NotUsed] = Producer.flow(avroProducerSettings)
val avroQueue: SourceQueueWithComplete[Message[String, GenericRecord, String]] =
Source.queue(bufferSize, overflowStrategy)
.via(avroProdFlow)
.map(logResult)
.to(Sink.ignore)
.run()
...
queue.offer(msg)
The serializer is a KafkaAvroSerializer
, instantiated with a new CachedSchemaRegistryClient(settings.schemaRegistry, 1000)
Generating the GenericRecord
:
def toAvro[A](a: A)(implicit recordFormat: RecordFormat[A]): GenericRecord =
recordFormat.to(a)
val makeEdgeMessage: (Edge, String) => Message[String, GenericRecord, String] = { (edge, topic) =>
val edgeAvro: GenericRecord = toAvro(edge)
val record = new ProducerRecord[String, GenericRecord](topic, edge.id, edgeAvro)
ProducerMessage.Message(record, edge.id)
}
The schema is created deep in the code (io.confluent.kafka.serializers.AbstractKafkaAvroSerDe#getSchema
, invoked by io.confluent.kafka.serializers.AbstractKafkaAvroSerializer#serializeImpl
) where I have no influence on it, so I have no idea how to fix the leak. Looks to me like the two confluent projects do not work well together.
The issues I have found here, here and here do not seem to address my use case.
The two workarounds for me are currently:
- not use schema registry - not a long-term solution obviously
- create custom
SchemaRegistryClient
not relying on object identity - doable but I would like to avoid creating more issues than by reimplementing
Is there a way to generate or cache a consistent schema depending on message/record type and use it with my setup?
edit 2017.11.20
The issue in my case was that each instance of
GenericRecord
carrying my message has been serialized by a different instance ofRecordFormat
, containing a different instance of theSchema
. The implicit resolution here generated a new instance each time.def toAvro[A](a: A)(implicit recordFormat: RecordFormat[A]): GenericRecord = recordFormat.to(a)
The solution was to pin the
RecordFormat
instance to aval
and reuse it explicitly. Many thanks to https://github.com/heliocentrist for explaining the details.original response:
After waiting for a while (also no answer for the github issue) I had to implement my own
SchemaRegistryClient
. Over 90% is copied from the originalCachedSchemaRegistryClient
, just translated into scala. Using a scalamutable.Map
fixed the memory leak. I have not performed any comprehensive tests, so use at your own risk.