I am specifically using twitter's AsyncStream and I need to take the results of concurrent processing, and make it into a Seq, but the only way I've managed to get working is horrendous. This feels like it should be a one liner, but all my attempts with Await and force have either hung or not processed the work.
Here's what I have working - what's a more idiomatic way of doing this?
def processWork(work: AsyncStream[Work]): Seq[Result] = {
// TODO: make less stupid
val resultStream = work.flatMap { processWork }
var results : Seq[Result] = Nil
resultStream.foreach {
result => {
results = results :+ result
}
}
results
}
Like @MattFowler pointed out - you can force the stream and wait until it's complete using:
toSeq
will start realizing the stream and return aFuture[Seq[A]]
that completes once all the element are resolved, as documentated here.You can then block until the future completes by using
Await.result
.Make sure your stream is finite! Calling
Await.result(resultStream.toSeq, 1.second)
would hang forever.