I'm building a REST API using AKKA Http, Monix and Datastax Java Driver for Apache Cassandra and I'm having some troubles while trying to fetch some Items from cassandra, wait for the query to be fulfilled and returning the results.
I'm able to print all the results easily, but unable to wait for the query to be done and return all the items. My rest point simply returns an empty array of items since it does not wait for the query to be done.
I have an executeQuery
method that takes:
queryString: String
representing a cassandra querypage: Int
useful for paginationparameters: Any*
representing parameters, if necessary for the query
And returns an Observable[Row]
.
Then, in order to perform such query, retrieve its result, parse them and send them back, I use Monix Observable and Subscription.
Let's suppose I want to retrieve some items by a common field known as pid
:
import monix.execution.Ack
import monix.execution.Scheduler.Implicits.global
import com.datastax.driver.core.Row
import monix.reactive.Observable
import cassandra.src.CassandraHelper
import item.src.entity.{Item, Items}
. . .
val keyspace = "my_keyspace"
val table = "items"
. . .
def getItems() : Items = {
var itemList: Items = List()
val observable: Observable[Row] = CassandraHelper.executeQuery(
"SELECT * FROM " + keyspace + "." + table,
1
)
observable.subscribe { row =>
itemList ::= ItemMapper.rowToItem()(row)
Ack.Continue
}
Items(itemList)
}
Where rowToItem
simply parses a row into an Item
and Items: List[Item]
.
I was taking a look at Task but I'm not quite sure its what I'm looking for.
EDIT
With @Alexandru Nedelcu solution I'm able to print all the items
in itemList
as soon as they get inserted into it, but still getting an empty response for that call: { "items" : [] }
.
Here's the edited code:
def getItems() : Items = {
var itemList: List[Item] = List()
val observable: Observable[Row] = CassandraHelper.executeQuery(
"SELECT * FROM " + keyspace + "." + table,
1
)
observable.subscribe { row =>
println(itemList)
itemList ::= ItemMapper.rowToItem()(row)
Ack.Continue
}
Items(itemList)
}
How can I wait for the results to be all parsed and inserted into items and then send them back?
From what I understand you have an
Observable[Row]
and you want to build anItems
out of it, which aggregates everyRow
element from the source stream, is that correct?If so, the
foldLeftL
is what you want, which will aggregate every element into a state and return the final result once the source stream completes:A
Task
is a lazyFuture
. And you can convert it into aFuture
withrunAsync
. More details here: https://monix.io/docs/2x/eval/task.html