SCAla : Futures /Promises and more

 

I have been a .NET developer for a long time now, and am very very used to dealing with the .NET framework Task library. Obviously here I mean TPL and now Async/Await.

So now that I am doing more and more Scala I wanted to see what the equivalent code would be in Scala, as I do like my Task(s) in .NET.

Lets say I had this .NET code, which is not blocking thanks to the use of callbacks

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace ConsoleApplication1
{
    class Program
    {
        static void Main(string[] args)
        {
            var task = Task.Run(() =>
            {
                return 40;
            });


            task.ContinueWith(ant =>
            {
                Console.WriteLine(ant.Result);
            }, TaskContinuationOptions.OnlyOnRanToCompletion);

            task.ContinueWith(ant =>
            {
                Console.WriteLine("BAD NEWS");
            }, TaskContinuationOptions.OnlyOnFaulted);


            Console.ReadLine();
        }
    }
}

Roughly speaking we could break this down into the following equivalents in Scala:

  • A Task in .NET is roughly equivalent to a Scala Future
  • task.ContinueWith callbacks in .NET are Future callbacks in Scala

 We could take this comparison a bit further. So lets change the .NET code to this code, which is now blocking since we no longer use any callbacks, and instead use the Task.Result property, which causes the Task to be “Observed”.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace ConsoleApplication1
{
    class Program
    {
        static void Main(string[] args)
        {
            var task = Task.Run(() =>
            {
                return 40;
            });


            var x = task.Result;


            Console.ReadLine();
        }
    }
}

In Scala this would be done by the use of the scala.concurrent.Await.ready / scala.concurrent.Await.result which we will see more of later. We will just spend a bit of time looking at some of the plumbing of how to create and work with Futures (Scalas Task equivalent).

 

Futures

A Future is an object holding a value which may become available at some point. This value is usually the result of some other computation:

  • If the computation has not yet completed, we say that the Future is not completed.
  • If the computation has completed with a value or with an exception, we say that the Future is completed.

Completion can take one of two forms:

  • When a Future is completed with a value, we say that the future was successfully completed with that value.
  • When a Future is completed with an exception thrown by the computation, we say that the Future was failed with that exception.

A Future has an important property that it may only be assigned once. Once a Future object is given a value or an exception, it becomes in effect immutable– it can never be overwritten.

The simplest way to create a future object is to invoke the future method which starts an asynchronous computation and returns a future holding the result of that computation. The result becomes available once the future completes.

Note that Future[T] is a type which denotes future objects, whereas future is a method which creates and schedules an asynchronous computation, and then returns a future object which will be completed with the result of that computation.

http://docs.scala-lang.org/overviews/core/futures.html up on date 10/11/15

Let’s see an example. This trivial example creates a Future[Int].

import scala.concurrent._
import ExecutionContext.Implicits.global

object ClassesDemo {

  def main(args: Array[String]) =
  {
    //Creating a Future
    val intFuture: Future[Int] = Future { 23 }
  }
}


You may be wondering how the Future.apply() method is able to come up with a computation that may be completed at some point in the future.

Well the answer to that lies in the use of Promises, which we will look at later.

 

Callbacks

So carrying on from the .NET example that I showed in the introduction paragraph, where I showed how to use Task.ContinueWith(..), which runs a continuation.

Well in Scala we can do the same thing, but it is simpy called a “callback”. Like the .NET continuation Scala callback are NON blocking.

Callback(s) are easy to use, here is an example:


import scala.concurrent.{ExecutionContext, duration, Future, Await}
import scala.reflect.runtime.universe._
import scala.reflect._
import scala.reflect.runtime._
import scala.util
import scala.util.{Failure, Success, Try}
import scala.concurrent.duration._
import ExecutionContext.Implicits.global

object ClassesDemo {

  def main(args: Array[String]) =
  {

    val intFuture: Future[Int] = Future { 23 }

    //use a "callback" which is non blocking
    intFuture onComplete {
      case Success(t) =>
      {
        println(t)
      }
      case Failure(e) =>
      {
        println(s"An error has occured: $e.getMessage")
      }
    }
  }
}


 

Awaiting Futures

We are also able to Await futures. We can do this using 2 methods of the scala.concurrent.Await class which are discussed below. One important note is that the 2 methods shown below ARE blocking, so should be used with caution

Await.ready 

//Await the "completed" state of an Awaitable.
def ready[T](awaitable: Awaitable[T], atMost: Duration): awaitable.type

Await.result

//Await and return the result (of type T) of an Awaitable.
def result[T](awaitable: Awaitable[T], atMost: Duration): T

