Monthly Archives: November 2016

Akka http

Last time we talked about routing within Akka. This time we will be looking at Akka’s support for http.

But just before that, a bit of history. Before Akka.Http there was already a fairly successful Akk based http option available to you as a Scala developer, called Spray. There is a lot of Spray documentation available here http://spray.io/

This framework was extremely well thought of, so much so that the good people at Akka have taken on much of the good work done by this team, and it now forms much of the codebase for Akka Http.

In fact if you are familiar with Spray, you will certainly notice quite a lot of similarities in the way routes and JSON are handled in Akka.Http, as it is pretty much the Spray code.

 

Introduction

Akka.Http comes with server side and client side libraries. It also comes with good support for standard serialization such as JSON/XML and the ability to roll your own serialization should you want to.

It also comes with a fairly nifty routing DSL which is very much inspired by the work done in Spray.

This post will concentrate on the common use cases that you may come across when working with HTTP.

 

SBT Dependencies

As usual we need to make sure we have the correct JARs referenced. So here is the SBT file that I am using for both the server side/client side and common messages that pass between them

import sbt._
import sbt.Keys._


lazy val allResolvers = Seq(
  "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/",
  "Akka Snapshot Repository" at "http://repo.akka.io/snapshots/"
)


lazy val AllLibraryDependencies =
  Seq(
    "com.typesafe.akka" % "akka-actor_2.11" % "2.4.12",
    "com.typesafe.akka" % "akka-http_2.11" % "3.0.0-RC1",
    "com.typesafe.akka" % "akka-http-core_2.11" % "3.0.0-RC1",
    "com.typesafe.akka" % "akka-http-spray-json_2.11" % "3.0.0-RC1"
  )


lazy val commonSettings = Seq(
  version := "1.0",
  scalaVersion := "2.11.8",
  resolvers := allResolvers,
  libraryDependencies := AllLibraryDependencies
)


lazy val serverside =(project in file("serverside")).
  settings(commonSettings: _*).
  settings(
    name := "serverside"
  )
  .aggregate(common, clientside)
  .dependsOn(common, clientside)

lazy val common = (project in file("common")).
  settings(commonSettings: _*).
  settings(
    name := "common"
  )

lazy val clientside = (project in file("clientside")).
  settings(commonSettings: _*).
  settings(
    name := "clientside"
  )
  .aggregate(common)
  .dependsOn(common)

It can be seen that the JSON dependency is contained in this JAR

akka-http-spray-json_2.11

Told you is was inspired by Spray a fair bit

 

Server Side

This section will talk about the server side element of Akka.Http

 

Hosting The Service

To have  a correctly formed/hostable server side we need a couple of things in place, namely the following

  • An actor system
  • A materializer (Akka http uses flows which is the subject of the next and final post)
  • An execution context
  • Routing

Once we have these things it is really just a question of binding the route to a host name and port.

Shown below is a barebones skeleton of what this may look like

import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{ContentTypes, HttpEntity}
import akka.http.scaladsl.server.Directives
import akka.stream.scaladsl.Flow
import common.{Item, JsonSupport}
import scala.io.StdIn
import scala.concurrent.Future
import akka.http.scaladsl.model.ws.{Message, TextMessage}
import akka.stream._
import akka.stream.scaladsl._


object Demo extends App with Directives with JsonSupport {

  implicit val system = ActorSystem("my-system")
  implicit val materializer = ActorMaterializer()


  val route = .....

  val (host, port) = ("localhost", 8080)
  val bindingFuture = Http().bindAndHandle(route, host, port)

  bindingFuture.onFailure {
    case ex: Exception =>
      println(s"$ex Failed to bind to $host:$port!")
  }

  println(s"Server online at http://localhost:8080/\nPress RETURN to stop...")
  StdIn.readLine() // let it run until user presses return
  bindingFuture
    .flatMap(_.unbind()) // trigger unbinding from the port
    .onComplete(_ => system.terminate()) // and shutdown when done
}

We will be looking at the routing DSL separately

 

Routing DSL

As stated, Akka.Http owes much to Spray, and the routing DSL in particular is practically unchanged from Spray, so it is well worth reading the Spray routing documentation which is available here : http://spray.io/documentation/1.2.4/spray-routing/ and for completeness here is the Akka.Http docs link too : http://doc.akka.io/docs/akka/2.4.7/scala/http/introduction.html#routing-dsl-for-http-servers

There is way too many possible routes to go into for a single post. Lets consider a few basic examples and deconstruct them

Some of these examples do rely on JSON which is the next topic, so for now just understand that there is a way to accept/return JSON.

Lets consider the following use cases

  • GET that returns a simple string
  • GET that returns a JSON representation of an Item
  • POST that accept a new Item

In all these cases this is what an Item looks like

package common

final case class Item(name: String, id: Long)

So lets see the routing DSL that makes the above examples work

val route =
  path("hello") {
    get {
      complete(HttpEntity(
	ContentTypes.`text/html(UTF-8)`, 
	"<h1>Say hello to akka-http</h1>"))
    }
  } ~
  path("randomitem") {
    get {
      // will marshal Item to JSON
      complete(Item("thing", 42))
    }
  } ~
  path("saveitem") {
    post {
      // will unmarshal JSON to Item
      entity(as[Item]) { item =>
        println(s"Server saw Item : $item")
        complete(item)
      }
    }
  }

It can be seen that there are some common routing DSL bits and bobs in there, such as:

  • path : which satisfies the route name part of the route
  • get : which tells us that we should go further into the route matching if it’s a GET http request and it matched the path route DSL part
  • post: which tells us that we should go further into the route matching if it’s a POST http request and it matched the path route DSL part
  • complete : This is the final result from the route

These parts of the DSL are known as directives. The general anatomy of a directive is as follows:

name(arguments) { extractions =>
  ... // inner route
}

It has a name, zero or more arguments and optionally an inner route (The RouteDirectives are special in that they are always used at the leaf-level and as such cannot have inner routes). Additionally directives can “extract” a number of values and make them available to their inner routes as function arguments. When seen “from the outside” a directive with its inner route form an expression of type Route.

Taken from http://doc.akka.io/docs/akka/2.4.7/scala/http/routing-dsl/directives/index.html#directives up on date 15/11/16

What Directives Do?

A directive can do one or more of the following:

  • Transform the incoming RequestContext before passing it on to its inner route (i.e. modify the request)
  • Filter the RequestContext according to some logic, i.e. only pass on certain requests and reject others
  • Extract values from the RequestContext and make them available to its inner route as “extractions”
  • Chain some logic into the RouteResult future transformation chain (i.e. modify the response or rejection)
  • Complete the request

 

This means a Directive completely wraps the functionality of its inner route and can apply arbitrarily complex transformations, both (or either) on the request and on the response side.

Ok so now that we have taken a whistle stop tour of the routing DSL and directives, lets have a look at the few we discussed above

 

For this work I would strongly recommend the use of the “Postman” google app, which you can grab from here

https://chrome.google.com/webstore/detail/postman/fhbjgbiflinjbdggehcddcbncdddomop?hl=en

GET

We can see this route looks like this

path("hello") {
  get {
    complete(HttpEntity(ContentTypes.`text/html(UTF-8)`, "<h1>Say hello to akka-http</h1>"))
  }
}

So we use the path, and also the get directives to establish a get route. We then use complete to complete the route with some static string representing the html we would like to return

So let’s see this one in postman

image

 

GET Item (as JSON)

We can see this route looks like this

path("randomitem") {
  get {
    // will marshal Item to JSON
    complete(Item("thing", 42))
  }
} 

So again we use the path/get directives, but this time we complete with an Item. This is done due to the JSON support that is able to create the right serialization data for us. We will look at this in the next section

So let’s see this one in postman

image

POST Item

We can see this route looks like this

path("saveitem") {
  post {
    // will unmarshal JSON to Item
    entity(as[Item]) { item =>
      println(s"Server saw Item : $item")
      complete(item)
    }
  }
} 

So again we use the path directive, but this time we use a post, where the post expects an item as JSON to be provided. The converting from the incoming JSON string to an Item is done using an Unmarshaller, we will look at this in the next section

So let’s see this one in postman

image

 

JSON Support

Akka.http provides JSON support using this library akka-http-spray-json-experimental which you can grab from Maven Central Repo.

JsonProtocol

When using spray we may use the SprayJsonProtocol and DefaultJsonProtocol to create the JSON protcol for your custom objects

Lets consider the Item class we have seen in the demos so far

package common

final case class Item(name: String, id: Long)

This is how we might write the JSON protocol code for this simple class

package common

import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import spray.json.DefaultJsonProtocol

trait JsonSupport extends SprayJsonSupport with DefaultJsonProtocol {
  implicit val itemFormat = jsonFormat2(Item)
}

It can be seen that there are jsonFormatXX helpers that can be used for very simple cases. In this case jsonFormat2 is used as our item class had 2 parameters

Most of the time this inbuilt helpers are all we need. If however you want something more elaborate you are free to create your own jsonFormat read / write methods

 

Marshalling

Marshalling is sprays process of taking objects and create a JSON string representation of them to send across the wire.

The Akka Spray JAR comes with a bunch of default marshallers that allow us to take custom classes and turn them into JSON

These are the most common default marshallers that you will most likely use

type ToEntityMarshaller[T] = Marshaller[T, MessageEntity]
type ToHeadersAndEntityMarshaller[T] = Marshaller[T, (immutable.Seq[HttpHeader], MessageEntity)]
type ToResponseMarshaller[T] = Marshaller[T, HttpResponse]
type ToRequestMarshaller[T] = Marshaller[T, HttpRequest]

You can read more about this here : http://doc.akka.io/docs/akka/2.4.7/scala/http/common/marshalling.html

Luckily you don’t really have to get that involved with these that often as the routing DSL does most of the heavy lifting for you when you do the complete this is taken care of for you providing there is a marshaller that can be found implicitly

Unmarshalling

Unmarshalling is the process of taking the on the wire format (JSON string in these examples) back into a scala class (Item class in this case)

You can read more about this at the official Akka docs page : http://doc.akka.io/docs/akka/2.4.7/scala/http/common/unmarshalling.html

Luckily you don’t really have to get that involved with these that often as the routing DSL does most of the heavy lifting for you, which is what we use this part of the routing DSL, where this will use an unmarshaller to create the Item from the JSON string on the wire

entity(as[Item]) { item =>

WebSockets

Akka Http also supports web sockets too. Lets start this investigation with looking at what is required from the routing DSL perspective, which starts like this

path("websocket") {
  get {
    handleWebSocketMessages(websocketFlow)
  }
} ~

If we look at this special directive a bit more, what exactly does the handleWebSocketMessages directive look like

Well it looks like this:

def handleWebSocketMessages(handler: Flow[Message, Message, Any]): Route

So we need to supply a flow. A Flow is part of akka reactive streams which will look at in the next part. But for now just be aware that you can create a Flow from a Sink/Source and Materializer to materialize the flow.

For this websocket example here is what the Flow looks like

val (websocketSink, websocketSource) =
  MergeHub.source[String].toMat(BroadcastHub.sink[String])(Keep.both).run()

val websocketFlow: Flow[Message, Message, NotUsed] =
  Flow[Message].mapAsync(1) {
    // transform websocket message to domain message (string)
    case TextMessage.Strict(text) =>       Future.successful(text)
    case streamed: TextMessage.Streamed => streamed.textStream.runFold("")(_ ++ _)
  }.via(Flow.fromSinkAndSource(websocketSink, websocketSource))
    .map[Message](string => TextMessage(string))

The idea is that when a websocket client connects and sends an initial message they will get a reply TextMessage sent over the websocket to them

This uses some pretty new akka stream stages namely

  • MergeHub : Creates a Source that emits elements merged from a dynamic set of producers.
  • Broadcast : Emit each incoming element each of n outputs

 

Lets start by running the server, and then opening the “WebSocketTestClient.html” page which should look like this

image

image

Once the page is open, type something in the textbox and hit the “Send” button, you should see this

image

All fairly normal socket type stuff so far, we send a message from the web page client side to the server and the server responds with the text we sent.

But what about if we wanted to send message to the client on demand, say from another route which could be a command to do some work, which notifies the clients of the websocket?

With this Flow in place, we are also able to push back messages to the client end of the websocket.

Lets see another route which will simulate some work, which results in messages being sent down the websocket back to the client (if its still connected)

Here is the route

path("sendmessagetowebsocket" / IntNumber) { msgCount =>
  post {
    for(i <- 0 until msgCount)
    {
      Source.single(s"sendmessagetowebsocket $i").runWith(websocketSink)
    }
    complete("done")
  }
}

It can be seen that we simply create a new source which is run with the existing Sink that was part of the Flow used by the websocket

Here is what this would look like in postman

image

And here is what the web page client side websocket example looks like after this route has been called as above

image

 

 

Client Side

Akka http support comes with 3 types of client API that one can use

In this article I will only be using the last of these APIs, as in my opinion it is the most sensible client side choice.

So what does the request level client API look like.

GET

If we consider that we want to conduct this request

http://localhost:8080/randomitem

which when run via postman gives the following JSON response

image

So lets see what the code looks like to do this using the request level client API

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.marshalling.Marshal
import akka.http.scaladsl.model._
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import scala.concurrent.{Await, Future}
import concurrent.ExecutionContext.Implicits.global
import common.{Item, JsonSupport}
import concurrent.duration._
import scala.io.StdIn

class RegularRoutesDemo extends JsonSupport {

  def Run() : Unit = {
    implicit val system = ActorSystem()
    implicit val materializer = ActorMaterializer()

    val httpClient = Http().outgoingConnection(host = "localhost", port = 8080)

    //+++++++++++++++++++++++++++++++++++++++++++++++
    // GET http://localhost:8080/randomitem
    //+++++++++++++++++++++++++++++++++++++++++++++++
    val randomItemUrl = s"""/randomitem"""
    val flowGet : Future[Item] =
      Source.single(
        HttpRequest(
          method = HttpMethods.GET,
          uri = Uri(randomItemUrl))
        )
        .via(httpClient)
        .mapAsync(1)(response => Unmarshal(response.entity).to[Item])
        .runWith(Sink.head)
    val start = System.currentTimeMillis()
    val result = Await.result(flowGet, 5 seconds)
    val end = System.currentTimeMillis()
    println(s"Result in ${end-start} millis: $result")

  }
}

There are a couple of take away points in the code above

  • We use a Source which is a HttpRequest, where we can specify the HTTP verb and other request type things
  • We use Unmarshal to convert the incoming JSON string to an Item. We discussed Marshalling/Unmarshalling above.
  • This obviously relies on the Spray JSON support that we discussed above

 

POST

If we consider that we want to conduct this request

http://localhost:8080/saveitem

which when run via postman gives the following JSON response

image

So lets see what the code looks like to do this using the request level client API

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.marshalling.Marshal
import akka.http.scaladsl.model._
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import scala.concurrent.{Await, Future}
import concurrent.ExecutionContext.Implicits.global
import common.{Item, JsonSupport}
import concurrent.duration._
import scala.io.StdIn

class RegularRoutesDemo extends JsonSupport {

  def Run() : Unit = {
    implicit val system = ActorSystem()
    implicit val materializer = ActorMaterializer()

    val httpClient = Http().outgoingConnection(host = "localhost", port = 8080)

    //+++++++++++++++++++++++++++++++++++++++++++++++
    // POST http://localhost:8080/saveitem
    //+++++++++++++++++++++++++++++++++++++++++++++++
    val saveItemUrl = s"""/saveitem"""
    val itemToSave = Item("newItemHere",12)
    val flowPost = for {
      requestEntity <- Marshal(itemToSave).to[RequestEntity]
      response <-
      Source.single(
        HttpRequest(
          method = HttpMethods.POST,
          uri = Uri(saveItemUrl),
          entity = requestEntity)
        )
        .via(httpClient)
        .mapAsync(1)(response => Unmarshal(response.entity).to[Item])
        .runWith(Sink.head)
    } yield response
    val startPost = System.currentTimeMillis()
    val resultPost = Await.result(flowPost, 5 seconds)
    val endPost = System.currentTimeMillis()
    println(s"Result in ${endPost-startPost} millis: $resultPost")
  }
}

The only thing that is different this time, is that we need to pass a JSON string representation of an Item which we pass to the HttpRequest.

This is done use a JSON marshaller which must be in scope implicitly.

 

Where Can I Find The Code Examples?

I will be augmenting this GitHub repo with the example projects as I move through this series

https://github.com/sachabarber/SachaBarber.AkkaExamples

Advertisements

AKKA routing

 

Last time we looked at Akka Clustering, this time we will look at routing.

Routing allows messages to be routed to one or more actors known as routees, by sending the messages to a router that will know how to route the messages to the routees.

Akka comes with quite a few inbuilt routing strategies that we can make use of. We will look at these next.

Types Of Routing Strategy

Akka comes with a whole bunch of inbuilt routing strategies such as :

RoundRobin : Routes in a round-robin fashion to its routees.

Random : This router type selects one of its routees randomly for each message.

SmallestMailBox : A Router that tries to send to the non-suspended child routee with fewest messages in mailbox. The selection is done in this order: pick any idle routee (not processing message) with empty mailbox pick any routee with empty mailbox pick routee with fewest pending messages in mailbox pick any remote routee, remote actors are consider lowest priority, since their mailbox size is unknown

Broadcast : A broadcast router forwards the message it receives to all its routees.

ScatterGatherFirstCompleted : The ScatterGatherFirstCompletedRouter will send the message on to all its routees. It then waits for first reply it gets back. This result will be sent back to original sender. Other replies are discarded.

TailChopping : The TailChoppingRouter will first send the message to one, randomly picked, routee and then after a small delay to a second routee (picked randomly from the remaining routees) and so on. It waits for first reply it gets back and forwards it back to original sender. Other replies are discarded.

The goal of this router is to decrease latency by performing redundant queries to multiple routees, assuming that one of the other actors may still be faster to respond than the initial one.

Regular Actor As A Router

Akka allows you to create routers in 2 ways, the first way is to use RoutingLogic to setup your router.

Therere are quite a few specializations of the RoutingLogic, such as

  • RoundRobinRoutingLogic
  • RandomRoutingLogic
  • SmallestMailboxRoutingLogic
  • BroadcastRoutingLogic

You would typically use this in a regular actor. The actor in which you use the RoutingLogic would be the router. If you go down this path you would be responsible for managing the routers children, ie the routees. That means you would be responsible for managing ALL aspects of the routees, including adding them to a list of available routees, watching them for Termination to remove them from the list of available routees (which sounds a lot like supervision doesn’t it).

Here is what a skeleton for an actor that is setup manually as a router may look like

import java.util.concurrent.atomic.AtomicInteger

import akka.actor.{Actor, Props, Terminated}
import akka.routing.{RoutingLogic, ActorRefRoutee, RoundRobinRoutingLogic, Router}


class RouterActor(val routingLogic : RoutingLogic)  extends Actor  {

  val counter : AtomicInteger = new AtomicInteger()

  val routees = Vector.fill(5) {
    val workerCount = counter.getAndIncrement()
    val r = context.actorOf(Props(
      new WorkerActor(workerCount)), name = s"workerActor-$workerCount")
    context watch r
    ActorRefRoutee(r)
  }

  //create a Router based on the incoming class field
  //RoutingLogic which will really determine what type of router
  //we end up with
  var router = Router(routingLogic, routees)

  def receive = {
    case WorkMessage =>
      router.route(WorkMessage, sender())
    case Report => routees.foreach(ref => ref.send(Report, sender()))
    case Terminated(a) =>
      router = router.removeRoutee(a)
      val workerCount = counter.getAndIncrement()
      val r = context.actorOf(Props(
        new WorkerActor(workerCount)), name = s"workerActor-$workerCount")
      context watch r
      router = router.addRoutee(r)
  }
}

It can be seen that I pass in the RoutingLogic, which would be one of the available RoutingLogic strategies that akka comes with.

The other thing to note is that as we stated earlier we need to FULLY manage the collection of routee actors ourselves, including watching them for Termination.

Sure there is a better way?

Well yes thankfully there is, Akka also provides a Pool for this job. We will look at that next.

Pool

Akka comes with the ability to create a router using a pool where we tell it what actors we want to use as the routees, how many routees we want, and how the supervision should be handled.

Here is some code from by demo code that uses 2 utility methods to create a pool created router that will use a simple FibboniciActor which is sent messages via an actor that is created using the pool router value

def RunTailChoppingPoolDemo() : Unit = {

  val supervisionStrategy = OneForOneStrategy() {
    case e => SupervisorStrategy.restart
  }

  val props = TailChoppingPool(5, within = 10.seconds,
    supervisorStrategy = supervisionStrategy,interval = 20.millis).
    props(Props[FibonacciActor])

  RunPoolDemo(props)
}

def RunPoolDemo(props : Props) : Unit = {
  val system = ActorSystem("RoutingSystem")
  val actorRef = system.actorOf(Props(
    new PoolRouterContainerActor(props,"theRouter")), name = "thePoolContainer")
  actorRef ! WorkMessage
  StdIn.readLine()
  system.terminate()
}



import akka.actor._
import akka.util.Timeout
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.pattern.ask

class PoolRouterContainerActor(val props: Props, val name :String)  extends Actor  {

  val router: ActorRef = context.actorOf(props, name)

  def receive = {
    case WorkMessage =>
      implicit val timeout = Timeout(5 seconds)
      val futureResult = router ? FibonacciNumber(10)
      val (actName,result) = Await.result(futureResult, timeout.duration)

      println(s"FibonacciActor : ($actName) came back with result -> $result")
  }
}



import akka.actor.Actor
import scala.annotation.tailrec

class FibonacciActor extends Actor {

  val actName = self.path.name

  def receive = {
    case FibonacciNumber(nbr) => {
      println(s"FibonacciActor : ($actName) ->  " +
        s"has been asked to calculate FibonacciNumber")
      val result = fibonacci(nbr)
      sender ! (actName,result)
    }
  }

  private def fibonacci(n: Int): Int = {
    @tailrec
    def fib(n: Int, b: Int, a: Int): Int = n match {
      case 0 => a
      case _ => fib(n - 1, a + b, b)
    }

    fib(n, 1, 0)
  }
}

Supervision Using Pool

Routees that are created by a pool router will be created as the router’s children. The router is therefore also the children’s supervisor.

The supervision strategy of the router actor can be configured with the supervisorStrategy property of the Pool. If no configuration is provided, routers default to a strategy of “always escalate”. This means that errors are passed up to the router’s supervisor for handling. The router’s supervisor will decide what to do about any errors.

Note the router’s supervisor will treat the error as an error with the router itself. Therefore a directive to stop or restart will cause the router itself to stop or restart. The router, in turn, will cause its children to stop and restart.

It should be mentioned that the router’s restart behavior has been overridden so that a restart, while still re-creating the children, will still preserve the same number of actors in the pool.

This means that if you have not specified supervisorStrategy of the router or its parent a failure in a routee will escalate to the parent of the router, which will by default restart the router, which will restart all routees (it uses Escalate and does not stop routees during restart). The reason is to make the default behave such that adding withRouter to a child’s definition does not change the supervision strategy applied to the child. This might be an inefficiency that you can avoid by specifying the strategy when defining the router.

http://doc.akka.io/docs/akka/snapshot/scala/routing.html#Supervision up on 01/11/16

Group

You may also wish to create your routees separately and let the router know about them. This is achievable using Groups. This is not something I decided to cover in this post, but if this sounds of interest to you, you can read more about it at the official documentation here:

http://doc.akka.io/docs/akka/snapshot/scala/routing.html#Group

Routing Strategy Demos

For the demos I am using a mixture of RoutingLogic hosted in my own actor, and also Pool based routers.

Here is the basic setup for a RoutingLogic based actor of my own, where I have to manage all supervision concerns manually.

There are ALWAYS 5 routees involved with this demo.

import java.util.concurrent.TimeUnit

import akka.actor._
import akka.routing._
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.io.StdIn

object Demo extends App {

  //==============================================================
  //Standard Actor that does routing using Router class
  //where we apply relevant RoutingLogic
  //Supervision is done manually within the Actor that hosts
  //the Router, where we monitor the routees and remove /recreate
  //them on 'Terminated'
  //==============================================================
  RunRoutingDemo(RoundRobinRoutingLogic())



  def RunRoutingDemo(routingLogic : RoutingLogic) : Unit = {
    val system = ActorSystem("RoutingSystem")
    val actorRef = system.actorOf(Props(
      new RouterActor(routingLogic)), name = "theRouter")

    for (i <- 0 until 10) {
      actorRef ! WorkMessage
      Thread.sleep(1000)
    }
    actorRef ! Report

    StdIn.readLine()
    system.terminate()
  }
}

Where we make use of the following generic actor code that uses the specific RoutingLogic that is passed in.

import java.util.concurrent.atomic.AtomicInteger

import akka.actor.{Actor, Props, Terminated}
import akka.routing.{RoutingLogic, ActorRefRoutee, RoundRobinRoutingLogic, Router}


class RouterActor(val routingLogic : RoutingLogic)  extends Actor  {

  val counter : AtomicInteger = new AtomicInteger()

  val routees = Vector.fill(5) {
    val workerCount = counter.getAndIncrement()
    val r = context.actorOf(Props(
      new WorkerActor(workerCount)), name = s"workerActor-$workerCount")
    context watch r
    ActorRefRoutee(r)
  }

  //create a Router based on the incoming class field
  //RoutingLogic which will really determine what type of router
  //we end up with
  var router = Router(routingLogic, routees)

  def receive = {
    case WorkMessage =>
      router.route(WorkMessage, sender())
    case Report => routees.foreach(ref => ref.send(Report, sender()))
    case Terminated(a) =>
      router = router.removeRoutee(a)
      val workerCount = counter.getAndIncrement()
      val r = context.actorOf(Props(
        new WorkerActor(workerCount)), name = s"workerActor-$workerCount")
      context watch r
      router = router.addRoutee(r)
  }
}

This is what the routees look like for this set of demos

import akka.actor.Actor

class WorkerActor(val id : Int) extends Actor {

  var msgCount = 0
  val actName = self.path.name

  def receive = {
    case WorkMessage => {
      msgCount += 1
      println(s"worker : {$id}, name : ($actName) ->  ($msgCount)")
    }
    case Report => {
      println(s"worker : {$id}, name : ($actName) ->  saw total messages : ($msgCount)")
    }
    case _       => println("unknown message")
  }
}

Ok so lets have a look at some examples of using this code shall we:

RoundRobin

We get this output, where each routee gets the message round robin strategy applied

worker : {0}, name : (workerActor-0) ->  (1)
worker : {1}, name : (workerActor-1) ->  (1)
worker : {2}, name : (workerActor-2) ->  (1)
worker : {3}, name : (workerActor-3) ->  (1)
worker : {4}, name : (workerActor-4) ->  (1)
worker : {0}, name : (workerActor-0) ->  (2)
worker : {1}, name : (workerActor-1) ->  (2)
worker : {2}, name : (workerActor-2) ->  (2)
worker : {3}, name : (workerActor-3) ->  (2)
worker : {4}, name : (workerActor-4) ->  (2)
worker : {0}, name : (workerActor-0) ->  saw total messages : (2)
worker : {1}, name : (workerActor-1) ->  saw total messages : (2)
worker : {2}, name : (workerActor-2) ->  saw total messages : (2)
worker : {4}, name : (workerActor-4) ->  saw total messages : (2)
worker : {3}, name : (workerActor-3) ->  saw total messages : (2)

Random

We get this output, where the messages are sent to routees randomly

worker : {1}, name : (workerActor-1) ->  (1)
worker : {1}, name : (workerActor-1) ->  (2)
worker : {4}, name : (workerActor-4) ->  (1)
worker : {0}, name : (workerActor-0) ->  (1)
worker : {0}, name : (workerActor-0) ->  (2)
worker : {2}, name : (workerActor-2) ->  (1)
worker : {3}, name : (workerActor-3) ->  (1)
worker : {4}, name : (workerActor-4) ->  (2)
worker : {0}, name : (workerActor-0) ->  (3)
worker : {0}, name : (workerActor-0) ->  (4)
worker : {1}, name : (workerActor-1) ->  saw total messages : (2)
worker : {0}, name : (workerActor-0) ->  saw total messages : (4)
worker : {2}, name : (workerActor-2) ->  saw total messages : (1)
worker : {4}, name : (workerActor-4) ->  saw total messages : (2)
worker : {3}, name : (workerActor-3) ->  saw total messages : (1)

SmallestMailBox

We get this output, where the routee with the smallest mailbox will get the message sent to it. This example may look a bit weird, but if you think about it, by the time the new message is sent the 1st routee (workerActor0) will have dealt with the 1st message, and it ready to receive a new one, and since it’s the 1st routee in the list it is still considered the one with the smallest mailbox. If you introduced an artificial delay in the actor dealing with the message it may show different more interesting results.

worker : {0}, name : (workerActor-0) ->  (1)
worker : {0}, name : (workerActor-0) ->  (2)
worker : {0}, name : (workerActor-0) ->  (3)
worker : {0}, name : (workerActor-0) ->  (4)
worker : {0}, name : (workerActor-0) ->  (5)
worker : {0}, name : (workerActor-0) ->  (6)
worker : {0}, name : (workerActor-0) ->  (7)
worker : {0}, name : (workerActor-0) ->  (8)
worker : {0}, name : (workerActor-0) ->  (9)
worker : {0}, name : (workerActor-0) ->  (10)
worker : {2}, name : (workerActor-2) ->  saw total messages : (0)
worker : {4}, name : (workerActor-4) ->  saw total messages : (0)
worker : {1}, name : (workerActor-1) ->  saw total messages : (0)
worker : {0}, name : (workerActor-0) ->  saw total messages : (10)
worker : {3}, name : (workerActor-3) ->  saw total messages : (0)

Broadcast

We get this output, where each routee should see ALL messages

worker : {0}, name : (workerActor-0) ->  (1)
worker : {2}, name : (workerActor-2) ->  (1)
worker : {4}, name : (workerActor-4) ->  (1)
worker : {3}, name : (workerActor-3) ->  (1)
worker : {1}, name : (workerActor-1) ->  (1)
worker : {0}, name : (workerActor-0) ->  (2)
worker : {1}, name : (workerActor-1) ->  (2)
worker : {4}, name : (workerActor-4) ->  (2)
worker : {2}, name : (workerActor-2) ->  (2)
worker : {3}, name : (workerActor-3) ->  (2)
worker : {0}, name : (workerActor-0) ->  (3)
worker : {2}, name : (workerActor-2) ->  (3)
worker : {3}, name : (workerActor-3) ->  (3)
worker : {4}, name : (workerActor-4) ->  (3)
worker : {1}, name : (workerActor-1) ->  (3)
worker : {1}, name : (workerActor-1) ->  (4)
worker : {4}, name : (workerActor-4) ->  (4)
worker : {3}, name : (workerActor-3) ->  (4)
worker : {0}, name : (workerActor-0) ->  (4)
worker : {2}, name : (workerActor-2) ->  (4)
worker : {0}, name : (workerActor-0) ->  (5)
worker : {1}, name : (workerActor-1) ->  (5)
worker : {4}, name : (workerActor-4) ->  (5)
worker : {2}, name : (workerActor-2) ->  (5)
worker : {3}, name : (workerActor-3) ->  (5)
worker : {3}, name : (workerActor-3) ->  (6)
worker : {2}, name : (workerActor-2) ->  (6)
worker : {1}, name : (workerActor-1) ->  (6)
worker : {4}, name : (workerActor-4) ->  (6)
worker : {0}, name : (workerActor-0) ->  (6)
worker : {1}, name : (workerActor-1) ->  (7)
worker : {0}, name : (workerActor-0) ->  (7)
worker : {4}, name : (workerActor-4) ->  (7)
worker : {2}, name : (workerActor-2) ->  (7)
worker : {3}, name : (workerActor-3) ->  (7)
worker : {0}, name : (workerActor-0) ->  (8)
worker : {3}, name : (workerActor-3) ->  (8)
worker : {1}, name : (workerActor-1) ->  (8)
worker : {2}, name : (workerActor-2) ->  (8)
worker : {4}, name : (workerActor-4) ->  (8)
worker : {2}, name : (workerActor-2) ->  (9)
worker : {3}, name : (workerActor-3) ->  (9)
worker : {4}, name : (workerActor-4) ->  (9)
worker : {1}, name : (workerActor-1) ->  (9)
worker : {0}, name : (workerActor-0) ->  (9)
worker : {0}, name : (workerActor-0) ->  (10)
worker : {2}, name : (workerActor-2) ->  (10)
worker : {1}, name : (workerActor-1) ->  (10)
worker : {4}, name : (workerActor-4) ->  (10)
worker : {3}, name : (workerActor-3) ->  (10)
worker : {1}, name : (workerActor-1) ->  saw total messages : (10)
worker : {2}, name : (workerActor-2) ->  saw total messages : (10)
worker : {0}, name : (workerActor-0) ->  saw total messages : (10)
worker : {3}, name : (workerActor-3) ->  saw total messages : (10)
worker : {4}, name : (workerActor-4) ->  saw total messages : (10)

So that about covers the demos I have created for using your own actor and using the RoutingLogic. Lets now look at using pools, as I have stated already pools take care of supervision for us, so we don’t have to manually take care of that any more.

As before I have a helper actor to work with the pool, that accepts the router, where the router will receive the messages to send to its routees.

Here is the demo code

import java.util.concurrent.TimeUnit

import akka.actor._
import akka.routing._
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.io.StdIn

object Demo extends App {

  //==============================================================
  // Use built Pool router(s) which will do the supervision for us
  //
  //
  //    Comment/Uncomment to try the different router logic
  //
  //==============================================================
  RunScatterGatherFirstCompletedPoolDemo()
  //RunTailChoppingPoolDemo()



  def RunScatterGatherFirstCompletedPoolDemo() : Unit = {

    val supervisionStrategy = OneForOneStrategy() {
      case e => SupervisorStrategy.restart
    }

    val props = ScatterGatherFirstCompletedPool(
      5, supervisorStrategy = supervisionStrategy,within = 10.seconds).
      props(Props[FibonacciActor])

    RunPoolDemo(props)
  }

  def RunTailChoppingPoolDemo() : Unit = {

    val supervisionStrategy = OneForOneStrategy() {
      case e => SupervisorStrategy.restart
    }

    val props = TailChoppingPool(5, within = 10.seconds,
      supervisorStrategy = supervisionStrategy,interval = 20.millis).
      props(Props[FibonacciActor])

    RunPoolDemo(props)
  }

  def RunPoolDemo(props : Props) : Unit = {
    val system = ActorSystem("RoutingSystem")
    val actorRef = system.actorOf(Props(
      new PoolRouterContainerActor(props,"theRouter")), name = "thePoolContainer")
    actorRef ! WorkMessage
    StdIn.readLine()
    system.terminate()
  }
}

And here is the help actor

import akka.actor._
import akka.util.Timeout
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.pattern.ask

class PoolRouterContainerActor(val props: Props, val name :String)  extends Actor  {

  val router: ActorRef = context.actorOf(props, name)

  def receive = {
    case WorkMessage =>
      implicit val timeout = Timeout(5 seconds)
      val futureResult = router ? FibonacciNumber(10)
      val (actName,result) = Await.result(futureResult, timeout.duration)

      println(s"FibonacciActor : ($actName) came back with result -> $result")
  }
}

As before we will use 5 routees.

This is what the routees look like for the pool demo

import akka.actor.Actor
import scala.annotation.tailrec

class FibonacciActor extends Actor {

  val actName = self.path.name

  def receive = {
    case FibonacciNumber(nbr) => {
      println(s"FibonacciActor : ($actName) ->  " +
        s"has been asked to calculate FibonacciNumber")
      val result = fibonacci(nbr)
      sender ! (actName,result)
    }
  }

  private def fibonacci(n: Int): Int = {
    @tailrec
    def fib(n: Int, b: Int, a: Int): Int = n match {
      case 0 => a
      case _ => fib(n - 1, a + b, b)
    }

    fib(n, 1, 0)
  }
}

ScatterGatherFirstCompletedPool

Here is the output when we run this. It can be seen that we simply get the results from the routee that completed first

FibonacciActor : ($d) ->  has been asked to calculate FibonacciNumber
FibonacciActor : ($e) ->  has been asked to calculate FibonacciNumber
FibonacciActor : ($a) ->  has been asked to calculate FibonacciNumber
FibonacciActor : ($c) ->  has been asked to calculate FibonacciNumber
FibonacciActor : ($b) ->  has been asked to calculate FibonacciNumber
FibonacciActor : ($d) came back with result -> 55

TailChoppingPool

Here is the output when we run this. It can be seen that we simply get the results from the routee that completed first, out of the few routees that the message was sent to

FibonacciActor : ($b) ->  has been asked to calculate FibonacciNumber
FibonacciActor : ($b) came back with result -> 55

 

What About Custom Routing Strategy

Akka allows you to create your own routing strategy where you would create a class that extends the inbuilt Akka RoutingLogic. You can read more about this in the official Akka documentation:

http://doc.akka.io/docs/akka/snapshot/scala/routing.html#Custom_Router

 

Where Can I Find The Code Examples?

I will be augmenting this GitHub repo with the example projects as I move through this series

https://github.com/sachabarber/SachaBarber.AkkaExamples