I'm building a REST API using AKKA Http, Monix and Datastax Java Driver for Apache Cassandra and I'm having some troubles while trying to fetch some Items from cassandra, wait for the query to be fulfilled and returning the results.
I'm able to print all the results easily, but unable to wait for the query to be done and return all the items. My rest point simply returns an empty array of items since it does not wait for the query to be done.
I have an executeQuery method that takes:
queryString: Stringrepresenting a cassandra querypage: Intuseful for paginationparameters: Any*representing parameters, if necessary for the query
And returns an Observable[Row].
Then, in order to perform such query, retrieve its result, parse them and send them back, I use Monix Observable and Subscription.
Let's suppose I want to retrieve some items by a common field known as pid:
import monix.execution.Ack
import monix.execution.Scheduler.Implicits.global
import com.datastax.driver.core.Row
import monix.reactive.Observable
import cassandra.src.CassandraHelper
import item.src.entity.{Item, Items}
. . .
val keyspace = "my_keyspace"
val table = "items"
. . .
def getItems() : Items = {
var itemList: Items = List()
val observable: Observable[Row] = CassandraHelper.executeQuery(
"SELECT * FROM " + keyspace + "." + table,
1
)
observable.subscribe { row =>
itemList ::= ItemMapper.rowToItem()(row)
Ack.Continue
}
Items(itemList)
}
Where rowToItem simply parses a row into an Item and Items: List[Item].
I was taking a look at Task but I'm not quite sure its what I'm looking for.
EDIT
With @Alexandru Nedelcu solution I'm able to print all the items in itemList as soon as they get inserted into it, but still getting an empty response for that call: { "items" : [] }.
Here's the edited code:
def getItems() : Items = {
var itemList: List[Item] = List()
val observable: Observable[Row] = CassandraHelper.executeQuery(
"SELECT * FROM " + keyspace + "." + table,
1
)
observable.subscribe { row =>
println(itemList)
itemList ::= ItemMapper.rowToItem()(row)
Ack.Continue
}
Items(itemList)
}
How can I wait for the results to be all parsed and inserted into items and then send them back?
From what I understand you have an
Observable[Row]and you want to build anItemsout of it, which aggregates everyRowelement from the source stream, is that correct?If so, the
foldLeftLis what you want, which will aggregate every element into a state and return the final result once the source stream completes:A
Taskis a lazyFuture. And you can convert it into aFuturewithrunAsync. More details here: https://monix.io/docs/2x/eval/task.html