Let’s see an example of both of these:

import scala.concurrent.{ExecutionContext, duration, Future, Await}
import scala.reflect.runtime.universe._
import scala.reflect._
import scala.reflect.runtime._
import scala.util
import scala.util.{Failure, Success, Try}
import scala.concurrent.duration._
import ExecutionContext.Implicits.global

object ClassesDemo {

  def main(args: Array[String]) =
  {
    //Await.ready
    lazy val intFuture: Future[Int] = Future { 23 }
    val result: Try[Int] = Await.ready(intFuture, 10 seconds).value.get
    val resultEither = result match {
      case Success(t) => Right(t)
      case Failure(e) => Left(e)
    }
    resultEither match {
      case Right(t) => println(t)
      case Left(e) => println(e)
    }

    //Await.result
    lazy val stringFuture = Future { "hello" }
    val theString :String = Await.result(stringFuture, 1 second)
    println(theString)
  }
}

Which when run will give the following output

image

Here are some other links that are good for some background reading on this 

 

Functional Composition

The callback mechanism we have shown is sufficient to chain future results with subsequent computations. However, it is sometimes inconvenient and results in bulky code. Luckily the scala Future[T] class is quite powerful, and comes with a number of combinators to help you write cleaner more succint code.

If only .Net Task has some of these methods (Oh hang on RX (reactive extensions does)) we would be laughing.

Anyway for now just be aware that Future[T] does come equipped with some nice combinators that you may use. I will go through a few of them here, but you should do some more research yourself

Map Example

In this example we use the Future[T].map to transform the result from one Future[T] into a new type of T say TR



import scala.concurrent.{ExecutionContext, duration, Future, Await}
import scala.reflect.runtime.universe._
import scala.reflect._
import scala.reflect.runtime._
import scala.util
import scala.util.{Failure, Success, Try}
import scala.concurrent.duration._
import ExecutionContext.Implicits.global

object ClassesDemo {


  def main(args: Array[String]) =
  {

    val rateQuoteFuture : Future[Double] = Future {
      1.5
    }

    val formattedRateFuture = rateQuoteFuture map { quote =>
      println(quote)
      s"Rate was : $quote"
    }
    formattedRateFuture onComplete  {
      case Success(formatted) => println(formatted)
      case Failure(x) => {
        println(x)
      }
    }


    System.in.read()
  }
}


For

We can also use For with Future[T] (here is one that I shameless stole from the Scala docs)

val usdQuote = Future { connection.getCurrentValue(USD) }
val chfQuote = Future { connection.getCurrentValue(CHF) }
val purchase = for {
  usd <- usdQuote
  chf <- chfQuote
  if isProfitable(usd, chf)
} yield connection.buy(amount, chf)
purchase onSuccess {
  case _ => println("Purchased " + amount + " CHF")
}

WithFilter

Or how about providing a filter. This can be done using the WithFilter method

val purchase = usdQuote flatMap {
  usd =>
  chfQuote
    .withFilter(chf => isProfitable(usd, chf))
    .map(chf => connection.buy(amount, chf))
}

 

Promises

So far we have only considered Future objects created by asynchronous computations started using the future method. However, futures can also be created using promises.

While futures are defined as a type of read-only placeholder object created for a result which doesn’t yet exist, a promise can be thought of as a writable, single-assignment container, which completes a future. That is, a promise can be used to successfully complete a future with a value (by “completing” the promise) using the success method. Conversely, a promise can also be used to complete a future with an exception, by failing the promise, using the failure method.

http://docs.scala-lang.org/overviews/core/futures.html up on date 10/11/15

The way I like to think about Promises (coming from .NET as I have) is that they are pretty much the same as a TaskCompletionSource.

