CodeProject, F#

F#26 : Reactive Programming

In this post we will look at using a reactive programming paradigm within F#. There may be some of you that have used the Reactive Extensions (Rx), I am in fact a massive fan boy of Rx, and really enjoy what it can bring to a project, in particular I think a UI project benefits immensely from a more reactive approach.

There may of course be those of you that have never come across Rx at all. So lets take a very small detour and talk about the general idea of the observer pattern.

Observer Pattern

Lets say I have order system that should print invoices and also send emails to a client when a new order is received. A naive implementation of this may be to just lump all this into a single class, but this is a poor separation of concerns, we could do better. So what we could do, is to have a order system object which receives orders and then calls into 2 other classes that produce a invoice and send an email to a client.

This is ok, but we would now have to take a strong dependency on the 2 sub systems from the order system class, and then call them when a new order arrives. Wouldn’t it be better if the order system class could just accept a list of subscribers (say implementing some IAcceptOrder interface), and then any time a  new order arrives the order system class would simply loop through its list of subscribers and call their AcceptOrder method (from the hypothetical IAcceptOrder interface).

This is essence is the observer pattern, of course this pattern could also allow unsubscribing too.

Rx has built upon the general idea of the observer pattern, and has added a great many standard .NET classes that aid in the construction of observable streams of data. You can also use subscriptions over events, asynchronous operations, and also use the standard LINQ operators.

So that is what Rx gives you.

F# also comes with a Control.Observable /Control.Event modules which contains many types that can be used to create reactive code The types and functions in this module mirror some of the functionality found in the Rx classes (see http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable(v=vs.103).aspx).

This article is however about F#, so we will be keeping the discussion from here on out about the F# Observable modules.

 

Observable Module

The F# Observable module is the place to start for reactive programming in F#, and contains the following functions/types

image

We will be seeing some examples of these is just a minute, but before we do it is worth just going on another slight detour. So lets just get the slight detour over with before we start to look at the Observable module in depth.

<SlightDetour>

There were several things that made Rx so powerful :

  • The ability to treat events as a source to create an IObservable<T>. This IObservable<T> could then have any of the standard LINQ operators applied to it, along with another specific set of IObservable<T> extension methods. This is a very powerful technique, which allows you to do things like only listen to an event where some condition is true, project the original event args into a completely new type, merge 2 or more events into a single stream (this technique is particularly powerful when building UIs, I use this one a lot)
  • The decoupling of the event into a much for general purpose interface (IObservable<T>), which means that users of the original event source dino longer need a direct link to the original event source class (which they would do if they wanted to add an event handler to the source object event). Depending on a abstraction, i.e IObservable<T> is a much better / cleaner design. IObservable<T> is a base class type that is included in the .NET framework
  • The ability to subscribe, which would return a IDisposable, which you could then just wrap in a using(..) or Dispose of when you were done with it, both of which would stop the subscriber from receiving any further notifications

The F# team (Don Syme I guess here), have exposed some (though not all) of the Rx goodness in F# so it is important to understand some of the reasons why IObservable<T> is so useful.

</SlightDetour>

OK now that we have talked about why IObservable is better than a plain old event, lets have a look at some examples using the F# Observable module. Here is a small windows form (yes you can do forms in F# quite easily too) example that demonstrates the following things:

  • How to create an IObservable<’a> from a standard event using Observable.filter, where the filter is just being used to give back a IObservable<’a>, in essence no filtering at all
  • How to use Observable.add to add a handler to an IObservable<’a>
  • How to create an actual filter from an IObservable<’a> using Observable.filter, this filter ensures that only MouseMove events that happen in the bottom 1/2 of the form notify listeners
  • How to create an IDisposable subscription using Observable.subscribe
  • How to cancel a subscription by simply calling Dispose on the subscriber which is an IDisposable. In this example this is done using a Task.Delay(4000), which means the subscriber will only work for 4 seconds, and then will not receive any notifications after that

 

open System
open System.IO
open System
open System.Linq
open System.Collections.Generic
open ConsoleApplication1.CustomTypes
open System.IO
open System.Drawing
open System.Windows.Forms
open System.Threading.Tasks

[<EntryPoint>]
let main argv =

    //create a form
    let form = new Form(Text = "F# Windows Form",
                        Visible = true,
                        TopMost = true)

    let label1 = new Label()
    label1.Text <- "Label1"
    label1.Location <- new Point(10,10)

    let txt1 = new TextBox()
    txt1.Width <- 160
    txt1.Location <- new Point(120,10)

    let label2 = new Label()
    label2.Text <- "Label2"
    label2.Location <- new Point(10,40)

    let txt2 = new TextBox()
    txt2.Width <- 160
    txt2.Location <- new Point(120,40)

    form.Controls.Add(label1)
    form.Controls.Add(txt1)
    form.Controls.Add(label2)
    form.Controls.Add(txt2)

    //use Control.Observable reactive functions
    form.MouseMove
        |> Observable.filter ( fun evArgs -> true)
        |> Observable.add ( fun evArgs ->
            txt1.Text <- String.Format("x: {0} y :{1}", evArgs.X, evArgs.Y))

    //shows how to subscribe, such that we get a IDisposable back for subscription
    let sub =
        form.MouseMove
        |> Observable.filter ( fun evArgs -> evArgs.Y > form.Height / 2)
        |> Observable.subscribe  ( fun evArgs ->
            txt2.Text <- String.Format("x: {0} y :{1}", evArgs.X, evArgs.Y))

    //dispose of the subsriber after 4 seconds
    Task.Delay(4000).ContinueWith(fun x -> sub.Dispose()) |> ignore

    //run the windows form message pump
    Application.Run(form)

    //return 0 for main method
    0       

