F# 28 : Integrating With Task Parallel Library

Last time we looked at the Async class, and examined some of its core functions. This time we will be looking at using some Task Parallel Library (TPL) classes, namely Task<T>, and Task. We will also examine how the Async module can be used in conjunction with TPL. 

TPL Primer

I do not have enough time in this post to go through all the nitty gritty details of TPL, but I will just mention a few key points

  • TPL uses a Task<T> to represent a asynchronous operation that will return a value T in this case (yes a generic so anything your heart desires)
  • TPL uses a Task to represent a asynchronous operation that doesn’t return a value. Unit in F# lingo
  • In TPL there are several trigger values that cause the Task<T> to be observed. Things like Wait / WaitAll / Result will also cause the tasks to be observed. These are however blocking operations that suspend the calling thread.
  • TPL may also use CancellationTokens to cancel async operations (albeit you need a bit more code in C# than you do in F# due to the fact that in C# you must constantly check the CancellationToken, which we saw in the previous post)
  • Both Task<T> and Task can be waited on
  • Both Task<T> and Task can run things known as continuations, which are essentially callbacks when the Task<T> / Task is done. You may schedule callback for when a Task ran to completion, or is faulted, or both, or none
  • Task<T> and Task for the basis of the new async/await syntax in C#

Starting And Waiting For Task<T>

In this simple example we will show how to create a simple Task<T> that returns a boolean. We will the use the blocking Task<T>.Wait() method, to obtain the result of the Task<T>, which will be a boolean in this case.

open System
open System.Threading
open System.Threading.Tasks

[<EntryPoint>]
let main argv =

    let work() =
        for i in 0 .. 2 do
            printfn "Work loop is currently %O" i |> ignore
            Thread.Sleep(1000)
        false
    
    printfn "Starting task that returns a value" |> ignore
    let task = Task.Factory.StartNew<bool>((fun () -> work()),  TaskCreationOptions.LongRunning)
    let result = task.Result
    printfn "Task result is %O" result
   
    Console.ReadLine() |> ignore
    
    //return 0 for main method
    0       

Which when run gives the following output

image

We could also do this another way too which would yield the same results. We could use a continuation from the original Task<T> that is run when the original task runs to completion. Think of continuations as callbacks. Here is the code rewritten to use a continuation, remember you can have a single callback for the whole original task, or hook up specific ones for particular scenarios, which is what I have done here.

 

open System
open System.Threading
open System.Threading.Tasks

[<EntryPoint>]
let main argv =

    let work() =
        for i in 0 .. 2 do
            printfn "Work loop is currently %O" i |> ignore
            Thread.Sleep(1000)
        false
    
    printfn "Starting task that returns a value" |> ignore
    let task = Task.Factory.StartNew<bool>((fun () -> work()),  TaskCreationOptions.LongRunning)
    task.ContinueWith((fun (antecedant : Task<bool>) -> printfn "Task result is %O" antecedant.Result),
        TaskContinuationOptions.OnlyOnRanToCompletion) |> ignore
   
    Console.ReadLine() |> ignore
    
    //return 0 for main method
    0       

Starting And Waiting For Task<T> In A More F# Like Way

The Async class offers a couple of helpers when dealing with tasks, you may use

  • Async.StartAsTask
  • Async.AwaitTask

Here is some code that shows how you can use these

open System
open System.Threading
open System.Threading.Tasks

[<EntryPoint>]
let main argv =

    let work = async {
        for i in 0 .. 2 do
            printfn "Work loop is currently %O" i |> ignore
            do! Async.Sleep(1000)
        return "task is completed " + DateTime.Now.ToLongTimeString()
        }
    
    printfn "Starting task that returns a value" |> ignore
   
    let asynWorkflow = async {
        //NOTE : Async.StartAsNewTask doesn't like TaskCreationOptions.LongRunning
        let task = Async.StartAsTask((work))
        let! result = Async.AwaitTask(task)
        return result
    }

    let finalResult = Async.RunSynchronously asynWorkflow
    printfn "Task result is : %O" finalResult
   
    Console.ReadLine() |> ignore
    
    //return 0 for main method
    0       

 

Here are the results of running the above code:

image

 

Starting And Waiting For Plain Task

Another thing you might find yourself wanting to do is a use a TPL Task. That is a Task that does not return a value, basically you have Task<T> which is a task that returns T, and Task (essentially Task void, or Task<Unit> in F# lingo), which is a task that doesn’t return a value. Task may still be waited on in C# land, but there seems to be less you can do with a standard Task (one that doesn’t return a value) in F#.

There however a few tricks you can do, the first one requires a bit of insight into multi threading anyway, which is that Task, and Task<T> for that matter both implement IAsyncResult, which is something you can wait on inside of a F# async workflow, by using Async.AwaitIAsyncResult. Here is a small example, of how you can wait on a plain Task. This example also demonstrates how you can extend the Async module to include your own user specified functions. That is pretty cool actually, C# allows extension methods (which F# also allows), but being able to just add arbitrary functions is very cool.

Anyway here is the code:

open System
open System.Threading
open System.Threading.Tasks

//This extends the Async module to add the
//AwaitTaskVoid function, which will now appear
//in intellisense
module Async =
    let AwaitVoidTask : (Task -> Async<unit>) =
        Async.AwaitIAsyncResult >> Async.Ignore

[<EntryPoint>]
let main argv =

    let theWorkflow(delay :int) = async {
        printfn "Starting workflow at %O" (DateTime.Now.ToLongTimeString())
        do! Task.Delay(delay) |> Async.AwaitVoidTask
        printfn "Ending workflow at %O" (DateTime.Now.ToLongTimeString())
    }
    
    Async.RunSynchronously (theWorkflow(2000))

    Console.ReadLine() |> ignore

    //return 0 for main method
    0       

Which when run gives the following result:

image

Some other clever chap who maintains this blog https://gist.github.com/theburningmonk/3921623 has a slightly different take on this. Here is his version, which I also think has many merits, for example it is really nice that it will pattern match against a Faulted Task and raise an Exception

open System
open System.Threading
open System.Threading.Tasks

//This extends the Async module to add the
//AwaitTaskVoid function, which will now appear
//in intellisense
module Async =
    let inline awaitPlainTask (task: Task) =
        // rethrow exception from preceding task if it fauled
        let continuation (t : Task) : unit =
            match t.IsFaulted with
            | true -> raise t.Exception
            | arg -> ()
        task.ContinueWith continuation |> Async.AwaitTask

    let inline startAsPlainTask (work : Async<unit>) =
        Task.Factory.StartNew(fun () -> work |> Async.RunSynchronously)

[<EntryPoint>]
let main argv =

    let sleepy = async {
        do! Async.Sleep(5000) // sleep for 5 seconds
        printfn "awake"
      }

    let sleepy2 = async {
        do! sleepy |> Async.startAsPlainTask |> Async.awaitPlainTask
        printfn "feeling sleepy again…"
      }

    //call the workflows
    sleepy |> Async.startAsPlainTask |> ignore
    sleepy2 |> Async.Start |> ignore

    Console.ReadLine() |> ignore

    //return 0 for main method
    0       

Which gives the following results when run:

image

 

Starting And Waiting For Multiple Tasks

To wait for multiple Task<T> you can use TPLs Task.WhenAll() for this, which will give you an aggregated result task, which will have a result object which contains the results from the original tasks you used in the Task.WaitAll() call.

There may well be a way that you can bend the Async.Parallel() to do the same job, but to my mind using Task.WhenAll() is by far the easiest way.

Here is some code that demonstrates this

open System
open System.Threading
open System.Threading.Tasks

[<EntryPoint>]
let main argv =

    let work(msg) =
        for i in 0 .. 2 do
            printfn "%O : Work loop is currently %O\r\n" msg i |> ignore
            Thread.Sleep(1000)
        "Task 1 done " + (DateTime.Now.ToLongTimeString())

    let taskRunner(msg) =
        printfn "Starting %O that returns a value %O\r\n" msg (DateTime.Now.ToLongTimeString()) |> ignore
        Task.Factory.StartNew<string>((fun () -> work(msg)),  TaskCreationOptions.LongRunning)
     
    
    let task1 = taskRunner("task1")
    Thread.Sleep(2000)
    let task2 = taskRunner("task2")
    Thread.Sleep(2000)
    let task3 = taskRunner("task3")

    let resultsOfAllTask = Task.WhenAll([task1;task2;task3])

    printfn "Task1 result: %O\r\nTask2 result: %O\r\nTask3 result: %O"
        resultsOfAllTask.Result.[0]
        resultsOfAllTask.Result.[1]
        resultsOfAllTask.Result.[2]
    
    Console.ReadLine() |> ignore
    
    //return 0 for main method
    0       

 

Which when run will give the following results

image

 

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: