How to use Scala ARM with Futures?

963 views Asked by At

I want to implement the ARM (automated resource management) pattern, where the resource is used asynchronously.

The problem

Suppose my resource looks like:

class MyResource { 
  def foo() : Future[MyResource] = ???
  // Other methods returning various futures
  def close() : Unit = ???
}
object MyResource { 
  def open(name: String): Future[MyResource] = ???
} 

The desired usage pattern is:

val r : Future[MyResource] = MyResource.open("name")
r flatMap (r => {
  r.foo() /* map ... */ andThen {
    case _ => r.close()
  }
})

The elided mapped functions can be complex, involving branching and chaining futures that make repeated calls to methods of r that return Futures.

I want to make sure r.close() is called after all future continuations have completed (or failed). Doing this manually at every call-site is error prone. This calls for an ARM solution.

Attempted solutions

The scala-arm library is normally synchronous. This code wouldn't do the right thing, because close() would be called before the futures inside the block had completed:

for (r <- managed(MyResource.open("name"))) {
  r map (_.foo()) // map ...
}

I though of using this wrapper:

def usingAsync[T](opener: => Future[MyResource]) (body: MyResource => Future[T]) : Future[T] =
  opener flatMap { 
    myr => body(myr) andThen { case _ => myr.close() } }

Then the call site would look like:

usingAsync(MyResource.open("name")) ( myr => {
  myr.foo // map ...
})

But then, the code inside the block would be responsible for returning a Future that completed when all the other futures created by that block completed. If it accidentally didn't, then again the resource would be closed before all futures using it were complete. And there would be no static verification to catch this error. For example, this would be a runtime error:

usingAsync(MyResource.open("name")) ( myr => {
  myr.foo() // Do one thing
  myr.bar() // Do another
})

How to solve this?

Apparently, I could use scala-arm's delimited continuation support (CPS). It looks a bit complex and I'm afraid of getting it wrong. And it requires enabling a compiler plugin. Also, my team is very new to scala and I don't want to require them to use CPS.

Is CPS the only way forward? Is there a library or design pattern that does this more simply with Futures, or an example of doing this with scala-arm?

1

There are 1 answers

2
reggoodwin On

Reactive Extensions (Rx) could be an alternative solution. There is increasing momentum around this programming paradigm, that is now available in many languages including Scala.

The basis of Rx is to create an Observable which is a source of asynchronous events. Observable can be chained in sophisticated ways, this is it's power. You subscribe to an Observable to listen for onNext, onError and onComplete events. You also receive a Subscription back that allows you to cancel.

I think you would probably add a resource.close() call in and onCompleted and/or onError handler.

See RxScala docs for:

Observable.subscribe(
    onNext: (T) ⇒ Unit, 
    onError: (Throwable) ⇒ Unit, 
    onCompleted: () ⇒ Unit): Subscription

More info: