Related: scala.concurrent.Future wrapper for java.util.concurrent.Future
This came from my other question:
How to integrate akka streams kafka (reactive-kafka) into akka http application?
I have a AKKA HTTP application, and I'd like to send a message/ProducerRecord to Kafka in the onComplete function in my route, like the following:
val producer : KafkaProducer = new KafkaProducer(producerSettings)
val routes : Route =
post {
entity(as[User]) { user =>
val createUser = userService.create(user)
onSuccess(createUser) {
case Invalid(y: NonEmptyList[Err]) =>
complete(BadRequest -> "invalid user")
case Valid(u: User) => {
val producerRecord =
new ProducerRecord[Array[Byte], String]("topic1","some message")
onComplete(producer.send(producerRecord)) { _ =>
complete(ToResponseMarshallable((StatusCodes.Created, u)))
}
}
}
}
}
However, the onComplete(producer send producerRecord) is generating the following type mismatch error:
[error] found : Future[org.apache.kafka.clients.producer.RecordMetadata] (in java.util.concurrent) [error] required: Future[org.apache.kafka.clients.producer.RecordMetadata] (in scala.concurrent) [error] onCompleteRecordMetadata { _ =>
Is there any way around this, maybe by using the Producer as as sink (http://doc.akka.io/docs/akka-stream-kafka/current/producer.html#producer-as-a-sink) instead of the java producer.send function?
You could leverage Cake's Scala based Kafka client, which will do the work of running Java futures and giving you Scala futures back. Once you make sure you create a
cakesolutions.kafka.KafkaProducer
instead of aorg.apache.kafka.clients.producer.KafkaProducer
, the rest of your code should practically stay the same.Alternatively, you can sort this out leveraging Reactive Kafka whilst keeping using the high level Akka HTTP DSL. You can do it by running your producer record to a Kafka Sink, this way: