AKKA STREAMS

Last time we looked at Akka Http, this time we will look at Akka Streams.

Akka Streams is a vast topic, and you will definitely need to supplement this  post with the official documentation.

Akka Streams is one of the founding members of Reactive Streams, and Akka streams is one implementation (there are many) of the Reactive Streams APIs.

Reactive Streams  is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure. This encompasses efforts aimed at runtime environments (JVM and JavaScript) as well as network protocols.

Introduction

There may be some readers who have come from .NET such as myself who have used RX.

You may even have heard of Reactive Streams before. So what exactly makes reactive streams different from Rx?

The central thing that is the big win with reactive streams over Rx is the idea of back pressure. Here is what the Akka docs say about back pressure

The back pressure protocol is defined in terms of the number of elements a downstream Subscriber is able to receive and buffer, referred to as demand. The source of data, referred to as Publisher in Reactive Streams terminology and implemented as Source in Akka Streams, guarantees that it will never emit more elements than the received total demand for any given Subscriber.

http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-flows-and-basics.html#back-pressure-explained

Luckily this is all inbuilt to Akka streams, you do not have to worry about this too much as a user of Akka streams.

You can pretty much decide how you want the built in streams pipelines (which we will be diving into in more details below) in terms of backpressure using the OverflowStrategy enum value. Here is a very simple example

Source(1 to 10).buffer(10, OverflowStrategy.backpressure)

Where the following are the available OverflowStrategy values

object OverflowStrategy {
  /**
   * If the buffer is full when a new element arrives, drops the oldest element from the buffer to make space for
   * the new element.
   */
  def dropHead: OverflowStrategy = DropHead

  /**
   * If the buffer is full when a new element arrives, drops the youngest element from the buffer to make space for
   * the new element.
   */
  def dropTail: OverflowStrategy = DropTail

  /**
   * If the buffer is full when a new element arrives, drops all the buffered elements to make space for the new element.
   */
  def dropBuffer: OverflowStrategy = DropBuffer

  /**
   * If the buffer is full when a new element arrives, drops the new element.
   */
  def dropNew: OverflowStrategy = DropNew

  /**
   * If the buffer is full when a new element is available this strategy backpressures the upstream publisher until
   * space becomes available in the buffer.
   */
  def backpressure: OverflowStrategy = Backpressure

  /**
   * If the buffer is full when a new element is available this strategy completes the stream with failure.
   */
  def fail: OverflowStrategy = Fail
}

So that is the basic idea, Akka streams does provide a lot of stuff, such as

  • Built in stages/shapes
  • A graph API
  • Ability to create your own stages/shapes

For the rest of this post we will be looking at some examples of these 3 points.

Working With The Akka Streams APIs

As stated at the beginning of this post the Akka Streams implementation is vast. There is a lot of ground to cover, far more than I can reasonably cover in a small blog post. The official docs are still the place to go, but if you have not heard of Akka Streams this post may be enough to get you into it.

The official docs (at time of writing) are here:

http://doc.akka.io/docs/akka/2.4.2/scala/stream/index.html

 

Working With Built In Stages/Shapes

Akka comes with loads of prebuilt stages which we can make use of. However before I mention those lets try and just spend a bit of time taking a bit about how you use the Akka Streams APIs in their most basic form.

The idea is that we have 4 different parts that make up a useable pipeline.

Source
A processing stage with exactly one output, emitting data elements whenever downstream processing stages are ready to receive them.

Sink
A processing stage with exactly one input, requesting and accepting data elements possibly slowing down the upstream producer of elements

Flow
A processing stage which has exactly one input and output, which connects its up- and downstreams by transforming the data elements flowing through it.

RunnableGraph
A Flow that has both ends “attached” to a Source and Sink respectively, and is ready to be run().

As I say Akka comes with loads of inbuilt stages to make our lives easier here. For example these are the available stages at time of writing

Source Stages

  • fromIterator
  • apply
  • single
  • repeat
  • tick
  • fromFuture
  • fromCompletionStage
  • unfold
  • unfoldAsync
  • empty
  • maybe
  • failed
  • actorPublisher
  • actorRef
  • combine
  • queue
  • asSubscriber
  • fromPublisher
  • fromFile

Sink Stages

  • head
  • headOption
  • last
  • lastOption
  • ignore
  • cancelled
  • seq
  • foreach
  • foreachParallel
  • onComplete
  • fold
  • reduce
  • combine
  • actorRef
  • actorRefWithAck
  • actorSubscriber
    asPublisher
  • fromSubscriber
  • toFile

We will now look at some example of using some of these

def simpleFlow() : Unit = {
  val source = Source(1 to 10)
  val sink = Sink.fold[Int, Int](0)(_ + _)
  // connect the Source to the Sink, obtaining a RunnableGraph
  val runnable: RunnableGraph[Future[Int]] = source.toMat(sink)(Keep.right)
  // materialize the flow and get the value of the FoldSink
  implicit val timeout = Timeout(5 seconds)
  val sumFuture: Future[Int] = runnable.run()
  val sum = Await.result(sumFuture, timeout.duration)
  println(s"source.toMat(sink)(Keep.right) Sum = $sum")

  // Use the shorthand source.runWith(sink)
  val sumFuture2: Future[Int] = source.runWith(sink)
  val sum2 = Await.result(sumFuture2, timeout.duration)
  println(s"source.runWith(sink) Sum = $sum")
}

In this simple example we have s Source(1 to 10) which we then wire up to a Sink which adds the numbers coming in.

This block demonstrates various different Source(s) and Sink(s)

def differentSourcesAndSinks() : Unit = {
  //various sources
  Source(List(1, 2, 3)).runWith(Sink.foreach(println))
  Source.single("only one element").runWith(Sink.foreach(println))
  //actor sink
  val helloActor = system.actorOf(Props[HelloActor], name = "helloactor")
  Source(List("hello", "hello"))
    .runWith(Sink.actorRef(helloActor,DoneMessage))
  //future source
  val futureString = Source.fromFuture(Future.successful("Hello Streams!"))
    .toMat(Sink.head)(Keep.right).run()
  implicit val timeout = Timeout(5 seconds)
  val theString = Await.result(futureString, timeout.duration)
  println(s"theString = $theString")
}

And this block demos using a simple Map on a Source

def mapFlow() : Unit = {
  val source = Source(11 to 16)
  val doublerSource = source.map(x => x * 2)
  val sink = Sink.foreach(println)
  implicit val timeout = Timeout(5 seconds)

  // Use the shorthand source.runWith(sink)
  val printSinkFuture: Future[Done] = doublerSource.runWith(sink)
  Await.result(printSinkFuture, timeout.duration)
}

Working With The Graph API

Akka streams also comes with a pretty funky graph building DSL. You would use this when you want to create quite elaborate flows.

The other very interesting thing about the graph builder DSL is that you can use custom shapes inside it, and you can also leave it partially connected. Such that you could potentially use it as a Source/Sink.

Lets say you had an output from the graph you built using the graph DSL, you could then use that partially constructed graph as a Source in its own right.

The same goes if you had an unconnected input in the graph you created you could use that as a Sink.

You can read more about this here :

http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-graphs.html#constructing-sources-sinks-and-flows-from-partial-graphs

I urge you all to have a read of that as its quite cool what can be done with the graph DSL

Ok so time for an example, this example comes directly from the TypeSafe activator code

http://www.lightbend.com/activator/template/akka-stream-scala

package com.sas.graphs

import java.io.File

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.ClosedShape
import akka.stream.scaladsl._
import akka.util.ByteString

import scala.concurrent.forkjoin.ThreadLocalRandom
import scala.util.{ Failure, Success }

class WritePrimesDemo {

  def run(): Unit = {
    implicit val system = ActorSystem("Sys")
    import system.dispatcher
    implicit val materializer = ActorMaterializer()

    // generate random numbers
    val maxRandomNumberSize = 1000000
    val primeSource: Source[Int, NotUsed] =
      Source.fromIterator(() => Iterator.continually(ThreadLocalRandom.current().nextInt(maxRandomNumberSize))).
        // filter prime numbers
        filter(rnd => isPrime(rnd)).
        // and neighbor +2 is also prime
        filter(prime => isPrime(prime + 2))

    // write to file sink
    val fileSink = FileIO.toPath(new File("target/primes.txt").toPath)
    val slowSink = Flow[Int]
      // act as if processing is really slow
      .map(i => { Thread.sleep(1000); ByteString(i.toString) })
      .toMat(fileSink)((_, bytesWritten) => bytesWritten)

    // console output sink
    val consoleSink = Sink.foreach[Int](println)

    // send primes to both slow file sink and console sink using graph API
    val graph = GraphDSL.create(slowSink, consoleSink)((slow, _) => slow) { implicit builder =>
      (slow, console) =>
        import GraphDSL.Implicits._
        val broadcast = builder.add(Broadcast[Int](2)) // the splitter - like a Unix tee
        primeSource ~> broadcast ~> slow // connect primes to splitter, and one side to file
        broadcast ~> console // connect other side of splitter to console
        ClosedShape
    }
    val materialized = RunnableGraph.fromGraph(graph).run()

    // ensure the output file is closed and the system shutdown upon completion
    materialized.onComplete {
      case Success(_) =>
        system.terminate()
      case Failure(e) =>
        println(s"Failure: ${e.getMessage}")
        system.terminate()
    }

  }

  def isPrime(n: Int): Boolean = {
    if (n <= 1) false
    else if (n == 2) true
    else !(2 to (n - 1)).exists(x => n % x == 0)
  }
}

The most important part of this code is this part

// send primes to both slow file sink and console sink using graph API
val graph = GraphDSL.create(slowSink, consoleSink)((slow, _) => slow) { implicit builder =>
  (slow, console) =>
    import GraphDSL.Implicits._
    val broadcast = builder.add(Broadcast[Int](2)) // the splitter - like a Unix tee
    primeSource ~> broadcast ~> slow // connect primes to splitter, and one side to file
    broadcast ~> console // connect other side of splitter to console
    ClosedShape
}
val materialized = RunnableGraph.fromGraph(graph).run()

There is 2 sinks defined before we use the Graph

  • A file Sink
  • A console Sink

There is also a Source that generates random primes

So the Graph DSL allows you to um well create graphs. It allows you to take in inputs and create other shapes using the implicit builder that is provided.

The DSL then allows you to connect inputs/other builder creates stages/shapes to the inputs and even expose the connected stages to an output.

This is done using the ~> syntax than simply means connect

As previously stated you can create partially connected graphs, but if you have all inputs and outputs connected it is considered a ClosedShape, that can be used as an isolated component

Here is an example of the output of running this graph example

image

Create Custom Shapes/Stages

It doesn’t stop there, we can also create out own shapes that can be used in flows. This is a pretty complex subject and you will definitely benefit from reading this page

http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-customize.html

There is no way this little post will cover enough, but here are some highlights of the official documentation

This is the basic pattern you would use to create a custom stage

import akka.stream.SourceShape
import akka.stream.stage.GraphStage
 
class NumbersSource extends GraphStage[SourceShape[Int]] {
  // Define the (sole) output port of this stage
  val out: Outlet[Int] = Outlet("NumbersSource")
  // Define the shape of this stage, which is SourceShape with the port we defined above
  override val shape: SourceShape[Int] = SourceShape(out)
 
  // This is where the actual (possibly stateful) logic will live
  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = ???
}

Most of the actual logic will be inside the createLogic method. But in order to do anything useful in there you will need to use handlers. Handlers are what you use to handle input/output. There are InHandler and OutHandler.

Each of which has its own state machine flow. For example this is the state machine for an OutHandler

image

Whilst this is the one for InHandler

image

This is the best page to read to learn more about these handlers

http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-customize.html#Port_states__InHandler_and_OutHandler

The one and ONLY place that state should be maintained is within the createLogic method.

Lets consider a small example. Lets say we have some objects like this

case class Element(id: Int, value: Int)

And we want to build a custom stage that will allow us to select a value from this type, and should only emit an output value for unique values as provided by the property selector.

We could call this DistinctUntilChanged. Lets see what an example for this could look like

