RX

RX Over the wire

 

Introduction

Now you know I’m a RX fan boy, I think it’s the bees knees in fact. Of late I have also gotten into Akka, and Akka Streams, which is one implementation of the Reactive Streams API.

I also have a colleague who recently attend the ReactConf and came back raving about all the good work that NetFlix were doing using this uber duper specific socket that they have developed to allow RX type operators over the wire using a socket.

NetFlix call their implementation ReactiveSocket : http://reactivesocket.io/ 

Which offers these things

  • request/response (stream of 1)
  • request/stream (finite stream of many)
  • fire-and-forget (no response)
  • event subscription (infinite stream of many)
  • channel (bi-directional streams)

Mmmm, sounds pretty cool. Thing is I was sure I had seen this before, and a long time ago too (well a long time in software years).

IQbservable – The Dual of IQueryable

Back in 2010 Bart De Smet of the RX team posted a couple of intriguing resources around this VERY badly named interface.

Most informative one being this video:

https://channel9.msdn.com/shows/Going+Deep/Bart-De-Smet-Observations-on-IQbservable-The-Dual-of-IQueryable/

For those that can not be bothered to watch the video here are some of the highlights

IQbservable allows the following

  • Combines LINQ Queryable and RX Observable functionality
  • Queryable – allows you to create a query client side using LINQ, and pass that to a datasource (server, database, web service etc)
  • Observable – instead of blocking until the data comes back, will just notify you know when it gets the data

So those are the key take away points. But how about a nice diagram or 2 to really set the scene

I think these are the best 2 diagrams (at least for my money)

image

image

So that is what IQbservable is all about. So what is the rest of this post about then?

Well it just so happens that Dave Sexton one of the Reactive Extension Extensions guys (meant to be slightly tongue in cheek) has written an extremely useful and fairly lightweight library that does much of what is described above.

Dave calls it QActive. He has done a great job of it, and has written serialized expression trees and parsers, which allow us to create client client queries in LINQ which are sent across the wire.

In the rest of this post I will be showcasing a very simple demo based on QActive, and I’ll point out some more links that Dave Sexton provides, which are invaluable reading

 Dave has 2 of his own posts covering more than I do here, which you can go to here:

 

A Small Demo

So it may not come as a surprise to know that we need a server side and a client side. We will look at both of these for a simple example, and in the download at my github there is also an forms based server that allows you to push items to the client on demand

What Is Not Inlcuded

There is no form of fault tolerance, if you want that you could do worse than to read my SignalR + RX code, which shows you how to make a resilient connection using RX

https://www.codeproject.com/Articles/851437/SignalR-plus-RX-Streaming-Data-Demo-App-of:

We will now proceed to look at a simple server/client. This is the most simplistic of examples that has a server that runs on a timer, and the client provides a LINQ where (filter) to this that will be applied to the server side stream ON THE SERVER.

Simple Server

This is all we need for a simple server

using System;
using System.Net;
using System.Reactive.Linq;
using Qactive;

namespace Server
{
    class Program
    {
        static void Main(string[] args)
        {
            IObservable<long> source = Observable.Interval(TimeSpan.FromSeconds(1));
            var service = source.ServeQbservableTcp(
                new IPEndPoint(IPAddress.Loopback, 3205),
                new QbservableServiceOptions()
                {
                    SendServerErrorsToClients = true,
                    EnableDuplex = true,
                    AllowExpressionsUnrestricted = true
                }
            );
            using (service.Subscribe(
              client => Console.WriteLine(
                  "Client shutdown."),
              ex => Console.WriteLine(
                  "Fatal error: {0}", ex.Message),
              () => Console.WriteLine(
                  "This will never be printed because a service host never completes.")))
            {
                Console.ReadKey();
            }
        }
    }
}

Take away points here are:

  • We use a TCIP IPEndpoint to bind the server too
  • We can subscribe to the IObservable to see what is happening with the connected client

On Demand Server Notification

If you want to see a server that allows you to send notifications to the clients on demand have a look at the FormsServer in my GitHub repo.

 

Simple Client

This is all we need for a simple client

using System;
using System.Net;
using System.Reactive;
using System.Reactive.Linq;
using Qactive;

namespace Client
{
    class Program
    {
        static void Main(string[] args)
        {
            var client = new TcpQbservableClient<long>(new IPEndPoint(IPAddress.Loopback, 3205));

            //thie expression tree filtering will happen server side
            //THAT IS AWESOME
            IQbservable<string> query =
              from value in client.Query()
              where value <= 5 || value >= 8
              select string.Format("The incoming value has been doubled to {0}", value * 2);

            using (query.Subscribe(
              value => Console.WriteLine("Client observed: " + value),
              ex => Console.WriteLine("Error: {0}", ex.Message),
              () => Console.WriteLine("Completed")))
            {
                Console.ReadKey();
            }
        }
    }
}

 

Take away points here are:

  • The client is also using a TCIP IPEndpoint
  • The client IS ABLE to use LINQ expressions which WILL be serialized and sent to the server where they will be applied

Here is the output when the simple server is run, and 2 clients are started one after another.

image

 

Why Is This Cool?

This is very cool (uber cool in fact), we have just created a push based notification system that supports server side filtering (thanks to LINQ) in about 20 lines of code.

If you can’t see what is cool about that, well I can’t help you.

There may be some amongst you that go well any messaging framework that has a server and a client would/could do that in the same amount of code, what and the server side push down delegates (thanks to serializable expression trees)….mmmmm Don’t think so.

Only thing I can think of that even comes close is OData, but that requires a fair bit of infrastructure/baggage to make it work.

 

 

Where Can I Get The Code?

As usual I have posted the code  to my GitHub account :

https://github.com/sachabarber/RxOverTheWire

Advertisements
Akka

AKKA STREAMS

Last time we looked at Akka Http, this time we will look at Akka Streams.

Akka Streams is a vast topic, and you will definitely need to supplement this  post with the official documentation.

Akka Streams is one of the founding members of Reactive Streams, and Akka streams is one implementation (there are many) of the Reactive Streams APIs.

Reactive Streams  is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure. This encompasses efforts aimed at runtime environments (JVM and JavaScript) as well as network protocols.

Introduction

There may be some readers who have come from .NET such as myself who have used RX.

You may even have heard of Reactive Streams before. So what exactly makes reactive streams different from Rx?

The central thing that is the big win with reactive streams over Rx is the idea of back pressure. Here is what the Akka docs say about back pressure

The back pressure protocol is defined in terms of the number of elements a downstream Subscriber is able to receive and buffer, referred to as demand. The source of data, referred to as Publisher in Reactive Streams terminology and implemented as Source in Akka Streams, guarantees that it will never emit more elements than the received total demand for any given Subscriber.

http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-flows-and-basics.html#back-pressure-explained

Luckily this is all inbuilt to Akka streams, you do not have to worry about this too much as a user of Akka streams.

You can pretty much decide how you want the built in streams pipelines (which we will be diving into in more details below) in terms of backpressure using the OverflowStrategy enum value. Here is a very simple example

Source(1 to 10).buffer(10, OverflowStrategy.backpressure)

Where the following are the available OverflowStrategy values

object OverflowStrategy {
  /**
   * If the buffer is full when a new element arrives, drops the oldest element from the buffer to make space for
   * the new element.
   */
  def dropHead: OverflowStrategy = DropHead

  /**
   * If the buffer is full when a new element arrives, drops the youngest element from the buffer to make space for
   * the new element.
   */
  def dropTail: OverflowStrategy = DropTail

  /**
   * If the buffer is full when a new element arrives, drops all the buffered elements to make space for the new element.
   */
  def dropBuffer: OverflowStrategy = DropBuffer

  /**
   * If the buffer is full when a new element arrives, drops the new element.
   */
  def dropNew: OverflowStrategy = DropNew

  /**
   * If the buffer is full when a new element is available this strategy backpressures the upstream publisher until
   * space becomes available in the buffer.
   */
  def backpressure: OverflowStrategy = Backpressure

  /**
   * If the buffer is full when a new element is available this strategy completes the stream with failure.
   */
  def fail: OverflowStrategy = Fail
}

So that is the basic idea, Akka streams does provide a lot of stuff, such as

  • Built in stages/shapes
  • A graph API
  • Ability to create your own stages/shapes

For the rest of this post we will be looking at some examples of these 3 points.

Working With The Akka Streams APIs

As stated at the beginning of this post the Akka Streams implementation is vast. There is a lot of ground to cover, far more than I can reasonably cover in a small blog post. The official docs are still the place to go, but if you have not heard of Akka Streams this post may be enough to get you into it.

The official docs (at time of writing) are here:

http://doc.akka.io/docs/akka/2.4.2/scala/stream/index.html

 

Working With Built In Stages/Shapes

Akka comes with loads of prebuilt stages which we can make use of. However before I mention those lets try and just spend a bit of time taking a bit about how you use the Akka Streams APIs in their most basic form.

The idea is that we have 4 different parts that make up a useable pipeline.

Source
A processing stage with exactly one output, emitting data elements whenever downstream processing stages are ready to receive them.

Sink
A processing stage with exactly one input, requesting and accepting data elements possibly slowing down the upstream producer of elements

Flow
A processing stage which has exactly one input and output, which connects its up- and downstreams by transforming the data elements flowing through it.

RunnableGraph
A Flow that has both ends “attached” to a Source and Sink respectively, and is ready to be run().

As I say Akka comes with loads of inbuilt stages to make our lives easier here. For example these are the available stages at time of writing

Source Stages

  • fromIterator
  • apply
  • single
  • repeat
  • tick
  • fromFuture
  • fromCompletionStage
  • unfold
  • unfoldAsync
  • empty
  • maybe
  • failed
  • actorPublisher
  • actorRef
  • combine
  • queue
  • asSubscriber
  • fromPublisher
  • fromFile

Sink Stages

  • head
  • headOption
  • last
  • lastOption
  • ignore
  • cancelled
  • seq
  • foreach
  • foreachParallel
  • onComplete
  • fold
  • reduce
  • combine
  • actorRef
  • actorRefWithAck
  • actorSubscriber
    asPublisher
  • fromSubscriber
  • toFile

We will now look at some example of using some of these

def simpleFlow() : Unit = {
  val source = Source(1 to 10)
  val sink = Sink.fold[Int, Int](0)(_ + _)
  // connect the Source to the Sink, obtaining a RunnableGraph
  val runnable: RunnableGraph[Future[Int]] = source.toMat(sink)(Keep.right)
  // materialize the flow and get the value of the FoldSink
  implicit val timeout = Timeout(5 seconds)
  val sumFuture: Future[Int] = runnable.run()
  val sum = Await.result(sumFuture, timeout.duration)
  println(s"source.toMat(sink)(Keep.right) Sum = $sum")

  // Use the shorthand source.runWith(sink)
  val sumFuture2: Future[Int] = source.runWith(sink)
  val sum2 = Await.result(sumFuture2, timeout.duration)
  println(s"source.runWith(sink) Sum = $sum")
}

In this simple example we have s Source(1 to 10) which we then wire up to a Sink which adds the numbers coming in.

This block demonstrates various different Source(s) and Sink(s)

def differentSourcesAndSinks() : Unit = {
  //various sources
  Source(List(1, 2, 3)).runWith(Sink.foreach(println))
  Source.single("only one element").runWith(Sink.foreach(println))
  //actor sink
  val helloActor = system.actorOf(Props[HelloActor], name = "helloactor")
  Source(List("hello", "hello"))
    .runWith(Sink.actorRef(helloActor,DoneMessage))
  //future source
  val futureString = Source.fromFuture(Future.successful("Hello Streams!"))
    .toMat(Sink.head)(Keep.right).run()
  implicit val timeout = Timeout(5 seconds)
  val theString = Await.result(futureString, timeout.duration)
  println(s"theString = $theString")
}

And this block demos using a simple Map on a Source

def mapFlow() : Unit = {
  val source = Source(11 to 16)
  val doublerSource = source.map(x => x * 2)
  val sink = Sink.foreach(println)
  implicit val timeout = Timeout(5 seconds)

  // Use the shorthand source.runWith(sink)
  val printSinkFuture: Future[Done] = doublerSource.runWith(sink)
  Await.result(printSinkFuture, timeout.duration)
}

Working With The Graph API

Akka streams also comes with a pretty funky graph building DSL. You would use this when you want to create quite elaborate flows.

The other very interesting thing about the graph builder DSL is that you can use custom shapes inside it, and you can also leave it partially connected. Such that you could potentially use it as a Source/Sink.

Lets say you had an output from the graph you built using the graph DSL, you could then use that partially constructed graph as a Source in its own right.

The same goes if you had an unconnected input in the graph you created you could use that as a Sink.

You can read more about this here :

http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-graphs.html#constructing-sources-sinks-and-flows-from-partial-graphs

I urge you all to have a read of that as its quite cool what can be done with the graph DSL

Ok so time for an example, this example comes directly from the TypeSafe activator code

http://www.lightbend.com/activator/template/akka-stream-scala

package com.sas.graphs

import java.io.File

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.ClosedShape
import akka.stream.scaladsl._
import akka.util.ByteString

import scala.concurrent.forkjoin.ThreadLocalRandom
import scala.util.{ Failure, Success }

class WritePrimesDemo {

  def run(): Unit = {
    implicit val system = ActorSystem("Sys")
    import system.dispatcher
    implicit val materializer = ActorMaterializer()

    // generate random numbers
    val maxRandomNumberSize = 1000000
    val primeSource: Source[Int, NotUsed] =
      Source.fromIterator(() => Iterator.continually(ThreadLocalRandom.current().nextInt(maxRandomNumberSize))).
        // filter prime numbers
        filter(rnd => isPrime(rnd)).
        // and neighbor +2 is also prime
        filter(prime => isPrime(prime + 2))

    // write to file sink
    val fileSink = FileIO.toPath(new File("target/primes.txt").toPath)
    val slowSink = Flow[Int]
      // act as if processing is really slow
      .map(i => { Thread.sleep(1000); ByteString(i.toString) })
      .toMat(fileSink)((_, bytesWritten) => bytesWritten)

    // console output sink
    val consoleSink = Sink.foreach[Int](println)

    // send primes to both slow file sink and console sink using graph API
    val graph = GraphDSL.create(slowSink, consoleSink)((slow, _) => slow) { implicit builder =>
      (slow, console) =>
        import GraphDSL.Implicits._
        val broadcast = builder.add(Broadcast[Int](2)) // the splitter - like a Unix tee
        primeSource ~> broadcast ~> slow // connect primes to splitter, and one side to file
        broadcast ~> console // connect other side of splitter to console
        ClosedShape
    }
    val materialized = RunnableGraph.fromGraph(graph).run()

    // ensure the output file is closed and the system shutdown upon completion
    materialized.onComplete {
      case Success(_) =>
        system.terminate()
      case Failure(e) =>
        println(s"Failure: ${e.getMessage}")
        system.terminate()
    }

  }

  def isPrime(n: Int): Boolean = {
    if (n <= 1) false
    else if (n == 2) true
    else !(2 to (n - 1)).exists(x => n % x == 0)
  }
}

The most important part of this code is this part

// send primes to both slow file sink and console sink using graph API
val graph = GraphDSL.create(slowSink, consoleSink)((slow, _) => slow) { implicit builder =>
  (slow, console) =>
    import GraphDSL.Implicits._
    val broadcast = builder.add(Broadcast[Int](2)) // the splitter - like a Unix tee
    primeSource ~> broadcast ~> slow // connect primes to splitter, and one side to file
    broadcast ~> console // connect other side of splitter to console
    ClosedShape
}
val materialized = RunnableGraph.fromGraph(graph).run()

There is 2 sinks defined before we use the Graph

  • A file Sink
  • A console Sink

There is also a Source that generates random primes

So the Graph DSL allows you to um well create graphs. It allows you to take in inputs and create other shapes using the implicit builder that is provided.

The DSL then allows you to connect inputs/other builder creates stages/shapes to the inputs and even expose the connected stages to an output.

This is done using the ~> syntax than simply means connect

As previously stated you can create partially connected graphs, but if you have all inputs and outputs connected it is considered a ClosedShape, that can be used as an isolated component

Here is an example of the output of running this graph example

image

Create Custom Shapes/Stages

It doesn’t stop there, we can also create out own shapes that can be used in flows. This is a pretty complex subject and you will definitely benefit from reading this page

http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-customize.html

There is no way this little post will cover enough, but here are some highlights of the official documentation

