Akka : remoting

It has been a while since I wrote a post, the reason for this is actually this post.

I would consider remoting/clustering to be some of the more advanced stuff you could do with Akka. That said this and the next post will outline all of this good stuff for you, and by the end of this post I would hope to have demonstrated enough for you guys to go off and write Akka remoting/clustered apps.

I have decided to split the remoting and clustering stuff into 2 posts, to make it more focused and digestible. I think this is the right thing to do.

 

A Note About All The Demos In This Topic

I wanted the demos in this section to be as close to real life as possible. The official akka examples tend to have a single process. Which I personally think is quite confusing when you are trying to deal with quite hard concepts. As such I decided to go with multi process projects to demonstrate things. I do however only have 1 laptop, so they are hosted on the same node, but they are separate processes/JVMs.

I am hoping by doing this it will make the learning process easier, as it is closer to what you would do in real life rather than have 1 main method that spawns an entire cluster. You just would not have that in real life.

 

What Is Akka Remoting

If you have ever used RMI in Java or Remoting/WCF in C# you can kind of think of Akka remoting as something similar to that. Where there is the ability to call a remote objects method as is it were local. It is essentially peer-to-peer.

Obviously in Akkas case the remote object is actually an Actor, and you will not actually be calling a method at all,but will instead by treating the remote actor just like any other actor where you simply pass messages to it, and the remote actor will work just like any other actor where it will receive the message and act on it accordingly.

This is actually quite unique actually, I have work with Java Remoting and also C# Remoting, and done a lot with .NET WCF. What all of these had in common was that there was some code voodoo that you had to do, where the difference between working with a local object and working with a remote object required a fair bit of code, be it remoting channels, proxies etc etc

In Akka there is literally no change in coding style to work with remoting, it is completely configuration driven. This is quite nice.

Akkas Remoting Interaction Models

 Akka supports 2 ways of using remoting

  • Lookup : Where we use actorSelection to lookup an already running remote actor
  • Creation : Where an actor will be created on the remote node

We will be looking at both these approaches

Requirements

As I have stated on numerous occasions I have chosen to use SBT. As such this is my SBT file dependencies section for both these Remoting examples.

lazy val AllLibraryDependencies =
  Seq(
    "com.typesafe.akka" %% "akka-actor" % "2.4.8",
    "com.typesafe.akka" %% "akka-remote" % "2.4.8"
  )

It can be seen that the crucial dependency is akka-remote library

 

Remote Selection

As stated above “Remote Selection” will try and use actorSelection to look up a remote actor. The remote actor IS expected to be available and running.

In this example there will be 2 projects configured in the SBT file

  • Remote : The remote actor, that is expected to be running before the local actor tries to communicate with it
  • Local : The local actor that will call the remote

Here is the complete SBT file for this section

import sbt._
import sbt.Keys._


lazy val allResolvers = Seq(
  "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/",
  "Akka Snapshot Repository" at "http://repo.akka.io/snapshots/"
)

lazy val AllLibraryDependencies =
  Seq(
    "com.typesafe.akka" %% "akka-actor" % "2.4.8",
    "com.typesafe.akka" %% "akka-remote" % "2.4.8"
  )


lazy val commonSettings = Seq(
  name := "AkkaRemoting",
  version := "1.0",
  scalaVersion := "2.11.8",
  resolvers := allResolvers,
  libraryDependencies := AllLibraryDependencies
)


lazy val remote = (project in file("remote")).
  settings(commonSettings: _*).
  settings(
    // other settings
  )

lazy val local = (project in file("local")).
  settings(commonSettings: _*).
  settings(
    // other settings
  )



This simply creates 2 projects for us, Remote and Local.

Remote Project

Now that we have the relevant projects in place, lets talk about how we expose a remote actor for selection.

We must ensure that the remote actor is available for selection, which requires the use of an IP address and a port.

Here is how we do this

akka {
  actor {
    provider = "akka.remote.RemoteActorRefProvider"
  }
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "127.0.0.1"
      port = 4444
    }
    log-sent-messages = on
    log-received-messages = on
  }
}

Also note the akka.actor.provider is set to

akka.remote.RemoteActorRefProvider

Ok so now we have the ability to expose the remote actor on a IP address and port, lets have a look at the remote actor code.

Here is the remote actor itself

import akka.actor.Actor

class RemoteActor extends Actor {
  def receive = {
    case msg: String =>
      println(s"RemoteActor received message '$msg'")
      sender ! "Hello from the RemoteActor"
  }
}

Nothing too special, we simply receive a string message and send a response string message back to the sender (the local actor would be the sender in this case)

And here is the main method that drives the remote project

import akka.actor.{Props, ActorSystem}

import scala.io.StdIn

object RemoteDemo extends App  {
  val system = ActorSystem("RemoteDemoSystem")
  val remoteActor = system.actorOf(Props[RemoteActor], name = "RemoteActor")
  remoteActor ! "The RemoteActor is alive"
  StdIn.readLine()
  system.terminate()
  StdIn.readLine()
}

Again pretty standard stuff, no voodoo here

And that is all there is to the remote side of the “remote selection” remoting version. Lets now turn our attention to the local side.

Local Project

So far we have a remote actor which is configured to up and running at 127.0.0.1:4444.

We now need to open up the local side of things. This is done using the following configuration.Notice the port is a different port from the already in use 4444. Obviously if you host these actors on physically different boxes there would be nothing to stop you using port 4444 again, but from a sanity point of view, I find it is better to not do that.

akka {
  actor {
    provider = "akka.remote.RemoteActorRefProvider"
  }
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "127.0.0.1"
      port = 0
    }
  }
}

Plain Selection

We can make use of plain old actor selection to select any actor by a path of our chosing. Where we may use the resolveOne (if we expect to only match one actor, remember we can use wildcards so there are times we may match more than one) to give us a ActorRef.

context.actorSelection(path).resolveOne()

When we use resolveOne() we would get a Future[ActorRef] that we can use in any of the normal ways we would handle and work with Futute[T]. I have chosen to use a for comprehension to capture the result of the ActorRef of the more actor. I also monitor the remote actor using context.watch such that if it terminates we will see a Terminated message and can shutdown the local actor system.

We also make use of the become (see the state machines post for more info on that) to swap out the message loop for the local actor, so work differently once we have a remote ActorRef.

Once we have an ActorRef representing the remote actor it is pretty standard stuff where we just send messages to the remote actor ref using the ActorRef that represents it.

Here is the entire code for the plain actor selection approach of dealing with a remote actor.

import java.util.concurrent.atomic.AtomicInteger
import akka.actor.{Terminated, ActorRef, ReceiveTimeout, Actor}
import akka.util.Timeout
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global

case object Init

class LocalActorUsingPlainSelection extends Actor {

  val path = "akka.tcp://RemoteDemoSystem@127.0.0.1:4444/user/RemoteActor"
  val atomicInteger = new AtomicInteger();
  context.setReceiveTimeout(3 seconds)

  def receive = identifying

  def identifying: Receive = {
    case Init => {
      implicit val resolveTimeout = Timeout(5 seconds)
      for (ref : ActorRef <- context.actorSelection(path).resolveOne()) {
        println("Resolved remote actor ref using Selection")
        context.watch(ref)
        context.become(active(ref))
        context.setReceiveTimeout(Duration.Undefined)
        self ! Start
      }
    }
    case ReceiveTimeout => println("timeout")
  }

  def active(actor: ActorRef): Receive = {
    case Start =>
      actor ! "Hello from the LocalActorUsingPlainSelection"
    case msg: String =>
      println(s"LocalActorUsingPlainSelection received message: '$msg'")
      if (atomicInteger.get() < 5) {
        sender ! "Hello back to you"
        atomicInteger.getAndAdd(1)
      }
    case Terminated(`actor`) =>
      println("Receiver terminated")
      context.system.terminate()
  }
}

 

Using Identity Messages

Another approach that can be taken rather than relying on straight actor selection is by using some special Akka messages, namely Identify and ActorIdentity.

The idea is that we still use actorSelection for a given path, but rather than using resolveOne we sent the send the ActorSelection a special Identify message. The actor that was chosen by the ActorSelection should see this Identify message and should respond with a ActorIdentity message.

As this point the local actor can simply listen for ActorIdentity messages and when it sees one, it can test this messages correlationId to see if it matches the requested path, if it does you know that is the correct actor and you can then use the ActorRef of the ActorIdentity message.

As in the previous example we also make use of the become (see the state machines post for more info on that) to swap out the message loop for the local actor, so work differently once we have a remote ActorRef.

Once we have an ActorRef representing the remote actor it is pretty standard stuff where we just send messages to the remote actor ref using the ActorRef that represents it.

Here is the entire code for the Identity actor approach

import java.util.concurrent.atomic.AtomicInteger
import akka.actor._
import scala.concurrent.duration._


case object Start


class LocalActorUsingIdentity extends Actor {

  val path = "akka.tcp://RemoteDemoSystem@127.0.0.1:4444/user/RemoteActor"
  val atomicInteger = new AtomicInteger();
  context.setReceiveTimeout(3 seconds)
  sendIdentifyRequest()

  def receive = identifying

  def sendIdentifyRequest(): Unit =
    context.actorSelection(path) ! Identify(path)

  def identifying: Receive = {
    case identity : ActorIdentity =>
      if(identity.correlationId.equals(path)) {
        identity.ref match {
          case Some(remoteRef) => {
            context.watch(remoteRef)
            context.become(active(remoteRef))
            context.setReceiveTimeout(Duration.Undefined)
            self ! Start
          }
          case None => println(s"Remote actor not available: $path")
        }
      }
    case ReceiveTimeout => sendIdentifyRequest()
  }

  def active(actor: ActorRef): Receive = {
    case Start =>
      actor ! "Hello from the LocalActorUsingIdentity"
    case msg: String =>
      println(s"LocalActorUsingIdentity received message: '$msg'")
      if (atomicInteger.get() < 5) {
        sender ! "Hello back to you"
        atomicInteger.getAndAdd(1)
      }
    case Terminated(`actor`) =>
      println("Receiver terminated")
      context.system.terminate()
  }
}

How do I Run The Demo

You will need to ensure that you run the following 2 projects in this order:

  • Remote
  • Local

Once you run the 2 projects you should see some output like this

The Remote project output:

[INFO] [10/03/2016 07:02:54.282] [main] [akka.remote.Remoting] Starting remoting
[INFO] [10/03/2016 07:02:54.842] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://RemoteDemoSystem@127.0.0.1:4444]
[INFO] [10/03/2016 07:02:54.844] [main] [akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://RemoteDemoSystem@127.0.0.1:4444]
RemoteActor received message ‘The RemoteActor is alive’
[INFO] [10/03/2016 07:02:54.867] [RemoteDemoSystem-akka.actor.default-dispatcher-15] [akka://RemoteDemoSystem/deadLetters]
Message [java.lang.String] from Actor[akka://RemoteDemoSystem/user/RemoteActor#-109465353] to Actor[akka://RemoteDemoSystem/deadLetters]
was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings
‘akka.log-dead-letters’ and ‘akka.log-dead-letters-during-shutdown’.
RemoteActor received message ‘Hello from the LocalActorUsingPlainSelection’
RemoteActor received message ‘Hello back to you’
RemoteActor received message ‘Hello back to you’
RemoteActor received message ‘Hello back to you’
RemoteActor received message ‘Hello back to you’
RemoteActor received message ‘Hello back to you’

 

The Local project output:

[INFO] [10/03/2016 07:03:09.489] [main] [akka.remote.Remoting] Starting remoting
[INFO] [10/03/2016 07:03:09.961] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://LocalDemoSystem@127.0.0.1:64945]
[INFO] [10/03/2016 07:03:09.963] [main] [akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://LocalDemoSystem@127.0.0.1:64945]
Resolved remote actor ref using Selection
LocalActorUsingPlainSelection received message: ‘Hello from the RemoteActor’
LocalActorUsingPlainSelection received message: ‘Hello from the RemoteActor’
LocalActorUsingPlainSelection received message: ‘Hello from the RemoteActor’
LocalActorUsingPlainSelection received message: ‘Hello from the RemoteActor’
LocalActorUsingPlainSelection received message: ‘Hello from the RemoteActor’
LocalActorUsingPlainSelection received message: ‘Hello from the RemoteActor’

 

 

 

Remote Creation

Just as we did with remote selection, remote creation shall be split into a remote project and a local project, however since this time the local project must know about the type of the more actor to create it in the first place we introduce a common project which both the remote and local depend on.

In this example there will be 3 projects configured in the SBT file

  • Remote : The remote actor, that is expected to be created by the local actor
  • Common : The common files that both Local/Remote projects depend on
  • Local : The local actor that will create and call the remote actor

Here is the complete SBT file for this section

import sbt._
import sbt.Keys._


lazy val allResolvers = Seq(
  "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/",
  "Akka Snapshot Repository" at "http://repo.akka.io/snapshots/"
)

lazy val AllLibraryDependencies =
  Seq(
    "com.typesafe.akka" %% "akka-actor" % "2.4.8",
    "com.typesafe.akka" %% "akka-remote" % "2.4.8"
  )


lazy val commonSettings = Seq(
  version := "1.0",
  scalaVersion := "2.11.8",
  resolvers := allResolvers,
  libraryDependencies := AllLibraryDependencies
)


lazy val root =(project in file(".")).
  settings(commonSettings: _*).
  settings(
    name := "Base"
  )
  .aggregate(common, remote)
  .dependsOn(common, remote)

lazy val common = (project in file("common")).
  settings(commonSettings: _*).
  settings(
    name := "common"
  )

lazy val remote = (project in file("remote")).
  settings(commonSettings: _*).
  settings(
    name := "remote"
  )
  .aggregate(common)
  .dependsOn(common)

It can be seen that both the local/remote will aggregate/depend on the common project. This is standard SBT stuff so I will not go into that.

So now that we understand a bit more about the SBT side of things lets focus on the remote side of things.

This may seem odd since we are expecting the local actor to create the remote aren’t we?

Well yes we are but the remote actor system must still be available prior to start/deploy and actor in it via the local system.

So it still makes sense to examine the remote side of things first.

Remote Project

Now that we have the relevant projects in place, lets talk about how we expose a remote actor for creation.

We must ensure that the remote system is available for creation requests, which requires the use of an IP address and a port.

Here is how we do this

akka {

  actor {
    provider = "akka.remote.RemoteActorRefProvider"
  }

  remote {
    netty.tcp {
      hostname = "127.0.0.1",
      # LISTEN on tcp port 2552
      port=2552
    }
  }

}

I also mentioned that the remote actor system MUST be started to allow remote creation to work, as such the entire codebase for the remote end of thing (excluding the actor remote actor which gets created by the local side of thing) is shown below

import com.typesafe.config.ConfigFactory
import akka.actor.ActorSystem

object CalculatorApplication {

  def main(args: Array[String]): Unit = {
    startRemoteWorkerSystem()
  }

  def startRemoteWorkerSystem(): Unit = {
    ActorSystem("CalculatorWorkerSystem", ConfigFactory.load("calculator"))
    println("Started CalculatorWorkerSystem")
  }

}

All that is happening here is that the remote actor system gets created.

Local Project

Most of the hard work is done in this project. As it is the local side of things that is responsible for creating and deploying the remote actor in the remote actor system, before it can then make use of it.

Lets start with the deployment of the remote actor from the local side.

Firstly we need this configuration to allow this happen

akka {

  actor {
    provider = "akka.remote.RemoteActorRefProvider",
    deployment {
      "/creationActor/*" {
        remote = "akka.tcp://CalculatorWorkerSystem@127.0.0.1:2552"
      }
    }
  }

  remote {
    netty.tcp {
      hostname = "127.0.0.1",
      port=2554
    }
  }

}

If you look carefully at this configuration file and the one in the remote end you will see that the ip address/poprt/actor system name used within the deployment section all match. This is how the local actor system is able to create and deploy an actor to the remote actor system (which must be running prior to the local actor system trying to deploy a remote actor to it)

So now that we have seen this config, lets see how it is used by the local

import sample.remote.calculator.{Divide, Multiply, CreationActor}
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import scala.util.Random
import akka.actor.ActorSystem
import akka.actor.Props

object CreationApplication {

  def main(args: Array[String]): Unit = {
    startRemoteCreationSystem()
  }

  def startRemoteCreationSystem(): Unit = {
    val system =
      ActorSystem("CreationSystem", ConfigFactory.load("remotecreation"))
    val actor = system.actorOf(Props[CreationActor],
      name = "creationActor")

    println("Started CreationSystem")
    import system.dispatcher
    system.scheduler.schedule(1.second, 1.second) {
      if (Random.nextInt(100) % 2 == 0)
        actor ! Multiply(Random.nextInt(20), Random.nextInt(20))
      else
        actor ! Divide(Random.nextInt(10000), (Random.nextInt(99) + 1))
    }
  }

}

It can be seen that the first thing we try and do is try and create the remote actor (CreationActor) using the config above. If this all works we will end up with a CreationActor being created in the already running remote actor system. This CreationActor can then be used just like any other actor.

For completeness here is the code of the CreationActor

package sample.remote.calculator

import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Props

class CreationActor extends Actor {

  def receive = {
    case op: MathOp =>
      val calculator = context.actorOf(Props[CalculatorActor])
      calculator ! op
    case result: MathResult => result match {
      case MultiplicationResult(n1, n2, r) =>
        printf("Mul result: %d * %d = %d\n", n1, n2, r)
        context.stop(sender())
      case DivisionResult(n1, n2, r) =>
        printf("Div result: %.0f / %d = %.2f\n", n1, n2, r)
        context.stop(sender())
    }
  }
}
It can be seen that the CreationActor above also creates another actor called CalculatorActor which does the real work. Lets see the code for that one
package sample.remote.calculator

import akka.actor.Props
import akka.actor.Actor

class CalculatorActor extends Actor {
  def receive = {
    case Add(n1, n2) =>
      println("Calculating %d + %d".format(n1, n2))
      sender() ! AddResult(n1, n2, n1 + n2)
    case Subtract(n1, n2) =>
      println("Calculating %d - %d".format(n1, n2))
      sender() ! SubtractResult(n1, n2, n1 - n2)
    case Multiply(n1, n2) =>
      println("Calculating %d * %d".format(n1, n2))
      sender() ! MultiplicationResult(n1, n2, n1 * n2)
    case Divide(n1, n2) =>
      println("Calculating %.0f / %d".format(n1, n2))
      sender() ! DivisionResult(n1, n2, n1 / n2)
  }
}

Nothing special there really, its just a standard actor

So we now have a complete pipeline.

How do I Run The Demo

You will need to ensure that you run the following 2 projects in this order:

  • Remote
  • Root

Once you run the 2 projects you should see some output like this

The Remote project output:

[INFO] [10/03/2016 07:30:58.763] [main] [akka.remote.Remoting] Starting remoting
[INFO] [10/03/2016 07:30:59.235] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://CalculatorWorkerSystem@127.0.0.1:2552]
[INFO] [10/03/2016 07:30:59.237] [main] [akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://CalculatorWorkerSystem@127.0.0.1:2552]
Started CalculatorWorkerSystem
Calculating 6 * 15
[WARN] [10/03/2016 07:31:10.988] [CalculatorWorkerSystem-akka.remote.default-remote-dispatcher-6] [akka.serialization.Serialization(akka://CalculatorWorkerSystem)] Using the default Java serializer for class [sample.remote.calculator.MultiplicationResult] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting ‘akka.actor.warn-about-java-serializer-usage’
Calculating 1346 / 82
[WARN] [10/03/2016 07:31:11.586] [CalculatorWorkerSystem-akka.remote.default-remote-dispatcher-6] [akka.serialization.Serialization(akka://CalculatorWorkerSystem)] Using the default Java serializer for class [sample.remote.calculator.DivisionResult] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting ‘akka.actor.warn-about-java-serializer-usage’
Calculating 2417 / 31
Calculating 229 / 66
Calculating 9966 / 43
Calculating 4 * 12
Calculating 9 * 5
Calculating 1505 / 91

The Root project output:

[INFO] [10/03/2016 07:31:08.849] [main] [akka.remote.Remoting] Starting remoting
[INFO] [10/03/2016 07:31:09.470] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://CreationSystem@127.0.0.1:2554]
[INFO] [10/03/2016 07:31:09.472] [main] [akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://CreationSystem@127.0.0.1:2554]
Started CreationSystem
[WARN] [10/03/2016 07:31:10.808] [CreationSystem-akka.remote.default-remote-dispatcher-5] [akka.serialization.Serialization(akka://CreationSystem)] Using the default Java serializer for class [com.typesafe.config.impl.SimpleConfig] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting ‘akka.actor.warn-about-java-serializer-usage’
[WARN] [10/03/2016 07:31:10.848] [CreationSystem-akka.remote.default-remote-dispatcher-5] [akka.serialization.Serialization(akka://CreationSystem)] Using the default Java serializer for class [sample.remote.calculator.Multiply] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting ‘akka.actor.warn-about-java-serializer-usage’
Mul result: 6 * 15 = 90
[WARN] [10/03/2016 07:31:11.559] [CreationSystem-akka.remote.default-remote-dispatcher-6] [akka.serialization.Serialization(akka://CreationSystem)] Using the default Java serializer for class [sample.remote.calculator.Divide] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting ‘akka.actor.warn-about-java-serializer-usage’
Div result: 1346 / 82 = 16.41
Div result: 2417 / 31 = 77.97
Div result: 229 / 66 = 3.47
Div result: 9966 / 43 = 231.77
Mul result: 4 * 12 = 48
Mul result: 9 * 5 = 45
Div result: 1505 / 91 = 16.54
Mul result: 7 * 4 = 28
Div result: 1797 / 95 = 18.92
Mul result: 12 * 17 = 204
Div result: 2998 / 72 = 41.64
Div result: 1157 / 98 = 11.81
Div result: 1735 / 22 = 78.86
Mul result: 4 * 19 = 76
Div result: 6257 / 51 = 122.69
Mul result: 14 * 4 = 56
Div result: 587 / 27 = 21.74
Div result: 2528 / 98 = 25.80
Mul result: 9 * 16 = 144
Mul result: 13 * 10 = 130

 

 

 

 

Nat or Docker Considerations

Akka Remoting does not work transparently with Network Address Translation, Load Balancers, or in Docker containers. If this is your case you may need to further configure Akka as described here :

http://doc.akka.io/docs/akka/snapshot/scala/remoting.html#remote-configuration-nat

 

 

 

Where Can I Find The Code Examples?

I will be augmenting this GitHub repo with the example projects as I move through this series

https://github.com/sachabarber/SachaBarber.AkkaExamples

Advertisements

AKKA : TESTKIT

So the journey continues, we have covered a fair bit of ground, but there is still a long way to go yet, with many exciting features of Akka yet to cover.

Bit before we get on to some of the more advanced stuff, I thought it would be a good idea to take a small detour and look at how you can test Akka actor systems.

TestKit

Akka comes with a completely separate module for testing which you MUST include, this can be obtained using the following SBT settings

name := "Testing"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies ++= Seq(
  "com.typesafe.akka"   %%    "akka-actor"    %   "2.4.8",
  "com.typesafe.akka"   %%    "akka-testkit"  %   "2.4.8"   %   "test",
  "org.scalatest"       %%    "scalatest"     %   "2.2.5"   %   "test"
)

I have chosen to use scalatest, but you could use other popular testing frameworks such as specs2.

 

 

Testing Actors

The following sub sections will outline how Akka allows the testing of actors

 

The built in ‘testActor’

The official Akka testkit docs do a great job of explaining the testActor and why there is a need for a test actor.

Testing the business logic inside Actor classes can be divided into two parts: first, each atomic operation must work in isolation, then sequences of incoming events must be processed correctly, even in the presence of some possible variability in the ordering of events. The former is the primary use case for single-threaded unit testing, while the latter can only be verified in integration tests.

Normally, the ActorRef shields the underlying Actor instance from the outside, the only communications channel is the actor’s mailbox. This restriction is an impediment to unit testing, which led to the inception of the TestActorRef.

http://doc.akka.io/docs/akka/snapshot/scala/testing.html#Using_Multiple_Probe_Actors

It is by using this testActor that we are able to test the individual operations of an actor under test.

You essentially instantiate the TestActorRef passing it the real actor you would like to test. The test actor then allows you to send messages which are forwarded to the contained real actor that you are attempting to test.

CallingThreadDispatcher/TestActorRef

As Akka is an asynchronous beast by nature, and uses the concept of Dispatchers to conduct the dispatching of messages. We have also seen that the message loop (receive) can be replaced with become/unbecome, all of which contributes to the overall behviour of the actor being quite hard to test.

Akka comes with a special actor called TestActorRef.Which is a special actor that comes with the Akka TestKit.

It should come as no surprise that this TestActorRef aslo makes use of a Dispatcher. But what makes this actor more suited to testing is that it uses a specialized testing Dispatcher, which makes testing the asynchronous code easier to test.

The specialized dispatcher is called CallingThreadDispatcher. As the name suggests it uses the current thread to deal with the message dispatching.

This makes thing easier of that there is no doubt. As stated you don’t really need to do anything other than use the Akka TestKit TestActorRef.

 

Anatomy Of An Actor Testkit Test

This is what a basic skeleton looks like when using the Akka TestKit (please note this is for ScalaTest)

import akka.actor.{Props, ActorSystem}
import akka.util.Timeout
import org.scalatest._
import akka.testkit.{ImplicitSender, TestKit, TestActorRef}
import scala.concurrent.duration._
import scala.concurrent.Await
import akka.pattern.ask

import scala.util.Success


class HelloActorTests
  extends TestKit(ActorSystem("MySpec"))
  with ImplicitSender
  with WordSpecLike
  with BeforeAndAfterAll
  with Matchers {

  override def afterAll {
    TestKit.shutdownActorSystem(system)
  }



}


There are a couple of things to note there, so lets go through them

  • Extending the akka TestKit trait. allows us to get all the good pre-canned assertions that allow us to assert facts about our actors
  • Extending the akka ImplicitSender trait allow us to have an actual sender which would be set to the test suits self actor as the sender.

 

The Built In Assertions

The Akka TestKit comes with many useful assertions which you can see here

expectMsg[T](d: Duration, msg: T): T
expectMsgPF[T](d: Duration)(pf: PartialFunction[Any, T]): T
expectMsgClass[T](d: Duration, c: Class[T]): T
expectMsgType[T: Manifest](d: Duration)
expectMsgAnyOf[T](d: Duration, obj: T*): T
expectMsgAnyClassOf[T](d: Duration, obj: Class[_ <: T]*): T
expectMsgAllOf[T](d: Duration, obj: T*): Seq[T]
expectMsgAllClassOf[T](d: Duration, c: Class[_ <: T]*): Seq[T]
expectMsgAllConformingOf[T](d: Duration, c: Class[_ <: T]*): Seq[T]
expectNoMsg(d: Duration)
receiveN(n: Int, d: Duration): Seq[AnyRef]
fishForMessage(max: Duration, hint: String)
	(pf: PartialFunction[Any, Boolean]): Any
receiveOne(d: Duration): AnyRef
receiveWhile[T](max: Duration, idle: Duration, messages: Int)
	(pf: PartialFunction[Any, T]): Seq[T]
awaitCond(p: => Boolean, max: Duration, interval: Duration)
awaitAssert(a: => Any, max: Duration, interval: Duration)
ignoreMsg(pf: PartialFunction[AnyRef, Boolean])
within[T](min: FiniteDuration, max: FiniteDuration)
	(f: ⇒ T): T

You can read more about these at the official documentation: http://doc.akka.io/docs/akka/snapshot/scala/testing.html#Built-In_Assertions

 

Demo : HelloActor That We Will Test

Lets assume we have this actor which we would like to test for the next 3 points

import akka.actor.Actor


class HelloActor extends Actor {
  def receive = {
    case "hello" => sender ! "hello world"
    case _       => throw new IllegalArgumentException("bad juju")
  }
}

Sending Messages

So we have the HelloActor above, and we would like to send a message to it, and assert that the sender (the test suite testActor) gets a message back.

Here is how we might do that.

import akka.actor.{Props, ActorSystem}
import akka.util.Timeout
import org.scalatest._
import akka.testkit.{ImplicitSender, TestKit, TestActorRef}
import scala.concurrent.duration._
import scala.concurrent.Await
import akka.pattern.ask

import scala.util.Success


class HelloActorTests
  extends TestKit(ActorSystem("MySpec"))
  with ImplicitSender
  with WordSpecLike
  with BeforeAndAfterAll
  with Matchers {

  override def afterAll {
    TestKit.shutdownActorSystem(system)
  }

  "An HelloActor using implicit sender " must {
    "send back 'hello world'" in {
      val helloActor = system.actorOf(Props[HelloActor], name = "helloActor")
      helloActor ! "hello"
      expectMsg("hello world")
    }
  }
}


 

This is thanks to the fact that we used the ImplicitSender trait, and we used the available akka TestKit assertions

 

Expecting A Response

Another thing that is quite common is to expect a response from an actor that we have asked a result from using the Akka ask pattern (which returns a Future[T] to represent the eventual result)

Here is how we might write a test for this

import akka.actor.{Props, ActorSystem}
import akka.util.Timeout
import org.scalatest._
import akka.testkit.{ImplicitSender, TestKit, TestActorRef}
import scala.concurrent.duration._
import scala.concurrent.Await
import akka.pattern.ask

import scala.util.Success


class HelloActorTests
  extends TestKit(ActorSystem("MySpec"))
  with ImplicitSender
  with WordSpecLike
  with BeforeAndAfterAll
  with Matchers {

  override def afterAll {
    TestKit.shutdownActorSystem(system)
  }

  "An HelloActor using TestActorRef " must {
    "send back 'hello world' when asked" in {
      implicit val timeout = Timeout(5 seconds)
      val helloActorRef = TestActorRef(new HelloActor)
      val future = helloActorRef ? "hello"
      val Success(result: String) = future.value.get
      result should be("hello world")
    }
  }
}

It can be seen that we make use of the TestActorRef (which we discussed above the one that uses the CallingThreadDispatcher), as the actor that we use to wrap (for want of a better word) the actual actor we wish to test.

 

Expecting Exceptions

Another completely plausible thing to want to do is test for exceptions that may be thrown. It can be seen in the HelloActor that we are trying to test that it will throw an IllegalArgumentException should it see a message it doesn’t handle.

So how do we test that?

We use the inbuilt intercept function to allow us to catch the exception

import akka.actor.{Props, ActorSystem}
import akka.util.Timeout
import org.scalatest._
import akka.testkit.{ImplicitSender, TestKit, TestActorRef}
import scala.concurrent.duration._
import scala.concurrent.Await
import akka.pattern.ask

import scala.util.Success


class HelloActorTests
  extends TestKit(ActorSystem("MySpec"))
  with ImplicitSender
  with WordSpecLike
  with BeforeAndAfterAll
  with Matchers {

  override def afterAll {
    TestKit.shutdownActorSystem(system)
  }

  "An HelloActor using TestActorRef " must {
    "should throw IllegalArgumentException when sent unhandled message" in {
      val actorRef = TestActorRef(new HelloActor)
      intercept[IllegalArgumentException] { actorRef.receive("should blow up") }
    }
  }
}


 

 

Testing Finite State Machines

Last time we looked at implementing finite state machines in Akka. One of the methods we use for that was using Akka FSM. Lets examine how we can test those using the TestKit.

Lets assume we have this simple AkkaFSM example (based on the LightSwitch demo from the last article)

import akka.actor.{Actor, ActorRef, FSM}
import scala.concurrent.duration._
import scala.collection._

// received events
final case class PowerOn()
final case class PowerOff()

// states
sealed trait LightSwitchState
case object On extends LightSwitchState
case object Off extends LightSwitchState

//data
sealed trait LightSwitchData
case object NoData extends LightSwitchData

class LightSwitchActor extends FSM[LightSwitchState, LightSwitchData] {

  startWith(Off, NoData)

  when(Off) {
    case Event(PowerOn, _) =>
      goto(On) using NoData
  }

  when(On, stateTimeout = 1 second) {
    case Event(PowerOff, _) =>
      goto(Off) using NoData
    case Event(StateTimeout, _) =>
      println("'On' state timed out, moving to 'Off'")
      goto(Off) using NoData
  }

  whenUnhandled {
    case Event(e, s) =>
      log.warning("received unhandled request {} in state {}/{}", e, stateName, s)
      goto(Off) using NoData
  }

  onTransition {
    case Off -> On => println("Moved from Off to On")
    case On -> Off => println("Moved from On to Off")
  }

  initialize()
}

This example is fairly good as it only has 2 states On/Off. So it makes for quite a good simply example to showcase the testing.

Another Special Test Actor

To test FSMs there is yet another specialized actor which may only be used for testing FSMs. This actor is call TestFSMRef. Just like the TestActorRef you use the TestFSMRef to accept the actual FSM actor you are trying to test.

Here is an example of that

 val fsm = TestFSMRef(new LightSwitchActor())

The TestFSMRef comes with a whole host of useful methods, properties that can be used when testing FSMs. We will see some of them used below.

Testing Initial State

As we saw last time Akka FSM has the idea of an initialise() method that may be used to place the FSM in an initial state. So we should be able to test that.

Here is how we can do that

import akka.actor.{ActorSystem, Props}
import akka.pattern.ask
import akka.testkit.{ImplicitSender, TestActorRef, TestKit}
import akka.util.Timeout
import org.scalatest._
import akka.testkit.TestFSMRef
import akka.actor.FSM

import scala.concurrent.duration._
import scala.util.Success


class LightSwitchFSMActorTests
  extends TestKit(ActorSystem("MySpec"))
  with ImplicitSender
  with WordSpecLike
  with BeforeAndAfterAll
  with Matchers {

  override def afterAll {
    TestKit.shutdownActorSystem(system)
  }

  "An LightSwitchActor " must {
    "start in the 'Off' state" in {
      val fsm = TestFSMRef(new LightSwitchActor())
      assert(fsm.stateName == Off)
      assert(fsm.stateData == NoData)
    }
  }
}


Testing State Move

The TestFSMRef comes with the ability to move the underlying FSM actor into a new state using the setState method. Here is an example of how to use that:

import akka.actor.{ActorSystem, Props}
import akka.pattern.ask
import akka.testkit.{ImplicitSender, TestActorRef, TestKit}
import akka.util.Timeout
import org.scalatest._
import akka.testkit.TestFSMRef
import akka.actor.FSM

import scala.concurrent.duration._
import scala.util.Success


class LightSwitchFSMActorTests
  extends TestKit(ActorSystem("MySpec"))
  with ImplicitSender
  with WordSpecLike
  with BeforeAndAfterAll
  with Matchers {

  override def afterAll {
    TestKit.shutdownActorSystem(system)
  }

  "An LightSwitchActor that starts with 'Off' " must {
    "should transition to 'On' when told to by the test" in {
      val fsm = TestFSMRef(new LightSwitchActor())
      fsm.setState(stateName = On)
      assert(fsm.stateName == On)
      assert(fsm.stateData == NoData)
    }
  }
}


You can of course still send messages to the TestFSMRef which would instruct the underlying FSM actor to move to a new state.

import akka.actor.{ActorSystem, Props}
import akka.pattern.ask
import akka.testkit.{ImplicitSender, TestActorRef, TestKit}
import akka.util.Timeout
import org.scalatest._
import akka.testkit.TestFSMRef
import akka.actor.FSM

import scala.concurrent.duration._
import scala.util.Success


class LightSwitchFSMActorTests
  extends TestKit(ActorSystem("MySpec"))
  with ImplicitSender
  with WordSpecLike
  with BeforeAndAfterAll
  with Matchers {

  override def afterAll {
    TestKit.shutdownActorSystem(system)
  }

  "An LightSwitchActor that starts with 'Off' " must {
    "should transition to 'On' when sent a 'PowerOn' message" in {
      val fsm = TestFSMRef(new LightSwitchActor())
      fsm ! PowerOn
      assert(fsm.stateName == On)
      assert(fsm.stateData == NoData)
    }
  }
}


 
Testing StateTimeout

Another thing that AkkaFSM supports is the notion of a StateTimeout. In the example FSM we are trying to test, if the FSM stays in the On state for more than 1 second it should automatically move to the Off state.

So how do we test that?

Here is how:

import akka.actor.{ActorSystem, Props}
import akka.pattern.ask
import akka.testkit.{ImplicitSender, TestActorRef, TestKit}
import akka.util.Timeout
import org.scalatest._
import akka.testkit.TestFSMRef
import akka.actor.FSM

import scala.concurrent.duration._
import scala.util.Success


class LightSwitchFSMActorTests
  extends TestKit(ActorSystem("MySpec"))
  with ImplicitSender
  with WordSpecLike
  with BeforeAndAfterAll
  with Matchers {

  override def afterAll {
    TestKit.shutdownActorSystem(system)
  }

  "An LightSwitchActor that stays 'On' for more than 1 second " must {
    "should transition to 'Off' thanks to the StateTimeout" in {
      val fsm = TestFSMRef(new LightSwitchActor())
      fsm ! PowerOn
      awaitCond(fsm.stateName == Off, 1200 milliseconds, 100 milliseconds)
    }
  }
}


 

Testing Using Probes

So far we have been looking at testing a single actor that might reply to a single sender. Sometimes though we may need to test an enture suite of actors all working together. And due to the single threaded nature of the TestActorRef (thanks to the very useful CurrentThreadDispatcher), we may find it difficult to distinguish the incoming messages read.

Akka TestKit provides yet another abstraction to deal with this, which is the idea of a concrete actor that you inject into the message flow. This concept is called a TestProbe.

Lets assume we have this actor that replies to 2 actorRef.

import akka.actor.{ActorRef, Actor}

class DoubleSenderActor extends Actor {
  var dest1: ActorRef = _
  var dest2: ActorRef = _
  def receive = {
    case (d1: ActorRef, d2: ActorRef) =>
      dest1 = d1
      dest2 = d2
    case x =>
      dest1 ! x
      dest2 ! x
  }
}

We could test this code as follows using the TestProbe.

import akka.actor.{ActorSystem, Props}
import akka.pattern.ask
import akka.testkit.{TestProbe, ImplicitSender, TestActorRef, TestKit}
import akka.util.Timeout
import org.scalatest._

import scala.concurrent.duration._
import scala.util.Success


class DoubleSenderActorTestsUsingProbe
  extends TestKit(ActorSystem("MySpec"))
  with ImplicitSender
  with WordSpecLike
  with BeforeAndAfterAll
  with Matchers {

  override def afterAll {
    TestKit.shutdownActorSystem(system)
  }

  "An DoubleSenderActor that has 2 target ActorRef for sending messages to " must {
    "should send messages to both supplied 'TestProbe(s)'" in {
      val doubleSenderActor = system.actorOf(Props[DoubleSenderActor],
        name = "multiSenderActor")
      val probe1 = TestProbe()
      val probe2 = TestProbe()
      doubleSenderActor ! ((probe1.ref, probe2.ref))
      doubleSenderActor ! "hello"
      probe1.expectMsg(500 millis, "hello")
      probe2.expectMsg(500 millis, "hello")
    }
  }
}


It can be seen that the TestProbe comes with its own set of useful assertion methods. This is due to the fact that TestProbe inherits from the TestKit trait, and as such you can expect to find ALL the TestKit traits assertions available to use when using TestProbe objects.

 

 

Where Can I Find The Code Examples?

I will be augmenting this GitHub repo with the example projects as I move through this series

https://github.com/sachabarber/SachaBarber.AkkaExamples

akka : state machines

In this post we will look at 2 ways you can write state machines with Akka. We will firstly examine the more primitive (but easily understandable) approach, and then look into the more sophisticated approach offered by AkkaFSM.

What Is A State Machine?

For those of you out there that do not know what a state machine is.

This is what Wikipedia says about them

A finite-state machine (FSM) or finite-state automaton (FSA, plural: automata), or simply a state machine, is a mathematical model of computation used to design both computer programs and sequential logic circuits. It is conceived as an abstract machine that can be in one of a finite number of states. The machine is in only one state at a time; the state it is in at any given time is called the current state. It can change from one state to another when initiated by a triggering event or condition; this is called a transition. A particular FSM is defined by a list of its states, and the triggering condition for each transition.

https://en.wikipedia.org/wiki/Finite-state_machine

This could be an example state machine for a coin operated barrier

Akka supports swapping the standard message loop using become which is available via the context where  this is the standard signature of receive (the message loop)

PartialFunction[Any, Unit]

The newly applied message loops are maintained in a stack and may be pushed popped

Become

There are different ways of swapping out the message loop. Here is one such example

import akka.actor.Actor

class HotColdStateActor extends Actor {

  //need this for become/unbecome
  import context._

  def cold: Receive = {
    case "snow" => println("I am already cold!")
    case "sun" => becomeHot
  }

  def hot: Receive = {
    case "sun" => println("I am already hot!")
    case "snow" => becomeCold
  }

  def receive = {
    case "snow" => becomeCold
    case "sun" => becomeHot
  }


  private def becomeCold: Unit = {
    println("becoming cold")
    become(cold)
  }

  private def becomeHot: Unit = {
    println("becoming hot")
    become(hot)
  }
}

With this example we simply use become to push a new message loop, the latest become code is the current message loop.

If we use the following demo code against this actor code

import akka.actor._
import scala.language.postfixOps
import scala.io.StdIn

object Demo extends App {

  RunBecomeUnbecomeStateDemo

  def RunHotColdStateDemo : Unit = {
    //create the actor system
    val system = ActorSystem("StateMachineSystem")

    val hotColdStateActor =
      system.actorOf(Props(classOf[HotColdStateActor]),
        "demo-HotColdStateActor")


    println("sending sun")
    hotColdStateActor ! "sun"
    //actors are async, so give it chance to get message
    //obviously we would not do this in prod code, its just
    //for the demo, to get the correct ordering for the print
    //statements
    Thread.sleep(1000)

    println("sending sun")
    hotColdStateActor ! "sun"
    Thread.sleep(1000)

    println("sending snow")
    hotColdStateActor ! "snow"
    Thread.sleep(1000)

    println("sending snow")
    hotColdStateActor ! "snow"
    Thread.sleep(1000)

    println("sending sun")
    hotColdStateActor ! "sun"
    Thread.sleep(1000)

    println("sending snow")
    hotColdStateActor ! "snow"
    Thread.sleep(1000)

    StdIn.readLine()

    //shutdown the actor system
    system.terminate()

    StdIn.readLine()
  }

}

We would see output like this

image

 

UnBecome

The other way to swap out the message loop relies on having matching pairs of become/unbecome. Where the standard message loop is not replaced as such, but will use the last value on a stack of values as the message loop.

Care must taken to ensure the amount of push (become) and pop (unbecome) operations match, otherwise memory leaks may occur. Which is why this is not the default behavior.

Here is an example actor that uses the become/unbecome matched operations.

import akka.actor.Actor

class BecomeUnbecomeStateActor extends Actor {

  //need this for become/unbecome
  import context._

  def receive = {
    case "snow" =>
      println("saw snow, becoming")
      become({
        case "sun" =>
          println("saw sun, unbecoming")
          unbecome() // resets the latest 'become' (just for fun)
        case _ => println("Unknown message, state only likes sun")
      }, discardOld = false) // push on top instead of replace

    case _ => println("Unknown message, state only likes snow")
  }



}

I personally think this is harder to read, and manage.

Here is some demo code to exercise this actor:

import akka.actor._
import scala.language.postfixOps
import scala.io.StdIn

object Demo extends App {

  RunBecomeUnbecomeStateDemo


  def RunBecomeUnbecomeStateDemo : Unit = {
    //create the actor system
    val system = ActorSystem("StateMachineSystem")

    val becomeUnbecomeStateActor =
      system.actorOf(Props(classOf[BecomeUnbecomeStateActor]),
        "demo-BecomeUnbecomeStateActor")

    println("Sending snow")
    becomeUnbecomeStateActor ! "snow"
    //actors are async, so give it chance to get message
    Thread.sleep(1000)

    println("Sending snow")
    becomeUnbecomeStateActor ! "snow"
    Thread.sleep(1000)

    println("Sending sun")
    becomeUnbecomeStateActor ! "sun"
    Thread.sleep(1000)

    println("Sending sun")
    becomeUnbecomeStateActor ! "sun"
    Thread.sleep(1000)

    println("Sending snow")
    becomeUnbecomeStateActor ! "snow"
    Thread.sleep(1000)

    StdIn.readLine()

    //shutdown the actor system
    system.terminate()

    StdIn.readLine()
  }

}

Which when run should yield the following results

image

 

AkkaFSM

While become/unbecome will get the job done. Akka comes with a much better alternative called Akka FSM

Using Akka FSM you can not only handle states but also have state data, and add code against the movement from one state to the next, which as most people will know are known as “transitions”.

When using Akka FSM you may see a state machine expressed using this sort of notation

State(S) x Event(E) -> Actions (A), State(S’)

f we are in state S and the event E occurs, we should perform the actions A and make a transition to the state S’.

To use Akka FSM there are a number of things you can do. Some of them are mandatory and some you can opt into depending on your requirements. Lets have a look at some of the moving parts that make Akka FSM shine.

FSM[S,D]

In order to use Akka FSM you need to mixin the FSM trait. The trait itself looks like this

trait FSM[S, D]

 

Where S is the state type, and D is the data type.

startWith

You can choose what state the FSM starts in by using the startWith method which has the following signature.

startWith(stateName: S, stateData: D, timeout: Timeout = None): Unit

initialize

Calling initialilze() performs the transition into the initial state and sets up timers (if required).

when

when is used to match the state, and is also used to control the movement to a new state using the inbuilt goto method, or possibly stay in the current state.

When uses pattern matching to match the events that a particular state can handle. As stated it is completely valid to stay in the current state or move to a new state

The examples that follow below will show you both stay and goto in action.

whenUnhandled

You should obviously try and make sure you cover all the correct state movements in response to all the events your FSM knows about. But Akka FSM also comes with the whenUnhandled method for catching events that were not handled by YOUR state handling (when) logic.

onTransition

You may also monitor the movement from one state to the next and run some code when this occurs. This is accomplished using the onTransition method.

onTransition has the following signature

nTransition(transitionHandler: TransitionHandler): Unit 

Where TransitionHandler is really just a PartialFunction that has the following generic parameters

PartialFunction[(S, S), Unit]

Where the tuple is a tuple of “from state” to “to state”.

 

Time for an example

 

Lightswitch Example

This first example is a very simply FSM, that has 2 states, On and Off. It doesn’t really need any state, however the Akka FSM trait, always needs a Data object. So in this case we simple use a base trait for the Data which we don’t really care about.

The idea behind this example is that the lightswitch can move from Off –> On, and On –> Off.

This example also shows a stateTimeout in action, where by the On state will move from On, if left in that state for more than 1 second.

Here is the full code for this example

import akka.actor.{Actor, ActorRef, FSM}
import scala.concurrent.duration._
import scala.collection._

// received events
final case class PowerOn()
final case class PowerOff()

// states
sealed trait LightSwitchState
case object On extends LightSwitchState
case object Off extends LightSwitchState

//data
sealed trait LightSwitchData
case object NoData extends LightSwitchData

class LightSwitchActor extends FSM[LightSwitchState, LightSwitchData] {

  startWith(Off, NoData)

  when(Off) {
    case Event(PowerOn, _) =>
      goto(On) using NoData
  }

  when(On, stateTimeout = 1 second) {
    case Event(PowerOff, _) =>
      goto(Off) using NoData
    case Event(StateTimeout, _) =>
      println("'On' state timed out, moving to 'Off'")
      goto(Off) using NoData
  }

  whenUnhandled {
    case Event(e, s) =>
      log.warning("received unhandled request {} in state {}/{}", e, stateName, s)
      goto(Off) using NoData
  }

  onTransition {
    case Off -> On => println("Moved from Off to On")
    case On -> Off => println("Moved from On to Off")
  }

  initialize()
}

Which we can run using this demo code.

import akka.actor._
import scala.language.postfixOps
import scala.io.StdIn

object Demo extends App {

  RunLightSwitchDemo

  def RunLightSwitchDemo : Unit = {
    //create the actor system
    val system = ActorSystem("StateMachineSystem")

    val lightSwitchActor =
      system.actorOf(Props(classOf[LightSwitchActor]),
        "demo-LightSwitch")


    println("sending PowerOff, should be off already")
    lightSwitchActor ! PowerOff
    //akka is async allow it some time to pick up message
    //from its mailbox
    Thread.sleep(500)


    println("sending PowerOn")
    lightSwitchActor ! PowerOn
    //akka is async allow it some time to pick up message
    //from its mailbox
    Thread.sleep(500)

    println("sending PowerOff")
    lightSwitchActor ! PowerOff
    //akka is async allow it some time to pick up message
    //from its mailbox
    Thread.sleep(500)


    println("sending PowerOn")
    lightSwitchActor ! PowerOn
    //akka is async allow it some time to pick up message
    //from its mailbox
    Thread.sleep(500)

    println("sleep for a while to allow 'On' state to timeout")
    Thread.sleep(2000)

    StdIn.readLine()

    //shutdown the actor system
    system.terminate()

    StdIn.readLine()
  }



}

When run we get the following results

sending PowerOff, should be off already
[WARN] [09/06/2016 07:21:28.864] [StateMachineSystem-akka.actor.default-dispatcher-4]
  [akka://StateMachineSystem/user/demo-LightSwitch] received unhandled request PowerOff in state Off/NoData
sending PowerOn
Moved from Off to On
sending PowerOff
Moved from On to Off
sending PowerOn
Moved from Off to On
sleep for a while to allow ‘On’ state to timeout
‘On’ state timed out, moving to ‘Off’
Moved from On to Off

There is something interesting here, which is that we see an Unhandled event. Why is this?

Well this is due to the fact that this demo FSM starts in the Off state, and we send a PowerOff event. This is not handled in the when for the Off state.

We could fix this, by amending this as follows:

when(Off) {
  case Event(PowerOn, _) =>
    goto(On) using NoData
  case Event(PowerOff, _) =>
    println("already off")
    stay
}

So if we apply this amended code, and run the demo again. We would now see this output instead

sending PowerOff, should be off already
already off
sending PowerOn
Moved from Off to On
sending PowerOff
Moved from On to Off
sending PowerOn
Moved from Off to On
sleep for a while to allow ‘On’ state to timeout
‘On’ state timed out, moving to ‘Off’
Moved from On to Off

 

Buncher Example

I have basically taken this one straight from the official Akka FSM docs.

This example shall receive and queue messages while they arrive in a burst and send them on to another target actor after the burst ended or a flush request is received.

Here is the full code for this example. It is a slightly fuller example, so this time we use a proper set of data objects for the states.

import akka.actor.{Actor, ActorRef, FSM}
import scala.concurrent.duration._
import scala.collection._

// received events
final case class SetTarget(ref: ActorRef)
final case class Queue(obj: Any)
case object Flush

// sent events
final case class Batch(obj: immutable.Seq[Any])

// states
sealed trait State
case object Idle extends State
case object Active extends State

//data
sealed trait BuncherData
case object Uninitialized extends BuncherData
final case class Todo(target: ActorRef, queue: immutable.Seq[Any]) extends BuncherData

class BuncherActor extends FSM[State, BuncherData] {

  startWith(Idle, Uninitialized)

  when(Idle) {
    case Event(SetTarget(ref), Uninitialized) =>
      stay using Todo(ref, Vector.empty)
  }

  when(Active, stateTimeout = 1 second) {
    case Event(Flush | StateTimeout, t: Todo) =>
      goto(Idle) using t.copy(queue = Vector.empty)
  }

  whenUnhandled {
    // common code for both states
    case Event(Queue(obj), t @ Todo(_, v)) =>
      goto(Active) using t.copy(queue = v :+ obj)

    case Event(e, s) =>
      log.warning("received unhandled request {} in state {}/{}", e, stateName, s)
      stay
  }

  onTransition {
    case Active -> Idle =>
      stateData match {
        case Todo(ref, queue) => ref ! Batch(queue)
        case _                => // nothing to do
      }
  }

  initialize()
}


class BunchReceivingActor extends Actor {
  def receive = {
    case Batch(theBatchData) => {
      println(s"receiving the batch data $theBatchData")
    }
    case _ => println("unknown message")
  }
}

Which we can run using this demo code

import akka.actor._
import scala.language.postfixOps
import scala.io.StdIn

object Demo extends App {

  RunBuncherDemo

  def RunBuncherDemo : Unit = {
    //create the actor system
    val system = ActorSystem("StateMachineSystem")

    val buncherActor =
      system.actorOf(Props(classOf[BuncherActor]),
        "demo-Buncher")

    val bunchReceivingActor =
      system.actorOf(Props(classOf[BunchReceivingActor]),
        "demo-BunchReceiving")

    buncherActor ! SetTarget(bunchReceivingActor)

    println("sending Queue(42)")
    buncherActor ! Queue(42)
    println("sending Queue(43)")
    buncherActor ! Queue(43)
    println("sending Queue(44)")
    buncherActor ! Queue(44)
    println("sending Flush")
    buncherActor ! Flush
    println("sending Queue(45)")
    buncherActor ! Queue(45)


    StdIn.readLine()

    //shutdown the actor system
    system.terminate()

    StdIn.readLine()
  }
}

When this demo runs you should see something like this

sending Queue(42)
sending Queue(43)
sending Queue(44)
sending Flush
sending Queue(45)
receiving the batch data Vector(42, 43, 44)
receiving the batch data Vector(45)

 

 

Where Can I Find The Code Examples?

I will be augmenting this GitHub repo with the example projects as I move through this series

https://github.com/sachabarber/SachaBarber.AkkaExamples

akka : persistent actors

In this post we will look at some of the core concepts around persistent actors. Which may seem strange since so far I have been saying that Akka is great cos it is lock free and each actor is stateless.

Truth is there are times where some sort immutable state, just can’t be avoided, and for that Akka provides us with the PersistentActor

 

Event Sourcing

Akka persistence does borrow a couple of ideas from other concepts out there in the wild, such as

  • Event sourcing
  • Possibly CQRS
  • Snapshots

if you have not heard of event sourcing before, it is fairly simple to explain. The idea is that you have a data structure that is able to receive events to set its internal state. At the same time you also have an event store that stores events in reverse chronological order.

These events are then played onto the object that accepts them, whereby the object will build its own internal state from the events played on to it.

The eagle eyed amongst you may be thinking won’t that lead to loads of events? Well yes it might.

There is however an optimization around this called “snapshots”. snapshots are a special type of event where they capture the object state as it is right now.

So in reality you would apply the latest snapshot, and then take any events that occurred after that and apply them, which would drastically reduce the amount of event play back that would need to occur against an object that would like to receive events from the event store.

I have written a fairly well received article about this before for the .NET space which also included working code and also includes the CQRS (Command Query Responsibility Segregation) part of it too. If you would like to know more about that you can read about it here:

http://www.codeproject.com/Articles/991648/CQRS-A-Cross-Examination-Of-How-It-Works

Anyway I am kind of drifting of topic a bit here, the point is that Akka persistence borrows some of these ideas from other concepts/frameworks, so it may be of some use to have a read around some of it, in particular event sourcing and CQRS.

Dependencies

In order to work with Akka persistence you will need the following 2 SBT entries in your build.sbt file

"com.typesafe.akka" %% "akka-actor" % "2.4.8",
"com.typesafe.akka" %% "akka-persistence" % "2.4.8

Storage

Akka persistence comes with a couple of pre-canned persistence storage mechanisms, and there are many more community based storage frameworks that you can use.

Within this article I have chosen to use the Akka provided storage mechanism which is LevelDB.

In order to do this, you will need the following SBT entries in your build.sbt file

"org.iq80.leveldb"            % "leveldb"          % "0.7",
"org.fusesource.leveldbjni"   % "leveldbjni-all"   % "1.8"

You can read more about storage in general at the official Akka docs page

http://doc.akka.io/docs/akka/snapshot/scala/persistence.html#Storage_plugins

Config

As with most things in Akka you are able to configure things either in code (by way of overriding things) or by configuration. I personally prefer configuration. So in order to get the persistence to work, one must provide the following configuration

akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"

akka.persistence.journal.leveldb.dir = "target/example/journal"
akka.persistence.snapshot-store.local.dir = "target/example/snapshots"

# DO NOT USE THIS IN PRODUCTION !!!
# See also https://github.com/typesafehub/activator/issues/287
akka.persistence.journal.leveldb.native = false

For me this is in an Resources/application.conf file

PersistentActor Trait

Ok so getting back on track. How do we create a persistence actor in Akka.

Well out of the box Akka provides the PersistentActor trait, which you can mix in. This trait looks like this

trait PersistentActor extends Eventsourced with PersistenceIdentity {
  def receive = receiveCommand
}

There are several things that are of interest here

  • There is an expectation there will be a receiveCommand implementation (this is within the EventSourced trait)
  • That the base trait is called EventSourced is also interesting

I personally find the fact that there is an expectation of a command and that we end up mixing in a trait that uses the name “EventSourced” to be quite a strong indicator of just how similar working with Akka persistence is to CQRS + traditional event sourcing ideas.

If we were to go further down the rabbit hole and keep looking into the base trait EventSourced we would see a couple more abstract methods that are of interest that are expected to be supplied by the end users code:


  /**
   * Recovery handler that receives persisted events during recovery. If a state snapshot
   * has been captured and saved, this handler will receive a [[SnapshotOffer]] message
   * followed by events that are younger than the offered snapshot.
   *
   * This handler must not have side-effects other than changing persistent actor state i.e. it
   * should not perform actions that may fail, such as interacting with external services,
   * for example.
   *
   * If there is a problem with recovering the state of the actor from the journal, the error
   * will be logged and the actor will be stopped.
   *
   * @see [[Recovery]]
   */
  def receiveRecover: Receive

  /**
   * Command handler. Typically validates commands against current state (and/or by
   * communication with other actors). On successful validation, one or more events are
   * derived from a command and these events are then persisted by calling `persist`.
   */
  def receiveCommand: Receive

As stated these methods are abstract methods that YOUR code would need to supply to make the persistence actor stuff work properly. We will see more of this in a bit

Persistent Id

One of the rules you must follow when using persistent actors is that the persistent actor MUST have the same ID, even including across incarnations. This can be set using the persistenceId method as shown below

override def persistenceId = "demo-persistent-actor-1"

Snapshots

As I stated earlier snapshots can reduce the amount of extra events that need to replayed against the event source target (the persistent actor).

In order to save a snapshot the actor may call the saveSnapshot method.

  • If the snapshot succeeds the actor will receive a SaveSnapshotSuccess message
  • If the snapshot succeeds the actor will receive a SaveSnapshotFailure message
var state: Any = _
 
override def receiveCommand: Receive = {
  case "snap"                                => saveSnapshot(state)
  case SaveSnapshotSuccess(metadata)         => // ...
  case SaveSnapshotFailure(metadata, reason) => // ...
}

Where the metadata looks like this

final case class SnapshotMetadata(persistenceId: String, sequenceNr: Long, timestamp: Long = 0L)

During the recovery process the persistent actor is offer a snapshotOffer from which it may restore its internal state.

After the snapshotOffer will come the newer (younger in Akka speak) events, which when replayed onto the persistent actor will get it to its final internal state.

 

Failure/Recovery

Special care must be taken when shutting down persistent actors from outside. For non persistence actor a PoisonPill may be used. This is not recommended for persistence actors due to how the commands are stashed until such a time as the journaling mechanism signals that things are stored. At which time the mailbox is drained.  A better way is to use explicit shutdown messages

Read more about this here :

http://doc.akka.io/docs/akka/snapshot/scala/persistence.html#Safely_shutting_down_persistent_actors

 

Persisting state

So we have talked about event/commands/CQRS/event sourcing/snapshots, but so far we have not talked about how to actually save state. How is this done?

Well as luck would have it, its very easy we simply call the persist method, which looks like this

def persist[A](event: A)(handler: A ⇒ Unit): Unit 

Demo Time

Ok so have now gone through most of the bits you would need to work with persistent actors. Time for a demo

Lets assume we have the following commands that will send to a single persistent actor

case class Cmd(data: String)

And this type of event that we would like to store

case class Evt(data: String)

Where we would hold this type of state within the persistent actor

case class ExampleState(events: List[String] = Nil) {
  def updated(evt: Evt): ExampleState = copy(evt.data :: events)
  def size: Int = events.length
  override def toString: String = events.reverse.toString
}

And that the actual persistent actor looks like this

import akka.actor._
import akka.persistence._



class DemoPersistentActor extends PersistentActor {

  //note : This is  mutable
  var state = ExampleState()

  def updateState(event: Evt): Unit =
    state = state.updated(event)

  def numEvents =
    state.size

  val receiveRecover: Receive = {
    case evt: Evt => updateState(evt)
    case SnapshotOffer(_, snapshot: ExampleState) => {
        println(s"offered state = $snapshot")
        state = snapshot
    }
  }

  val receiveCommand: Receive = {
    case Cmd(data) =>
      persist(Evt(s"${data}-${numEvents}"))(updateState)
      persist(Evt(s"${data}-${numEvents + 1}")) { event =>
        updateState(event)
        context.system.eventStream.publish(event)
      }
    case "snap"  => saveSnapshot(state)
    case SaveSnapshotSuccess(metadata) =>
      println(s"SaveSnapshotSuccess(metadata) :  metadata=$metadata")
    case SaveSnapshotFailure(metadata, reason) =>
      println("""SaveSnapshotFailure(metadata, reason) :
        metadata=$metadata, reason=$reason""")
    case "print" => println(state)
    case "boom"  => throw new Exception("boom")
  }

  override def persistenceId = "demo-persistent-actor-1"
}

Which we could first run using this demo code:

import akka.actor._
import scala.language.postfixOps
import scala.io.StdIn

object Demo extends App {

  //create the actor system
  val system = ActorSystem("PersitenceSystem")

  val persistentActor =
    system.actorOf(Props(classOf[DemoPersistentActor]),
      "demo-persistent-actor-1")

  persistentActor ! "print"
  persistentActor ! Cmd("foo")
  persistentActor ! Cmd("baz")
  persistentActor ! "boom"
  persistentActor ! Cmd("bar")
  persistentActor ! "snap"
  persistentActor ! Cmd("buzz")
  persistentActor ! "print"


  StdIn.readLine()

  //shutdown the actor system
  system.terminate()

  StdIn.readLine()
}

Which gives the following output (your output may look a little different to mine as I have run this code a number of times so have previous runs state on disk)

offered state = List(foo-0, foo-1, baz-2, baz-3, bar-4, bar-5, buzz-6, buzz-7, foo-8,
foo-9, baz-10, baz-11, bar-12, bar-13, buzz-14, buzz-15, foo-16, foo-17, baz-18, baz-19, bar-20, bar-21)
List(foo-0, foo-1, baz-2, baz-3, bar-4, bar-5, buzz-6, buzz-7, foo-8, foo-9, baz-10, baz-11, bar-12,
bar-13, buzz-14, buzz-15, foo-16, foo-17, baz-18, baz-19, bar-20, bar-21, buzz-22, buzz-23)
[ERROR] [08/16/2016 18:43:21.475] [PersitenceSystem-akka.actor.default-dispatcher-5]
[akka://PersitenceSystem/user/demo-persistent-actor-1] boom
java.lang.Exception: boom
    at DemoPersistentActor$$anonfun$2.applyOrElse(DemoPersistentActor.scala:39)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:484)
    at DemoPersistentActor.akka$persistence$Eventsourced$$super$aroundReceive(DemoPersistentActor.scala:6)
    at akka.persistence.Eventsourced$$anon$1.stateReceive(Eventsourced.scala:657)
    at akka.persistence.Eventsourced$class.aroundReceive(Eventsourced.scala:182)
    at DemoPersistentActor.aroundReceive(DemoPersistentActor.scala:6)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
    at akka.actor.ActorCell.invoke(ActorCell.scala:495)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
    at akka.dispatch.Mailbox.run(Mailbox.scala:224)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

offered state = List(foo-0, foo-1, baz-2, baz-3, bar-4, bar-5, buzz-6, buzz-7, foo-8, foo-9,
baz-10, baz-11, bar-12, bar-13, buzz-14, buzz-15, foo-16, foo-17, baz-18, baz-19, bar-20, bar-21)
[WARN] [08/16/2016 18:43:21.494] [PersitenceSystem-akka.persistence.dispatchers.default-stream-dispatcher-8]
[akka.serialization.Serialization(akka://PersitenceSystem)] Using the default Java serializer for class
[ExampleState] which is not recommended because of performance implications. Use another serializer or
disable this warning using the setting ‘akka.actor.warn-about-java-serializer-usage’
List(foo-0, foo-1, baz-2, baz-3, bar-4, bar-5, buzz-6, buzz-7, foo-8, foo-9, baz-10, baz-11, bar-12,
bar-13, buzz-14, buzz-15, foo-16, foo-17, baz-18, baz-19, bar-20, bar-21, buzz-22, buzz-23, foo-24, foo-25,
baz-26, baz-27, bar-28, bar-29, buzz-30, buzz-31)
SaveSnapshotSuccess(metadata) :  metadata=SnapshotMetadata(demo-persistent-actor-1,33,1471369401491)

And then if we were to run this demo code:

import akka.actor._
import scala.language.postfixOps
import scala.io.StdIn

object Demo extends App {

  //create the actor system
  val system = ActorSystem("PersitenceSystem")

  val persistentActor =
    system.actorOf(Props(classOf[DemoPersistentActor]),
      "demo-persistent-actor-1")

  persistentActor ! "print"


  StdIn.readLine()

  //shutdown the actor system
  system.terminate()

  StdIn.readLine()
}

 

Lets see what we can see:

offered state = List(foo-0, foo-1, baz-2, baz-3, bar-4, bar-5, buzz-6, buzz-7,
foo-8, foo-9, baz-10, baz-11, bar-12,
bar-13, buzz-14, buzz-15, foo-16, foo-17, baz-18, baz-19, bar-20, bar-21, buzz-22, buzz-23,
foo-24, foo-25, baz-26, baz-27, bar-28, bar-29)

List(foo-0, foo-1, baz-2, baz-3, bar-4, bar-5, buzz-6, buzz-7, foo-8, foo-9, baz-10, baz-11,
bar-12, bar-13, buzz-14, buzz-15, foo-16, foo-17, baz-18, baz-19, bar-20, bar-21, buzz-22,
buzz-23, foo-24, foo-25, baz-26, baz-27, bar-28, bar-29, buzz-30, buzz-31)

It can be seen that although we did not save any new events the state of the demo persistent actor was restored correctly using a combination of a snapshot and events that are newer than the snapshot

 

 

Where Is The Code?

As previously stated all the code for this series will end up in this GitHub repo:

https://github.com/sachabarber/SachaBarber.AkkaExamples

akka dead letters and how to monitor for them

So we are a little way into this mini series on Akka now.

We have covered a few of the fundamental topics of Akka such as

  • Supervision
  • Mailboxes
  • Logging

When we talked about mailboxes we discussed the fact that there were queues involves that are used as mailboxes.

Generally speaking where there are queues concerned there is also the possibility of dead letters.

In this post we will talk about what ‘dead letters’ are within Akka, and also look at how you can monitor for dead letters

What Are Dead Letters?

In Akka messages that can’t be delivered are routed to a synthetic actor which has the path “/deadLetters”. This is for NON transport lost messages.

Akka makes no guarantees for lost messages at the transport layer.

How Can You Monitor For Them?

Within Akka there is a concept of a event bus. That is a bus that Akka uses internally to send messages. You can also use this event bus for other purposes, such as publishing/subscribing to certain types of messages.

It is kind of like a topic based subscription system.

We can use this event bus to monitor for dead letters.

It is also worth noting that the event bus can be used a general pub/sub mechanism, you can read more about it here : Akka Event Bus

But for now let’s get back to the problem in hand which is how do we monitor for dead letters?

So let’s start out with this actor code

import akka.actor.Actor
import akka.event.Logging

class LifeCycleActor extends Actor {
  val log = Logging(context.system, this)

  log.info("LifeCycleActor: constructor")

  override def preStart { log.info("LifeCycleActor: preStart") }

  override def postStop { log.info("LifeCycleActor: postStop") }

  override def preRestart(reason: Throwable, message: Option[Any]) {
    log.info("LifeCycleActor: preRestart")
    log.info(s"LifeCycleActor reason: ${reason.getMessage}")
    log.info(s"LifeCycleActor message: ${message.getOrElse("")}")
    super.preRestart(reason, message)
  }
  override def postRestart(reason: Throwable) {
    log.info("LifeCycleActor: postRestart")
    log.info(s"LifeCycleActor reason: ${reason.getMessage}")
    super.postRestart(reason)
  }
  def receive = {
    case RestartMessage => throw new Exception("RestartMessage seen")
    case _ => log.info("LifeCycleActor : got a message")
  }
}

We will then use this actor which is setup to receive dead letters

import akka.actor.{DeadLetter, Actor}

class DeadLetterMonitorActor 
  extends Actor 
  with akka.actor.ActorLogging {
  log.info("DeadLetterMonitorActor: constructor")

  def receive = {
    case d: DeadLetter => {
      log.error(s"DeadLetterMonitorActor : saw dead letter $d")
    }
    case _ => log.info("DeadLetterMonitorActor : got a message")
  }
}

We then use this demo code

import akka.actor._
import scala.language.postfixOps
import scala.io.StdIn
import akka.event.Logging

class Runner {

  def Run(): Unit = {

    //create the actor system
    val system = ActorSystem("LifeCycleSystem")

    // default Actor constructor
    val lifeCycleActor =
      system.actorOf(Props[LifeCycleActor],
        name = "lifecycleactor")

    val deadLetterMonitorActor =
      system.actorOf(Props[DeadLetterMonitorActor],
        name = "deadlettermonitoractor")

    //subscribe to system wide event bus 'DeadLetter'
    system.eventStream.subscribe(
      deadLetterMonitorActor, classOf[DeadLetter])

    val log = Logging(system, classOf[Runner])
    log.debug("Runner IS UP BABY")

    log.debug("sending lifeCycleActor a few numbers")
    lifeCycleActor ! 100
    lifeCycleActor ! 200
    Thread.sleep(1000)

    log.debug("sending lifeCycleActor a poison pill (kill it)")
    lifeCycleActor ! PoisonPill
    Thread.sleep(1000)
    log.debug("sending lifeCycleActor a few numbers")
    lifeCycleActor ! 100
    lifeCycleActor ! 200


    log.debug("stop lifeCycleActor/deadLetterMonitorActor")
    system.stop(lifeCycleActor)
    system.stop(deadLetterMonitorActor)

    //shutdown the actor system
    log.debug("stop actor system")
    system.terminate()

    StdIn.readLine()
  }
}

To to do the following

  • Create the 2 actors above
  • Subscribe the DeadLetterMonitorActor (above) to the “DeadLetter” topic of the inbuilt akka EventStream (internal pub/sub bus)
  • We then send a few messages to the LifeCycleActor
  • Then send the LifeCylceActor a PoisonPill (Which terminates it)
  • We then send a few more messages to the LifeCycleActor (which it doesn’t get cos its dead like)
  • We expect the DeadLetterMonitorActor to receive the DeadLetter messages and log these DeadLetter messages

If we run these bits of code this is the result, where it can plainly be seen that the DeadLetterMonitorActor does receive the DeadLetter messages

INFO [LifeCycleSystem-akka.actor.default-dispatcher-2] Slf4jLogger – Slf4jLogger started
16:55:49.546UTC DEBUG[LifeCycleSystem-akka.actor.default-dispatcher-2] EventStream – logger log1-Slf4jLogger started
16:55:49.547UTC DEBUG[LifeCycleSystem-akka.actor.default-dispatcher-2] EventStream – Default Loggers started
16:55:49.559UTC INFO [LifeCycleSystem-akka.actor.default-dispatcher-2] LifeCycleActor – LifeCycleActor: constructor
16:55:49.560UTC INFO [LifeCycleSystem-akka.actor.default-dispatcher-2] LifeCycleActor – LifeCycleActor: preStart
16:55:49.560UTC DEBUG[LifeCycleSystem-akka.actor.default-dispatcher-2] Runner – Runner IS UP BABY
16:55:49.561UTC DEBUG[LifeCycleSystem-akka.actor.default-dispatcher-2] Runner – sending lifeCycleActor a few numbers
16:55:49.561UTC INFO [LifeCycleSystem-akka.actor.default-dispatcher-2] DeadLetterMonitorActor – DeadLetterMonitorActor: constructor
16:55:49.563UTC INFO [LifeCycleSystem-akka.actor.default-dispatcher-2] LifeCycleActor – LifeCycleActor : got a message
16:55:49.563UTC INFO [LifeCycleSystem-akka.actor.default-dispatcher-2] LifeCycleActor – LifeCycleActor : got a message
16:55:50.561UTC DEBUG[LifeCycleSystem-akka.actor.default-dispatcher-2] Runner – sending lifeCycleActor a poison pill (kill it)
16:55:50.576UTC INFO [LifeCycleSystem-akka.actor.default-dispatcher-9] LifeCycleActor – LifeCycleActor: postStop
16:55:51.565UTC DEBUG[LifeCycleSystem-akka.actor.default-dispatcher-9] Runner – sending lifeCycleActor a few numbers
16:55:51.570UTC DEBUG[LifeCycleSystem-akka.actor.default-dispatcher-2] Runner – stop lifeCycleActor/deadLetterMonitorActor
16:55:51.573UTC DEBUG[LifeCycleSystem-akka.actor.default-dispatcher-7] Runner – stop actor system
16:55:51.581UTC INFO [LifeCycleSystem-akka.actor.default-dispatcher-9] RepointableActorRef – Message [java.lang.Integer]
    from Actor[akka://LifeCycleSystem/deadLetters] to Actor[akka://LifeCycleSystem/user/lifecycleactor#-912643257]
    was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration
    settings ‘akka.log-dead-letters’ and ‘akka.log-dead-letters-during-shutdown’.
16:55:51.581UTC INFO [LifeCycleSystem-akka.actor.default-dispatcher-9] RepointableActorRef – Message [java.lang.Integer]
    from Actor[akka://LifeCycleSystem/deadLetters] to Actor[akka://LifeCycleSystem/user/lifecycleactor#-912643257]
    was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration
    settings ‘akka.log-dead-letters’ and ‘akka.log-dead-letters-during-shutdown’.
16:55:51.588UTC ERROR[LifeCycleSystem-akka.actor.default-dispatcher-8] DeadLetterMonitorActor – DeadLetterMonitorActor :
    saw dead letter DeadLetter(100,Actor[akka://LifeCycleSystem/deadLetters],Actor[akka://LifeCycleSystem/user/lifecycleactor#-912643257])
16:55:51.590UTC ERROR[LifeCycleSystem-akka.actor.default-dispatcher-8] DeadLetterMonitorActor – DeadLetterMonitorActor :
    saw dead letter DeadLetter(200,Actor[akka://LifeCycleSystem/deadLetters],Actor[akka://LifeCycleSystem/user/lifecycleactor#-912643257])
16:55:51.609UTC INFO [LifeCycleSystem-akka.actor.default-dispatcher-6] LocalActorRef – Message [akka.actor.StopChild]
    from Actor[akka://LifeCycleSystem/deadLetters] to Actor[akka://LifeCycleSystem/user] was not delivered. [3] dead letters encountered.
    This logging can be turned off or adjusted with configuration settings ‘akka.log-dead-letters’ and ‘akka.log-dead-letters-during-shutdown’.
16:55:51.616UTC DEBUG[LifeCycleSystem-akka.actor.default-dispatcher-6] EventStream – shutting down: StandardOutLogger started

 

 

NOTE : According to the official Akka docs “Dead letters are not propagated over the network, if you want to collect them in one place you will have to subscribe one actor per network node and forward them manually.”

Are All Messages In DeadLetters A Problem

No, not all of the messages that end up in the deadLetters synthetic actor are problems. For example suppose an actor is sent several Terminate messages. Only one of these will take effect, the others will likely end up in deadLetters, but this is not a concern.

The deadLetters actor and the monitoring of it is more of a debugging aid than anything else. You will have to use some modicum of common sense when examining the results of the deadLetters logging, which we discussed above

Where Is The Code?

As previously stated all the code for this series will end up in this GitHub repo:

https://github.com/sachabarber/SachaBarber.AkkaExamples

akka logging

This is likely to be a smaller one of the series, but just because it is small in size doesn’t mean that it is not mighty.

Every app needs logging, and when working with a distributed set of actors this is crucial.

Akka provides 2 types of logging adaptor out of the box

  • Console
  • SLF4J (where you need the appropriate back end for this which is usually Logback)

It also has various configuration sections that allow you to adjust the following elements of Akka

  • Akka setup messages
  • Receive of messages
  • All actor lifecycle messages
  • Finite state machine messages
  • EventStream subscription messages
  • Remoting messages

Before we look at how you can customize the logging to capture all this good stuff lets first see what steps you need to setup basic logging in Akka

Step1 : Grab the JARs

There are a couple of JARs you will need to perform logging in Akka. These are shown below

See Built.sbt

name := "HelloWorld"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies ++= Seq(
  "com.typesafe.akka" % "akka-actor_2.11" % "2.4.8",
  "ch.qos.logback" % "logback-classic" % "1.1.7",
  "com.typesafe.akka" % "akka-slf4j_2.11" % "2.4.8")

Step2 : application.conf

You must then configure how Akka will log its entries. This is done in an configuration file, I have decided to call mine “application.conf”

See resources/application.conf

akka {
  # event-handlers = ["akka.event.slf4j.Slf4jEventHandler"]
  loggers = ["akka.event.slf4j.Slf4jLogger"]
  loglevel = "DEBUG"
}

Step3 : logback.xml

For the SL4J logging to work we need to configure logback. This is typically done with a configuration file called “logback.xml”.

An example of this may look like this

See “resources/logback.xml”

<?xml version="1.0" encoding="UTF-8"?>
<configuration>

    <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
        <target>System.out</target>
        <encoder>
            <pattern>%X{akkaTimestamp} %-5level[%thread] %logger{0} - %msg%n</pattern>
        </encoder>
    </appender>

    <appender name="FILE" class="ch.qos.logback.core.FileAppender">
        <file>c:/logs/akka.log</file>
        <append>true</append>
        <encoder>
            <pattern>%date{yyyy-MM-dd} %X{akkaTimestamp} %-5level[%thread] %logger{1} - %msg%n</pattern>
        </encoder>
    </appender>

    <logger name="akka" level="DEBUG" />

    <root level="DEBUG">
        <appender-ref ref="CONSOLE"/>
        <appender-ref ref="FILE"/>
    </root>

</configuration>

Step4 : Log Some Stuff

Once you have the above steps completed. You have 2 choices about how to consume the logging

Using The LoggingAdaptor.Apply

You are able to use the LoggingAdaptor.Apply to create a new log that you may use. Here is an example

import akka.actor._
import scala.language.postfixOps
import scala.io.StdIn
import akka.event.Logging

class Runner {

  def Run(): Unit = {

    //create the actor system
    val system = ActorSystem("LifeCycleSystem")

    val log = Logging(system, classOf[Runner])
    log.debug("Runner IS UP BABY")
    ...

    StdIn.readLine()
  }
}

Using Logging Trait

To make this process easier Akka also provides a trait that can be mixed in called “akka.actor.ActorLogging”. This can be mixed in wherever you require logging

import akka.actor.Actor
import akka.event.Logging

class LifeCycleActorWithLoggingTrait extends Actor with akka.actor.ActorLogging {

  log.info("LifeCycleActor: constructor")

  .....
}
 

Async Or NOT

Some of the more experiences JVM/Scala devs amongst you may think heck I can just use my own logging, I don’t need the Akka trait or LoggingAdaptor.

The thing is if you use the Akka trait or LogginAdaptor they are setup to log asynchronously and not introduce any time delays into the messaging pipeline when logging.

So just be warned that you should probably use the inbuilt Akka stuff rather than roll your own. Logging to things like an ELK stack may be the exception.

Common Akka Configuration Around Logging

These configuration sections are useful for controlling what is logged

General Log Level

akka {
  # general logging level
  loglevel = DEBUG
}

Log Akka Config At Start

akka {
  # Log the complete configuration at INFO level when the actor system is started.
  # This is useful when you are uncertain of what configuration is used.
  log-config-on-start = on
}

Actor Receive Messages

akka {
  debug {
    # enable function of LoggingReceive, which is to log any received message at
    # DEBUG level
    receive = on
  }
}

You can also monitor lifecycle events like this

akka {
  debug {
    # enable DEBUG logging of actor lifecycle changes
    lifecycle = on
  }
}

Remoting Logging

Firstly you can log the message the remote actor is sent by the transport layer

kka {
  remote {
    # If this is "on", Akka will log all outbound messages at DEBUG level, if off then they are not logged
    log-sent-messages = on
  }
}

You may also see what messages are received by the transport layer like this

akka {
  remote {
    # If this is "on", Akka will log all inbound messages at DEBUG level, if off then they are not logged
    log-received-messages = on
  }
}

Where Is The Code?

As previously stated all the code for this series will end up in this GitHub repo:

https://github.com/sachabarber/SachaBarber.AkkaExamples

AKKA : hierarchies / lifecycles

Ok so last time we covered the basics of actors and the actor system (actor fabric) and covered how to send simple messages to actors.

This time we will talk about actor hierarchies (supervision) and also actor lifecycles.

But before we get into supervision and how it is used, we should just take a trip back to the introduction article in this series of posts.

The introductory post had the following bullet point

  • Supervisor hierarchies with “let-it-crash” semantics.

What does that actually mean. Well quite simply Akka embraces failure and treats it as an expected part of what could happen.

Doesn’t this sound quite dangerous? Well yes and no, the yes part could be that an actor was part way through processing some message. So logic would dictate that you should ensure that you messages are idempotent, and are safe to send again.

The no part is the interesting case, so you have an actor that dies part way through its job. Boo

Luckily this is Akka’s bread and butter, it has a strong concept of ownership/supervision where the supervisor would know about an underlings failure and would know how to remedy that. We will look at this next.

Hierarchies (supervision)

So far we have seen how to create our own little Akka systems. But what is really going on when we do that.

It turns out that the Akka fabric creates some top level Actors called “Guardians” which all user created guardians are “supervised” by.

This can be seen on this diagram (borrowed from the official Akka documentation)

image

Within Akka EVERY actor is “supervised” by another actor, up until the “Root Guardian”

There are 2 sub guardians under the root guardian.

User and System both of which play different roles

To stand on the should of giants here is what the Akka docs have to say about these 3 top level guardians

/: The Root Guardian
The root guardian is the grand-parent of all so-called “top-level” actors and supervises all the special actors mentioned in Top-Level Scopes for Actor Paths using the SupervisorStrategy.stoppingStrategy, whose purpose is to terminate the child upon any type of Exception. All other throwables will be escalated … but to whom? Since every real actor has a supervisor, the supervisor of the root guardian cannot be a real actor. And because this means that it is “outside of the bubble”, it is called the “bubble-walker”. This is a synthetic ActorRef which in effect stops its child upon the first sign of trouble and sets the actor system’s isTerminated status to true as soon as the root guardian is fully terminated (all children recursively stopped).

/system: The System Guardian
This special guardian has been introduced in order to achieve an orderly shut-down sequence where logging remains active while all normal actors terminate, even though logging itself is implemented using actors. This is realized by having the system guardian watch the user guardian and initiate its own shut-down upon reception of the Terminated message. The top-level system actors are supervised using a strategy which will restart indefinitely upon all types of Exception except for ActorInitializationException and ActorKilledException, which will terminate the child in question. All other throwables are escalated, which will shut down the whole actor system.

/user: The Guardian Actor
The actor which is probably most interacted with is the parent of all user-created actors, the guardian named “/user”. Actors created using system.actorOf() are children of this actor. This means that when this guardian terminates, all normal actors in the system will be shutdown, too. It also means that this guardian’s supervisor strategy determines how the top-level normal actors are supervised. Since Akka 2.1 it is possible to configure this using the setting akka.actor.guardian-supervisor-strategy, which takes the fully-qualified class-name of a SupervisorStrategyConfigurator. When the guardian escalates a failure, the root guardian’s response will be to terminate the guardian, which in effect will shut down the whole actor system.

See http://doc.akka.io/docs/akka/snapshot/general/supervision.html

Ok, so now we have established that there are top level guardians that take care of “supervising” any actors that are created programmatically, and that there is a guardian to monitor existing actors and log/manage their correct shutdown.

But can we created our own supervisors?

Well yes you can, in fact this is completely encouraged and a very very natural part of writing Akka code.

So how do we do that exactly?

It is actually quite simple, we have to follow a few steps

  1. Decide on what supervision strategy we wish to use (Akka has 2 of these, we will cover these in this article)
  2. Implement a Decider (again we will cover this in this article)

Let’s start with talking about these 2 points before we dive into any code

Supervision Strategy

As stated above it is completely possible to create our own supervisors.  There are only a couple of things that we need to look at before we dive into looking at some example code.

Firstly we need to understand what is meant by a supervision strategy. Quite simply a supervision strategy is a strategy that dictates what a supervisor will do when one of his underlings (the actors that he creates) throws an Exception

Akka comes with 2 inbuilt supervisor strategy

OneForOneStrategy which means that ONLY the child actor that raised the exception qualifies for some special treatment. The special treatment is called a directive, but more on this later

AllForOneStrategy which means that ALL of the children under the direct supervisor will have the supervision strategy directive applied to them

Decider

Within both of these strategies listed above, Akka has the concept of a decider. A decider is a PartialFunction that has the following type signature

type Decider = PartialFunction[Throwable, Directive]

Where PartialFunction is a trait that looks like this under the covers

PartialFunction[-A, +B] extends (A => B)

The idea here being that given a Throwable (exception) you must return a B. In Akka’s case the B in question would be a akka.actor.SupervisorStrategy.Directiver

A Directive will take one of 4 possible values

  • Resume
  • ReStart
  • Stop
  • Escalate

So what we are really saying when we talk about a decider is that it is a way for a supervisor to know what action to carry out when it sees a given Throwable (exception). This action will be applied to one (or more) of the supervsiors child(ren) depending on the type of supervision strategy used.

Akka actually comes with a default decider, which you can use directly should you want to, or you can create your own which you can augment with the default Akka one. In this post I will create my own and augment it with the default Akka one.

Let’s see an example of how a supervisor might use a decider and how it may create a supervision strategy.

import akka.actor.SupervisorStrategy.Directive
import akka.actor._

class AllForOneSupervisorActor extends Actor {
  

  val decider: PartialFunction[Throwable, Directive] = {
    case _: AkkaStopDirectiveException => 
	akka.actor.SupervisorStrategy.stop
    case _: AkkaRestartDirectiveException => 
	akka.actor.SupervisorStrategy.restart
  }

  override def supervisorStrategy: SupervisorStrategy =
    AllForOneStrategy()(decider.orElse(
	SupervisorStrategy.defaultStrategy.decider))
}

Now that we have seen how a supervisor creates a supervision strategy and how it makes use of a decider lets continue to look at an example of the 2 built in Akka supervision strategies

The Common Parts Of The Demo Code

Within this demo the following actor will be used as the child of the supervisors, we will just have multiple children ALL of this type of actor

import akka.actor.Actor

class LifeCycleActor extends Actor {
  println("LifeCycleActor: constructor")

  override def preStart { println("LifeCycleActor: preStart") }

  override def postStop { println("LifeCycleActor: postStop") }

  override def preRestart(reason: Throwable, message: Option[Any]) {
    println("LifeCycleActor: preRestart")
    println(s"LifeCycleActor reason: ${reason.getMessage}")
    println(s"LifeCycleActor message: ${message.getOrElse("")}")
    super.preRestart(reason, message)
  }

  override def postRestart(reason: Throwable) {
    println("LifeCycleActor: postRestart")
    println(s"LifeCycleActor reason: ${reason.getMessage}")
    super.postRestart(reason)
  }

  def receive = {

    case "SoundOff" =>
      println("LifeCycleActor: SoundOff seen")
      println(s"LifeCycleActor alive ${self.path.name}" )

    case RaiseStopThrowableMessage =>
      println("LifeCycleActor: RaiseStopThrowableMessage seen")
      throw new AkkaStopDirectiveException(
        "LifeCycleActor raised AkkaStopDirectiveException")

    case RaiseRestartThrowableMessage =>
      println("LifeCycleActor: RaiseRestartThrowableMessage seen")
      throw new AkkaRestartDirectiveException(
        "LifeCycleActor raised AkkaRestartDirectiveException")
  }
}

OneForOneCodeStrategy Example

Here is the code for the “OneForOne” supervisor actor.

There are a couple of things to not here.

  • We create child actors using the contextOf built in akka factors. This create children that are “supervised” by the actor whose context was used to create the child actors
  • We use a custom decider which uses the default akka decider as well
  • We use the “OneForOneStrategy” supervision strategy
import akka.actor.SupervisorStrategy.Directive
import akka.actor._

class OneForOneSupervisorActor extends Actor {
  println("OneForOneSupervisorActor: constructor")
  val lifeCycleChildrenActors = new Array[ActorRef](3)

  def receive = {
    case "StartChildren" =>
      println(s"OneForOneSupervisorActor : got a message StartChildren")

      for(i <- 0 to 2) {
        val child = context.actorOf(Props[LifeCycleActor], name = s"lifecycleactor_$i")
        lifeCycleChildrenActors(i) = child
      }

    case "MakeRandomChildCommitSuicide" =>
      println(s"OneForOneSupervisorActor : got a message MakeRandomChildCommitSuicide")
      lifeCycleChildrenActors(2) ! RaiseStopThrowableMessage

    case "MakeRandomChildRestart" =>
      println(s"OneForOneSupervisorActor : got a message MakeRandomChildRestart")
      lifeCycleChildrenActors(2) ! RaiseRestartThrowableMessage

    case "TellChildrenToSoundOff" =>
      println(s"OneForOneSupervisorActor : got a message TellChildrenToSoundOff")

      lifeCycleChildrenActors.foreach(x => x ! "SoundOff")
  }

  val decider: PartialFunction[Throwable, Directive] = {
    case _: AkkaStopDirectiveException => akka.actor.SupervisorStrategy.stop
    case _: AkkaRestartDirectiveException => akka.actor.SupervisorStrategy.restart
  }

  override def supervisorStrategy: SupervisorStrategy =
    OneForOneStrategy()(decider.orElse(SupervisorStrategy.defaultStrategy.decider))
}

When we run this one, we will fail a particular child, and ONLY that child will be effected by the supervision strategy thanks to the use of the “OneForOneStrategy”.

Let’s assume we have this demo code

val oneForOneSupervisorActor = system.actorOf(Props[OneForOneSupervisorActor], name = "OneForOneSupervisorActor")
println("sending oneForOneSupervisorActor a 'StartChildren' message")

oneForOneSupervisorActor ! "StartChildren"
Thread.sleep(1000)

oneForOneSupervisorActor ! "MakeRandomChildRestart"
Thread.sleep(1000)

oneForOneSupervisorActor ! "TellChildrenToSoundOff"
Thread.sleep(1000)

oneForOneSupervisorActor ! "MakeRandomChildCommitSuicide"
Thread.sleep(1000)

oneForOneSupervisorActor ! "TellChildrenToSoundOff"
Thread.sleep(1000)

system.stop(oneForOneSupervisorActor)

Here is the output of running the full demo code

sending oneForOneSupervisorActor a ‘StartChildren’ message
OneForOneSupervisorActor: constructor
OneForOneSupervisorActor : got a message StartChildren
LifeCycleActor: constructor
LifeCycleActor: constructor
LifeCycleActor: constructor
LifeCycleActor: preStart
LifeCycleActor: preStart
LifeCycleActor: preStart
OneForOneSupervisorActor : got a message MakeRandomChildRestart
LifeCycleActor: RaiseRestartThrowableMessage seen
LifeCycleActor: preRestart
LifeCycleActor reason: LifeCycleActor raised AkkaRestartDirectiveException
LifeCycleActor message: RaiseRestartThrowableMessage
LifeCycleActor: postStop
LifeCycleActor: constructor
LifeCycleActor: postRestart
LifeCycleActor reason: LifeCycleActor raised AkkaRestartDirectiveException
LifeCycleActor: preStart
[ERROR] [07/29/2016 07:06:28.569] [SupervisionSystem-akka.actor.default-dispatcher-2]
[akka://SupervisionSystem/user/OneForOneSupervisorActor/lifecycleactor_2]
LifeCycleActor raised AkkaRestartDirectiveException
AkkaRestartDirectiveException: LifeCycleActor raised AkkaRestartDirectiveException
    at LifeCycleActor$$anonfun$receive$1.applyOrElse(LifeCycleActor.scala:36)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:484)
    at LifeCycleActor.aroundReceive(LifeCycleActor.scala:3)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
    at akka.actor.ActorCell.invoke(ActorCell.scala:495)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
    at akka.dispatch.Mailbox.run(Mailbox.scala:224)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

OneForOneSupervisorActor : got a message TellChildrenToSoundOff
LifeCycleActor: SoundOff seen
LifeCycleActor alive lifecycleactor_0
LifeCycleActor: SoundOff seen
LifeCycleActor alive lifecycleactor_2
LifeCycleActor: SoundOff seen
LifeCycleActor alive lifecycleactor_1
OneForOneSupervisorActor : got a message MakeRandomChildCommitSuicide
LifeCycleActor: RaiseStopThrowableMessage seen
[ERROR] [07/29/2016 07:06:30.548] [SupervisionSystem-akka.actor.default-dispatcher-2]
[akka://SupervisionSystem/user/OneForOneSupervisorActor/lifecycleactor_2]
LifeCycleActor raised AkkaStopDirectiveException
AkkaStopDirectiveException: LifeCycleActor raised AkkaStopDirectiveException
    at LifeCycleActor$$anonfun$receive$1.applyOrElse(LifeCycleActor.scala:31)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:484)
    at LifeCycleActor.aroundReceive(LifeCycleActor.scala:3)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
    at akka.actor.ActorCell.invoke(ActorCell.scala:495)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
    at akka.dispatch.Mailbox.run(Mailbox.scala:224)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

LifeCycleActor: postStop
OneForOneSupervisorActor : got a message TellChildrenToSoundOff
LifeCycleActor: SoundOff seen
LifeCycleActor: SoundOff seen
LifeCycleActor alive lifecycleactor_1
LifeCycleActor alive lifecycleactor_0
[INFO] [07/29/2016 07:06:31.553] [SupervisionSystem-akka.actor.default-dispatcher-4]
[akka://SupervisionSystem/user/OneForOneSupervisorActor/lifecycleactor_2] Message [java.lang.String]
from Actor[akka://SupervisionSystem/user/OneForOneSupervisorActor#-1082037253] to
Actor[akka://SupervisionSystem/user/OneForOneSupervisorActor/lifecycleactor_2#2123326382] was not delivered.
[1] dead letters encountered. This logging can be turned off or adjusted with configuration
settings ‘akka.log-dead-letters’ and ‘akka.log-dead-letters-during-shutdown’.
LifeCycleActor: postStop
LifeCycleActor: postStop

 

There are couple of things to note here

  • ONLY one of the child actors is restarted
  • ONLY one of the child actors is stopped, which is why we only see 2 actors out of 3 in the final “SoundOff” message sent to all the supervisors children

 

AllForOneCodeStrategy Example

Here is the code for the “AllForOne” supervisor actor.

There are a couple of things to not here.

  • We create child actors using the contextOf built in akka factors. This create children that are “supervised” by the actor whose context was used to create the child actors
  • We use a custom decider which uses the default akka decider as well
  • We use the “AllForOneStrategy” supervision strategy
import akka.actor.SupervisorStrategy.Directive
import akka.actor._

class AllForOneSupervisorActor extends Actor {
  println("AllForOneSupervisorActor: constructor")
  val lifeCycleChildrenActors = new Array[ActorRef](3)

  def receive = {
    case "StartChildren" =>
      println(s"AllForOneSupervisorActor : got a message StartChildren")

      for(i <- 0 to 2) {
        val child = context.actorOf(Props[LifeCycleActor], name = s"lifecycleactor_$i")
        lifeCycleChildrenActors(i) = child
      }

    case "MakeRandomChildCommitSuicide" =>
      println(s"AllForOneSupervisorActor : got a message MakeRandomChildCommitSuicide")
      lifeCycleChildrenActors(2) ! RaiseStopThrowableMessage

    case "MakeRandomChildRestart" =>
      println(s"AllForOneSupervisorActor : got a message MakeRandomChildRestart")
      lifeCycleChildrenActors(2) ! RaiseRestartThrowableMessage

    case "TellChildrenToSoundOff" =>
      println(s"AllForOneSupervisorActor : got a message TellChildrenToSoundOff")

      lifeCycleChildrenActors.foreach(x => x ! "SoundOff")
  }

  val decider: PartialFunction[Throwable, Directive] = {
    case _: AkkaStopDirectiveException => 
	akka.actor.SupervisorStrategy.stop
    case _: AkkaRestartDirectiveException => 
	akka.actor.SupervisorStrategy.restart
  }

  override def supervisorStrategy: SupervisorStrategy =
    AllForOneStrategy()(decider.orElse(
	SupervisorStrategy.defaultStrategy.decider))
}

When we run this one, we will fail a particular child, and ALL children will be effected by the supervision strategy thanks to the use of the “AllForOneStrategy”.

Let’s assume we have this demo code

val allForOneSupervisorActor = system.actorOf(Props[AllForOneSupervisorActor], name = "AllForOneSupervisorActor")
println("sending allForOneSupervisorActor a 'StartChildren' message")

allForOneSupervisorActor ! "StartChildren"
Thread.sleep(1000)

allForOneSupervisorActor ! "MakeRandomChildRestart"
Thread.sleep(1000)

allForOneSupervisorActor ! "TellChildrenToSoundOff"
Thread.sleep(1000)

allForOneSupervisorActor ! "MakeRandomChildCommitSuicide"
Thread.sleep(1000)

allForOneSupervisorActor ! "TellChildrenToSoundOff"
Thread.sleep(1000)

system.stop(allForOneSupervisorActor)

Here is the output of running the full demo code

sending allForOneSupervisorActor a ‘StartChildren’ message
AllForOneSupervisorActor: constructor
AllForOneSupervisorActor : got a message StartChildren
LifeCycleActor: constructor
LifeCycleActor: constructor
LifeCycleActor: constructor
LifeCycleActor: preStart
LifeCycleActor: preStart
LifeCycleActor: preStart
AllForOneSupervisorActor : got a message MakeRandomChildRestart
LifeCycleActor: RaiseRestartThrowableMessage seen
[ERROR] [07/29/2016 07:10:12.887] [SupervisionSystem-akka.actor.default-dispatcher-3]
[akka://SupervisionSystem/user/AllForOneSupervisorActor/lifecycleactor_2] LifeCycleActor raised AkkaRestartDirectiveException
AkkaRestartDirectiveException: LifeCycleActor raised AkkaRestartDirectiveException
    at LifeCycleActor$$anonfun$receive$1.applyOrElse(LifeCycleActor.scala:36)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:484)
    at LifeCycleActor.aroundReceive(LifeCycleActor.scala:3)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
    at akka.actor.ActorCell.invoke(ActorCell.scala:495)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
    at akka.dispatch.Mailbox.run(Mailbox.scala:224)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

LifeCycleActor: preRestart
LifeCycleActor: preRestart
LifeCycleActor: preRestart
LifeCycleActor reason: LifeCycleActor raised AkkaRestartDirectiveException
LifeCycleActor reason: LifeCycleActor raised AkkaRestartDirectiveException
LifeCycleActor reason: LifeCycleActor raised AkkaRestartDirectiveException
LifeCycleActor message:
LifeCycleActor message: RaiseRestartThrowableMessage
LifeCycleActor message:
LifeCycleActor: postStop
LifeCycleActor: postStop
LifeCycleActor: postStop
LifeCycleActor: constructor
LifeCycleActor: constructor
LifeCycleActor: constructor
LifeCycleActor: postRestart
LifeCycleActor: postRestart
LifeCycleActor reason: LifeCycleActor raised AkkaRestartDirectiveException
LifeCycleActor: preStart
LifeCycleActor: postRestart
LifeCycleActor reason: LifeCycleActor raised AkkaRestartDirectiveException
LifeCycleActor: preStart
LifeCycleActor reason: LifeCycleActor raised AkkaRestartDirectiveException
LifeCycleActor: preStart
AllForOneSupervisorActor : got a message TellChildrenToSoundOff
LifeCycleActor: SoundOff seen
LifeCycleActor: SoundOff seen
LifeCycleActor alive lifecycleactor_2
LifeCycleActor: SoundOff seen
LifeCycleActor alive lifecycleactor_0
LifeCycleActor alive lifecycleactor_1
AllForOneSupervisorActor : got a message MakeRandomChildCommitSuicide
LifeCycleActor: RaiseStopThrowableMessage seen
[ERROR] [07/29/2016 07:10:14.866] [SupervisionSystem-akka.actor.default-dispatcher-7]
[akka://SupervisionSystem/user/AllForOneSupervisorActor/lifecycleactor_2] LifeCycleActor raised AkkaStopDirectiveException
AkkaStopDirectiveException: LifeCycleActor raised AkkaStopDirectiveException
    at LifeCycleActor$$anonfun$receive$1.applyOrElse(LifeCycleActor.scala:31)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:484)
    at LifeCycleActor.aroundReceive(LifeCycleActor.scala:3)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
    at akka.actor.ActorCell.invoke(ActorCell.scala:495)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
    at akka.dispatch.Mailbox.run(Mailbox.scala:224)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

LifeCycleActor: postStop
LifeCycleActor: postStop
LifeCycleActor: postStop
AllForOneSupervisorActor : got a message TellChildrenToSoundOff
[INFO] [07/29/2016 07:10:15.873] [SupervisionSystem-akka.actor.default-dispatcher-7]
[akka://SupervisionSystem/user/AllForOneSupervisorActor/lifecycleactor_0] Message [java.lang.String]
from Actor[akka://SupervisionSystem/user/AllForOneSupervisorActor#1766273084]
to Actor[akka://SupervisionSystem/user/AllForOneSupervisorActor/lifecycleactor_0#1933756970]
was not delivered. [1] dead letters encountered. This logging can be turned off or
adjusted with configuration settings ‘akka.log-dead-letters’ and ‘akka.log-dead-letters-during-shutdown’.
[INFO] [07/29/2016 07:10:15.874] [SupervisionSystem-akka.actor.default-dispatcher-7]
[akka://SupervisionSystem/user/AllForOneSupervisorActor/lifecycleactor_1] Message [java.lang.String]
from Actor[akka://SupervisionSystem/user/AllForOneSupervisorActor#1766273084]
to Actor[akka://SupervisionSystem/user/AllForOneSupervisorActor/lifecycleactor_1#922057055]
was not delivered. [2] dead letters encountered. This logging can be turned off or
adjusted with configuration settings ‘akka.log-dead-letters’ and ‘akka.log-dead-letters-during-shutdown’.
[INFO] [07/29/2016 07:10:15.877] [SupervisionSystem-akka.actor.default-dispatcher-7]
[akka://SupervisionSystem/user/AllForOneSupervisorActor/lifecycleactor_2] Message [java.lang.String]
from Actor[akka://SupervisionSystem/user/AllForOneSupervisorActor#1766273084]
to Actor[akka://SupervisionSystem/user/AllForOneSupervisorActor/lifecycleactor_2#-2073614826]
was not delivered. [3] dead letters encountered. This logging can be turned off or
adjusted with configuration settings ‘akka.log-dead-letters’ and ‘akka.log-dead-letters-during-shutdown’.

There are couple of things to note here

  • ALL the child actors are restarted
  • ALL the child actors are stopped, which is why we only see 0 actors out of 3 in the final “SoundOff” message sent to all the supervisors children

 

 

 

Actor LifeCycle

There is no better way of understanding the Actor life cycles that Akka has then to examine this image taken from the Akka documentation.

The Akka documentation is actually very very good, its just there is a lot of it, and I am hoping this series of posts will be a bit lighter to digest, and shall concentrate on the most common parts of Akka usage. The official docs are obviously the place to go should you have a need for some deep Akka related question.

Anyway the Actor lifecycle is as follows:

 

image

This diagram is ace if you ask me, from here you can see exactly what happens when, and the different states, and what is available within each state.

There is not much more I can add to that diagram, we can however take some of the concepts in this diagram for a little spin.

Lets assume we have the following Actor code

import akka.actor.Actor

class LifeCycleActor extends Actor {
  println("LifeCycleActor: constructor")

  override def preStart { println("LifeCycleActor: preStart") }

  override def postStop { println("LifeCycleActor: postStop") }

  override def preRestart(reason: Throwable, message: Option[Any]) {
    println("LifeCycleActor: preRestart")
    println(s"LifeCycleActor reason: ${reason.getMessage}")
    println(s"LifeCycleActor message: ${message.getOrElse("")}")
    super.preRestart(reason, message)
  }
  override def postRestart(reason: Throwable) {
    println("LifeCycleActor: postRestart")
    println(s"LifeCycleActor reason: ${reason.getMessage}")
    super.postRestart(reason)
  }
  def receive = {
    case RestartMessage => throw new Exception("RestartMessage seen")
    case _ => println("LifeCycleActor : got a message")
  }
}

It can be seen that we have several overrides of the underlying Akka Actor implemented. Lets discuss each of them in turn

preStart(): Unit

This is called by the Akka system for us, and simple allows us to carry out any presStart activity that we may wish to perform

preStop(): Unit

This is called by the Akka system for us, and simple allows us to carry out any presStop activity that we may wish to perform

preRestart(reason: Throwable, message: Option[Any]): Unit

This is called by the Akka system for us, and allows your code to examine both the reason and the message for the restart

postRestart(reason: Throwable): Unit

This is called by the Akka system for us, and allows your code to examine both the reason for the restartThis is called by the Akka system for us, and allows your code to examine the reason for the restart

 

Now let us assume that we also have the following code to exercise the LifeCycleActor code above

import akka.actor._
import scala.language.postfixOps
import scala.io.StdIn

object Demo extends App {

  //create the actor system
  val system = ActorSystem("LifeCycleSystem")

  // default Actor constructor
  val lifeCycleActor = system.actorOf(Props[LifeCycleActor], name = "lifecycleactor")


  println("sending lifeCycleActor a number")
  lifeCycleActor ! 100
  Thread.sleep(1000)

  println("force restart")
  lifeCycleActor ! RestartMessage
  Thread.sleep(1000)

  println("stop lifeCycleActor")
  system.stop(lifeCycleActor)

  //shutdown the actor system
  system.terminate()

  StdIn.readLine()
}

Here is what you might expect to get printed out to the console output

“C:\Program Files\Java\jdk1.8.0_51\bin\java” -Didea.launcher.port=7532 “-Didea.launcher.bin.path=C:\Program Files (x86)\JetBrains\IntelliJ IDEA Community Edition 15.0.1\bin” -Dfile.encoding=UTF-8 -classpath “C:\Program Files\Java\jdk1.8.0_51\jre\lib\charsets.jar;C:\Program Files\Java\jdk1.8.0_51\jre\lib\deploy.jar;C:\Program Files\Java\jdk1.8.0_51\jre\lib\ext\access-bridge-64.jar;C:\Program Files\Java\jdk1.8.0_51\jre\lib\ext\cldrdata.jar;C:\Program Files\Java\jdk1.8.0_51\jre\lib\ext\dnsns.jar;C:\Program Files\Java\jdk1.8.0_51\jre\lib\ext\jaccess.jar;C:\Program Files\Java\jdk1.8.0_51\jre\lib\ext\jfxrt.jar;C:\Program Files\Java\jdk1.8.0_51\jre\lib\ext\localedata.jar;C:\Program Files\Java\jdk1.8.0_51\jre\lib\ext\nashorn.jar;C:\Program Files\Java\jdk1.8.0_51\jre\lib\ext\sunec.jar;C:\Program Files\Java\jdk1.8.0_51\jre\lib\ext\sunjce_provider.jar;C:\Program Files\Java\jdk1.8.0_51\jre\lib\ext\sunmscapi.jar;C:\Program Files\Java\jdk1.8.0_51\jre\lib\ext\sunpkcs11.jar;C:\Program Files\Java\jdk1.8.0_51\jre\lib\ext\zipfs.jar;C:\Program Files\Java\jdk1.8.0_51\jre\lib\javaws.jar;C:\Program Files\Java\jdk1.8.0_51\jre\lib\jce.jar;C:\Program Files\Java\jdk1.8.0_51\jre\lib\jfr.jar;C:\Program Files\Java\jdk1.8.0_51\jre\lib\jfxswt.jar;C:\Program Files\Java\jdk1.8.0_51\jre\lib\jsse.jar;C:\Program Files\Java\jdk1.8.0_51\jre\lib\management-agent.jar;C:\Program Files\Java\jdk1.8.0_51\jre\lib\plugin.jar;C:\Program Files\Java\jdk1.8.0_51\jre\lib\resources.jar;C:\Program Files\Java\jdk1.8.0_51\jre\lib\rt.jar;C:\Users\sacha\Desktop\SachaBarber.AkkaExamples\Lifecyles\target\scala-2.11\classes;C:\Users\sacha\.ivy2\cache\com.typesafe\config\bundles\config-1.3.0.jar;C:\Users\sacha\.ivy2\cache\com.typesafe.akka\akka-actor_2.11\jars\akka-actor_2.11-2.4.8.jar;C:\Users\sacha\.ivy2\cache\joda-time\joda-time\jars\joda-time-2.9.4.jar;C:\Users\sacha\.ivy2\cache\org.scala-lang\scala-library\jars\scala-library-2.11.8.jar;C:\Users\sacha\.ivy2\cache\org.scala-lang.modules\scala-java8-compat_2.11\bundles\scala-java8-compat_2.11-0.7.0.jar;C:\Program Files (x86)\JetBrains\IntelliJ IDEA Community Edition 15.0.1\lib\idea_rt.jar” com.intellij.rt.execution.application.AppMain Demo
sending lifeCycleActor a number
LifeCycleActor: constructor
LifeCycleActor: preStart
LifeCycleActor : got a message
force restart
LifeCycleActor: preRestart
LifeCycleActor reason: RestartMessage seen
LifeCycleActor message: RestartMessage
LifeCycleActor: postStop
[ERROR] [07/27/2016 07:21:08.232] [LifeCycleSystem-akka.actor.default-dispatcher-4] [akka://LifeCycleSystem/user/lifecycleactor] RestartMessage seen
java.lang.Exception: RestartMessage seen
    at LifeCycleActor$$anonfun$receive$1.applyOrElse(LifeCycleActor.scala:22)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:484)
    at LifeCycleActor.aroundReceive(LifeCycleActor.scala:3)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
    at akka.actor.ActorCell.invoke(ActorCell.scala:495)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
    at akka.dispatch.Mailbox.run(Mailbox.scala:224)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

LifeCycleActor: constructor
LifeCycleActor: postRestart
LifeCycleActor reason: RestartMessage seen
LifeCycleActor: preStart
stop lifeCycleActor
LifeCycleActor: postStop

 

As you can see you have all the pertinent information that you would need to make logic decisions on within the Actor.

 

Where Is The Code?

As previously stated all the code for this series will end up in this GitHub repo:

https://github.com/sachabarber/SachaBarber.AkkaExamples