Found java.util.concurrent.Future Required scala.concurrent.Future

1.3k views Asked by At

Related: scala.concurrent.Future wrapper for java.util.concurrent.Future

This came from my other question:

How to integrate akka streams kafka (reactive-kafka) into akka http application?

I have a AKKA HTTP application, and I'd like to send a message/ProducerRecord to Kafka in the onComplete function in my route, like the following:

val producer : KafkaProducer = new KafkaProducer(producerSettings)

val routes : Route = 
  post {
    entity(as[User]) { user =>
      val createUser = userService.create(user)
      onSuccess(createUser) {
        case Invalid(y: NonEmptyList[Err]) =>  
          complete(BadRequest -> "invalid user")
        case Valid(u: User) => { 
          val producerRecord = 
            new ProducerRecord[Array[Byte], String]("topic1","some message")

          onComplete(producer.send(producerRecord)) { _ =>
            complete(ToResponseMarshallable((StatusCodes.Created, u)))
          }
        }
      }
    }
  }

However, the onComplete(producer send producerRecord) is generating the following type mismatch error:

[error] found : Future[org.apache.kafka.clients.producer.RecordMetadata] (in java.util.concurrent) [error] required: Future[org.apache.kafka.clients.producer.RecordMetadata] (in scala.concurrent) [error] onCompleteRecordMetadata { _ =>

Is there any way around this, maybe by using the Producer as as sink (http://doc.akka.io/docs/akka-stream-kafka/current/producer.html#producer-as-a-sink) instead of the java producer.send function?

2

There are 2 answers

1
Stefano Bonetti On BEST ANSWER

You could leverage Cake's Scala based Kafka client, which will do the work of running Java futures and giving you Scala futures back. Once you make sure you create a cakesolutions.kafka.KafkaProducer instead of a org.apache.kafka.clients.producer.KafkaProducer, the rest of your code should practically stay the same.

Alternatively, you can sort this out leveraging Reactive Kafka whilst keeping using the high level Akka HTTP DSL. You can do it by running your producer record to a Kafka Sink, this way:

val producerSink = Producer.plainSink(producerSettings)

...
        // inside the route
        val producerRecord =
          new ProducerRecord[Array[Byte], String]("topic1", "some message")

        onComplete(Source.single(producerRecord).runWith(producerSink)) { _ =>
          complete(ToResponseMarshallable((StatusCodes.Created, u)))
        }
0
Arnout Engelen On

To answer your specific question, the scala-java8-compat library provides converters between java8 and Scala Futures.

Specifically, you can use FutureConverters.toScala(producer.send(producerRecord)) to convert the java.util.concurrent.Future to scala.concurrent.Future

However, using a client library that has a Scala-friendly API itself (as suggested by Stefano above) will probably get you the best result.