This is the basic pattern you would use to create a custom stage

import akka.stream.SourceShape
import akka.stream.stage.GraphStage
 
class NumbersSource extends GraphStage[SourceShape[Int]] {
  // Define the (sole) output port of this stage
  val out: Outlet[Int] = Outlet("NumbersSource")
  // Define the shape of this stage, which is SourceShape with the port we defined above
  override val shape: SourceShape[Int] = SourceShape(out)
 
  // This is where the actual (possibly stateful) logic will live
  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = ???
}

Most of the actual logic will be inside the createLogic method. But in order to do anything useful in there you will need to use handlers. Handlers are what you use to handle input/output. There are InHandler and OutHandler.

Each of which has its own state machine flow. For example this is the state machine for an OutHandler

image

Whilst this is the one for InHandler

image

This is the best page to read to learn more about these handlers

http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-customize.html#Port_states__InHandler_and_OutHandler

The one and ONLY place that state should be maintained is within the createLogic method.

Lets consider a small example. Lets say we have some objects like this

case class Element(id: Int, value: Int)

And we want to build a custom stage that will allow us to select a value from this type, and should only emit an output value for unique values as provided by the property selector.

We could call this DistinctUntilChanged. Lets see what an example for this could look like

package com.sas.customshapes

import akka.stream.stage.{GraphStageLogic, InHandler, OutHandler, GraphStage}
import akka.stream.{Outlet, Attributes, Inlet, FlowShape}

import scala.collection.immutable

final class DistinctUntilChanged[E, P](propertyExtractor: E => P)
  extends GraphStage[FlowShape[E, E]] {

  val in = Inlet[E]("DistinctUntilChanged.in")
  val out = Outlet[E]("DistinctUntilChanged.out")

  override def shape = FlowShape.of(in, out)

  override def createLogic(attributes: Attributes) = new GraphStageLogic(shape) {

    private var savedState : Option[E] = None

    setHandlers(in, out, new InHandler with OutHandler {

      override def onPush(): Unit = {
        val nextElement = grab(in)
        val nextState = propertyExtractor(nextElement)

        if (savedState.isEmpty  || propertyExtractor(savedState.get) != nextState) {
          savedState = Some(nextElement)
          push(out, savedState.get)
        }
        else {
          pull(in)
        }
        savedState = Some(nextElement)
      }

      override def onPull(): Unit = {
        pull(in)
      }

      override def onUpstreamFinish(): Unit = {
        completeStage()
      }
    })

    override def postStop(): Unit = {
      savedState = None
    }
  }
}



The highlights of this are

  • We have a single Inlet
  • We have a single Outlet
  • We expose a FlowShape (in/out only) there are many shapes but FlowShape is what we want for one in/out out
  • We use createLogic to do the work
  • We use an InHandler to handle input
  • We use an OutHandler to handle output

One other important thing (at least for this single in/out example) is that we DO NOT call pull/push more than once in the createLogic

Lets assume we have these elements

package com.sas.customshapes

import scala.collection.immutable

object SampleElements {

  val E11 = Element(1, 1)
  val E21 = Element(2, 1)
  val E31 = Element(3, 1)
  val E42 = Element(4, 2)
  val E52 = Element(5, 2)
  val E63 = Element(6, 3)

  val Ones = immutable.Seq(E11, E21, E31)
  val Twos = immutable.Seq(E42, E52)
  val Threes = immutable.Seq(E63)

  val All = Ones ++ Twos ++ Threes
}

And this demo code

def runDistinctUntilChanged() : Unit = {
  Source(SampleElements.All)
    .via(new DistinctUntilChanged(_.value))
    .runWith(Sink.foreach(println))
}

We would get this output to the Sink

image

This example does owe a lot to a nice blog post I found here :

https://www.softwaremill.com/implementing-a-custom-akka-streams-graph-stage/
 

That’s It

Anyway that is the end of the series I hope you have enjoyed it, and have learnt you some Akka along the way

I am going to have a small break now and then start looking into some Azure/Web stuff I think

 

Where Can I Find The Code Examples?

I will be augmenting this GitHub repo with the example projects as I move through this series

https://github.com/sachabarber/SachaBarber.AkkaExamples