Which when runs looks like this:

image

With this one I urge you to try the code out for yourself, as you will not be able to see that the Label2 associated TextBox stops updating after 4 seconds in a screen shot

This is cool for sure, but the real power of IObservable is that you can use it against your own events/properties too, so lets wrap up this post by looking at an example where we create our own event source, and look at a few more of the Observable module functions.

Here is a small class that contains a single “NewOrderEvent”, which uses a custom EventArgs derived class called “OrderEventArgs”. The “OrderEventArgs” has the following 2 properties:

  1. Price : decimal
  2. AuthorisationLevel : Which uses a empty discriminating union type called “DiscountApprovalLevel”.

Here  is the relevant code:

namespace ConsoleApplication1
    module CustomTypes =

        open System
        open System.Collections.Generic
        open System.ComponentModel
        open System.Reflection

        type DiscountApprovalLevel = Standard | Manager | Ceo

        type Order = { Price : decimal; AuthorisationLevel : DiscountApprovalLevel }

        type OrderArgs(price : decimal, authorisationLevel : DiscountApprovalLevel) =
            inherit System.EventArgs()
    
            member this.Price = price
            member this.AuthorisationLevel = authorisationLevel

        type OrderChangeDelegate = delegate of obj * OrderArgs -> unit

        type OrderSystem() =
            let newOrderEvent = new Event<OrderChangeDelegate, OrderArgs>()
    
            member this.CreateOrder(order) =
                newOrderEvent.Trigger(
                    this,
                    new OrderArgs(order.Price, order.AuthorisationLevel)
                )
    
            [<CLIEvent>]
            member this.NewOrderEvent = newOrderEvent.Publish

So that is the code for the source of the custom event, so how about the Observable code, lets see that next:

open System
open System.IO
open System
open System.Linq
open System.Collections.Generic
open ConsoleApplication1.CustomTypes
open System.IO

[<EntryPoint>]
let main argv =

    // Use the Observable module to only subscribe to specific events
    let orderSystem = new OrderSystem()

    let stdDiscountObservable, managerApprovalObservable =
        orderSystem.NewOrderEvent
        // Filter event to just Standard Or Manager level authorisation orders
        |> Observable.filter(fun orderArgs ->
                match orderArgs.AuthorisationLevel with
                | Standard | Manager -> true
                | _ -> false)
        // Split the event into 'Standard' and Other discount approval level
        |> Observable.partition(fun orderArgs ->
                orderArgs.AuthorisationLevel  = DiscountApprovalLevel.Standard)

    // Add event handlers to the stdDiscountObservable IObservable<OrderEventArgs> stream
    stdDiscountObservable.Add(fun args -> printfn "Price : %A, Level : %A" args.Price args.AuthorisationLevel)

    // Add event handlers to the stdDiscountObservable IObservable<OrderEventArgs> stream
      managerApprovalObservable.Add(fun args ->printfn "Price : %A, Level : %A" args.Price args.AuthorisationLevel)

    orderSystem.CreateOrder( { Price = 120.0m; AuthorisationLevel = Manager } )

    //this will not fire any code, as we have filtered out to not include CEO EventArgs
    orderSystem.CreateOrder( { Price = 240.0m; AuthorisationLevel = Ceo } )

    orderSystem.CreateOrder( { Price = 10.0m; AuthorisationLevel = Standard } )
    orderSystem.CreateOrder( { Price = 20.0m; AuthorisationLevel = Standard } )
    orderSystem.CreateOrder( { Price = 50.0m; AuthorisationLevel = Standard } )

    //this will not fire any code, as we have filtered out to not include CEO EventArgs
    orderSystem.CreateOrder( { Price = 240.0m; AuthorisationLevel = Ceo } )

    Console.ReadLine() |> ignore

    //return 0 for main method
    0       

There are several things to point out in this code:

  1. We use a proper Observable.filter this time, such that only “Standard” and “Manager” AuthorisationLevel OrderEventArgs come through the applied filter. This means any event that has OrderEventArgs with a AuthorisationLevel of “Ceo” are effectively ignored thanks to the Observable.filter
  2. We use Observable.partition to partition the source IObservable<OrderEventArgs> into 2 separate IObservable<OrderEventArgs> , one stream for Standard AuthorisationLevel and one stream for Manager AuthorisationLevel. When I say stream I really mean IObservable<T>

Here is what this code looks like when it is run:

 

image

 

Using More Of The Rx Extension Methods

As I previously stated, I am a massive Rx fan boy, so I was a little disappointed to see that the F# Observable module did not have the full set of extension methods that Rx would have for IObservable<T>. However it seems I am not alone here, and some people have put out a Github project which brings the standard Rx extension methods to F#, here is a link : https://github.com/fsprojects/FSharp.Reactive

0 thoughts on “F#26 : Reactive Programming

Leave a comment