Getting Cassandra query results asynchronously using Scala + Monix

566 views Asked by At

I'm building a REST API using AKKA Http, Monix and Datastax Java Driver for Apache Cassandra and I'm having some troubles while trying to fetch some Items from cassandra, wait for the query to be fulfilled and returning the results.

I'm able to print all the results easily, but unable to wait for the query to be done and return all the items. My rest point simply returns an empty array of items since it does not wait for the query to be done.

I have an executeQuery method that takes:

  • queryString: String representing a cassandra query
  • page: Int useful for pagination
  • parameters: Any* representing parameters, if necessary for the query

And returns an Observable[Row].

Then, in order to perform such query, retrieve its result, parse them and send them back, I use Monix Observable and Subscription.

Let's suppose I want to retrieve some items by a common field known as pid:

import monix.execution.Ack
import monix.execution.Scheduler.Implicits.global
import com.datastax.driver.core.Row
import monix.reactive.Observable

import cassandra.src.CassandraHelper
import item.src.entity.{Item, Items}

. . .
val keyspace = "my_keyspace"
val table = "items"
. . .

def getItems() : Items = {
  var itemList: Items = List()
  val observable: Observable[Row] = CassandraHelper.executeQuery(
    "SELECT * FROM " + keyspace + "." + table,
    1
  )
  observable.subscribe { row =>
    itemList ::= ItemMapper.rowToItem()(row)
    Ack.Continue
  }
  Items(itemList)
}

Where rowToItem simply parses a row into an Item and Items: List[Item]. I was taking a look at Task but I'm not quite sure its what I'm looking for.

EDIT

With @Alexandru Nedelcu solution I'm able to print all the items in itemList as soon as they get inserted into it, but still getting an empty response for that call: { "items" : [] }.

Here's the edited code:

def getItems() : Items = {
  var itemList: List[Item] = List()
  val observable: Observable[Row] = CassandraHelper.executeQuery(
    "SELECT * FROM " + keyspace + "." + table,
    1
  )
  observable.subscribe { row =>
    println(itemList)
    itemList ::= ItemMapper.rowToItem()(row)
    Ack.Continue
  }
  Items(itemList)
}

How can I wait for the results to be all parsed and inserted into items and then send them back?

1

There are 1 answers

4
Alexandru Nedelcu On BEST ANSWER

From what I understand you have an Observable[Row] and you want to build an Items out of it, which aggregates every Row element from the source stream, is that correct?

If so, the foldLeftL is what you want, which will aggregate every element into a state and return the final result once the source stream completes:

// We need to suspend the Task, because your Items is probably a
// mutable object and it's best to suspend side effects ;-)
val items: Task[Items] = Task.suspend {
  val initial: Items = _
  val observable: Observable[Row] = ???

  // This returns a Task[Items] when the source completes
  observable.foldLeftL(initial) { (items, elem) =>
    items ::= ItemMapper.rowToItem()(row)
    // I don't understand if your `Items` is mutable or not
    // but returning the same reference is fine
    items
  }
}

A Task is a lazy Future. And you can convert it into a Future with runAsync. More details here: https://monix.io/docs/2x/eval/task.html