Asynchronous Iterable over remote data

1.4k views Asked by At

There is some data that I have pulled from a remote API, for which I use a Future-style interface. The data is structured as a linked-list. A relevant example data container is shown below.

case class Data(information: Int) {
    def hasNext: Boolean = ??? // Implemented
    def next: Future[Data] = ??? // Implemented
}

Now I'm interested in adding some functionality to the data class, such as map, foreach, reduce, etc. To do so I want to implement some form of IterableLike such that it inherets these methods. Given below is the trait Data may extend, such that it gets this property.

trait AsyncIterable[+T]
    extends IterableLike[Future[T], AsyncIterable[T]]
{
    def hasNext : Boolean
    def next : Future[T]

    // How to implement?
    override def iterator: Iterator[Future[T]] = ???
    override protected[this] def newBuilder: mutable.Builder[Future[T], AsyncIterable[T]] = ???
    override def seq: TraversableOnce[Future[T]] = ???
}

It should be a non-blocking implementation, which when acted on, starts requesting the next data from the remote data source. It is then possible to do cool stuff such as

case class Data(information: Int) extends AsyncIterable[Data]
val data = Data(1) // And more, of course
// Asynchronously print all the information.
data.foreach(data => println(data.information))

It is also acceptable for the interface to be different. But the result should in some way represent asynchronous iteration over the collection. Preferably in a way that is familiar to developers, as it will be part of an (open source) library.

2

There are 2 answers

0
Calavoow On BEST ANSWER

Using Twitter Spool I've implemented a working example. To implement spool I modified the example in the documentation.

import com.twitter.concurrent.Spool
import com.twitter.util.{Await, Return, Promise}

import scala.concurrent.{ExecutionContext, Future}

trait AsyncIterable[+T <: AsyncIterable[T]] { self : T =>
    def hasNext : Boolean
    def next : Future[T]

    def spool(implicit ec: ExecutionContext) : Spool[T] = {
        def fill(currentPage: Future[T], rest: Promise[Spool[T]]) {
            currentPage foreach { cPage =>
                if(hasNext) {
                    val nextSpool = new Promise[Spool[T]]
                    rest() = Return(cPage *:: nextSpool)
                    fill(next, nextSpool)
                } else {
                    val emptySpool = new Promise[Spool[T]]
                    emptySpool() = Return(Spool.empty[T])
                    rest() = Return(cPage *:: emptySpool)
                }
            }
        }
        val rest = new Promise[Spool[T]]
        if(hasNext) {
            fill(next, rest)
        } else {
            rest() = Return(Spool.empty[T])
        }
        self *:: rest
    }
}

Data is the same as before, and now we can use it.

// Cool stuff
implicit val ec = scala.concurrent.ExecutionContext.global
val data = Data(1) // And others
// Print all the information asynchronously
val fut = data.spool.foreach(data => println(data.information))
Await.ready(fut)

It will trow an exception on the second element, because the implementation of next was not provided.

5
Oleg Rudenko On

In production I would use one of following:

  1. Akka Streams
  2. Reactive Extensions

For private tests I would implement something similar to following. (Explanations are below)

I have modified a little bit your Data:

abstract class AsyncIterator[T] extends Iterator[Future[T]] {
  def hasNext: Boolean
  def next(): Future[T]
}

For it we can implement this Iterable:

class AsyncIterable[T](sourceIterator: AsyncIterator[T])
  extends IterableLike[Future[T], AsyncIterable[T]]
{
  private def stream(): Stream[Future[T]] =
    if(sourceIterator.hasNext) {sourceIterator.next #:: stream()} else {Stream.empty}
  val asStream = stream()

  override def iterator = asStream.iterator
  override def seq = asStream.seq
  override protected[this] def newBuilder = throw new UnsupportedOperationException()
}

And if see it in action using following code:

object Example extends App {
  val source = "Hello World!";

  val iterator1 = new DelayedIterator[Char](100L, source.toCharArray)
  new AsyncIterable(iterator1).foreach(_.foreach(print)) //prints 1 char per 100 ms
  pause(2000L)

  val iterator2 = new DelayedIterator[String](100L, source.toCharArray.map(_.toString))
  new AsyncIterable(iterator2).reduceLeft((fl: Future[String], fr) =>
    for(l <- fl; r <- fr) yield {println(s"$l+$r"); l + r}) //prints 1 line per 100 ms
  pause(2000L)

  def pause(duration: Long) = {println("->"); Thread.sleep(duration); println("\n<-")}
}

class DelayedIterator[T](delay: Long, data: Seq[T]) extends AsyncIterator[T] {
  private val dataIterator = data.iterator
  private var nextTime = System.currentTimeMillis() + delay
  override def hasNext = dataIterator.hasNext
  override def next = {
    val thisTime = math.max(System.currentTimeMillis(), nextTime)
    val thisValue = dataIterator.next()
    nextTime = thisTime + delay
    Future {
      val now = System.currentTimeMillis()
      if(thisTime > now) Thread.sleep(thisTime - now) //Your implementation will be better
      thisValue
    }
  }
}

Explanation

AsyncIterable uses Stream because it's calculated lazily and it's simple.

Pros:

  • simplicity
  • multiple calls to iterator and seq methods return same iterable with all items.

Cons:

  • could lead to memory overflow because stream keeps all prevously obtained values.
  • first value is eagerly gotten during creation of AsyncIterable

DelayedIterator is very simplistic implementation of AsyncIterator, don't blame me for quick and dirty code here.

It's still strange for me to see synchronous hasNext and asynchronous next()