package com.sas.customshapes

import akka.stream.stage.{GraphStageLogic, InHandler, OutHandler, GraphStage}
import akka.stream.{Outlet, Attributes, Inlet, FlowShape}

import scala.collection.immutable

final class DistinctUntilChanged[E, P](propertyExtractor: E => P)
  extends GraphStage[FlowShape[E, E]] {

  val in = Inlet[E]("DistinctUntilChanged.in")
  val out = Outlet[E]("DistinctUntilChanged.out")

  override def shape = FlowShape.of(in, out)

  override def createLogic(attributes: Attributes) = new GraphStageLogic(shape) {

    private var savedState : Option[E] = None

    setHandlers(in, out, new InHandler with OutHandler {

      override def onPush(): Unit = {
        val nextElement = grab(in)
        val nextState = propertyExtractor(nextElement)

        if (savedState.isEmpty  || propertyExtractor(savedState.get) != nextState) {
          savedState = Some(nextElement)
          push(out, savedState.get)
        }
        else {
          pull(in)
        }
        savedState = Some(nextElement)
      }

      override def onPull(): Unit = {
        pull(in)
      }

      override def onUpstreamFinish(): Unit = {
        completeStage()
      }
    })

    override def postStop(): Unit = {
      savedState = None
    }
  }
}



The highlights of this are

  • We have a single Inlet
  • We have a single Outlet
  • We expose a FlowShape (in/out only) there are many shapes but FlowShape is what we want for one in/out out
  • We use createLogic to do the work
  • We use an InHandler to handle input
  • We use an OutHandler to handle output

One other important thing (at least for this single in/out example) is that we DO NOT call pull/push more than once in the createLogic

Lets assume we have these elements

package com.sas.customshapes

import scala.collection.immutable

object SampleElements {

  val E11 = Element(1, 1)
  val E21 = Element(2, 1)
  val E31 = Element(3, 1)
  val E42 = Element(4, 2)
  val E52 = Element(5, 2)
  val E63 = Element(6, 3)

  val Ones = immutable.Seq(E11, E21, E31)
  val Twos = immutable.Seq(E42, E52)
  val Threes = immutable.Seq(E63)

  val All = Ones ++ Twos ++ Threes
}

And this demo code

def runDistinctUntilChanged() : Unit = {
  Source(SampleElements.All)
    .via(new DistinctUntilChanged(_.value))
    .runWith(Sink.foreach(println))
}

We would get this output to the Sink

image

This example does owe a lot to a nice blog post I found here :

https://www.softwaremill.com/implementing-a-custom-akka-streams-graph-stage/
 

That’s It

Anyway that is the end of the series I hope you have enjoyed it, and have learnt you some Akka along the way

I am going to have a small break now and then start looking into some Azure/Web stuff I think

 

Where Can I Find The Code Examples?

I will be augmenting this GitHub repo with the example projects as I move through this series

https://github.com/sachabarber/SachaBarber.AkkaExamples

Advertisements

2 thoughts on “AKKA STREAMS

  1. Arak says:

    Hi Sacha,
    Thanks for an interesting post.
    I hope you can clarify some things regarding Akka Streams.

    1) How do one handle files that are not homogenous, i.e. a riff/wave file with header and data sections. Is there a way to process the data section using FileIO? Is this the intent of FileIO and does such an approach make sense, or should one convert a file reader into an IEnumerable source?
    2) Being a .NET developer, how does Akka Streams compare to TPL Dataflows, from my understanding the two technologies look quit similar?
    3) Is it fair to say that “back pressure” is a more finely configurable async/await mechanism?

    Thanks,

    P.S. Thanks for your WPF articles on CodeProject. You pretty much taught me WPF back in the day.

    • sachabarber says:

      I think of Akka streams as more of a better rx. Thanks to backpressure you can handle very large numbers of items

      Dataflow is cool but I think they are different (though I have not used Dataflow in anger)

      For the file question, you would use Map (there is example of this in the post) where you could get bytes for entire file and create a class that had header, body etc

      Map is the linq select of the scala world

      Ps glad I helped you learn wpf

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: