akka : persistent actors

In this post we will look at some of the core concepts around persistent actors. Which may seem strange since so far I have been saying that Akka is great cos it is lock free and each actor is stateless.

Truth is there are times where some sort immutable state, just can’t be avoided, and for that Akka provides us with the PersistentActor

 

Event Sourcing

Akka persistence does borrow a couple of ideas from other concepts out there in the wild, such as

  • Event sourcing
  • Possibly CQRS
  • Snapshots

if you have not heard of event sourcing before, it is fairly simple to explain. The idea is that you have a data structure that is able to receive events to set its internal state. At the same time you also have an event store that stores events in reverse chronological order.

These events are then played onto the object that accepts them, whereby the object will build its own internal state from the events played on to it.

The eagle eyed amongst you may be thinking won’t that lead to loads of events? Well yes it might.

There is however an optimization around this called “snapshots”. snapshots are a special type of event where they capture the object state as it is right now.

So in reality you would apply the latest snapshot, and then take any events that occurred after that and apply them, which would drastically reduce the amount of event play back that would need to occur against an object that would like to receive events from the event store.

I have written a fairly well received article about this before for the .NET space which also included working code and also includes the CQRS (Command Query Responsibility Segregation) part of it too. If you would like to know more about that you can read about it here:

http://www.codeproject.com/Articles/991648/CQRS-A-Cross-Examination-Of-How-It-Works

Anyway I am kind of drifting of topic a bit here, the point is that Akka persistence borrows some of these ideas from other concepts/frameworks, so it may be of some use to have a read around some of it, in particular event sourcing and CQRS.

Dependencies

In order to work with Akka persistence you will need the following 2 SBT entries in your build.sbt file

"com.typesafe.akka" %% "akka-actor" % "2.4.8",
"com.typesafe.akka" %% "akka-persistence" % "2.4.8

Storage

Akka persistence comes with a couple of pre-canned persistence storage mechanisms, and there are many more community based storage frameworks that you can use.

Within this article I have chosen to use the Akka provided storage mechanism which is LevelDB.

In order to do this, you will need the following SBT entries in your build.sbt file

"org.iq80.leveldb"            % "leveldb"          % "0.7",
"org.fusesource.leveldbjni"   % "leveldbjni-all"   % "1.8"

You can read more about storage in general at the official Akka docs page

http://doc.akka.io/docs/akka/snapshot/scala/persistence.html#Storage_plugins

Config

As with most things in Akka you are able to configure things either in code (by way of overriding things) or by configuration. I personally prefer configuration. So in order to get the persistence to work, one must provide the following configuration

akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"

akka.persistence.journal.leveldb.dir = "target/example/journal"
akka.persistence.snapshot-store.local.dir = "target/example/snapshots"

# DO NOT USE THIS IN PRODUCTION !!!
# See also https://github.com/typesafehub/activator/issues/287
akka.persistence.journal.leveldb.native = false

For me this is in an Resources/application.conf file

PersistentActor Trait

Ok so getting back on track. How do we create a persistence actor in Akka.

Well out of the box Akka provides the PersistentActor trait, which you can mix in. This trait looks like this

trait PersistentActor extends Eventsourced with PersistenceIdentity {
  def receive = receiveCommand
}

There are several things that are of interest here

  • There is an expectation there will be a receiveCommand implementation (this is within the EventSourced trait)
  • That the base trait is called EventSourced is also interesting

I personally find the fact that there is an expectation of a command and that we end up mixing in a trait that uses the name “EventSourced” to be quite a strong indicator of just how similar working with Akka persistence is to CQRS + traditional event sourcing ideas.

If we were to go further down the rabbit hole and keep looking into the base trait EventSourced we would see a couple more abstract methods that are of interest that are expected to be supplied by the end users code:


  /**
   * Recovery handler that receives persisted events during recovery. If a state snapshot
   * has been captured and saved, this handler will receive a [[SnapshotOffer]] message
   * followed by events that are younger than the offered snapshot.
   *
   * This handler must not have side-effects other than changing persistent actor state i.e. it
   * should not perform actions that may fail, such as interacting with external services,
   * for example.
   *
   * If there is a problem with recovering the state of the actor from the journal, the error
   * will be logged and the actor will be stopped.
   *
   * @see [[Recovery]]
   */
  def receiveRecover: Receive

  /**
   * Command handler. Typically validates commands against current state (and/or by
   * communication with other actors). On successful validation, one or more events are
   * derived from a command and these events are then persisted by calling `persist`.
   */
  def receiveCommand: Receive

As stated these methods are abstract methods that YOUR code would need to supply to make the persistence actor stuff work properly. We will see more of this in a bit

Persistent Id

One of the rules you must follow when using persistent actors is that the persistent actor MUST have the same ID, even including across incarnations. This can be set using the persistenceId method as shown below

override def persistenceId = "demo-persistent-actor-1"

Snapshots

As I stated earlier snapshots can reduce the amount of extra events that need to replayed against the event source target (the persistent actor).

In order to save a snapshot the actor may call the saveSnapshot method.

  • If the snapshot succeeds the actor will receive a SaveSnapshotSuccess message
  • If the snapshot succeeds the actor will receive a SaveSnapshotFailure message
var state: Any = _
 
override def receiveCommand: Receive = {
  case "snap"                                => saveSnapshot(state)
  case SaveSnapshotSuccess(metadata)         => // ...
  case SaveSnapshotFailure(metadata, reason) => // ...
}

Where the metadata looks like this

final case class SnapshotMetadata(persistenceId: String, sequenceNr: Long, timestamp: Long = 0L)

During the recovery process the persistent actor is offer a snapshotOffer from which it may restore its internal state.

After the snapshotOffer will come the newer (younger in Akka speak) events, which when replayed onto the persistent actor will get it to its final internal state.

 

Failure/Recovery

Special care must be taken when shutting down persistent actors from outside. For non persistence actor a PoisonPill may be used. This is not recommended for persistence actors due to how the commands are stashed until such a time as the journaling mechanism signals that things are stored. At which time the mailbox is drained.  A better way is to use explicit shutdown messages

Read more about this here :

http://doc.akka.io/docs/akka/snapshot/scala/persistence.html#Safely_shutting_down_persistent_actors

 

Persisting state

So we have talked about event/commands/CQRS/event sourcing/snapshots, but so far we have not talked about how to actually save state. How is this done?

Well as luck would have it, its very easy we simply call the persist method, which looks like this

def persist[A](event: A)(handler: A ⇒ Unit): Unit 

Demo Time

Ok so have now gone through most of the bits you would need to work with persistent actors. Time for a demo

Lets assume we have the following commands that will send to a single persistent actor

case class Cmd(data: String)

And this type of event that we would like to store

case class Evt(data: String)

Where we would hold this type of state within the persistent actor

case class ExampleState(events: List[String] = Nil) {
  def updated(evt: Evt): ExampleState = copy(evt.data :: events)
  def size: Int = events.length
  override def toString: String = events.reverse.toString
}

And that the actual persistent actor looks like this

import akka.actor._
import akka.persistence._



class DemoPersistentActor extends PersistentActor {

  //note : This is  mutable
  var state = ExampleState()

  def updateState(event: Evt): Unit =
    state = state.updated(event)

  def numEvents =
    state.size

  val receiveRecover: Receive = {
    case evt: Evt => updateState(evt)
    case SnapshotOffer(_, snapshot: ExampleState) => {
        println(s"offered state = $snapshot")
        state = snapshot
    }
  }

  val receiveCommand: Receive = {
    case Cmd(data) =>
      persist(Evt(s"${data}-${numEvents}"))(updateState)
      persist(Evt(s"${data}-${numEvents + 1}")) { event =>
        updateState(event)
        context.system.eventStream.publish(event)
      }
    case "snap"  => saveSnapshot(state)
    case SaveSnapshotSuccess(metadata) =>
      println(s"SaveSnapshotSuccess(metadata) :  metadata=$metadata")
    case SaveSnapshotFailure(metadata, reason) =>
      println("""SaveSnapshotFailure(metadata, reason) :
        metadata=$metadata, reason=$reason""")
    case "print" => println(state)
    case "boom"  => throw new Exception("boom")
  }

  override def persistenceId = "demo-persistent-actor-1"
}

Which we could first run using this demo code:

import akka.actor._
import scala.language.postfixOps
import scala.io.StdIn

object Demo extends App {

  //create the actor system
  val system = ActorSystem("PersitenceSystem")

  val persistentActor =
    system.actorOf(Props(classOf[DemoPersistentActor]),
      "demo-persistent-actor-1")

  persistentActor ! "print"
  persistentActor ! Cmd("foo")
  persistentActor ! Cmd("baz")
  persistentActor ! "boom"
  persistentActor ! Cmd("bar")
  persistentActor ! "snap"
  persistentActor ! Cmd("buzz")
  persistentActor ! "print"


  StdIn.readLine()

  //shutdown the actor system
  system.terminate()

  StdIn.readLine()
}

Which gives the following output (your output may look a little different to mine as I have run this code a number of times so have previous runs state on disk)

offered state = List(foo-0, foo-1, baz-2, baz-3, bar-4, bar-5, buzz-6, buzz-7, foo-8,
foo-9, baz-10, baz-11, bar-12, bar-13, buzz-14, buzz-15, foo-16, foo-17, baz-18, baz-19, bar-20, bar-21)
List(foo-0, foo-1, baz-2, baz-3, bar-4, bar-5, buzz-6, buzz-7, foo-8, foo-9, baz-10, baz-11, bar-12,
bar-13, buzz-14, buzz-15, foo-16, foo-17, baz-18, baz-19, bar-20, bar-21, buzz-22, buzz-23)
[ERROR] [08/16/2016 18:43:21.475] [PersitenceSystem-akka.actor.default-dispatcher-5]
[akka://PersitenceSystem/user/demo-persistent-actor-1] boom
java.lang.Exception: boom
    at DemoPersistentActor$$anonfun$2.applyOrElse(DemoPersistentActor.scala:39)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:484)
    at DemoPersistentActor.akka$persistence$Eventsourced$$super$aroundReceive(DemoPersistentActor.scala:6)
    at akka.persistence.Eventsourced$$anon$1.stateReceive(Eventsourced.scala:657)
    at akka.persistence.Eventsourced$class.aroundReceive(Eventsourced.scala:182)
    at DemoPersistentActor.aroundReceive(DemoPersistentActor.scala:6)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
    at akka.actor.ActorCell.invoke(ActorCell.scala:495)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
    at akka.dispatch.Mailbox.run(Mailbox.scala:224)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

offered state = List(foo-0, foo-1, baz-2, baz-3, bar-4, bar-5, buzz-6, buzz-7, foo-8, foo-9,
baz-10, baz-11, bar-12, bar-13, buzz-14, buzz-15, foo-16, foo-17, baz-18, baz-19, bar-20, bar-21)
[WARN] [08/16/2016 18:43:21.494] [PersitenceSystem-akka.persistence.dispatchers.default-stream-dispatcher-8]
[akka.serialization.Serialization(akka://PersitenceSystem)] Using the default Java serializer for class
[ExampleState] which is not recommended because of performance implications. Use another serializer or
disable this warning using the setting ‘akka.actor.warn-about-java-serializer-usage’
List(foo-0, foo-1, baz-2, baz-3, bar-4, bar-5, buzz-6, buzz-7, foo-8, foo-9, baz-10, baz-11, bar-12,
bar-13, buzz-14, buzz-15, foo-16, foo-17, baz-18, baz-19, bar-20, bar-21, buzz-22, buzz-23, foo-24, foo-25,
baz-26, baz-27, bar-28, bar-29, buzz-30, buzz-31)
SaveSnapshotSuccess(metadata) :  metadata=SnapshotMetadata(demo-persistent-actor-1,33,1471369401491)

And then if we were to run this demo code:

import akka.actor._
import scala.language.postfixOps
import scala.io.StdIn

object Demo extends App {

  //create the actor system
  val system = ActorSystem("PersitenceSystem")

  val persistentActor =
    system.actorOf(Props(classOf[DemoPersistentActor]),
      "demo-persistent-actor-1")

  persistentActor ! "print"


  StdIn.readLine()

  //shutdown the actor system
  system.terminate()

  StdIn.readLine()
}

 

Lets see what we can see:

offered state = List(foo-0, foo-1, baz-2, baz-3, bar-4, bar-5, buzz-6, buzz-7,
foo-8, foo-9, baz-10, baz-11, bar-12,
bar-13, buzz-14, buzz-15, foo-16, foo-17, baz-18, baz-19, bar-20, bar-21, buzz-22, buzz-23,
foo-24, foo-25, baz-26, baz-27, bar-28, bar-29)

List(foo-0, foo-1, baz-2, baz-3, bar-4, bar-5, buzz-6, buzz-7, foo-8, foo-9, baz-10, baz-11,
bar-12, bar-13, buzz-14, buzz-15, foo-16, foo-17, baz-18, baz-19, bar-20, bar-21, buzz-22,
buzz-23, foo-24, foo-25, baz-26, baz-27, bar-28, bar-29, buzz-30, buzz-31)

It can be seen that although we did not save any new events the state of the demo persistent actor was restored correctly using a combination of a snapshot and events that are newer than the snapshot

 

 

Where Is The Code?

As previously stated all the code for this series will end up in this GitHub repo:

https://github.com/sachabarber/SachaBarber.AkkaExamples

akka dead letters and how to monitor for them

So we are a little way into this mini series on Akka now.

We have covered a few of the fundamental topics of Akka such as

  • Supervision
  • Mailboxes
  • Logging

When we talked about mailboxes we discussed the fact that there were queues involves that are used as mailboxes.

Generally speaking where there are queues concerned there is also the possibility of dead letters.

In this post we will talk about what ‘dead letters’ are within Akka, and also look at how you can monitor for dead letters

What Are Dead Letters?

In Akka messages that can’t be delivered are routed to a synthetic actor which has the path “/deadLetters”. This is for NON transport lost messages.

Akka makes no guarantees for lost messages at the transport layer.

How Can You Monitor For Them?

Within Akka there is a concept of a event bus. That is a bus that Akka uses internally to send messages. You can also use this event bus for other purposes, such as publishing/subscribing to certain types of messages.

It is kind of like a topic based subscription system.

We can use this event bus to monitor for dead letters.

It is also worth noting that the event bus can be used a general pub/sub mechanism, you can read more about it here : Akka Event Bus

But for now let’s get back to the problem in hand which is how do we monitor for dead letters?

So let’s start out with this actor code

import akka.actor.Actor
import akka.event.Logging

class LifeCycleActor extends Actor {
  val log = Logging(context.system, this)

  log.info("LifeCycleActor: constructor")

  override def preStart { log.info("LifeCycleActor: preStart") }

  override def postStop { log.info("LifeCycleActor: postStop") }

  override def preRestart(reason: Throwable, message: Option[Any]) {
    log.info("LifeCycleActor: preRestart")
    log.info(s"LifeCycleActor reason: ${reason.getMessage}")
    log.info(s"LifeCycleActor message: ${message.getOrElse("")}")
    super.preRestart(reason, message)
  }
  override def postRestart(reason: Throwable) {
    log.info("LifeCycleActor: postRestart")
    log.info(s"LifeCycleActor reason: ${reason.getMessage}")
    super.postRestart(reason)
  }
  def receive = {
    case RestartMessage => throw new Exception("RestartMessage seen")
    case _ => log.info("LifeCycleActor : got a message")
  }
}

We will then use this actor which is setup to receive dead letters

import akka.actor.{DeadLetter, Actor}

class DeadLetterMonitorActor 
  extends Actor 
  with akka.actor.ActorLogging {
  log.info("DeadLetterMonitorActor: constructor")

  def receive = {
    case d: DeadLetter => {
      log.error(s"DeadLetterMonitorActor : saw dead letter $d")
    }
    case _ => log.info("DeadLetterMonitorActor : got a message")
  }
}

We then use this demo code

import akka.actor._
import scala.language.postfixOps
import scala.io.StdIn
import akka.event.Logging

class Runner {

  def Run(): Unit = {

    //create the actor system
    val system = ActorSystem("LifeCycleSystem")

    // default Actor constructor
    val lifeCycleActor =
      system.actorOf(Props[LifeCycleActor],
        name = "lifecycleactor")

    val deadLetterMonitorActor =
      system.actorOf(Props[DeadLetterMonitorActor],
        name = "deadlettermonitoractor")

    //subscribe to system wide event bus 'DeadLetter'
    system.eventStream.subscribe(
      deadLetterMonitorActor, classOf[DeadLetter])

    val log = Logging(system, classOf[Runner])
    log.debug("Runner IS UP BABY")

    log.debug("sending lifeCycleActor a few numbers")
    lifeCycleActor ! 100
    lifeCycleActor ! 200
    Thread.sleep(1000)

    log.debug("sending lifeCycleActor a poison pill (kill it)")
    lifeCycleActor ! PoisonPill
    Thread.sleep(1000)
    log.debug("sending lifeCycleActor a few numbers")
    lifeCycleActor ! 100
    lifeCycleActor ! 200


    log.debug("stop lifeCycleActor/deadLetterMonitorActor")
    system.stop(lifeCycleActor)
    system.stop(deadLetterMonitorActor)

    //shutdown the actor system
    log.debug("stop actor system")
    system.terminate()

    StdIn.readLine()
  }
}

To to do the following

  • Create the 2 actors above
  • Subscribe the DeadLetterMonitorActor (above) to the “DeadLetter” topic of the inbuilt akka EventStream (internal pub/sub bus)
  • We then send a few messages to the LifeCycleActor
  • Then send the LifeCylceActor a PoisonPill (Which terminates it)
  • We then send a few more messages to the LifeCycleActor (which it doesn’t get cos its dead like)
  • We expect the DeadLetterMonitorActor to receive the DeadLetter messages and log these DeadLetter messages

If we run these bits of code this is the result, where it can plainly be seen that the DeadLetterMonitorActor does receive the DeadLetter messages

INFO [LifeCycleSystem-akka.actor.default-dispatcher-2] Slf4jLogger – Slf4jLogger started
16:55:49.546UTC DEBUG[LifeCycleSystem-akka.actor.default-dispatcher-2] EventStream – logger log1-Slf4jLogger started
16:55:49.547UTC DEBUG[LifeCycleSystem-akka.actor.default-dispatcher-2] EventStream – Default Loggers started
16:55:49.559UTC INFO [LifeCycleSystem-akka.actor.default-dispatcher-2] LifeCycleActor – LifeCycleActor: constructor
16:55:49.560UTC INFO [LifeCycleSystem-akka.actor.default-dispatcher-2] LifeCycleActor – LifeCycleActor: preStart
16:55:49.560UTC DEBUG[LifeCycleSystem-akka.actor.default-dispatcher-2] Runner – Runner IS UP BABY
16:55:49.561UTC DEBUG[LifeCycleSystem-akka.actor.default-dispatcher-2] Runner – sending lifeCycleActor a few numbers
16:55:49.561UTC INFO [LifeCycleSystem-akka.actor.default-dispatcher-2] DeadLetterMonitorActor – DeadLetterMonitorActor: constructor
16:55:49.563UTC INFO [LifeCycleSystem-akka.actor.default-dispatcher-2] LifeCycleActor – LifeCycleActor : got a message
16:55:49.563UTC INFO [LifeCycleSystem-akka.actor.default-dispatcher-2] LifeCycleActor – LifeCycleActor : got a message
16:55:50.561UTC DEBUG[LifeCycleSystem-akka.actor.default-dispatcher-2] Runner – sending lifeCycleActor a poison pill (kill it)
16:55:50.576UTC INFO [LifeCycleSystem-akka.actor.default-dispatcher-9] LifeCycleActor – LifeCycleActor: postStop
16:55:51.565UTC DEBUG[LifeCycleSystem-akka.actor.default-dispatcher-9] Runner – sending lifeCycleActor a few numbers
16:55:51.570UTC DEBUG[LifeCycleSystem-akka.actor.default-dispatcher-2] Runner – stop lifeCycleActor/deadLetterMonitorActor
16:55:51.573UTC DEBUG[LifeCycleSystem-akka.actor.default-dispatcher-7] Runner – stop actor system
16:55:51.581UTC INFO [LifeCycleSystem-akka.actor.default-dispatcher-9] RepointableActorRef – Message [java.lang.Integer]
    from Actor[akka://LifeCycleSystem/deadLetters] to Actor[akka://LifeCycleSystem/user/lifecycleactor#-912643257]
    was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration
    settings ‘akka.log-dead-letters’ and ‘akka.log-dead-letters-during-shutdown’.
16:55:51.581UTC INFO [LifeCycleSystem-akka.actor.default-dispatcher-9] RepointableActorRef – Message [java.lang.Integer]
    from Actor[akka://LifeCycleSystem/deadLetters] to Actor[akka://LifeCycleSystem/user/lifecycleactor#-912643257]
    was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration
    settings ‘akka.log-dead-letters’ and ‘akka.log-dead-letters-during-shutdown’.
16:55:51.588UTC ERROR[LifeCycleSystem-akka.actor.default-dispatcher-8] DeadLetterMonitorActor – DeadLetterMonitorActor :
    saw dead letter DeadLetter(100,Actor[akka://LifeCycleSystem/deadLetters],Actor[akka://LifeCycleSystem/user/lifecycleactor#-912643257])
16:55:51.590UTC ERROR[LifeCycleSystem-akka.actor.default-dispatcher-8] DeadLetterMonitorActor – DeadLetterMonitorActor :
    saw dead letter DeadLetter(200,Actor[akka://LifeCycleSystem/deadLetters],Actor[akka://LifeCycleSystem/user/lifecycleactor#-912643257])
16:55:51.609UTC INFO [LifeCycleSystem-akka.actor.default-dispatcher-6] LocalActorRef – Message [akka.actor.StopChild]
    from Actor[akka://LifeCycleSystem/deadLetters] to Actor[akka://LifeCycleSystem/user] was not delivered. [3] dead letters encountered.
    This logging can be turned off or adjusted with configuration settings ‘akka.log-dead-letters’ and ‘akka.log-dead-letters-during-shutdown’.
16:55:51.616UTC DEBUG[LifeCycleSystem-akka.actor.default-dispatcher-6] EventStream – shutting down: StandardOutLogger started

 

 

NOTE : According to the official Akka docs “Dead letters are not propagated over the network, if you want to collect them in one place you will have to subscribe one actor per network node and forward them manually.”

Are All Messages In DeadLetters A Problem

No, not all of the messages that end up in the deadLetters synthetic actor are problems. For example suppose an actor is sent several Terminate messages. Only one of these will take effect, the others will likely end up in deadLetters, but this is not a concern.

The deadLetters actor and the monitoring of it is more of a debugging aid than anything else. You will have to use some modicum of common sense when examining the results of the deadLetters logging, which we discussed above

Where Is The Code?

As previously stated all the code for this series will end up in this GitHub repo:

https://github.com/sachabarber/SachaBarber.AkkaExamples

akka logging

This is likely to be a smaller one of the series, but just because it is small in size doesn’t mean that it is not mighty.

Every app needs logging, and when working with a distributed set of actors this is crucial.

Akka provides 2 types of logging adaptor out of the box

  • Console
  • SLF4J (where you need the appropriate back end for this which is usually Logback)

It also has various configuration sections that allow you to adjust the following elements of Akka

  • Akka setup messages
  • Receive of messages
  • All actor lifecycle messages
  • Finite state machine messages
  • EventStream subscription messages
  • Remoting messages

Before we look at how you can customize the logging to capture all this good stuff lets first see what steps you need to setup basic logging in Akka

Step1 : Grab the JARs

There are a couple of JARs you will need to perform logging in Akka. These are shown below

See Built.sbt

name := "HelloWorld"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies ++= Seq(
  "com.typesafe.akka" % "akka-actor_2.11" % "2.4.8",
  "ch.qos.logback" % "logback-classic" % "1.1.7",
  "com.typesafe.akka" % "akka-slf4j_2.11" % "2.4.8")

Step2 : application.conf

You must then configure how Akka will log its entries. This is done in an configuration file, I have decided to call mine “application.conf”

See resources/application.conf

akka {
  # event-handlers = ["akka.event.slf4j.Slf4jEventHandler"]
  loggers = ["akka.event.slf4j.Slf4jLogger"]
  loglevel = "DEBUG"
}

Step3 : logback.xml

For the SL4J logging to work we need to configure logback. This is typically done with a configuration file called “logback.xml”.

An example of this may look like this

See “resources/logback.xml”

<?xml version="1.0" encoding="UTF-8"?>
<configuration>

    <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
        <target>System.out</target>
        <encoder>
            <pattern>%X{akkaTimestamp} %-5level[%thread] %logger{0} - %msg%n</pattern>
        </encoder>
    </appender>

    <appender name="FILE" class="ch.qos.logback.core.FileAppender">
        <file>c:/logs/akka.log</file>
        <append>true</append>
        <encoder>
            <pattern>%date{yyyy-MM-dd} %X{akkaTimestamp} %-5level[%thread] %logger{1} - %msg%n</pattern>
        </encoder>
    </appender>

    <logger name="akka" level="DEBUG" />

    <root level="DEBUG">
        <appender-ref ref="CONSOLE"/>
        <appender-ref ref="FILE"/>
    </root>

</configuration>

Step4 : Log Some Stuff

Once you have the above steps completed. You have 2 choices about how to consume the logging

Using The LoggingAdaptor.Apply

You are able to use the LoggingAdaptor.Apply to create a new log that you may use. Here is an example

import akka.actor._
import scala.language.postfixOps
import scala.io.StdIn
import akka.event.Logging

class Runner {

  def Run(): Unit = {

    //create the actor system
    val system = ActorSystem("LifeCycleSystem")

    val log = Logging(system, classOf[Runner])
    log.debug("Runner IS UP BABY")
    ...

    StdIn.readLine()
  }
}

Using Logging Trait

To make this process easier Akka also provides a trait that can be mixed in called “akka.actor.ActorLogging”. This can be mixed in wherever you require logging

import akka.actor.Actor
import akka.event.Logging

class LifeCycleActorWithLoggingTrait extends Actor with akka.actor.ActorLogging {

  log.info("LifeCycleActor: constructor")

  .....
}
 

Async Or NOT

Some of the more experiences JVM/Scala devs amongst you may think heck I can just use my own logging, I don’t need the Akka trait or LoggingAdaptor.

The thing is if you use the Akka trait or LogginAdaptor they are setup to log asynchronously and not introduce any time delays into the messaging pipeline when logging.

So just be warned that you should probably use the inbuilt Akka stuff rather than roll your own. Logging to things like an ELK stack may be the exception.

Common Akka Configuration Around Logging

These configuration sections are useful for controlling what is logged

General Log Level

akka {
  # general logging level
  loglevel = DEBUG
}

Log Akka Config At Start

akka {
  # Log the complete configuration at INFO level when the actor system is started.
  # This is useful when you are uncertain of what configuration is used.
  log-config-on-start = on
}

Actor Receive Messages

akka {
  debug {
    # enable function of LoggingReceive, which is to log any received message at
    # DEBUG level
    receive = on
  }
}

You can also monitor lifecycle events like this

akka {
  debug {
    # enable DEBUG logging of actor lifecycle changes
    lifecycle = on
  }
}

Remoting Logging

Firstly you can log the message the remote actor is sent by the transport layer

kka {
  remote {
    # If this is "on", Akka will log all outbound messages at DEBUG level, if off then they are not logged
    log-sent-messages = on
  }
}

You may also see what messages are received by the transport layer like this

akka {
  remote {
    # If this is "on", Akka will log all inbound messages at DEBUG level, if off then they are not logged
    log-received-messages = on
  }
}

Where Is The Code?

As previously stated all the code for this series will end up in this GitHub repo:

https://github.com/sachabarber/SachaBarber.AkkaExamples

akka mailboxes

This post will be a bit smaller than the ones we have just done, but none the less still just as important.

Let’ start by talking about mailboxes and dispatchers, and what exactly they are and how they relate to each other.

What’s A Mailbox?

In Akka the mailbox is some time of queue that holds the messages for an actor. There is usually a mailbox per actor. Though in some cases, where routing gets involved there may only be one mailbox between a number of actors, but for now lets assume a one to one relationship between mailboxes and actors.

What’s A Dispatcher

In Akka a Dispatcher is the heart of the actor system, and it is the thing that dispatches the messages.

There is a way that Akka actors may be configured to use a certain Dispatcher and the Dispatcher may in turn be configured to use a certain mailbox type.

Here is an example of how you might configure an actor to use a custom Dispatcher

You may have this code for an actor

import akka.actor.Props
val myActor =
  context.actorOf(Props[MyActor].withDispatcher("my-dispatcher"), "myactor1")

Where you may have this custom Dispatcher in your configuration of the system

my-dispatcher {

  # Type of mailbox to use for the Dispatcher
  mailbox-requirement = org.example.MyInterface

  # Dispatcher is the name of the event-based dispatcher
  type = Dispatcher

  # What kind of ExecutionService to use
  executor = "thread-pool-executor"

  # Configuration for the thread pool
  thread-pool-executor {

    # minimum number of threads to cap factor-based core number to
    core-pool-size-min = 2

    # No of core threads ... ceil(available processors * factor)
    core-pool-size-factor = 2.0

    # maximum number of threads to cap factor-based number to
    core-pool-size-max = 10
  }

  # Throughput defines the maximum number of messages to be
  # processed per actor before the thread jumps to the next actor.
  # Set to 1 for as fair as possible.
  throughput = 100
}

It can be seen above that we are able to configure the mailbox type for a Dispatcher in the configuration using the line

# Type of mailbox to use for the Dispatcher
mailbox-requirement = org.example.MyInterface

There are actually several inbuilt Dispatcher types that you may use when creating a custom Dispatcher.

Talking about Dispatch types and how they all work is kind of out of scope for what I wanted to talk about in this post though, so if you want to know more about Akka Dispatchers you should consult the official Akka documentation

http://doc.akka.io/docs/akka/snapshot/scala/dispatchers.html

Ok so now that we have taken that slight detour and talked about how you can associate a mailbox type with a custom Dispatcher should you want to let’s get back to the main thrust of this post, which is to talk about mailboxes.

As previously stated mailboxes represent a storage mechanism for an actors messages.

Built In Mailbox Types

Akka comes shipped with a number of mailbox implementations:

UnboundedMailbox – The default mailbox

  • Backed by a java.util.concurrent.ConcurrentLinkedQueue
  • Blocking: No
  • Bounded: No
  • Configuration name: “unbounded” or “akka.dispatch.UnboundedMailbox”

SingleConsumerOnlyUnboundedMailbox

  • Backed by a very efficient Multiple Producer Single Consumer queue, cannot be used with
  • BalancingDispatcher
  • Blocking: No
  • Bounded: No
  • Configuration name: “akka.dispatch.SingleConsumerOnlyUnboundedMailbox”

BoundedMailbox

  • Backed by a java.util.concurrent.LinkedBlockingQueue
  • Blocking: Yes
  • Bounded: Yes
  • Configuration name: “bounded” or “akka.dispatch.BoundedMailbox”

UnboundedPriorityMailbox

  • Backed by a java.util.concurrent.PriorityBlockingQueue
  • Blocking: Yes
  • Bounded: No
  • Configuration name: “akka.dispatch.UnboundedPriorityMailbox”

BoundedPriorityMailbox

  • Backed by a java.util.PriorityBlockingQueue wrapped in an akka.util.BoundedBlockingQueue
  • Blocking: Yes
  • Bounded: Yes
  • Configuration name: “akka.dispatch.BoundedPriorityMailbox”

 

Default Mailbox

As shown above the unbounded mailbox is the default. You can however swap out the default using the following configuration, though you will need to ensure that the chosen default mailbox is the correct one for the type of Dispatcher used. For example a SingleConsumerOnlyUnboundedMailbox can not be used with a BalancingDispatcher

Anyway this is how you would change the default mailbox in config

akka.actor.default-mailbox {
  mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
}

 

Mailbox Type For An Actor

It is possible to associate a particular type of mailbox with a particular type of an actor which can be done by mixing in the RequiresMessageQueue trait

import akka.dispatch.RequiresMessageQueue
import akka.dispatch.BoundedMessageQueueSemantics
 
class SomeActor extends Actor
  with RequiresMessageQueue[BoundedMessageQueueSemantics]

Where you would use the following configuration to configure the mailbox

bounded-mailbox {
  mailbox-type = "akka.dispatch.BoundedMailbox"
  mailbox-capacity = 1000
  mailbox-push-timeout-time = 10s
}
 
akka.actor.mailbox.requirements {
  "akka.dispatch.BoundedMessageQueueSemantics" = bounded-mailbox
}

It is worth noting that this setting could be overwritten by code or by a dispatcher mailbox configuration section

Where Is The Code?

As previously stated all the code for this series will end up in this GitHub repo:

https://github.com/sachabarber/SachaBarber.AkkaExamples

AKKA : hierarchies / lifecycles

Ok so last time we covered the basics of actors and the actor system (actor fabric) and covered how to send simple messages to actors.

This time we will talk about actor hierarchies (supervision) and also actor lifecycles.

But before we get into supervision and how it is used, we should just take a trip back to the introduction article in this series of posts.

The introductory post had the following bullet point

  • Supervisor hierarchies with “let-it-crash” semantics.

What does that actually mean. Well quite simply Akka embraces failure and treats it as an expected part of what could happen.

Doesn’t this sound quite dangerous? Well yes and no, the yes part could be that an actor was part way through processing some message. So logic would dictate that you should ensure that you messages are idempotent, and are safe to send again.

The no part is the interesting case, so you have an actor that dies part way through its job. Boo

Luckily this is Akka’s bread and butter, it has a strong concept of ownership/supervision where the supervisor would know about an underlings failure and would know how to remedy that. We will look at this next.

Hierarchies (supervision)

So far we have seen how to create our own little Akka systems. But what is really going on when we do that.

It turns out that the Akka fabric creates some top level Actors called “Guardians” which all user created guardians are “supervised” by.

This can be seen on this diagram (borrowed from the official Akka documentation)

image

Within Akka EVERY actor is “supervised” by another actor, up until the “Root Guardian”

There are 2 sub guardians under the root guardian.

User and System both of which play different roles

To stand on the should of giants here is what the Akka docs have to say about these 3 top level guardians

/: The Root Guardian
The root guardian is the grand-parent of all so-called “top-level” actors and supervises all the special actors mentioned in Top-Level Scopes for Actor Paths using the SupervisorStrategy.stoppingStrategy, whose purpose is to terminate the child upon any type of Exception. All other throwables will be escalated … but to whom? Since every real actor has a supervisor, the supervisor of the root guardian cannot be a real actor. And because this means that it is “outside of the bubble”, it is called the “bubble-walker”. This is a synthetic ActorRef which in effect stops its child upon the first sign of trouble and sets the actor system’s isTerminated status to true as soon as the root guardian is fully terminated (all children recursively stopped).

/system: The System Guardian
This special guardian has been introduced in order to achieve an orderly shut-down sequence where logging remains active while all normal actors terminate, even though logging itself is implemented using actors. This is realized by having the system guardian watch the user guardian and initiate its own shut-down upon reception of the Terminated message. The top-level system actors are supervised using a strategy which will restart indefinitely upon all types of Exception except for ActorInitializationException and ActorKilledException, which will terminate the child in question. All other throwables are escalated, which will shut down the whole actor system.

/user: The Guardian Actor
The actor which is probably most interacted with is the parent of all user-created actors, the guardian named “/user”. Actors created using system.actorOf() are children of this actor. This means that when this guardian terminates, all normal actors in the system will be shutdown, too. It also means that this guardian’s supervisor strategy determines how the top-level normal actors are supervised. Since Akka 2.1 it is possible to configure this using the setting akka.actor.guardian-supervisor-strategy, which takes the fully-qualified class-name of a SupervisorStrategyConfigurator. When the guardian escalates a failure, the root guardian’s response will be to terminate the guardian, which in effect will shut down the whole actor system.

See http://doc.akka.io/docs/akka/snapshot/general/supervision.html

Ok, so now we have established that there are top level guardians that take care of “supervising” any actors that are created programmatically, and that there is a guardian to monitor existing actors and log/manage their correct shutdown.

But can we created our own supervisors?

Well yes you can, in fact this is completely encouraged and a very very natural part of writing Akka code.

So how do we do that exactly?

It is actually quite simple, we have to follow a few steps

  1. Decide on what supervision strategy we wish to use (Akka has 2 of these, we will cover these in this article)
  2. Implement a Decider (again we will cover this in this article)

Let’s start with talking about these 2 points before we dive into any code

Supervision Strategy

As stated above it is completely possible to create our own supervisors.  There are only a couple of things that we need to look at before we dive into looking at some example code.

Firstly we need to understand what is meant by a supervision strategy. Quite simply a supervision strategy is a strategy that dictates what a supervisor will do when one of his underlings (the actors that he creates) throws an Exception

Akka comes with 2 inbuilt supervisor strategy

OneForOneStrategy which means that ONLY the child actor that raised the exception qualifies for some special treatment. The special treatment is called a directive, but more on this later

AllForOneStrategy which means that ALL of the children under the direct supervisor will have the supervision strategy directive applied to them

Decider

Within both of these strategies listed above, Akka has the concept of a decider. A decider is a PartialFunction that has the following type signature

type Decider = PartialFunction[Throwable, Directive]

Where PartialFunction is a trait that looks like this under the covers

PartialFunction[-A, +B] extends (A => B)

The idea here being that given a Throwable (exception) you must return a B. In Akka’s case the B in question would be a akka.actor.SupervisorStrategy.Directiver

A Directive will take one of 4 possible values

  • Resume
  • ReStart
  • Stop
  • Escalate

So what we are really saying when we talk about a decider is that it is a way for a supervisor to know what action to carry out when it sees a given Throwable (exception). This action will be applied to one (or more) of the supervsiors child(ren) depending on the type of supervision strategy used.

Akka actually comes with a default decider, which you can use directly should you want to, or you can create your own which you can augment with the default Akka one. In this post I will create my own and augment it with the default Akka one.

Let’s see an example of how a supervisor might use a decider and how it may create a supervision strategy.

import akka.actor.SupervisorStrategy.Directive
import akka.actor._

class AllForOneSupervisorActor extends Actor {
  

  val decider: PartialFunction[Throwable, Directive] = {
    case _: AkkaStopDirectiveException => 
	akka.actor.SupervisorStrategy.stop
    case _: AkkaRestartDirectiveException => 
	akka.actor.SupervisorStrategy.restart
  }

  override def supervisorStrategy: SupervisorStrategy =
    AllForOneStrategy()(decider.orElse(
	SupervisorStrategy.defaultStrategy.decider))
}

Now that we have seen how a supervisor creates a supervision strategy and how it makes use of a decider lets continue to look at an example of the 2 built in Akka supervision strategies

The Common Parts Of The Demo Code

Within this demo the following actor will be used as the child of the supervisors, we will just have multiple children ALL of this type of actor

import akka.actor.Actor

class LifeCycleActor extends Actor {
  println("LifeCycleActor: constructor")

  override def preStart { println("LifeCycleActor: preStart") }

  override def postStop { println("LifeCycleActor: postStop") }

  override def preRestart(reason: Throwable, message: Option[Any]) {
    println("LifeCycleActor: preRestart")
    println(s"LifeCycleActor reason: ${reason.getMessage}")
    println(s"LifeCycleActor message: ${message.getOrElse("")}")
    super.preRestart(reason, message)
  }

  override def postRestart(reason: Throwable) {
    println("LifeCycleActor: postRestart")
    println(s"LifeCycleActor reason: ${reason.getMessage}")
    super.postRestart(reason)
  }

  def receive = {

    case "SoundOff" =>
      println("LifeCycleActor: SoundOff seen")
      println(s"LifeCycleActor alive ${self.path.name}" )

    case RaiseStopThrowableMessage =>
      println("LifeCycleActor: RaiseStopThrowableMessage seen")
      throw new AkkaStopDirectiveException(
        "LifeCycleActor raised AkkaStopDirectiveException")

    case RaiseRestartThrowableMessage =>
      println("LifeCycleActor: RaiseRestartThrowableMessage seen")
      throw new AkkaRestartDirectiveException(
        "LifeCycleActor raised AkkaRestartDirectiveException")
  }
}

OneForOneCodeStrategy Example

Here is the code for the “OneForOne” supervisor actor.

There are a couple of things to not here.

  • We create child actors using the contextOf built in akka factors. This create children that are “supervised” by the actor whose context was used to create the child actors
  • We use a custom decider which uses the default akka decider as well
  • We use the “OneForOneStrategy” supervision strategy
import akka.actor.SupervisorStrategy.Directive
import akka.actor._

class OneForOneSupervisorActor extends Actor {
  println("OneForOneSupervisorActor: constructor")
  val lifeCycleChildrenActors = new Array[ActorRef](3)

  def receive = {
    case "StartChildren" =>
      println(s"OneForOneSupervisorActor : got a message StartChildren")

      for(i <- 0 to 2) {
        val child = context.actorOf(Props[LifeCycleActor], name = s"lifecycleactor_$i")
        lifeCycleChildrenActors(i) = child
      }

    case "MakeRandomChildCommitSuicide" =>
      println(s"OneForOneSupervisorActor : got a message MakeRandomChildCommitSuicide")
      lifeCycleChildrenActors(2) ! RaiseStopThrowableMessage

    case "MakeRandomChildRestart" =>
      println(s"OneForOneSupervisorActor : got a message MakeRandomChildRestart")
      lifeCycleChildrenActors(2) ! RaiseRestartThrowableMessage

    case "TellChildrenToSoundOff" =>
      println(s"OneForOneSupervisorActor : got a message TellChildrenToSoundOff")

      lifeCycleChildrenActors.foreach(x => x ! "SoundOff")
  }

  val decider: PartialFunction[Throwable, Directive] = {
    case _: AkkaStopDirectiveException => akka.actor.SupervisorStrategy.stop
    case _: AkkaRestartDirectiveException => akka.actor.SupervisorStrategy.restart
  }

  override def supervisorStrategy: SupervisorStrategy =
    OneForOneStrategy()(decider.orElse(SupervisorStrategy.defaultStrategy.decider))
}

When we run this one, we will fail a particular child, and ONLY that child will be effected by the supervision strategy thanks to the use of the “OneForOneStrategy”.

Let’s assume we have this demo code

val oneForOneSupervisorActor = system.actorOf(Props[OneForOneSupervisorActor], name = "OneForOneSupervisorActor")
println("sending oneForOneSupervisorActor a 'StartChildren' message")

oneForOneSupervisorActor ! "StartChildren"
Thread.sleep(1000)

oneForOneSupervisorActor ! "MakeRandomChildRestart"
Thread.sleep(1000)

oneForOneSupervisorActor ! "TellChildrenToSoundOff"
Thread.sleep(1000)

oneForOneSupervisorActor ! "MakeRandomChildCommitSuicide"
Thread.sleep(1000)

oneForOneSupervisorActor ! "TellChildrenToSoundOff"
Thread.sleep(1000)

system.stop(oneForOneSupervisorActor)

Here is the output of running the full demo code

sending oneForOneSupervisorActor a ‘StartChildren’ message
OneForOneSupervisorActor: constructor
OneForOneSupervisorActor : got a message StartChildren
LifeCycleActor: constructor
LifeCycleActor: constructor
LifeCycleActor: constructor
LifeCycleActor: preStart
LifeCycleActor: preStart
LifeCycleActor: preStart
OneForOneSupervisorActor : got a message MakeRandomChildRestart
LifeCycleActor: RaiseRestartThrowableMessage seen
LifeCycleActor: preRestart
LifeCycleActor reason: LifeCycleActor raised AkkaRestartDirectiveException
LifeCycleActor message: RaiseRestartThrowableMessage
LifeCycleActor: postStop
LifeCycleActor: constructor
LifeCycleActor: postRestart
LifeCycleActor reason: LifeCycleActor raised AkkaRestartDirectiveException
LifeCycleActor: preStart
[ERROR] [07/29/2016 07:06:28.569] [SupervisionSystem-akka.actor.default-dispatcher-2]
[akka://SupervisionSystem/user/OneForOneSupervisorActor/lifecycleactor_2]
LifeCycleActor raised AkkaRestartDirectiveException
AkkaRestartDirectiveException: LifeCycleActor raised AkkaRestartDirectiveException
    at LifeCycleActor$$anonfun$receive$1.applyOrElse(LifeCycleActor.scala:36)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:484)
    at LifeCycleActor.aroundReceive(LifeCycleActor.scala:3)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
    at akka.actor.ActorCell.invoke(ActorCell.scala:495)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
    at akka.dispatch.Mailbox.run(Mailbox.scala:224)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

OneForOneSupervisorActor : got a message TellChildrenToSoundOff
LifeCycleActor: SoundOff seen
LifeCycleActor alive lifecycleactor_0
LifeCycleActor: SoundOff seen
LifeCycleActor alive lifecycleactor_2
LifeCycleActor: SoundOff seen
LifeCycleActor alive lifecycleactor_1
OneForOneSupervisorActor : got a message MakeRandomChildCommitSuicide
LifeCycleActor: RaiseStopThrowableMessage seen
[ERROR] [07/29/2016 07:06:30.548] [SupervisionSystem-akka.actor.default-dispatcher-2]
[akka://SupervisionSystem/user/OneForOneSupervisorActor/lifecycleactor_2]
LifeCycleActor raised AkkaStopDirectiveException
AkkaStopDirectiveException: LifeCycleActor raised AkkaStopDirectiveException
    at LifeCycleActor$$anonfun$receive$1.applyOrElse(LifeCycleActor.scala:31)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:484)
    at LifeCycleActor.aroundReceive(LifeCycleActor.scala:3)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
    at akka.actor.ActorCell.invoke(ActorCell.scala:495)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
    at akka.dispatch.Mailbox.run(Mailbox.scala:224)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

LifeCycleActor: postStop
OneForOneSupervisorActor : got a message TellChildrenToSoundOff
LifeCycleActor: SoundOff seen
LifeCycleActor: SoundOff seen
LifeCycleActor alive lifecycleactor_1
LifeCycleActor alive lifecycleactor_0
[INFO] [07/29/2016 07:06:31.553] [SupervisionSystem-akka.actor.default-dispatcher-4]
[akka://SupervisionSystem/user/OneForOneSupervisorActor/lifecycleactor_2] Message [java.lang.String]
from Actor[akka://SupervisionSystem/user/OneForOneSupervisorActor#-1082037253] to
Actor[akka://SupervisionSystem/user/OneForOneSupervisorActor/lifecycleactor_2#2123326382] was not delivered.
[1] dead letters encountered. This logging can be turned off or adjusted with configuration
settings ‘akka.log-dead-letters’ and ‘akka.log-dead-letters-during-shutdown’.
LifeCycleActor: postStop
LifeCycleActor: postStop

 

There are couple of things to note here

  • ONLY one of the child actors is restarted
  • ONLY one of the child actors is stopped, which is why we only see 2 actors out of 3 in the final “SoundOff” message sent to all the supervisors children

 

AllForOneCodeStrategy Example

Here is the code for the “AllForOne” supervisor actor.

There are a couple of things to not here.

  • We create child actors using the contextOf built in akka factors. This create children that are “supervised” by the actor whose context was used to create the child actors
  • We use a custom decider which uses the default akka decider as well
  • We use the “AllForOneStrategy” supervision strategy
import akka.actor.SupervisorStrategy.Directive
import akka.actor._

class AllForOneSupervisorActor extends Actor {
  println("AllForOneSupervisorActor: constructor")
  val lifeCycleChildrenActors = new Array[ActorRef](3)

  def receive = {
    case "StartChildren" =>
      println(s"AllForOneSupervisorActor : got a message StartChildren")

      for(i <- 0 to 2) {
        val child = context.actorOf(Props[LifeCycleActor], name = s"lifecycleactor_$i")
        lifeCycleChildrenActors(i) = child
      }

    case "MakeRandomChildCommitSuicide" =>
      println(s"AllForOneSupervisorActor : got a message MakeRandomChildCommitSuicide")
      lifeCycleChildrenActors(2) ! RaiseStopThrowableMessage

    case "MakeRandomChildRestart" =>
      println(s"AllForOneSupervisorActor : got a message MakeRandomChildRestart")
      lifeCycleChildrenActors(2) ! RaiseRestartThrowableMessage

    case "TellChildrenToSoundOff" =>
      println(s"AllForOneSupervisorActor : got a message TellChildrenToSoundOff")

      lifeCycleChildrenActors.foreach(x => x ! "SoundOff")
  }

  val decider: PartialFunction[Throwable, Directive] = {
    case _: AkkaStopDirectiveException => 
	akka.actor.SupervisorStrategy.stop
    case _: AkkaRestartDirectiveException => 
	akka.actor.SupervisorStrategy.restart
  }

  override def supervisorStrategy: SupervisorStrategy =
    AllForOneStrategy()(decider.orElse(
	SupervisorStrategy.defaultStrategy.decider))
}

When we run this one, we will fail a particular child, and ALL children will be effected by the supervision strategy thanks to the use of the “AllForOneStrategy”.

Let’s assume we have this demo code

val allForOneSupervisorActor = system.actorOf(Props[AllForOneSupervisorActor], name = "AllForOneSupervisorActor")
println("sending allForOneSupervisorActor a 'StartChildren' message")

allForOneSupervisorActor ! "StartChildren"
Thread.sleep(1000)

allForOneSupervisorActor ! "MakeRandomChildRestart"
Thread.sleep(1000)

allForOneSupervisorActor ! "TellChildrenToSoundOff"
Thread.sleep(1000)

allForOneSupervisorActor ! "MakeRandomChildCommitSuicide"
Thread.sleep(1000)

allForOneSupervisorActor ! "TellChildrenToSoundOff"
Thread.sleep(1000)

system.stop(allForOneSupervisorActor)

Here is the output of running the full demo code

sending allForOneSupervisorActor a ‘StartChildren’ message
AllForOneSupervisorActor: constructor
AllForOneSupervisorActor : got a message StartChildren
LifeCycleActor: constructor
LifeCycleActor: constructor
LifeCycleActor: constructor
LifeCycleActor: preStart
LifeCycleActor: preStart
LifeCycleActor: preStart
AllForOneSupervisorActor : got a message MakeRandomChildRestart
LifeCycleActor: RaiseRestartThrowableMessage seen
[ERROR] [07/29/2016 07:10:12.887] [SupervisionSystem-akka.actor.default-dispatcher-3]
[akka://SupervisionSystem/user/AllForOneSupervisorActor/lifecycleactor_2] LifeCycleActor raised AkkaRestartDirectiveException
AkkaRestartDirectiveException: LifeCycleActor raised AkkaRestartDirectiveException
    at LifeCycleActor$$anonfun$receive$1.applyOrElse(LifeCycleActor.scala:36)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:484)
    at LifeCycleActor.aroundReceive(LifeCycleActor.scala:3)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
    at akka.actor.ActorCell.invoke(ActorCell.scala:495)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
    at akka.dispatch.Mailbox.run(Mailbox.scala:224)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

LifeCycleActor: preRestart
LifeCycleActor: preRestart
LifeCycleActor: preRestart
LifeCycleActor reason: LifeCycleActor raised AkkaRestartDirectiveException
LifeCycleActor reason: LifeCycleActor raised AkkaRestartDirectiveException
LifeCycleActor reason: LifeCycleActor raised AkkaRestartDirectiveException
LifeCycleActor message:
LifeCycleActor message: RaiseRestartThrowableMessage
LifeCycleActor message:
LifeCycleActor: postStop
LifeCycleActor: postStop
LifeCycleActor: postStop
LifeCycleActor: constructor
LifeCycleActor: constructor
LifeCycleActor: constructor
LifeCycleActor: postRestart
LifeCycleActor: postRestart
LifeCycleActor reason: LifeCycleActor raised AkkaRestartDirectiveException
LifeCycleActor: preStart
LifeCycleActor: postRestart
LifeCycleActor reason: LifeCycleActor raised AkkaRestartDirectiveException
LifeCycleActor: preStart
LifeCycleActor reason: LifeCycleActor raised AkkaRestartDirectiveException
LifeCycleActor: preStart
AllForOneSupervisorActor : got a message TellChildrenToSoundOff
LifeCycleActor: SoundOff seen
LifeCycleActor: SoundOff seen
LifeCycleActor alive lifecycleactor_2
LifeCycleActor: SoundOff seen
LifeCycleActor alive lifecycleactor_0
LifeCycleActor alive lifecycleactor_1
AllForOneSupervisorActor : got a message MakeRandomChildCommitSuicide
LifeCycleActor: RaiseStopThrowableMessage seen
[ERROR] [07/29/2016 07:10:14.866] [SupervisionSystem-akka.actor.default-dispatcher-7]
[akka://SupervisionSystem/user/AllForOneSupervisorActor/lifecycleactor_2] LifeCycleActor raised AkkaStopDirectiveException
AkkaStopDirectiveException: LifeCycleActor raised AkkaStopDirectiveException
    at LifeCycleActor$$anonfun$receive$1.applyOrElse(LifeCycleActor.scala:31)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:484)
    at LifeCycleActor.aroundReceive(LifeCycleActor.scala:3)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
    at akka.actor.ActorCell.invoke(ActorCell.scala:495)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
    at akka.dispatch.Mailbox.run(Mailbox.scala:224)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

LifeCycleActor: postStop
LifeCycleActor: postStop
LifeCycleActor: postStop
AllForOneSupervisorActor : got a message TellChildrenToSoundOff
[INFO] [07/29/2016 07:10:15.873] [SupervisionSystem-akka.actor.default-dispatcher-7]
[akka://SupervisionSystem/user/AllForOneSupervisorActor/lifecycleactor_0] Message [java.lang.String]
from Actor[akka://SupervisionSystem/user/AllForOneSupervisorActor#1766273084]
to Actor[akka://SupervisionSystem/user/AllForOneSupervisorActor/lifecycleactor_0#1933756970]
was not delivered. [1] dead letters encountered. This logging can be turned off or
adjusted with configuration settings ‘akka.log-dead-letters’ and ‘akka.log-dead-letters-during-shutdown’.
[INFO] [07/29/2016 07:10:15.874] [SupervisionSystem-akka.actor.default-dispatcher-7]
[akka://SupervisionSystem/user/AllForOneSupervisorActor/lifecycleactor_1] Message [java.lang.String]
from Actor[akka://SupervisionSystem/user/AllForOneSupervisorActor#1766273084]
to Actor[akka://SupervisionSystem/user/AllForOneSupervisorActor/lifecycleactor_1#922057055]
was not delivered. [2] dead letters encountered. This logging can be turned off or
adjusted with configuration settings ‘akka.log-dead-letters’ and ‘akka.log-dead-letters-during-shutdown’.
[INFO] [07/29/2016 07:10:15.877] [SupervisionSystem-akka.actor.default-dispatcher-7]
[akka://SupervisionSystem/user/AllForOneSupervisorActor/lifecycleactor_2] Message [java.lang.String]
from Actor[akka://SupervisionSystem/user/AllForOneSupervisorActor#1766273084]
to Actor[akka://SupervisionSystem/user/AllForOneSupervisorActor/lifecycleactor_2#-2073614826]
was not delivered. [3] dead letters encountered. This logging can be turned off or
adjusted with configuration settings ‘akka.log-dead-letters’ and ‘akka.log-dead-letters-during-shutdown’.

There are couple of things to note here

  • ALL the child actors are restarted
  • ALL the child actors are stopped, which is why we only see 0 actors out of 3 in the final “SoundOff” message sent to all the supervisors children

 

 

 

Actor LifeCycle

There is no better way of understanding the Actor life cycles that Akka has then to examine this image taken from the Akka documentation.

The Akka documentation is actually very very good, its just there is a lot of it, and I am hoping this series of posts will be a bit lighter to digest, and shall concentrate on the most common parts of Akka usage. The official docs are obviously the place to go should you have a need for some deep Akka related question.

Anyway the Actor lifecycle is as follows:

 

image

This diagram is ace if you ask me, from here you can see exactly what happens when, and the different states, and what is available within each state.

There is not much more I can add to that diagram, we can however take some of the concepts in this diagram for a little spin.

Lets assume we have the following Actor code

import akka.actor.Actor

class LifeCycleActor extends Actor {
  println("LifeCycleActor: constructor")

  override def preStart { println("LifeCycleActor: preStart") }

  override def postStop { println("LifeCycleActor: postStop") }

  override def preRestart(reason: Throwable, message: Option[Any]) {
    println("LifeCycleActor: preRestart")
    println(s"LifeCycleActor reason: ${reason.getMessage}")
    println(s"LifeCycleActor message: ${message.getOrElse("")}")
    super.preRestart(reason, message)
  }
  override def postRestart(reason: Throwable) {
    println("LifeCycleActor: postRestart")
    println(s"LifeCycleActor reason: ${reason.getMessage}")
    super.postRestart(reason)
  }
  def receive = {
    case RestartMessage => throw new Exception("RestartMessage seen")
    case _ => println("LifeCycleActor : got a message")
  }
}

It can be seen that we have several overrides of the underlying Akka Actor implemented. Lets discuss each of them in turn

preStart(): Unit

This is called by the Akka system for us, and simple allows us to carry out any presStart activity that we may wish to perform

preStop(): Unit

This is called by the Akka system for us, and simple allows us to carry out any presStop activity that we may wish to perform

preRestart(reason: Throwable, message: Option[Any]): Unit

This is called by the Akka system for us, and allows your code to examine both the reason and the message for the restart

postRestart(reason: Throwable): Unit

This is called by the Akka system for us, and allows your code to examine both the reason for the restartThis is called by the Akka system for us, and allows your code to examine the reason for the restart

 

Now let us assume that we also have the following code to exercise the LifeCycleActor code above

import akka.actor._
import scala.language.postfixOps
import scala.io.StdIn

object Demo extends App {

  //create the actor system
  val system = ActorSystem("LifeCycleSystem")

  // default Actor constructor
  val lifeCycleActor = system.actorOf(Props[LifeCycleActor], name = "lifecycleactor")


  println("sending lifeCycleActor a number")
  lifeCycleActor ! 100
  Thread.sleep(1000)

  println("force restart")
  lifeCycleActor ! RestartMessage
  Thread.sleep(1000)

  println("stop lifeCycleActor")
  system.stop(lifeCycleActor)

  //shutdown the actor system
  system.terminate()

  StdIn.readLine()
}

Here is what you might expect to get printed out to the console output

“C:\Program Files\Java\jdk1.8.0_51\bin\java” -Didea.launcher.port=7532 “-Didea.launcher.bin.path=C:\Program Files (x86)\JetBrains\IntelliJ IDEA Community Edition 15.0.1\bin” -Dfile.encoding=UTF-8 -classpath “C:\Program Files\Java\jdk1.8.0_51\jre\lib\charsets.jar;C:\Program Files\Java\jdk1.8.0_51\jre\lib\deploy.jar;C:\Program Files\Java\jdk1.8.0_51\jre\lib\ext\access-bridge-64.jar;C:\Program Files\Java\jdk1.8.0_51\jre\lib\ext\cldrdata.jar;C:\Program Files\Java\jdk1.8.0_51\jre\lib\ext\dnsns.jar;C:\Program Files\Java\jdk1.8.0_51\jre\lib\ext\jaccess.jar;C:\Program Files\Java\jdk1.8.0_51\jre\lib\ext\jfxrt.jar;C:\Program Files\Java\jdk1.8.0_51\jre\lib\ext\localedata.jar;C:\Program Files\Java\jdk1.8.0_51\jre\lib\ext\nashorn.jar;C:\Program Files\Java\jdk1.8.0_51\jre\lib\ext\sunec.jar;C:\Program Files\Java\jdk1.8.0_51\jre\lib\ext\sunjce_provider.jar;C:\Program Files\Java\jdk1.8.0_51\jre\lib\ext\sunmscapi.jar;C:\Program Files\Java\jdk1.8.0_51\jre\lib\ext\sunpkcs11.jar;C:\Program Files\Java\jdk1.8.0_51\jre\lib\ext\zipfs.jar;C:\Program Files\Java\jdk1.8.0_51\jre\lib\javaws.jar;C:\Program Files\Java\jdk1.8.0_51\jre\lib\jce.jar;C:\Program Files\Java\jdk1.8.0_51\jre\lib\jfr.jar;C:\Program Files\Java\jdk1.8.0_51\jre\lib\jfxswt.jar;C:\Program Files\Java\jdk1.8.0_51\jre\lib\jsse.jar;C:\Program Files\Java\jdk1.8.0_51\jre\lib\management-agent.jar;C:\Program Files\Java\jdk1.8.0_51\jre\lib\plugin.jar;C:\Program Files\Java\jdk1.8.0_51\jre\lib\resources.jar;C:\Program Files\Java\jdk1.8.0_51\jre\lib\rt.jar;C:\Users\sacha\Desktop\SachaBarber.AkkaExamples\Lifecyles\target\scala-2.11\classes;C:\Users\sacha\.ivy2\cache\com.typesafe\config\bundles\config-1.3.0.jar;C:\Users\sacha\.ivy2\cache\com.typesafe.akka\akka-actor_2.11\jars\akka-actor_2.11-2.4.8.jar;C:\Users\sacha\.ivy2\cache\joda-time\joda-time\jars\joda-time-2.9.4.jar;C:\Users\sacha\.ivy2\cache\org.scala-lang\scala-library\jars\scala-library-2.11.8.jar;C:\Users\sacha\.ivy2\cache\org.scala-lang.modules\scala-java8-compat_2.11\bundles\scala-java8-compat_2.11-0.7.0.jar;C:\Program Files (x86)\JetBrains\IntelliJ IDEA Community Edition 15.0.1\lib\idea_rt.jar” com.intellij.rt.execution.application.AppMain Demo
sending lifeCycleActor a number
LifeCycleActor: constructor
LifeCycleActor: preStart
LifeCycleActor : got a message
force restart
LifeCycleActor: preRestart
LifeCycleActor reason: RestartMessage seen
LifeCycleActor message: RestartMessage
LifeCycleActor: postStop
[ERROR] [07/27/2016 07:21:08.232] [LifeCycleSystem-akka.actor.default-dispatcher-4] [akka://LifeCycleSystem/user/lifecycleactor] RestartMessage seen
java.lang.Exception: RestartMessage seen
    at LifeCycleActor$$anonfun$receive$1.applyOrElse(LifeCycleActor.scala:22)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:484)
    at LifeCycleActor.aroundReceive(LifeCycleActor.scala:3)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
    at akka.actor.ActorCell.invoke(ActorCell.scala:495)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
    at akka.dispatch.Mailbox.run(Mailbox.scala:224)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

LifeCycleActor: constructor
LifeCycleActor: postRestart
LifeCycleActor reason: RestartMessage seen
LifeCycleActor: preStart
stop lifeCycleActor
LifeCycleActor: postStop

 

As you can see you have all the pertinent information that you would need to make logic decisions on within the Actor.

 

Where Is The Code?

As previously stated all the code for this series will end up in this GitHub repo:

https://github.com/sachabarber/SachaBarber.AkkaExamples

akka : ‘hello world’

So last time I outlined the road map for this series of articles on Akka, and posted up some information from the Akka creators on what they had to say about Akka.

This time we will look at a simple example of Akka.

But just before we do that, let’s talk a bit more about WHY I think Akka is great stuff.

I have been a .NET programmer for many years and have seen asynchronous programming come in many different flavors now

  • Asynchronous delegates
  • BackgroundWorker
  • Task (TPL)
  • Async Await
  • RX
  • Concurrent collections
  • Critical sections (synchronized sections of code)

All of these pretty much have their JVM equivalent, and what you would have likely seen if you have used the .NET or JVM equivalents is the used of locks from time to time. For example under the covers the the concurrent collections would still use some locking (monitor enter/exit) to achieve the critical sections

This is all good stuff, and has got better to work with over the years, but there could be better more elegant lock free way of working with concurrent/parallel programming.

For me this is what Akka brings to the table. Instead of working with shared state that MUST be protected when writing multithreaded code, we simply don’t use any shared state and create dedicated micro sized bits of code that deal with one thing and one thing only. These are called “Actors”.

Actors do NOT share state instead they work independently of each other and rely on message passing. Where the message payload should give the actor either everything it needs to do it’s job (or at the very least enough information to perhaps look things up, say an Id such that Actor can look up the entity required by its Id field).

At no point will we be using locks within actors.

Ok so that is my mini-rant/intro over. Lets now carry on and have a look at what it takes to write some Akka code in Scala.

What Libs Do We Need?

As I mentioned in the introduction post I will be using Scala exclusively to do this series of posts. As such I shall also be using SBT to do the build side of things.

So for this post we are only showing how to use simple actors so we don’t have to pull in that many dependencies, we can keep things simple and use the following “build.sbt” file, where I am pulling in the following 2 dependencies

  • Basic Akka stuff
  • Joda time
name := "HelloWorld"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies ++= Seq(
  "com.typesafe.akka" % "akka-actor_2.11" % "2.4.8",
  "joda-time" % "joda-time" % "2.9.4")

NOTE : This SBT set of dependencies may grow in subsequent articles, but where it does require pulling in more Akka JARs I will show that when the time comes.

 

How Do We Create An Actor System?

To be to use the Akka actor system, we must first create an Akka system that is the fabric that all actors run under. This is easily achieved as follows:

object Demo extends App {

  //create the actor system
  val system = ActorSystem("HelloSystem")

  //---------------------------
  //   EXTRA STUFF WILL GO HERE
  //---------------------------


  //shutdown the actor system
  system.terminate()

  StdIn.readLine()
}

Note that you should ensure that the Akka system is also shutdown correctly.

The example I show here is a simple console type application, so my startup/shutdown logic is all in the same file, but in a real production app, things may be more complex (well they should be I would hope).

 

How Do We Create An Actor?

Now that we have an actor system, the next thing we need to do is create some actors that will live within the actor system. I used the word live as a ownership type of arrangement.

So how do we create an actor exactly?

Well luckily this to is dead simple we just need to inherit from Actor and provide an implementation for the receive method to handle the different messages that may be sent to the actor.

Recall that an actor works by receiving messages and acting upon them.

Here is what the skeleton code may look like:

import akka.actor.Actor

class HelloActor extends Actor {
  def receive = {
    //DO THE MESSAGE HANDLING HERE
  }
}

We will talk more about the receive method in a while, for now just know that you must implement this method for an Akka actor to work correctly

Difference Between Tell And Ask

Just before we get on to seeing the examples, lets just take a brief diversion where we talk about the difference between ask and tell.

When we ask (? method in scala) an actor we expect a response by way of a Future[T], this is an asynchronous operation.

When we tell (! method in scala) an actor something this is analogous to fire and forget (of course the receiving actor could send a response back to the initiating actor via a different message, but that’s a different story), this is an asynchronous operation that returns immediately.

Sender

We have not covered this ground yet but we will in one of the subsequent posts, but for now all you need to know is that when you are sending messages to an actor, you do so by a construct called actorRef which is a kind of like a handle to an actor.

There are special types of actorRef one such case being “sender” who is the initiator actorRef of the message being received (if we are talking from the context of the receiving actor). You will see “sender” used in the examples below.

How Do We Send A Message To An Actor?

Here is how we would send a message to an actor. This is a tell (!) so is fire and forget.

import akka.actor._
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.io.StdIn
import scala.util.{Success, Failure}
import ExecutionContext.Implicits.global

object Demo extends App {

  //create the actor system
  val system = ActorSystem("HelloSystem")

  // default Actor constructor
  val helloActor = system.actorOf(Props[HelloActor], name = "helloactor")

  //send some messages to the HelloActor (fire and forget)
  helloActor ! "hello"
  helloActor ! "tis a fine day for Akka"

  //shutdown the actor system
  system.terminate()

  StdIn.readLine()
}

Where we have the following HelloActor implementation

import akka.actor.Actor

class HelloActor extends Actor {
  def receive = {
    case "hello" => println("world")
    case _       => println("unknown message")
  }
}

See how for this simple actor we only ever deal with 2 things in the receive method

  • “Hello”
  • Anything else

It is consired good practice to ensure that you handle the correct messages and deal with unknown messages too

How Do We Wait For A Response From An Actor?

So we just saw a tell (fire and forget) but how about an ask. This is slightly harder but not much, we simply have to deal with the fact that a Future[T] will be the result of an ask. There are numerous ways of dealing with that, assuming we have the following actor

import akka.actor.Actor

class AskActor extends Actor {
  def receive = {
    case GetDateMessage => sender ! new org.joda.time.DateTime().toDate().toString()
    case _       => println("unknown message")
  }
}

Here are some examples of how to deal with the result of the ask

import akka.actor._
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.io.StdIn
import scala.util.{Success, Failure}
import ExecutionContext.Implicits.global

object Demo extends App {

  //create the actor system
  val system = ActorSystem("HelloSystem")

  // default Actor constructor
  val askActor = system.actorOf(Props[AskActor], name = "askactor")

  //send some messages to the AskActor, we want a response from it

  // (1) this is one way to "ask" another actor for information
  implicit val timeout = Timeout(5 seconds)
  val future1 = askActor ? GetDateMessage
  val result1 = Await.result(future1, timeout.duration).asInstanceOf[String]
  println(s"result1=$result1")

  // (2) a slightly different way to ask another actor for information
  val future2: Future[String] = ask(askActor, GetDateMessage).mapTo[String]
  val result2 = Await.result(future2, 5 second)
  println(s"result2=$result2")

  // (3) don't use blocking call at all, just use future callbacks
  val future3: Future[String] = ask(askActor, GetDateMessage).mapTo[String]
  future3 onComplete {
    case Success(result3) =>  println(s"result3=$result3")
    case Failure(t) => println("An error has occured: " + t.getMessage)
  }


  //shutdown the actor system
  system.terminate()

  StdIn.readLine()
}

Piping Futures

Another use case you may have is that you may want to use Future[T] internally within the actor code and send Future[T] around from actor to actor. Akka also supports this by the use of the pipe pattern, which you can use like this where we are piping the Future[List[Int]] back to the sender

import akka.actor._
import akka.pattern.pipe
import scala.concurrent.{ExecutionContext, Future}
import ExecutionContext.Implicits.global

class FutureResultActor extends Actor {
  def receive = {
    case GetIdsFromDatabase => {
      Future(List(1,2,3)).pipeTo(sender)
    }
    case _       => println("unknown message")
  }
}

Where the code that initiated the sending of the GetIdsFromDatabase message looks like this (the sender in the code above)

import akka.actor._
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.io.StdIn
import scala.util.{Success, Failure}
import ExecutionContext.Implicits.global

object Demo extends App {

  //create the actor system
  val system = ActorSystem("HelloSystem")

  // default Actor constructor
  val futureResultActor = system.actorOf(Props[FutureResultActor], name = "futureresultactor")

  //send some messages to the FutureResultActor, we expect a Future back from it
  val future4: Future[List[Int]] = ask(futureResultActor, GetIdsFromDatabase).mapTo[List[Int]]
  future4 onComplete {
    case Success(result4) =>  println(s"result4=$result4")
    case Failure(t) => println("An error has occured: " + t.getMessage)
  }


  //shutdown the actor system
  system.terminate()

  StdIn.readLine()
}

 

Where Is The Code?

As previously stated all the code for this series will end up in this GitHub repo:

https://github.com/sachabarber/SachaBarber.AkkaExamples

AkkA series

I have been idle for a while, which kind of irks me. Thing is I have not really been that idle, it’s just that I have not been doing that much in the .NET space of late. That is not to say I do not like .NET anymore, it’s just that I am spreading my time between .NET and the JVM which I am actually enjoying)

I thought the time has come for me to come out with a few posts of what I have been up to. So to that end I thought I would start out with a series of posts on Akka.

There will be quite a few parts to this mini series. And I will update this page as I bring new posts on line.

This is what I am planning on covering

What Is Akka

I can think of no better way to describe Akka then to screen scrape what the creators of Akka have to say about it. So here is what they say about it

We believe that writing correct distributed, concurrent, fault-tolerant and scalable applications is too hard. Most of the time it’s because we are using the wrong tools and the wrong level of abstraction. Akka is here to change that. Using the Actor Model we raise the abstraction level and provide a better platform to build scalable, resilient and responsive applications—see the Reactive Manifesto for more details. For fault-tolerance we adopt the “let it crash” model which the telecom industry has used with great success to build applications that self-heal and systems that never stop. Actors also provide the abstraction for transparent distribution and the basis for truly scalable and fault-tolerant applications.

Actors

Actors give you:

  • Simple and high-level abstractions for distribution, concurrency and parallelism.
  • Asynchronous, non-blocking and highly performant message-driven programming model.
  • Very lightweight event-driven processes (several million actors per GB of heap memory).

Fault Tolerance

  • Supervisor hierarchies with “let-it-crash” semantics.
  • Actor systems can span over multiple JVMs to provide truly fault-tolerant systems.
  • Excellent for writing highly fault-tolerant systems that self-heal and never stop.

Location Transparency

Everything in Akka is designed to work in a distributed environment: all interactions of actors use pure message passing and everything is asynchronous.

Persistence

State changes experience by an actor can optionally be persisted and replayed when the actor is started or restarted. This allows actors to recover their state, even after JVM crashes or when being migrated to another node.

 

So that is the 1000 mile view of what Akka is. You will get more familiar with it as we move through the series I hope.

What Form Will The Examples Take

As I am trying to improve my Scala to get it in line with what I can do in .NET, all examples will be based on

Where Can I Find The Code Examples?

I will be augmenting this GitHub repo with the example projects as I move through this series

https://github.com/sachabarber/SachaBarber.AkkaExamples

I hope you all enjoy the series

That’s all for now, Like I say I will update this page when more posts become available, which may be a while since I have a day job, 2 kids, one wife and a cat, and I like a drink too, and also enjoy my weekends.  So it happens when it happens folks

OuT WITH RAVEN embedded In with litedb

I recently starting working (I have now finished it) writing a small internal web site using the following things

  • WebApi2
  • OAuth
  • JWT support
  • OWIN
  • AutoFac
  • Raven DB Embedded
  • Aurelia.io for front end

I have to say it worked out great, It was a pleasuree to work on, all the way through.

I quite like Raven embedded, for this type of app. Its completely stand alone, and does just what I need from it.

So I got the end of the project, and I was pretty sure I checked that we had licenses for everything I was using. Turns out we didn’t have one for RavenDB.

Mmm. This app was a tool really to help us internally so we did not want to spend that much on it.

Shame as I like Raven. I started to look around for another tool that could fit the bill.

This was my shopping list

  • Had to be .NET
  • Had to support document storage
  • Had to have LINQ support
  • Had to support same set of features that I was using as Raven Embedded (CRUD + indexes essentially)
  • Had to be free
  • Had to be embedded as single Dll

It did not take me long to stumble upon LiteDB.

This ticked all my boxes and more. I decided to try it out in a little Console app to test it, and was extremely happy. I did not do any performance testing, as that is not such a concern for the app that I was building, but from an API point of view, it would prove to be very easy to replace the Raven Embedded code I had written so far.

I was happy.

Just thought I would show you all a little bit of its usage right here

 

Installation

This is done via NuGet. The package is called “LiteDB”

CRUD

Assuming we have this entity

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace LiteDBDemo
{
    public class Customer
    {
        public int Id { get; set; }
        public string Name { get; set; }
        public string[] Phones { get; set; }
        public bool IsActive { get; set; }

        public override string ToString()
        {
            return string.Format("Id : {0}, Name : {1}, Phones : {2}, IsActive : {3}",
                Id,
                Name,
                Phones.Aggregate((x, y) => string.Format("{0}{1}", x, y)),
                IsActive);
        }
    }
}

Here is how you would might use LiteDB to perform CRUD operation. See how it has the concept of collections. This is kind of like MongoDB if you have used that.

using LiteDB;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace LiteDBDemo
{
    class Program
    {
        static void Main(string[] args)
        {
            // Open database (or create if not exits)
            using (var db = new LiteDatabase(@"MyData.db"))
            {
                //clean out entire collection, will drop documents, indexes everything
                db.GetCollection<Customer>("customers").Drop();


                Create(db);
                Read(db);
                Update(db);
                Delete(db);
            }
            Console.ReadLine();
        }

        private static void Create(LiteDatabase db)
        {
            Console.WriteLine("\r\nCREATE\r\n");

            // Get customer collection
            var customers = db.GetCollection<Customer>("customers");

            // Create your new customer instance
            var customer = new Customer
            {
                Name = "John Doe",
                Phones = new string[] { "8000-0000", "9000-0000" },
                IsActive = true
            };

            // Insert new customer document (Id will be auto-incremented)
            customers.Insert(customer);
            Console.WriteLine("Inserted customer");
        }

        private static void Read(LiteDatabase db)
        {
            Console.WriteLine("\r\nREAD\r\n");

            // Get customer collection
            var customers = db.GetCollection<Customer>("customers");

            // Index document using a document property
            customers.EnsureIndex(x => x.Name);

            // Use Linq to query documents
            var firstCustomer = customers.Find(x => x.Name.StartsWith("Jo")).FirstOrDefault();
            Console.WriteLine(firstCustomer);

        }

        private static void Update(LiteDatabase db)
        {
            Console.WriteLine("\r\nUPDATE\r\n");

            // Get customer collection
            var customers = db.GetCollection<Customer>("customers");
            // Use Linq to query documents
            var johnDoe = customers.Find(x => x.Name == "John Doe").First();
            Console.WriteLine("Before update");
            Console.WriteLine(johnDoe);

            johnDoe.Name = "John Doe MODIFIED";
            customers.Update(johnDoe);

            var johnDoe2 = customers.Find(x => x.Name == "John Doe MODIFIED").First();
            Console.WriteLine("Read updated");
            customers.Update(johnDoe2);
            Console.WriteLine(johnDoe2);

        }

        private static void Delete(LiteDatabase db)
        {
            Console.WriteLine("\r\nDELETE\r\n");

            // Get customer collection
            var customers = db.GetCollection<Customer>("customers");
            // Use Linq to query documents
            customers.Delete(x => x.Name == "John Doe MODIFIED");
            Console.WriteLine("Deleting Name = 'John Doe MODIFIED'");

            var johnDoe = customers.Find(x => x.Name == "John Doe MODIFIED").FirstOrDefault();
            Console.WriteLine("Looking for Name = 'John Doe MODIFIED'");
            Console.WriteLine(johnDoe == null ? "It's GONE" : johnDoe.ToString());


        }

    }
}

You can learn more about this over at the LiteDB website

http://www.litedb.org/

 

Overall I was very very happy with LiteDB and I particularly like the fact that is was free, and it did pretty much exactly the same as RavenDB Emebedded (sometimes it was easier to do as well).

I would use this library again for sure, I found it spot on to be honest.

Like a nice Gin and Tonic on a summers day.

 

The Nuances of Loading and Unloading Assemblies with AppDomain

I don’t normallly like just pointing out other peoples work, bit this time I have no hesitation at all in doing just that. If you have ever worked with AppDomain(s) in .NET you would have certainly had some fun.

CodeProject Marc Clifton has written a truly great article on AppDomain(s) which you should all read. You can find it here : http://www.codeproject.com/Articles/1091726/The-Nuances-of-Loading-and-Unloading-Assemblies-wi

Nice one Marc

WebApi POST + [ISerializable] + JSON .NET

At work I have taken on the task of building a small utility web site for admin needs. Thing is I wanted it to be very self contained so I have opted for this

  • Self hosted web API
  • JSON data exchanges
  • Aurelia.IO front end
  • Raven DB database

So I set out to create a nice web api endpoint like this

private IDocumentStore _store;

public LoginController(IDocumentStore store)
{
	_store = store;
}

[HttpPost]
public IHttpActionResult Post(LoginUser loginUser)
{
    //
}

Where I then had this datamodel that I was trying to post via the awesome AWEWSOME REST plugin for Chrome

using System;
 
namespace Model
{
    [Serializable]
    public class LoginUser
    {
        public LoginUser()
        {
 
        }
 
        public LoginUser(string userName, string password)
        {
            UserName = userName;
            Password = password;
        }
 
        public string UserName { get; set; }
        public string Password { get; set; }
 
        public override string ToString()
        {
            returnstring.Format("UserName: {0}, Password: {1}", UserName, Password);
        }
    }
}

This just would not work, I could see the endpoint being called ok, but no matter what I did the LoginUser model only the post would always have NULL properties. After a little fiddling I removed the [Serializable] attribute and it all just started to work.

Turns out this is to do with the way JSON.Net works when it see the [Serializable] attribute.

For example if you had this model

[Serializable]
public class ResortModel
{
    public int ResortKey { get; set; }
    public string ResortName { get; set; }
}

Without the [Serializable] attribute the JSON output is:

{
    "ResortKey": 1,
    "ResortName": "Resort A"
}

With the [Serializable] attribute the JSON output is:

{
    "<ResortKey>k__BackingField": 1,
    "<ResortName>k__BackingField": "Resort A"
}

I told one of my collegues about this, and he found this article : http://stackoverflow.com/questions/29962044/using-serializable-attribute-on-model-in-webapi which explains it all nicely including how to fix it

Hope that helps, sure bit me in the Ass

Follow

Get every new post delivered to your Inbox.

Join 168 other followers