Akka

AKKA routing

 

Last time we looked at Akka Clustering, this time we will look at routing.

Routing allows messages to be routed to one or more actors known as routees, by sending the messages to a router that will know how to route the messages to the routees.

Akka comes with quite a few inbuilt routing strategies that we can make use of. We will look at these next.

Types Of Routing Strategy

Akka comes with a whole bunch of inbuilt routing strategies such as :

RoundRobin : Routes in a round-robin fashion to its routees.

Random : This router type selects one of its routees randomly for each message.

SmallestMailBox : A Router that tries to send to the non-suspended child routee with fewest messages in mailbox. The selection is done in this order: pick any idle routee (not processing message) with empty mailbox pick any routee with empty mailbox pick routee with fewest pending messages in mailbox pick any remote routee, remote actors are consider lowest priority, since their mailbox size is unknown

Broadcast : A broadcast router forwards the message it receives to all its routees.

ScatterGatherFirstCompleted : The ScatterGatherFirstCompletedRouter will send the message on to all its routees. It then waits for first reply it gets back. This result will be sent back to original sender. Other replies are discarded.

TailChopping : The TailChoppingRouter will first send the message to one, randomly picked, routee and then after a small delay to a second routee (picked randomly from the remaining routees) and so on. It waits for first reply it gets back and forwards it back to original sender. Other replies are discarded.

The goal of this router is to decrease latency by performing redundant queries to multiple routees, assuming that one of the other actors may still be faster to respond than the initial one.

Regular Actor As A Router

Akka allows you to create routers in 2 ways, the first way is to use RoutingLogic to setup your router.

Therere are quite a few specializations of the RoutingLogic, such as

  • RoundRobinRoutingLogic
  • RandomRoutingLogic
  • SmallestMailboxRoutingLogic
  • BroadcastRoutingLogic

You would typically use this in a regular actor. The actor in which you use the RoutingLogic would be the router. If you go down this path you would be responsible for managing the routers children, ie the routees. That means you would be responsible for managing ALL aspects of the routees, including adding them to a list of available routees, watching them for Termination to remove them from the list of available routees (which sounds a lot like supervision doesn’t it).

Here is what a skeleton for an actor that is setup manually as a router may look like

import java.util.concurrent.atomic.AtomicInteger

import akka.actor.{Actor, Props, Terminated}
import akka.routing.{RoutingLogic, ActorRefRoutee, RoundRobinRoutingLogic, Router}


class RouterActor(val routingLogic : RoutingLogic)  extends Actor  {

  val counter : AtomicInteger = new AtomicInteger()

  val routees = Vector.fill(5) {
    val workerCount = counter.getAndIncrement()
    val r = context.actorOf(Props(
      new WorkerActor(workerCount)), name = s"workerActor-$workerCount")
    context watch r
    ActorRefRoutee(r)
  }

  //create a Router based on the incoming class field
  //RoutingLogic which will really determine what type of router
  //we end up with
  var router = Router(routingLogic, routees)

  def receive = {
    case WorkMessage =>
      router.route(WorkMessage, sender())
    case Report => routees.foreach(ref => ref.send(Report, sender()))
    case Terminated(a) =>
      router = router.removeRoutee(a)
      val workerCount = counter.getAndIncrement()
      val r = context.actorOf(Props(
        new WorkerActor(workerCount)), name = s"workerActor-$workerCount")
      context watch r
      router = router.addRoutee(r)
  }
}

It can be seen that I pass in the RoutingLogic, which would be one of the available RoutingLogic strategies that akka comes with.

The other thing to note is that as we stated earlier we need to FULLY manage the collection of routee actors ourselves, including watching them for Termination.

Sure there is a better way?

Well yes thankfully there is, Akka also provides a Pool for this job. We will look at that next.

Pool

Akka comes with the ability to create a router using a pool where we tell it what actors we want to use as the routees, how many routees we want, and how the supervision should be handled.

Here is some code from by demo code that uses 2 utility methods to create a pool created router that will use a simple FibboniciActor which is sent messages via an actor that is created using the pool router value

def RunTailChoppingPoolDemo() : Unit = {

  val supervisionStrategy = OneForOneStrategy() {
    case e => SupervisorStrategy.restart
  }

  val props = TailChoppingPool(5, within = 10.seconds,
    supervisorStrategy = supervisionStrategy,interval = 20.millis).
    props(Props[FibonacciActor])

  RunPoolDemo(props)
}

def RunPoolDemo(props : Props) : Unit = {
  val system = ActorSystem("RoutingSystem")
  val actorRef = system.actorOf(Props(
    new PoolRouterContainerActor(props,"theRouter")), name = "thePoolContainer")
  actorRef ! WorkMessage
  StdIn.readLine()
  system.terminate()
}



import akka.actor._
import akka.util.Timeout
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.pattern.ask

class PoolRouterContainerActor(val props: Props, val name :String)  extends Actor  {

  val router: ActorRef = context.actorOf(props, name)

  def receive = {
    case WorkMessage =>
      implicit val timeout = Timeout(5 seconds)
      val futureResult = router ? FibonacciNumber(10)
      val (actName,result) = Await.result(futureResult, timeout.duration)

      println(s"FibonacciActor : ($actName) came back with result -> $result")
  }
}



import akka.actor.Actor
import scala.annotation.tailrec

class FibonacciActor extends Actor {

  val actName = self.path.name

  def receive = {
    case FibonacciNumber(nbr) => {
      println(s"FibonacciActor : ($actName) ->  " +
        s"has been asked to calculate FibonacciNumber")
      val result = fibonacci(nbr)
      sender ! (actName,result)
    }
  }

  private def fibonacci(n: Int): Int = {
    @tailrec
    def fib(n: Int, b: Int, a: Int): Int = n match {
      case 0 => a
      case _ => fib(n - 1, a + b, b)
    }

    fib(n, 1, 0)
  }
}

Supervision Using Pool

Routees that are created by a pool router will be created as the router’s children. The router is therefore also the children’s supervisor.

The supervision strategy of the router actor can be configured with the supervisorStrategy property of the Pool. If no configuration is provided, routers default to a strategy of “always escalate”. This means that errors are passed up to the router’s supervisor for handling. The router’s supervisor will decide what to do about any errors.

Note the router’s supervisor will treat the error as an error with the router itself. Therefore a directive to stop or restart will cause the router itself to stop or restart. The router, in turn, will cause its children to stop and restart.

It should be mentioned that the router’s restart behavior has been overridden so that a restart, while still re-creating the children, will still preserve the same number of actors in the pool.

This means that if you have not specified supervisorStrategy of the router or its parent a failure in a routee will escalate to the parent of the router, which will by default restart the router, which will restart all routees (it uses Escalate and does not stop routees during restart). The reason is to make the default behave such that adding withRouter to a child’s definition does not change the supervision strategy applied to the child. This might be an inefficiency that you can avoid by specifying the strategy when defining the router.

http://doc.akka.io/docs/akka/snapshot/scala/routing.html#Supervision up on 01/11/16

Group

You may also wish to create your routees separately and let the router know about them. This is achievable using Groups. This is not something I decided to cover in this post, but if this sounds of interest to you, you can read more about it at the official documentation here:

http://doc.akka.io/docs/akka/snapshot/scala/routing.html#Group

Routing Strategy Demos

For the demos I am using a mixture of RoutingLogic hosted in my own actor, and also Pool based routers.

Here is the basic setup for a RoutingLogic based actor of my own, where I have to manage all supervision concerns manually.

There are ALWAYS 5 routees involved with this demo.

import java.util.concurrent.TimeUnit

import akka.actor._
import akka.routing._
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.io.StdIn

object Demo extends App {

  //==============================================================
  //Standard Actor that does routing using Router class
  //where we apply relevant RoutingLogic
  //Supervision is done manually within the Actor that hosts
  //the Router, where we monitor the routees and remove /recreate
  //them on 'Terminated'
  //==============================================================
  RunRoutingDemo(RoundRobinRoutingLogic())



  def RunRoutingDemo(routingLogic : RoutingLogic) : Unit = {
    val system = ActorSystem("RoutingSystem")
    val actorRef = system.actorOf(Props(
      new RouterActor(routingLogic)), name = "theRouter")

    for (i <- 0 until 10) {
      actorRef ! WorkMessage
      Thread.sleep(1000)
    }
    actorRef ! Report

    StdIn.readLine()
    system.terminate()
  }
}

Where we make use of the following generic actor code that uses the specific RoutingLogic that is passed in.

import java.util.concurrent.atomic.AtomicInteger

import akka.actor.{Actor, Props, Terminated}
import akka.routing.{RoutingLogic, ActorRefRoutee, RoundRobinRoutingLogic, Router}


class RouterActor(val routingLogic : RoutingLogic)  extends Actor  {

  val counter : AtomicInteger = new AtomicInteger()

  val routees = Vector.fill(5) {
    val workerCount = counter.getAndIncrement()
    val r = context.actorOf(Props(
      new WorkerActor(workerCount)), name = s"workerActor-$workerCount")
    context watch r
    ActorRefRoutee(r)
  }

  //create a Router based on the incoming class field
  //RoutingLogic which will really determine what type of router
  //we end up with
  var router = Router(routingLogic, routees)

  def receive = {
    case WorkMessage =>
      router.route(WorkMessage, sender())
    case Report => routees.foreach(ref => ref.send(Report, sender()))
    case Terminated(a) =>
      router = router.removeRoutee(a)
      val workerCount = counter.getAndIncrement()
      val r = context.actorOf(Props(
        new WorkerActor(workerCount)), name = s"workerActor-$workerCount")
      context watch r
      router = router.addRoutee(r)
  }
}

This is what the routees look like for this set of demos

import akka.actor.Actor

class WorkerActor(val id : Int) extends Actor {

  var msgCount = 0
  val actName = self.path.name

  def receive = {
    case WorkMessage => {
      msgCount += 1
      println(s"worker : {$id}, name : ($actName) ->  ($msgCount)")
    }
    case Report => {
      println(s"worker : {$id}, name : ($actName) ->  saw total messages : ($msgCount)")
    }
    case _       => println("unknown message")
  }
}

Ok so lets have a look at some examples of using this code shall we:

RoundRobin

We get this output, where each routee gets the message round robin strategy applied

worker : {0}, name : (workerActor-0) ->  (1)
worker : {1}, name : (workerActor-1) ->  (1)
worker : {2}, name : (workerActor-2) ->  (1)
worker : {3}, name : (workerActor-3) ->  (1)
worker : {4}, name : (workerActor-4) ->  (1)
worker : {0}, name : (workerActor-0) ->  (2)
worker : {1}, name : (workerActor-1) ->  (2)
worker : {2}, name : (workerActor-2) ->  (2)
worker : {3}, name : (workerActor-3) ->  (2)
worker : {4}, name : (workerActor-4) ->  (2)
worker : {0}, name : (workerActor-0) ->  saw total messages : (2)
worker : {1}, name : (workerActor-1) ->  saw total messages : (2)
worker : {2}, name : (workerActor-2) ->  saw total messages : (2)
worker : {4}, name : (workerActor-4) ->  saw total messages : (2)
worker : {3}, name : (workerActor-3) ->  saw total messages : (2)

Random

We get this output, where the messages are sent to routees randomly

worker : {1}, name : (workerActor-1) ->  (1)
worker : {1}, name : (workerActor-1) ->  (2)
worker : {4}, name : (workerActor-4) ->  (1)
worker : {0}, name : (workerActor-0) ->  (1)
worker : {0}, name : (workerActor-0) ->  (2)
worker : {2}, name : (workerActor-2) ->  (1)
worker : {3}, name : (workerActor-3) ->  (1)
worker : {4}, name : (workerActor-4) ->  (2)
worker : {0}, name : (workerActor-0) ->  (3)
worker : {0}, name : (workerActor-0) ->  (4)
worker : {1}, name : (workerActor-1) ->  saw total messages : (2)
worker : {0}, name : (workerActor-0) ->  saw total messages : (4)
worker : {2}, name : (workerActor-2) ->  saw total messages : (1)
worker : {4}, name : (workerActor-4) ->  saw total messages : (2)
worker : {3}, name : (workerActor-3) ->  saw total messages : (1)

SmallestMailBox

We get this output, where the routee with the smallest mailbox will get the message sent to it. This example may look a bit weird, but if you think about it, by the time the new message is sent the 1st routee (workerActor0) will have dealt with the 1st message, and it ready to receive a new one, and since it’s the 1st routee in the list it is still considered the one with the smallest mailbox. If you introduced an artificial delay in the actor dealing with the message it may show different more interesting results.

worker : {0}, name : (workerActor-0) ->  (1)
worker : {0}, name : (workerActor-0) ->  (2)
worker : {0}, name : (workerActor-0) ->  (3)
worker : {0}, name : (workerActor-0) ->  (4)
worker : {0}, name : (workerActor-0) ->  (5)
worker : {0}, name : (workerActor-0) ->  (6)
worker : {0}, name : (workerActor-0) ->  (7)
worker : {0}, name : (workerActor-0) ->  (8)
worker : {0}, name : (workerActor-0) ->  (9)
worker : {0}, name : (workerActor-0) ->  (10)
worker : {2}, name : (workerActor-2) ->  saw total messages : (0)
worker : {4}, name : (workerActor-4) ->  saw total messages : (0)
worker : {1}, name : (workerActor-1) ->  saw total messages : (0)
worker : {0}, name : (workerActor-0) ->  saw total messages : (10)
worker : {3}, name : (workerActor-3) ->  saw total messages : (0)

Broadcast

We get this output, where each routee should see ALL messages

worker : {0}, name : (workerActor-0) ->  (1)
worker : {2}, name : (workerActor-2) ->  (1)
worker : {4}, name : (workerActor-4) ->  (1)
worker : {3}, name : (workerActor-3) ->  (1)
worker : {1}, name : (workerActor-1) ->  (1)
worker : {0}, name : (workerActor-0) ->  (2)
worker : {1}, name : (workerActor-1) ->  (2)
worker : {4}, name : (workerActor-4) ->  (2)
worker : {2}, name : (workerActor-2) ->  (2)
worker : {3}, name : (workerActor-3) ->  (2)
worker : {0}, name : (workerActor-0) ->  (3)
worker : {2}, name : (workerActor-2) ->  (3)
worker : {3}, name : (workerActor-3) ->  (3)
worker : {4}, name : (workerActor-4) ->  (3)
worker : {1}, name : (workerActor-1) ->  (3)
worker : {1}, name : (workerActor-1) ->  (4)
worker : {4}, name : (workerActor-4) ->  (4)
worker : {3}, name : (workerActor-3) ->  (4)
worker : {0}, name : (workerActor-0) ->  (4)
worker : {2}, name : (workerActor-2) ->  (4)
worker : {0}, name : (workerActor-0) ->  (5)
worker : {1}, name : (workerActor-1) ->  (5)
worker : {4}, name : (workerActor-4) ->  (5)
worker : {2}, name : (workerActor-2) ->  (5)
worker : {3}, name : (workerActor-3) ->  (5)
worker : {3}, name : (workerActor-3) ->  (6)
worker : {2}, name : (workerActor-2) ->  (6)
worker : {1}, name : (workerActor-1) ->  (6)
worker : {4}, name : (workerActor-4) ->  (6)
worker : {0}, name : (workerActor-0) ->  (6)
worker : {1}, name : (workerActor-1) ->  (7)
worker : {0}, name : (workerActor-0) ->  (7)
worker : {4}, name : (workerActor-4) ->  (7)
worker : {2}, name : (workerActor-2) ->  (7)
worker : {3}, name : (workerActor-3) ->  (7)
worker : {0}, name : (workerActor-0) ->  (8)
worker : {3}, name : (workerActor-3) ->  (8)
worker : {1}, name : (workerActor-1) ->  (8)
worker : {2}, name : (workerActor-2) ->  (8)
worker : {4}, name : (workerActor-4) ->  (8)
worker : {2}, name : (workerActor-2) ->  (9)
worker : {3}, name : (workerActor-3) ->  (9)
worker : {4}, name : (workerActor-4) ->  (9)
worker : {1}, name : (workerActor-1) ->  (9)
worker : {0}, name : (workerActor-0) ->  (9)
worker : {0}, name : (workerActor-0) ->  (10)
worker : {2}, name : (workerActor-2) ->  (10)
worker : {1}, name : (workerActor-1) ->  (10)
worker : {4}, name : (workerActor-4) ->  (10)
worker : {3}, name : (workerActor-3) ->  (10)
worker : {1}, name : (workerActor-1) ->  saw total messages : (10)
worker : {2}, name : (workerActor-2) ->  saw total messages : (10)
worker : {0}, name : (workerActor-0) ->  saw total messages : (10)
worker : {3}, name : (workerActor-3) ->  saw total messages : (10)
worker : {4}, name : (workerActor-4) ->  saw total messages : (10)

So that about covers the demos I have created for using your own actor and using the RoutingLogic. Lets now look at using pools, as I have stated already pools take care of supervision for us, so we don’t have to manually take care of that any more.

As before I have a helper actor to work with the pool, that accepts the router, where the router will receive the messages to send to its routees.

Here is the demo code

import java.util.concurrent.TimeUnit

import akka.actor._
import akka.routing._
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.io.StdIn

object Demo extends App {

  //==============================================================
  // Use built Pool router(s) which will do the supervision for us
  //
  //
  //    Comment/Uncomment to try the different router logic
  //
  //==============================================================
  RunScatterGatherFirstCompletedPoolDemo()
  //RunTailChoppingPoolDemo()



  def RunScatterGatherFirstCompletedPoolDemo() : Unit = {

    val supervisionStrategy = OneForOneStrategy() {
      case e => SupervisorStrategy.restart
    }

    val props = ScatterGatherFirstCompletedPool(
      5, supervisorStrategy = supervisionStrategy,within = 10.seconds).
      props(Props[FibonacciActor])

    RunPoolDemo(props)
  }

  def RunTailChoppingPoolDemo() : Unit = {

    val supervisionStrategy = OneForOneStrategy() {
      case e => SupervisorStrategy.restart
    }

    val props = TailChoppingPool(5, within = 10.seconds,
      supervisorStrategy = supervisionStrategy,interval = 20.millis).
      props(Props[FibonacciActor])

    RunPoolDemo(props)
  }

  def RunPoolDemo(props : Props) : Unit = {
    val system = ActorSystem("RoutingSystem")
    val actorRef = system.actorOf(Props(
      new PoolRouterContainerActor(props,"theRouter")), name = "thePoolContainer")
    actorRef ! WorkMessage
    StdIn.readLine()
    system.terminate()
  }
}

And here is the help actor

import akka.actor._
import akka.util.Timeout
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.pattern.ask

class PoolRouterContainerActor(val props: Props, val name :String)  extends Actor  {

  val router: ActorRef = context.actorOf(props, name)

  def receive = {
    case WorkMessage =>
      implicit val timeout = Timeout(5 seconds)
      val futureResult = router ? FibonacciNumber(10)
      val (actName,result) = Await.result(futureResult, timeout.duration)

      println(s"FibonacciActor : ($actName) came back with result -> $result")
  }
}

As before we will use 5 routees.

This is what the routees look like for the pool demo

import akka.actor.Actor
import scala.annotation.tailrec

class FibonacciActor extends Actor {

  val actName = self.path.name

  def receive = {
    case FibonacciNumber(nbr) => {
      println(s"FibonacciActor : ($actName) ->  " +
        s"has been asked to calculate FibonacciNumber")
      val result = fibonacci(nbr)
      sender ! (actName,result)
    }
  }

  private def fibonacci(n: Int): Int = {
    @tailrec
    def fib(n: Int, b: Int, a: Int): Int = n match {
      case 0 => a
      case _ => fib(n - 1, a + b, b)
    }

    fib(n, 1, 0)
  }
}

ScatterGatherFirstCompletedPool

Here is the output when we run this. It can be seen that we simply get the results from the routee that completed first

FibonacciActor : ($d) ->  has been asked to calculate FibonacciNumber
FibonacciActor : ($e) ->  has been asked to calculate FibonacciNumber
FibonacciActor : ($a) ->  has been asked to calculate FibonacciNumber
FibonacciActor : ($c) ->  has been asked to calculate FibonacciNumber
FibonacciActor : ($b) ->  has been asked to calculate FibonacciNumber
FibonacciActor : ($d) came back with result -> 55

TailChoppingPool

Here is the output when we run this. It can be seen that we simply get the results from the routee that completed first, out of the few routees that the message was sent to

FibonacciActor : ($b) ->  has been asked to calculate FibonacciNumber
FibonacciActor : ($b) came back with result -> 55

 

What About Custom Routing Strategy

Akka allows you to create your own routing strategy where you would create a class that extends the inbuilt Akka RoutingLogic. You can read more about this in the official Akka documentation:

http://doc.akka.io/docs/akka/snapshot/scala/routing.html#Custom_Router

 

Where Can I Find The Code Examples?

I will be augmenting this GitHub repo with the example projects as I move through this series

https://github.com/sachabarber/SachaBarber.AkkaExamples

2 thoughts on “AKKA routing

Leave a comment