To understand the association between a Promise and a Future lets look at the signature for the Future.apply() method, which looks like this:

 def apply[T](body: =>T)(implicit @deprecatedName('execctx) executor: ExecutionContext): Future[T] = impl.Future(body)

Which if we examine a bit further we can see has this implementation code, where we are actually using the Promise to complete / Fail the Future computation

private[concurrent] object Future {
  class PromiseCompletingRunnable[T](body: => T) extends Runnable {
    val promise = new Promise.DefaultPromise[T]()

    override def run() = {
      promise complete {
        try Success(body) catch { case NonFatal(e) => Failure(e) }
      }
    }
  }

  def apply[T](body: =>T)(implicit executor: ExecutionContext): scala.concurrent.Future[T] = {
    val runnable = new PromiseCompletingRunnable(body)
    executor.prepare.execute(runnable)
    runnable.promise.future
  }
}

 

Scala Async Library

Much of the stuff I talk about in this section is covered in a great post:

http://engineering.roundupapp.co/the-future-is-not-good-enough-coding-with-async-await/

Here is a small example of using several Future(s) together 

This has a few issues namely

There is a new nesting for each new Future to use
It doesn’t handle the unhappy path (failures)
Its pretty sequential



import scala.concurrent.{ExecutionContext, duration, Future, Await}
import scala.reflect.runtime.universe._
import scala.reflect._
import scala.reflect.runtime._
import scala.util
import scala.util.{Failure, Success, Try}
import scala.concurrent.duration._
import ExecutionContext.Implicits.global

object ClassesDemo {


  def main(args: Array[String]) =
  {

    val future1 : Future[Double] = Future { 1 }
    val future2 : Future[Double] = Future { 2 }
    val future3 : Future[Double] = Future { 3 }


    import scala.concurrent.ExecutionContext.Implicits.global

    val (f1,f2,f3) = (future1, future2, future3)
    f1 onSuccess { case r1 =>
      f2 onSuccess { case r2 =>
        f3 onSuccess { case r3 =>
          println(s"Sum:  ${r1 + r2 + r3}")
        }
      }
    }


    System.in.read()
  }
}


This has a few issues namely

  • There is a new nesting for each new Future to use
  • It doesn’t handle the unhappy path (failures)
  • Its pretty sequential

We c an fix some of this by using a for comprehension



import scala.concurrent.{ExecutionContext, duration, Future, Await}
import scala.reflect.runtime.universe._
import scala.reflect._
import scala.reflect.runtime._
import scala.util
import scala.util.{Failure, Success, Try}
import scala.concurrent.duration._
import ExecutionContext.Implicits.global

object ClassesDemo {


  def main(args: Array[String]) =
  {

    val future1 : Future[Double] = Future { 1 }
    val future2 : Future[Double] = Future { 2 }
    val future3 : Future[Double] = Future { 3 }


    import scala.concurrent.ExecutionContext.Implicits.global

    val (f1,f2,f3) = (future1, future2, future3)
    val f = for {
      r1 <- f1
      r2 <- f2
      r3 <- f3
    } yield r1 + r2 + r3
    f onComplete {
      case Success(s) => println(s"Sum: $s")
      case Failure(e) => // Handle failure
    }


    System.in.read()
  }
}


This fixes point 1, and 2, but it still executes sequentially. We could take this further and do this:



import scala.concurrent.{ExecutionContext, duration, Future, Await}
import scala.reflect.runtime.universe._
import scala.reflect._
import scala.reflect.runtime._
import scala.util
import scala.util.{Failure, Success, Try}
import scala.concurrent.duration._
import ExecutionContext.Implicits.global

object ClassesDemo {


  def main(args: Array[String]) =
  {

    val future1 : Future[Double] = Future { 1 }
    val future2 : Future[Double] = Future { 2 }
    val future3 : Future[Double] = Future { 3 }


    import scala.concurrent.ExecutionContext.Implicits.global

    val f = Future.sequence(Seq(future1,future2,future3))
    f onComplete {
      case Success(r) => println(s"Sum: ${r.sum}")
      case Failure(e) => // Handle failure
    }


    System.in.read()
  }
}


 

But there is a better way, that I am happy to say borrows from .NET async/await (which in turn borrowed from F# but hey ho). We can rewrite the above code using the Scala Async library like this.

The Scala async library can be found here :

https://github.com/scala/async up on date 10/11/15



import scala.concurrent.{Future}

import scala.async.Async._ //'async/await' macros blocks and implicits

object ClassesDemo {


  def main(args: Array[String]) =
  {
    val future1 : Future[Double] = Future { 1 }
    val future2 : Future[Double] = Future { 2 }
    val future3 : Future[Double] = Future { 3 }

    //use Scala Async Library here, note the Async-Await
    async {
      val s = await {future1} + await {future2} + await {future3}
      println(s"Sum:  $s")
    } onFailure { case e => /* Handle failure */ }


    System.in.read()
  }
}


async marks a block of asynchronous code. Such a block usually contains one or more await calls, which marks a point at which the computation will be suspended until the awaited Future is complete.

By default, async blocks operate on scala.concurrent.{Future, Promise}. The system can be adapted to alternative implementations of the Future pattern.

https://github.com/scala/async up on date 10/11/15

This for me as a .NET guy making his way into the Scala world makes a lot of sense

 

 

Further Reading

The Scala docs are actually very good for Futures/Promises. You can read more about this here :

http://docs.scala-lang.org/overviews/core/futures.html

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: