Category Archives: Akka

AKKA : hierarchies / lifecycles

Ok so last time we covered the basics of actors and the actor system (actor fabric) and covered how to send simple messages to actors.

This time we will talk about actor hierarchies (supervision) and also actor lifecycles.

But before we get into supervision and how it is used, we should just take a trip back to the introduction article in this series of posts.

The introductory post had the following bullet point

  • Supervisor hierarchies with “let-it-crash” semantics.

What does that actually mean. Well quite simply Akka embraces failure and treats it as an expected part of what could happen.

Doesn’t this sound quite dangerous? Well yes and no, the yes part could be that an actor was part way through processing some message. So logic would dictate that you should ensure that you messages are idempotent, and are safe to send again.

The no part is the interesting case, so you have an actor that dies part way through its job. Boo

Luckily this is Akka’s bread and butter, it has a strong concept of ownership/supervision where the supervisor would know about an underlings failure and would know how to remedy that. We will look at this next.

Hierarchies (supervision)

So far we have seen how to create our own little Akka systems. But what is really going on when we do that.

It turns out that the Akka fabric creates some top level Actors called “Guardians” which all user created guardians are “supervised” by.

This can be seen on this diagram (borrowed from the official Akka documentation)

image

Within Akka EVERY actor is “supervised” by another actor, up until the “Root Guardian”

There are 2 sub guardians under the root guardian.

User and System both of which play different roles

To stand on the should of giants here is what the Akka docs have to say about these 3 top level guardians

/: The Root Guardian
The root guardian is the grand-parent of all so-called “top-level” actors and supervises all the special actors mentioned in Top-Level Scopes for Actor Paths using the SupervisorStrategy.stoppingStrategy, whose purpose is to terminate the child upon any type of Exception. All other throwables will be escalated … but to whom? Since every real actor has a supervisor, the supervisor of the root guardian cannot be a real actor. And because this means that it is “outside of the bubble”, it is called the “bubble-walker”. This is a synthetic ActorRef which in effect stops its child upon the first sign of trouble and sets the actor system’s isTerminated status to true as soon as the root guardian is fully terminated (all children recursively stopped).

/system: The System Guardian
This special guardian has been introduced in order to achieve an orderly shut-down sequence where logging remains active while all normal actors terminate, even though logging itself is implemented using actors. This is realized by having the system guardian watch the user guardian and initiate its own shut-down upon reception of the Terminated message. The top-level system actors are supervised using a strategy which will restart indefinitely upon all types of Exception except for ActorInitializationException and ActorKilledException, which will terminate the child in question. All other throwables are escalated, which will shut down the whole actor system.

/user: The Guardian Actor
The actor which is probably most interacted with is the parent of all user-created actors, the guardian named “/user”. Actors created using system.actorOf() are children of this actor. This means that when this guardian terminates, all normal actors in the system will be shutdown, too. It also means that this guardian’s supervisor strategy determines how the top-level normal actors are supervised. Since Akka 2.1 it is possible to configure this using the setting akka.actor.guardian-supervisor-strategy, which takes the fully-qualified class-name of a SupervisorStrategyConfigurator. When the guardian escalates a failure, the root guardian’s response will be to terminate the guardian, which in effect will shut down the whole actor system.

See http://doc.akka.io/docs/akka/snapshot/general/supervision.html

Ok, so now we have established that there are top level guardians that take care of “supervising” any actors that are created programmatically, and that there is a guardian to monitor existing actors and log/manage their correct shutdown.

But can we created our own supervisors?

Well yes you can, in fact this is completely encouraged and a very very natural part of writing Akka code.

So how do we do that exactly?

It is actually quite simple, we have to follow a few steps

  1. Decide on what supervision strategy we wish to use (Akka has 2 of these, we will cover these in this article)
  2. Implement a Decider (again we will cover this in this article)

Let’s start with talking about these 2 points before we dive into any code

Supervision Strategy

As stated above it is completely possible to create our own supervisors.  There are only a couple of things that we need to look at before we dive into looking at some example code.

Firstly we need to understand what is meant by a supervision strategy. Quite simply a supervision strategy is a strategy that dictates what a supervisor will do when one of his underlings (the actors that he creates) throws an Exception

Akka comes with 2 inbuilt supervisor strategy

OneForOneStrategy which means that ONLY the child actor that raised the exception qualifies for some special treatment. The special treatment is called a directive, but more on this later

AllForOneStrategy which means that ALL of the children under the direct supervisor will have the supervision strategy directive applied to them

Decider

Within both of these strategies listed above, Akka has the concept of a decider. A decider is a PartialFunction that has the following type signature

type Decider = PartialFunction[Throwable, Directive]

Where PartialFunction is a trait that looks like this under the covers

PartialFunction[-A, +B] extends (A => B)

The idea here being that given a Throwable (exception) you must return a B. In Akka’s case the B in question would be a akka.actor.SupervisorStrategy.Directiver

A Directive will take one of 4 possible values

  • Resume
  • ReStart
  • Stop
  • Escalate

So what we are really saying when we talk about a decider is that it is a way for a supervisor to know what action to carry out when it sees a given Throwable (exception). This action will be applied to one (or more) of the supervsiors child(ren) depending on the type of supervision strategy used.

Akka actually comes with a default decider, which you can use directly should you want to, or you can create your own which you can augment with the default Akka one. In this post I will create my own and augment it with the default Akka one.

Let’s see an example of how a supervisor might use a decider and how it may create a supervision strategy.

import akka.actor.SupervisorStrategy.Directive
import akka.actor._

class AllForOneSupervisorActor extends Actor {
  

  val decider: PartialFunction[Throwable, Directive] = {
    case _: AkkaStopDirectiveException => 
	akka.actor.SupervisorStrategy.stop
    case _: AkkaRestartDirectiveException => 
	akka.actor.SupervisorStrategy.restart
  }

  override def supervisorStrategy: SupervisorStrategy =
    AllForOneStrategy()(decider.orElse(
	SupervisorStrategy.defaultStrategy.decider))
}

Now that we have seen how a supervisor creates a supervision strategy and how it makes use of a decider lets continue to look at an example of the 2 built in Akka supervision strategies

The Common Parts Of The Demo Code

Within this demo the following actor will be used as the child of the supervisors, we will just have multiple children ALL of this type of actor

import akka.actor.Actor

class LifeCycleActor extends Actor {
  println("LifeCycleActor: constructor")

  override def preStart { println("LifeCycleActor: preStart") }

  override def postStop { println("LifeCycleActor: postStop") }

  override def preRestart(reason: Throwable, message: Option[Any]) {
    println("LifeCycleActor: preRestart")
    println(s"LifeCycleActor reason: ${reason.getMessage}")
    println(s"LifeCycleActor message: ${message.getOrElse("")}")
    super.preRestart(reason, message)
  }

  override def postRestart(reason: Throwable) {
    println("LifeCycleActor: postRestart")
    println(s"LifeCycleActor reason: ${reason.getMessage}")
    super.postRestart(reason)
  }

  def receive = {

    case "SoundOff" =>
      println("LifeCycleActor: SoundOff seen")
      println(s"LifeCycleActor alive ${self.path.name}" )

    case RaiseStopThrowableMessage =>
      println("LifeCycleActor: RaiseStopThrowableMessage seen")
      throw new AkkaStopDirectiveException(
        "LifeCycleActor raised AkkaStopDirectiveException")

    case RaiseRestartThrowableMessage =>
      println("LifeCycleActor: RaiseRestartThrowableMessage seen")
      throw new AkkaRestartDirectiveException(
        "LifeCycleActor raised AkkaRestartDirectiveException")
  }
}

OneForOneCodeStrategy Example

Here is the code for the “OneForOne” supervisor actor.

There are a couple of things to not here.

  • We create child actors using the contextOf built in akka factors. This create children that are “supervised” by the actor whose context was used to create the child actors
  • We use a custom decider which uses the default akka decider as well
  • We use the “OneForOneStrategy” supervision strategy
import akka.actor.SupervisorStrategy.Directive
import akka.actor._

class OneForOneSupervisorActor extends Actor {
  println("OneForOneSupervisorActor: constructor")
  val lifeCycleChildrenActors = new Array[ActorRef](3)

  def receive = {
    case "StartChildren" =>
      println(s"OneForOneSupervisorActor : got a message StartChildren")

      for(i <- 0 to 2) {
        val child = context.actorOf(Props[LifeCycleActor], name = s"lifecycleactor_$i")
        lifeCycleChildrenActors(i) = child
      }

    case "MakeRandomChildCommitSuicide" =>
      println(s"OneForOneSupervisorActor : got a message MakeRandomChildCommitSuicide")
      lifeCycleChildrenActors(2) ! RaiseStopThrowableMessage

    case "MakeRandomChildRestart" =>
      println(s"OneForOneSupervisorActor : got a message MakeRandomChildRestart")
      lifeCycleChildrenActors(2) ! RaiseRestartThrowableMessage

    case "TellChildrenToSoundOff" =>
      println(s"OneForOneSupervisorActor : got a message TellChildrenToSoundOff")

      lifeCycleChildrenActors.foreach(x => x ! "SoundOff")
  }

  val decider: PartialFunction[Throwable, Directive] = {
    case _: AkkaStopDirectiveException => akka.actor.SupervisorStrategy.stop
    case _: AkkaRestartDirectiveException => akka.actor.SupervisorStrategy.restart
  }

  override def supervisorStrategy: SupervisorStrategy =
    OneForOneStrategy()(decider.orElse(SupervisorStrategy.defaultStrategy.decider))
}

When we run this one, we will fail a particular child, and ONLY that child will be effected by the supervision strategy thanks to the use of the “OneForOneStrategy”.

Let’s assume we have this demo code

val oneForOneSupervisorActor = system.actorOf(Props[OneForOneSupervisorActor], name = "OneForOneSupervisorActor")
println("sending oneForOneSupervisorActor a 'StartChildren' message")

oneForOneSupervisorActor ! "StartChildren"
Thread.sleep(1000)

oneForOneSupervisorActor ! "MakeRandomChildRestart"
Thread.sleep(1000)

oneForOneSupervisorActor ! "TellChildrenToSoundOff"
Thread.sleep(1000)

oneForOneSupervisorActor ! "MakeRandomChildCommitSuicide"
Thread.sleep(1000)

oneForOneSupervisorActor ! "TellChildrenToSoundOff"
Thread.sleep(1000)

system.stop(oneForOneSupervisorActor)

Here is the output of running the full demo code

sending oneForOneSupervisorActor a ‘StartChildren’ message
OneForOneSupervisorActor: constructor
OneForOneSupervisorActor : got a message StartChildren
LifeCycleActor: constructor
LifeCycleActor: constructor
LifeCycleActor: constructor
LifeCycleActor: preStart
LifeCycleActor: preStart
LifeCycleActor: preStart
OneForOneSupervisorActor : got a message MakeRandomChildRestart
LifeCycleActor: RaiseRestartThrowableMessage seen
LifeCycleActor: preRestart
LifeCycleActor reason: LifeCycleActor raised AkkaRestartDirectiveException
LifeCycleActor message: RaiseRestartThrowableMessage
LifeCycleActor: postStop
LifeCycleActor: constructor
LifeCycleActor: postRestart
LifeCycleActor reason: LifeCycleActor raised AkkaRestartDirectiveException
LifeCycleActor: preStart
[ERROR] [07/29/2016 07:06:28.569] [SupervisionSystem-akka.actor.default-dispatcher-2]
[akka://SupervisionSystem/user/OneForOneSupervisorActor/lifecycleactor_2]
LifeCycleActor raised AkkaRestartDirectiveException
AkkaRestartDirectiveException: LifeCycleActor raised AkkaRestartDirectiveException
    at LifeCycleActor$$anonfun$receive$1.applyOrElse(LifeCycleActor.scala:36)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:484)
    at LifeCycleActor.aroundReceive(LifeCycleActor.scala:3)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
    at akka.actor.ActorCell.invoke(ActorCell.scala:495)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
    at akka.dispatch.Mailbox.run(Mailbox.scala:224)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

OneForOneSupervisorActor : got a message TellChildrenToSoundOff
LifeCycleActor: SoundOff seen
LifeCycleActor alive lifecycleactor_0
LifeCycleActor: SoundOff seen
LifeCycleActor alive lifecycleactor_2
LifeCycleActor: SoundOff seen
LifeCycleActor alive lifecycleactor_1
OneForOneSupervisorActor : got a message MakeRandomChildCommitSuicide
LifeCycleActor: RaiseStopThrowableMessage seen
[ERROR] [07/29/2016 07:06:30.548] [SupervisionSystem-akka.actor.default-dispatcher-2]
[akka://SupervisionSystem/user/OneForOneSupervisorActor/lifecycleactor_2]
LifeCycleActor raised AkkaStopDirectiveException
AkkaStopDirectiveException: LifeCycleActor raised AkkaStopDirectiveException
    at LifeCycleActor$$anonfun$receive$1.applyOrElse(LifeCycleActor.scala:31)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:484)
    at LifeCycleActor.aroundReceive(LifeCycleActor.scala:3)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
    at akka.actor.ActorCell.invoke(ActorCell.scala:495)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
    at akka.dispatch.Mailbox.run(Mailbox.scala:224)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

LifeCycleActor: postStop
OneForOneSupervisorActor : got a message TellChildrenToSoundOff
LifeCycleActor: SoundOff seen
LifeCycleActor: SoundOff seen
LifeCycleActor alive lifecycleactor_1
LifeCycleActor alive lifecycleactor_0
[INFO] [07/29/2016 07:06:31.553] [SupervisionSystem-akka.actor.default-dispatcher-4]
[akka://SupervisionSystem/user/OneForOneSupervisorActor/lifecycleactor_2] Message [java.lang.String]
from Actor[akka://SupervisionSystem/user/OneForOneSupervisorActor#-1082037253] to
Actor[akka://SupervisionSystem/user/OneForOneSupervisorActor/lifecycleactor_2#2123326382] was not delivered.
[1] dead letters encountered. This logging can be turned off or adjusted with configuration
settings ‘akka.log-dead-letters’ and ‘akka.log-dead-letters-during-shutdown’.
LifeCycleActor: postStop
LifeCycleActor: postStop

 

There are couple of things to note here

  • ONLY one of the child actors is restarted
  • ONLY one of the child actors is stopped, which is why we only see 2 actors out of 3 in the final “SoundOff” message sent to all the supervisors children

 

AllForOneCodeStrategy Example

Here is the code for the “AllForOne” supervisor actor.

There are a couple of things to not here.

  • We create child actors using the contextOf built in akka factors. This create children that are “supervised” by the actor whose context was used to create the child actors
  • We use a custom decider which uses the default akka decider as well
  • We use the “AllForOneStrategy” supervision strategy
import akka.actor.SupervisorStrategy.Directive
import akka.actor._

class AllForOneSupervisorActor extends Actor {
  println("AllForOneSupervisorActor: constructor")
  val lifeCycleChildrenActors = new Array[ActorRef](3)

  def receive = {
    case "StartChildren" =>
      println(s"AllForOneSupervisorActor : got a message StartChildren")

      for(i <- 0 to 2) {
        val child = context.actorOf(Props[LifeCycleActor], name = s"lifecycleactor_$i")
        lifeCycleChildrenActors(i) = child
      }

    case "MakeRandomChildCommitSuicide" =>
      println(s"AllForOneSupervisorActor : got a message MakeRandomChildCommitSuicide")
      lifeCycleChildrenActors(2) ! RaiseStopThrowableMessage

    case "MakeRandomChildRestart" =>
      println(s"AllForOneSupervisorActor : got a message MakeRandomChildRestart")
      lifeCycleChildrenActors(2) ! RaiseRestartThrowableMessage

    case "TellChildrenToSoundOff" =>
      println(s"AllForOneSupervisorActor : got a message TellChildrenToSoundOff")

      lifeCycleChildrenActors.foreach(x => x ! "SoundOff")
  }

  val decider: PartialFunction[Throwable, Directive] = {
    case _: AkkaStopDirectiveException => 
	akka.actor.SupervisorStrategy.stop
    case _: AkkaRestartDirectiveException => 
	akka.actor.SupervisorStrategy.restart
  }

  override def supervisorStrategy: SupervisorStrategy =
    AllForOneStrategy()(decider.orElse(
	SupervisorStrategy.defaultStrategy.decider))
}

When we run this one, we will fail a particular child, and ALL children will be effected by the supervision strategy thanks to the use of the “AllForOneStrategy”.

Let’s assume we have this demo code

val allForOneSupervisorActor = system.actorOf(Props[AllForOneSupervisorActor], name = "AllForOneSupervisorActor")
println("sending allForOneSupervisorActor a 'StartChildren' message")

allForOneSupervisorActor ! "StartChildren"
Thread.sleep(1000)

allForOneSupervisorActor ! "MakeRandomChildRestart"
Thread.sleep(1000)

allForOneSupervisorActor ! "TellChildrenToSoundOff"
Thread.sleep(1000)

allForOneSupervisorActor ! "MakeRandomChildCommitSuicide"
Thread.sleep(1000)

allForOneSupervisorActor ! "TellChildrenToSoundOff"
Thread.sleep(1000)

system.stop(allForOneSupervisorActor)

Here is the output of running the full demo code

sending allForOneSupervisorActor a ‘StartChildren’ message
AllForOneSupervisorActor: constructor
AllForOneSupervisorActor : got a message StartChildren
LifeCycleActor: constructor
LifeCycleActor: constructor
LifeCycleActor: constructor
LifeCycleActor: preStart
LifeCycleActor: preStart
LifeCycleActor: preStart
AllForOneSupervisorActor : got a message MakeRandomChildRestart
LifeCycleActor: RaiseRestartThrowableMessage seen
[ERROR] [07/29/2016 07:10:12.887] [SupervisionSystem-akka.actor.default-dispatcher-3]
[akka://SupervisionSystem/user/AllForOneSupervisorActor/lifecycleactor_2] LifeCycleActor raised AkkaRestartDirectiveException
AkkaRestartDirectiveException: LifeCycleActor raised AkkaRestartDirectiveException
    at LifeCycleActor$$anonfun$receive$1.applyOrElse(LifeCycleActor.scala:36)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:484)
    at LifeCycleActor.aroundReceive(LifeCycleActor.scala:3)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
    at akka.actor.ActorCell.invoke(ActorCell.scala:495)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
    at akka.dispatch.Mailbox.run(Mailbox.scala:224)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

LifeCycleActor: preRestart
LifeCycleActor: preRestart
LifeCycleActor: preRestart
LifeCycleActor reason: LifeCycleActor raised AkkaRestartDirectiveException
LifeCycleActor reason: LifeCycleActor raised AkkaRestartDirectiveException
LifeCycleActor reason: LifeCycleActor raised AkkaRestartDirectiveException
LifeCycleActor message:
LifeCycleActor message: RaiseRestartThrowableMessage
LifeCycleActor message:
LifeCycleActor: postStop
LifeCycleActor: postStop
LifeCycleActor: postStop
LifeCycleActor: constructor
LifeCycleActor: constructor
LifeCycleActor: constructor
LifeCycleActor: postRestart
LifeCycleActor: postRestart
LifeCycleActor reason: LifeCycleActor raised AkkaRestartDirectiveException
LifeCycleActor: preStart
LifeCycleActor: postRestart
LifeCycleActor reason: LifeCycleActor raised AkkaRestartDirectiveException
LifeCycleActor: preStart
LifeCycleActor reason: LifeCycleActor raised AkkaRestartDirectiveException
LifeCycleActor: preStart
AllForOneSupervisorActor : got a message TellChildrenToSoundOff
LifeCycleActor: SoundOff seen
LifeCycleActor: SoundOff seen
LifeCycleActor alive lifecycleactor_2
LifeCycleActor: SoundOff seen
LifeCycleActor alive lifecycleactor_0
LifeCycleActor alive lifecycleactor_1
AllForOneSupervisorActor : got a message MakeRandomChildCommitSuicide
LifeCycleActor: RaiseStopThrowableMessage seen
[ERROR] [07/29/2016 07:10:14.866] [SupervisionSystem-akka.actor.default-dispatcher-7]
[akka://SupervisionSystem/user/AllForOneSupervisorActor/lifecycleactor_2] LifeCycleActor raised AkkaStopDirectiveException
AkkaStopDirectiveException: LifeCycleActor raised AkkaStopDirectiveException
    at LifeCycleActor$$anonfun$receive$1.applyOrElse(LifeCycleActor.scala:31)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:484)
    at LifeCycleActor.aroundReceive(LifeCycleActor.scala:3)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
    at akka.actor.ActorCell.invoke(ActorCell.scala:495)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
    at akka.dispatch.Mailbox.run(Mailbox.scala:224)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

LifeCycleActor: postStop
LifeCycleActor: postStop
LifeCycleActor: postStop
AllForOneSupervisorActor : got a message TellChildrenToSoundOff
[INFO] [07/29/2016 07:10:15.873] [SupervisionSystem-akka.actor.default-dispatcher-7]
[akka://SupervisionSystem/user/AllForOneSupervisorActor/lifecycleactor_0] Message [java.lang.String]
from Actor[akka://SupervisionSystem/user/AllForOneSupervisorActor#1766273084]
to Actor[akka://SupervisionSystem/user/AllForOneSupervisorActor/lifecycleactor_0#1933756970]
was not delivered. [1] dead letters encountered. This logging can be turned off or
adjusted with configuration settings ‘akka.log-dead-letters’ and ‘akka.log-dead-letters-during-shutdown’.
[INFO] [07/29/2016 07:10:15.874] [SupervisionSystem-akka.actor.default-dispatcher-7]
[akka://SupervisionSystem/user/AllForOneSupervisorActor/lifecycleactor_1] Message [java.lang.String]
from Actor[akka://SupervisionSystem/user/AllForOneSupervisorActor#1766273084]
to Actor[akka://SupervisionSystem/user/AllForOneSupervisorActor/lifecycleactor_1#922057055]
was not delivered. [2] dead letters encountered. This logging can be turned off or
adjusted with configuration settings ‘akka.log-dead-letters’ and ‘akka.log-dead-letters-during-shutdown’.
[INFO] [07/29/2016 07:10:15.877] [SupervisionSystem-akka.actor.default-dispatcher-7]
[akka://SupervisionSystem/user/AllForOneSupervisorActor/lifecycleactor_2] Message [java.lang.String]
from Actor[akka://SupervisionSystem/user/AllForOneSupervisorActor#1766273084]
to Actor[akka://SupervisionSystem/user/AllForOneSupervisorActor/lifecycleactor_2#-2073614826]
was not delivered. [3] dead letters encountered. This logging can be turned off or
adjusted with configuration settings ‘akka.log-dead-letters’ and ‘akka.log-dead-letters-during-shutdown’.

There are couple of things to note here

  • ALL the child actors are restarted
  • ALL the child actors are stopped, which is why we only see 0 actors out of 3 in the final “SoundOff” message sent to all the supervisors children

 

 

 

Actor LifeCycle

There is no better way of understanding the Actor life cycles that Akka has then to examine this image taken from the Akka documentation.

The Akka documentation is actually very very good, its just there is a lot of it, and I am hoping this series of posts will be a bit lighter to digest, and shall concentrate on the most common parts of Akka usage. The official docs are obviously the place to go should you have a need for some deep Akka related question.

Anyway the Actor lifecycle is as follows:

 

image

This diagram is ace if you ask me, from here you can see exactly what happens when, and the different states, and what is available within each state.

There is not much more I can add to that diagram, we can however take some of the concepts in this diagram for a little spin.

Lets assume we have the following Actor code

import akka.actor.Actor

class LifeCycleActor extends Actor {
  println("LifeCycleActor: constructor")

  override def preStart { println("LifeCycleActor: preStart") }

  override def postStop { println("LifeCycleActor: postStop") }

  override def preRestart(reason: Throwable, message: Option[Any]) {
    println("LifeCycleActor: preRestart")
    println(s"LifeCycleActor reason: ${reason.getMessage}")
    println(s"LifeCycleActor message: ${message.getOrElse("")}")
    super.preRestart(reason, message)
  }
  override def postRestart(reason: Throwable) {
    println("LifeCycleActor: postRestart")
    println(s"LifeCycleActor reason: ${reason.getMessage}")
    super.postRestart(reason)
  }
  def receive = {
    case RestartMessage => throw new Exception("RestartMessage seen")
    case _ => println("LifeCycleActor : got a message")
  }
}

It can be seen that we have several overrides of the underlying Akka Actor implemented. Lets discuss each of them in turn

preStart(): Unit

This is called by the Akka system for us, and simple allows us to carry out any presStart activity that we may wish to perform

preStop(): Unit

This is called by the Akka system for us, and simple allows us to carry out any presStop activity that we may wish to perform

preRestart(reason: Throwable, message: Option[Any]): Unit

This is called by the Akka system for us, and allows your code to examine both the reason and the message for the restart

postRestart(reason: Throwable): Unit

This is called by the Akka system for us, and allows your code to examine both the reason for the restartThis is called by the Akka system for us, and allows your code to examine the reason for the restart

 

Now let us assume that we also have the following code to exercise the LifeCycleActor code above

import akka.actor._
import scala.language.postfixOps
import scala.io.StdIn

object Demo extends App {

  //create the actor system
  val system = ActorSystem("LifeCycleSystem")

  // default Actor constructor
  val lifeCycleActor = system.actorOf(Props[LifeCycleActor], name = "lifecycleactor")


  println("sending lifeCycleActor a number")
  lifeCycleActor ! 100
  Thread.sleep(1000)

  println("force restart")
  lifeCycleActor ! RestartMessage
  Thread.sleep(1000)

  println("stop lifeCycleActor")
  system.stop(lifeCycleActor)

  //shutdown the actor system
  system.terminate()

  StdIn.readLine()
}

Here is what you might expect to get printed out to the console output

“C:\Program Files\Java\jdk1.8.0_51\bin\java” -Didea.launcher.port=7532 “-Didea.launcher.bin.path=C:\Program Files (x86)\JetBrains\IntelliJ IDEA Community Edition 15.0.1\bin” -Dfile.encoding=UTF-8 -classpath “C:\Program Files\Java\jdk1.8.0_51\jre\lib\charsets.jar;C:\Program Files\Java\jdk1.8.0_51\jre\lib\deploy.jar;C:\Program Files\Java\jdk1.8.0_51\jre\lib\ext\access-bridge-64.jar;C:\Program Files\Java\jdk1.8.0_51\jre\lib\ext\cldrdata.jar;C:\Program Files\Java\jdk1.8.0_51\jre\lib\ext\dnsns.jar;C:\Program Files\Java\jdk1.8.0_51\jre\lib\ext\jaccess.jar;C:\Program Files\Java\jdk1.8.0_51\jre\lib\ext\jfxrt.jar;C:\Program Files\Java\jdk1.8.0_51\jre\lib\ext\localedata.jar;C:\Program Files\Java\jdk1.8.0_51\jre\lib\ext\nashorn.jar;C:\Program Files\Java\jdk1.8.0_51\jre\lib\ext\sunec.jar;C:\Program Files\Java\jdk1.8.0_51\jre\lib\ext\sunjce_provider.jar;C:\Program Files\Java\jdk1.8.0_51\jre\lib\ext\sunmscapi.jar;C:\Program Files\Java\jdk1.8.0_51\jre\lib\ext\sunpkcs11.jar;C:\Program Files\Java\jdk1.8.0_51\jre\lib\ext\zipfs.jar;C:\Program Files\Java\jdk1.8.0_51\jre\lib\javaws.jar;C:\Program Files\Java\jdk1.8.0_51\jre\lib\jce.jar;C:\Program Files\Java\jdk1.8.0_51\jre\lib\jfr.jar;C:\Program Files\Java\jdk1.8.0_51\jre\lib\jfxswt.jar;C:\Program Files\Java\jdk1.8.0_51\jre\lib\jsse.jar;C:\Program Files\Java\jdk1.8.0_51\jre\lib\management-agent.jar;C:\Program Files\Java\jdk1.8.0_51\jre\lib\plugin.jar;C:\Program Files\Java\jdk1.8.0_51\jre\lib\resources.jar;C:\Program Files\Java\jdk1.8.0_51\jre\lib\rt.jar;C:\Users\sacha\Desktop\SachaBarber.AkkaExamples\Lifecyles\target\scala-2.11\classes;C:\Users\sacha\.ivy2\cache\com.typesafe\config\bundles\config-1.3.0.jar;C:\Users\sacha\.ivy2\cache\com.typesafe.akka\akka-actor_2.11\jars\akka-actor_2.11-2.4.8.jar;C:\Users\sacha\.ivy2\cache\joda-time\joda-time\jars\joda-time-2.9.4.jar;C:\Users\sacha\.ivy2\cache\org.scala-lang\scala-library\jars\scala-library-2.11.8.jar;C:\Users\sacha\.ivy2\cache\org.scala-lang.modules\scala-java8-compat_2.11\bundles\scala-java8-compat_2.11-0.7.0.jar;C:\Program Files (x86)\JetBrains\IntelliJ IDEA Community Edition 15.0.1\lib\idea_rt.jar” com.intellij.rt.execution.application.AppMain Demo
sending lifeCycleActor a number
LifeCycleActor: constructor
LifeCycleActor: preStart
LifeCycleActor : got a message
force restart
LifeCycleActor: preRestart
LifeCycleActor reason: RestartMessage seen
LifeCycleActor message: RestartMessage
LifeCycleActor: postStop
[ERROR] [07/27/2016 07:21:08.232] [LifeCycleSystem-akka.actor.default-dispatcher-4] [akka://LifeCycleSystem/user/lifecycleactor] RestartMessage seen
java.lang.Exception: RestartMessage seen
    at LifeCycleActor$$anonfun$receive$1.applyOrElse(LifeCycleActor.scala:22)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:484)
    at LifeCycleActor.aroundReceive(LifeCycleActor.scala:3)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
    at akka.actor.ActorCell.invoke(ActorCell.scala:495)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
    at akka.dispatch.Mailbox.run(Mailbox.scala:224)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

LifeCycleActor: constructor
LifeCycleActor: postRestart
LifeCycleActor reason: RestartMessage seen
LifeCycleActor: preStart
stop lifeCycleActor
LifeCycleActor: postStop

 

As you can see you have all the pertinent information that you would need to make logic decisions on within the Actor.

 

Where Is The Code?

As previously stated all the code for this series will end up in this GitHub repo:

https://github.com/sachabarber/SachaBarber.AkkaExamples

akka : ‘hello world’

So last time I outlined the road map for this series of articles on Akka, and posted up some information from the Akka creators on what they had to say about Akka.

This time we will look at a simple example of Akka.

But just before we do that, let’s talk a bit more about WHY I think Akka is great stuff.

I have been a .NET programmer for many years and have seen asynchronous programming come in many different flavors now

  • Asynchronous delegates
  • BackgroundWorker
  • Task (TPL)
  • Async Await
  • RX
  • Concurrent collections
  • Critical sections (synchronized sections of code)

All of these pretty much have their JVM equivalent, and what you would have likely seen if you have used the .NET or JVM equivalents is the used of locks from time to time. For example under the covers the the concurrent collections would still use some locking (monitor enter/exit) to achieve the critical sections

This is all good stuff, and has got better to work with over the years, but there could be better more elegant lock free way of working with concurrent/parallel programming.

For me this is what Akka brings to the table. Instead of working with shared state that MUST be protected when writing multithreaded code, we simply don’t use any shared state and create dedicated micro sized bits of code that deal with one thing and one thing only. These are called “Actors”.

Actors do NOT share state instead they work independently of each other and rely on message passing. Where the message payload should give the actor either everything it needs to do it’s job (or at the very least enough information to perhaps look things up, say an Id such that Actor can look up the entity required by its Id field).

At no point will we be using locks within actors.

Ok so that is my mini-rant/intro over. Lets now carry on and have a look at what it takes to write some Akka code in Scala.

What Libs Do We Need?

As I mentioned in the introduction post I will be using Scala exclusively to do this series of posts. As such I shall also be using SBT to do the build side of things.

So for this post we are only showing how to use simple actors so we don’t have to pull in that many dependencies, we can keep things simple and use the following “build.sbt” file, where I am pulling in the following 2 dependencies

  • Basic Akka stuff
  • Joda time
name := "HelloWorld"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies ++= Seq(
  "com.typesafe.akka" % "akka-actor_2.11" % "2.4.8",
  "joda-time" % "joda-time" % "2.9.4")

NOTE : This SBT set of dependencies may grow in subsequent articles, but where it does require pulling in more Akka JARs I will show that when the time comes.

 

How Do We Create An Actor System?

To be to use the Akka actor system, we must first create an Akka system that is the fabric that all actors run under. This is easily achieved as follows:

object Demo extends App {

  //create the actor system
  val system = ActorSystem("HelloSystem")

  //---------------------------
  //   EXTRA STUFF WILL GO HERE
  //---------------------------


  //shutdown the actor system
  system.terminate()

  StdIn.readLine()
}

Note that you should ensure that the Akka system is also shutdown correctly.

The example I show here is a simple console type application, so my startup/shutdown logic is all in the same file, but in a real production app, things may be more complex (well they should be I would hope).

 

How Do We Create An Actor?

Now that we have an actor system, the next thing we need to do is create some actors that will live within the actor system. I used the word live as a ownership type of arrangement.

So how do we create an actor exactly?

Well luckily this to is dead simple we just need to inherit from Actor and provide an implementation for the receive method to handle the different messages that may be sent to the actor.

Recall that an actor works by receiving messages and acting upon them.

Here is what the skeleton code may look like:

import akka.actor.Actor

class HelloActor extends Actor {
  def receive = {
    //DO THE MESSAGE HANDLING HERE
  }
}

We will talk more about the receive method in a while, for now just know that you must implement this method for an Akka actor to work correctly

Difference Between Tell And Ask

Just before we get on to seeing the examples, lets just take a brief diversion where we talk about the difference between ask and tell.

When we ask (? method in scala) an actor we expect a response by way of a Future[T], this is an asynchronous operation.

When we tell (! method in scala) an actor something this is analogous to fire and forget (of course the receiving actor could send a response back to the initiating actor via a different message, but that’s a different story), this is an asynchronous operation that returns immediately.

Sender

We have not covered this ground yet but we will in one of the subsequent posts, but for now all you need to know is that when you are sending messages to an actor, you do so by a construct called actorRef which is a kind of like a handle to an actor.

There are special types of actorRef one such case being “sender” who is the initiator actorRef of the message being received (if we are talking from the context of the receiving actor). You will see “sender” used in the examples below.

How Do We Send A Message To An Actor?

Here is how we would send a message to an actor. This is a tell (!) so is fire and forget.

import akka.actor._
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.io.StdIn
import scala.util.{Success, Failure}
import ExecutionContext.Implicits.global

object Demo extends App {

  //create the actor system
  val system = ActorSystem("HelloSystem")

  // default Actor constructor
  val helloActor = system.actorOf(Props[HelloActor], name = "helloactor")

  //send some messages to the HelloActor (fire and forget)
  helloActor ! "hello"
  helloActor ! "tis a fine day for Akka"

  //shutdown the actor system
  system.terminate()

  StdIn.readLine()
}

Where we have the following HelloActor implementation

import akka.actor.Actor

class HelloActor extends Actor {
  def receive = {
    case "hello" => println("world")
    case _       => println("unknown message")
  }
}

See how for this simple actor we only ever deal with 2 things in the receive method

  • “Hello”
  • Anything else

It is consired good practice to ensure that you handle the correct messages and deal with unknown messages too

How Do We Wait For A Response From An Actor?

So we just saw a tell (fire and forget) but how about an ask. This is slightly harder but not much, we simply have to deal with the fact that a Future[T] will be the result of an ask. There are numerous ways of dealing with that, assuming we have the following actor

import akka.actor.Actor

class AskActor extends Actor {
  def receive = {
    case GetDateMessage => sender ! new org.joda.time.DateTime().toDate().toString()
    case _       => println("unknown message")
  }
}

Here are some examples of how to deal with the result of the ask

import akka.actor._
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.io.StdIn
import scala.util.{Success, Failure}
import ExecutionContext.Implicits.global

object Demo extends App {

  //create the actor system
  val system = ActorSystem("HelloSystem")

  // default Actor constructor
  val askActor = system.actorOf(Props[AskActor], name = "askactor")

  //send some messages to the AskActor, we want a response from it

  // (1) this is one way to "ask" another actor for information
  implicit val timeout = Timeout(5 seconds)
  val future1 = askActor ? GetDateMessage
  val result1 = Await.result(future1, timeout.duration).asInstanceOf[String]
  println(s"result1=$result1")

  // (2) a slightly different way to ask another actor for information
  val future2: Future[String] = ask(askActor, GetDateMessage).mapTo[String]
  val result2 = Await.result(future2, 5 second)
  println(s"result2=$result2")

  // (3) don't use blocking call at all, just use future callbacks
  val future3: Future[String] = ask(askActor, GetDateMessage).mapTo[String]
  future3 onComplete {
    case Success(result3) =>  println(s"result3=$result3")
    case Failure(t) => println("An error has occured: " + t.getMessage)
  }


  //shutdown the actor system
  system.terminate()

  StdIn.readLine()
}

Piping Futures

Another use case you may have is that you may want to use Future[T] internally within the actor code and send Future[T] around from actor to actor. Akka also supports this by the use of the pipe pattern, which you can use like this where we are piping the Future[List[Int]] back to the sender

import akka.actor._
import akka.pattern.pipe
import scala.concurrent.{ExecutionContext, Future}
import ExecutionContext.Implicits.global

class FutureResultActor extends Actor {
  def receive = {
    case GetIdsFromDatabase => {
      Future(List(1,2,3)).pipeTo(sender)
    }
    case _       => println("unknown message")
  }
}

Where the code that initiated the sending of the GetIdsFromDatabase message looks like this (the sender in the code above)

import akka.actor._
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.io.StdIn
import scala.util.{Success, Failure}
import ExecutionContext.Implicits.global

object Demo extends App {

  //create the actor system
  val system = ActorSystem("HelloSystem")

  // default Actor constructor
  val futureResultActor = system.actorOf(Props[FutureResultActor], name = "futureresultactor")

  //send some messages to the FutureResultActor, we expect a Future back from it
  val future4: Future[List[Int]] = ask(futureResultActor, GetIdsFromDatabase).mapTo[List[Int]]
  future4 onComplete {
    case Success(result4) =>  println(s"result4=$result4")
    case Failure(t) => println("An error has occured: " + t.getMessage)
  }


  //shutdown the actor system
  system.terminate()

  StdIn.readLine()
}

 

Where Is The Code?

As previously stated all the code for this series will end up in this GitHub repo:

https://github.com/sachabarber/SachaBarber.AkkaExamples

AkkA series

I have been idle for a while, which kind of irks me. Thing is I have not really been that idle, it’s just that I have not been doing that much in the .NET space of late. That is not to say I do not like .NET anymore, it’s just that I am spreading my time between .NET and the JVM which I am actually enjoying)

I thought the time has come for me to come out with a few posts of what I have been up to. So to that end I thought I would start out with a series of posts on Akka.

There will be quite a few parts to this mini series. And I will update this page as I bring new posts on line.

This is what I am planning on covering

What Is Akka

I can think of no better way to describe Akka then to screen scrape what the creators of Akka have to say about it. So here is what they say about it

We believe that writing correct distributed, concurrent, fault-tolerant and scalable applications is too hard. Most of the time it’s because we are using the wrong tools and the wrong level of abstraction. Akka is here to change that. Using the Actor Model we raise the abstraction level and provide a better platform to build scalable, resilient and responsive applications—see the Reactive Manifesto for more details. For fault-tolerance we adopt the “let it crash” model which the telecom industry has used with great success to build applications that self-heal and systems that never stop. Actors also provide the abstraction for transparent distribution and the basis for truly scalable and fault-tolerant applications.

Actors

Actors give you:

  • Simple and high-level abstractions for distribution, concurrency and parallelism.
  • Asynchronous, non-blocking and highly performant message-driven programming model.
  • Very lightweight event-driven processes (several million actors per GB of heap memory).

Fault Tolerance

  • Supervisor hierarchies with “let-it-crash” semantics.
  • Actor systems can span over multiple JVMs to provide truly fault-tolerant systems.
  • Excellent for writing highly fault-tolerant systems that self-heal and never stop.

Location Transparency

Everything in Akka is designed to work in a distributed environment: all interactions of actors use pure message passing and everything is asynchronous.

Persistence

State changes experience by an actor can optionally be persisted and replayed when the actor is started or restarted. This allows actors to recover their state, even after JVM crashes or when being migrated to another node.

 

So that is the 1000 mile view of what Akka is. You will get more familiar with it as we move through the series I hope.

What Form Will The Examples Take

As I am trying to improve my Scala to get it in line with what I can do in .NET, all examples will be based on

Where Can I Find The Code Examples?

I will be augmenting this GitHub repo with the example projects as I move through this series

https://github.com/sachabarber/SachaBarber.AkkaExamples

I hope you all enjoy the series

That’s all for now, Like I say I will update this page when more posts become available, which may be a while since I have a day job, 2 kids, one wife and a cat, and I like a drink too, and also enjoy my weekends.  So it happens when it happens folks