Akka

akka : persistent actors

In this post we will look at some of the core concepts around persistent actors. Which may seem strange since so far I have been saying that Akka is great cos it is lock free and each actor is stateless.

Truth is there are times where some sort immutable state, just can’t be avoided, and for that Akka provides us with the PersistentActor

 

Event Sourcing

Akka persistence does borrow a couple of ideas from other concepts out there in the wild, such as

  • Event sourcing
  • Possibly CQRS
  • Snapshots

if you have not heard of event sourcing before, it is fairly simple to explain. The idea is that you have a data structure that is able to receive events to set its internal state. At the same time you also have an event store that stores events in reverse chronological order.

These events are then played onto the object that accepts them, whereby the object will build its own internal state from the events played on to it.

The eagle eyed amongst you may be thinking won’t that lead to loads of events? Well yes it might.

There is however an optimization around this called “snapshots”. snapshots are a special type of event where they capture the object state as it is right now.

So in reality you would apply the latest snapshot, and then take any events that occurred after that and apply them, which would drastically reduce the amount of event play back that would need to occur against an object that would like to receive events from the event store.

I have written a fairly well received article about this before for the .NET space which also included working code and also includes the CQRS (Command Query Responsibility Segregation) part of it too. If you would like to know more about that you can read about it here:

http://www.codeproject.com/Articles/991648/CQRS-A-Cross-Examination-Of-How-It-Works

Anyway I am kind of drifting of topic a bit here, the point is that Akka persistence borrows some of these ideas from other concepts/frameworks, so it may be of some use to have a read around some of it, in particular event sourcing and CQRS.

Dependencies

In order to work with Akka persistence you will need the following 2 SBT entries in your build.sbt file

"com.typesafe.akka" %% "akka-actor" % "2.4.8",
"com.typesafe.akka" %% "akka-persistence" % "2.4.8

Storage

Akka persistence comes with a couple of pre-canned persistence storage mechanisms, and there are many more community based storage frameworks that you can use.

Within this article I have chosen to use the Akka provided storage mechanism which is LevelDB.

In order to do this, you will need the following SBT entries in your build.sbt file

"org.iq80.leveldb"            % "leveldb"          % "0.7",
"org.fusesource.leveldbjni"   % "leveldbjni-all"   % "1.8"

You can read more about storage in general at the official Akka docs page

http://doc.akka.io/docs/akka/snapshot/scala/persistence.html#Storage_plugins

Config

As with most things in Akka you are able to configure things either in code (by way of overriding things) or by configuration. I personally prefer configuration. So in order to get the persistence to work, one must provide the following configuration

akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"

akka.persistence.journal.leveldb.dir = "target/example/journal"
akka.persistence.snapshot-store.local.dir = "target/example/snapshots"

# DO NOT USE THIS IN PRODUCTION !!!
# See also https://github.com/typesafehub/activator/issues/287
akka.persistence.journal.leveldb.native = false

For me this is in an Resources/application.conf file

PersistentActor Trait

Ok so getting back on track. How do we create a persistence actor in Akka.

Well out of the box Akka provides the PersistentActor trait, which you can mix in. This trait looks like this

trait PersistentActor extends Eventsourced with PersistenceIdentity {
  def receive = receiveCommand
}

There are several things that are of interest here

  • There is an expectation there will be a receiveCommand implementation (this is within the EventSourced trait)
  • That the base trait is called EventSourced is also interesting

I personally find the fact that there is an expectation of a command and that we end up mixing in a trait that uses the name “EventSourced” to be quite a strong indicator of just how similar working with Akka persistence is to CQRS + traditional event sourcing ideas.

If we were to go further down the rabbit hole and keep looking into the base trait EventSourced we would see a couple more abstract methods that are of interest that are expected to be supplied by the end users code:


  /**
   * Recovery handler that receives persisted events during recovery. If a state snapshot
   * has been captured and saved, this handler will receive a [[SnapshotOffer]] message
   * followed by events that are younger than the offered snapshot.
   *
   * This handler must not have side-effects other than changing persistent actor state i.e. it
   * should not perform actions that may fail, such as interacting with external services,
   * for example.
   *
   * If there is a problem with recovering the state of the actor from the journal, the error
   * will be logged and the actor will be stopped.
   *
   * @see [[Recovery]]
   */
  def receiveRecover: Receive

  /**
   * Command handler. Typically validates commands against current state (and/or by
   * communication with other actors). On successful validation, one or more events are
   * derived from a command and these events are then persisted by calling `persist`.
   */
  def receiveCommand: Receive

As stated these methods are abstract methods that YOUR code would need to supply to make the persistence actor stuff work properly. We will see more of this in a bit

Persistent Id

One of the rules you must follow when using persistent actors is that the persistent actor MUST have the same ID, even including across incarnations. This can be set using the persistenceId method as shown below

override def persistenceId = "demo-persistent-actor-1"

Snapshots

As I stated earlier snapshots can reduce the amount of extra events that need to replayed against the event source target (the persistent actor).

In order to save a snapshot the actor may call the saveSnapshot method.

  • If the snapshot succeeds the actor will receive a SaveSnapshotSuccess message
  • If the snapshot succeeds the actor will receive a SaveSnapshotFailure message
var state: Any = _
 
override def receiveCommand: Receive = {
  case "snap"                                => saveSnapshot(state)
  case SaveSnapshotSuccess(metadata)         => // ...
  case SaveSnapshotFailure(metadata, reason) => // ...
}

Where the metadata looks like this

final case class SnapshotMetadata(persistenceId: String, sequenceNr: Long, timestamp: Long = 0L)

During the recovery process the persistent actor is offer a snapshotOffer from which it may restore its internal state.

After the snapshotOffer will come the newer (younger in Akka speak) events, which when replayed onto the persistent actor will get it to its final internal state.

 

Failure/Recovery

Special care must be taken when shutting down persistent actors from outside. For non persistence actor a PoisonPill may be used. This is not recommended for persistence actors due to how the commands are stashed until such a time as the journaling mechanism signals that things are stored. At which time the mailbox is drained.  A better way is to use explicit shutdown messages

Read more about this here :

http://doc.akka.io/docs/akka/snapshot/scala/persistence.html#Safely_shutting_down_persistent_actors

 

Persisting state

So we have talked about event/commands/CQRS/event sourcing/snapshots, but so far we have not talked about how to actually save state. How is this done?

Well as luck would have it, its very easy we simply call the persist method, which looks like this

def persist[A](event: A)(handler: A ⇒ Unit): Unit 

Demo Time

Ok so have now gone through most of the bits you would need to work with persistent actors. Time for a demo

Lets assume we have the following commands that will send to a single persistent actor

case class Cmd(data: String)

And this type of event that we would like to store

case class Evt(data: String)

Where we would hold this type of state within the persistent actor

case class ExampleState(events: List[String] = Nil) {
  def updated(evt: Evt): ExampleState = copy(evt.data :: events)
  def size: Int = events.length
  override def toString: String = events.reverse.toString
}

And that the actual persistent actor looks like this

import akka.actor._
import akka.persistence._



class DemoPersistentActor extends PersistentActor {

  //note : This is  mutable
  var state = ExampleState()

  def updateState(event: Evt): Unit =
    state = state.updated(event)

  def numEvents =
    state.size

  val receiveRecover: Receive = {
    case evt: Evt => updateState(evt)
    case SnapshotOffer(_, snapshot: ExampleState) => {
        println(s"offered state = $snapshot")
        state = snapshot
    }
  }

  val receiveCommand: Receive = {
    case Cmd(data) =>
      persist(Evt(s"${data}-${numEvents}"))(updateState)
      persist(Evt(s"${data}-${numEvents + 1}")) { event =>
        updateState(event)
        context.system.eventStream.publish(event)
      }
    case "snap"  => saveSnapshot(state)
    case SaveSnapshotSuccess(metadata) =>
      println(s"SaveSnapshotSuccess(metadata) :  metadata=$metadata")
    case SaveSnapshotFailure(metadata, reason) =>
      println("""SaveSnapshotFailure(metadata, reason) :
        metadata=$metadata, reason=$reason""")
    case "print" => println(state)
    case "boom"  => throw new Exception("boom")
  }

  override def persistenceId = "demo-persistent-actor-1"
}

Which we could first run using this demo code:

import akka.actor._
import scala.language.postfixOps
import scala.io.StdIn

object Demo extends App {

  //create the actor system
  val system = ActorSystem("PersitenceSystem")

  val persistentActor =
    system.actorOf(Props(classOf[DemoPersistentActor]),
      "demo-persistent-actor-1")

  persistentActor ! "print"
  persistentActor ! Cmd("foo")
  persistentActor ! Cmd("baz")
  persistentActor ! "boom"
  persistentActor ! Cmd("bar")
  persistentActor ! "snap"
  persistentActor ! Cmd("buzz")
  persistentActor ! "print"


  StdIn.readLine()

  //shutdown the actor system
  system.terminate()

  StdIn.readLine()
}

Which gives the following output (your output may look a little different to mine as I have run this code a number of times so have previous runs state on disk)

offered state = List(foo-0, foo-1, baz-2, baz-3, bar-4, bar-5, buzz-6, buzz-7, foo-8,
foo-9, baz-10, baz-11, bar-12, bar-13, buzz-14, buzz-15, foo-16, foo-17, baz-18, baz-19, bar-20, bar-21)
List(foo-0, foo-1, baz-2, baz-3, bar-4, bar-5, buzz-6, buzz-7, foo-8, foo-9, baz-10, baz-11, bar-12,
bar-13, buzz-14, buzz-15, foo-16, foo-17, baz-18, baz-19, bar-20, bar-21, buzz-22, buzz-23)
[ERROR] [08/16/2016 18:43:21.475] [PersitenceSystem-akka.actor.default-dispatcher-5]
[akka://PersitenceSystem/user/demo-persistent-actor-1] boom
java.lang.Exception: boom
    at DemoPersistentActor$$anonfun$2.applyOrElse(DemoPersistentActor.scala:39)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:484)
    at DemoPersistentActor.akka$persistence$Eventsourced$$super$aroundReceive(DemoPersistentActor.scala:6)
    at akka.persistence.Eventsourced$$anon$1.stateReceive(Eventsourced.scala:657)
    at akka.persistence.Eventsourced$class.aroundReceive(Eventsourced.scala:182)
    at DemoPersistentActor.aroundReceive(DemoPersistentActor.scala:6)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
    at akka.actor.ActorCell.invoke(ActorCell.scala:495)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
    at akka.dispatch.Mailbox.run(Mailbox.scala:224)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

offered state = List(foo-0, foo-1, baz-2, baz-3, bar-4, bar-5, buzz-6, buzz-7, foo-8, foo-9,
baz-10, baz-11, bar-12, bar-13, buzz-14, buzz-15, foo-16, foo-17, baz-18, baz-19, bar-20, bar-21)
[WARN] [08/16/2016 18:43:21.494] [PersitenceSystem-akka.persistence.dispatchers.default-stream-dispatcher-8]
[akka.serialization.Serialization(akka://PersitenceSystem)] Using the default Java serializer for class
[ExampleState] which is not recommended because of performance implications. Use another serializer or
disable this warning using the setting ‘akka.actor.warn-about-java-serializer-usage’
List(foo-0, foo-1, baz-2, baz-3, bar-4, bar-5, buzz-6, buzz-7, foo-8, foo-9, baz-10, baz-11, bar-12,
bar-13, buzz-14, buzz-15, foo-16, foo-17, baz-18, baz-19, bar-20, bar-21, buzz-22, buzz-23, foo-24, foo-25,
baz-26, baz-27, bar-28, bar-29, buzz-30, buzz-31)
SaveSnapshotSuccess(metadata) :  metadata=SnapshotMetadata(demo-persistent-actor-1,33,1471369401491)

And then if we were to run this demo code:

import akka.actor._
import scala.language.postfixOps
import scala.io.StdIn

object Demo extends App {

  //create the actor system
  val system = ActorSystem("PersitenceSystem")

  val persistentActor =
    system.actorOf(Props(classOf[DemoPersistentActor]),
      "demo-persistent-actor-1")

  persistentActor ! "print"


  StdIn.readLine()

  //shutdown the actor system
  system.terminate()

  StdIn.readLine()
}

 

Lets see what we can see:

offered state = List(foo-0, foo-1, baz-2, baz-3, bar-4, bar-5, buzz-6, buzz-7,
foo-8, foo-9, baz-10, baz-11, bar-12,
bar-13, buzz-14, buzz-15, foo-16, foo-17, baz-18, baz-19, bar-20, bar-21, buzz-22, buzz-23,
foo-24, foo-25, baz-26, baz-27, bar-28, bar-29)

List(foo-0, foo-1, baz-2, baz-3, bar-4, bar-5, buzz-6, buzz-7, foo-8, foo-9, baz-10, baz-11,
bar-12, bar-13, buzz-14, buzz-15, foo-16, foo-17, baz-18, baz-19, bar-20, bar-21, buzz-22,
buzz-23, foo-24, foo-25, baz-26, baz-27, bar-28, bar-29, buzz-30, buzz-31)

It can be seen that although we did not save any new events the state of the demo persistent actor was restored correctly using a combination of a snapshot and events that are newer than the snapshot

 

 

Where Is The Code?

As previously stated all the code for this series will end up in this GitHub repo:

https://github.com/sachabarber/SachaBarber.AkkaExamples

Advertisements
Akka

akka dead letters and how to monitor for them

So we are a little way into this mini series on Akka now.

We have covered a few of the fundamental topics of Akka such as

  • Supervision
  • Mailboxes
  • Logging

When we talked about mailboxes we discussed the fact that there were queues involves that are used as mailboxes.

Generally speaking where there are queues concerned there is also the possibility of dead letters.

In this post we will talk about what ‘dead letters’ are within Akka, and also look at how you can monitor for dead letters

What Are Dead Letters?

In Akka messages that can’t be delivered are routed to a synthetic actor which has the path “/deadLetters”. This is for NON transport lost messages.

Akka makes no guarantees for lost messages at the transport layer.

How Can You Monitor For Them?

Within Akka there is a concept of a event bus. That is a bus that Akka uses internally to send messages. You can also use this event bus for other purposes, such as publishing/subscribing to certain types of messages.

It is kind of like a topic based subscription system.

We can use this event bus to monitor for dead letters.

It is also worth noting that the event bus can be used a general pub/sub mechanism, you can read more about it here : Akka Event Bus

But for now let’s get back to the problem in hand which is how do we monitor for dead letters?

So let’s start out with this actor code

import akka.actor.Actor
import akka.event.Logging

class LifeCycleActor extends Actor {
  val log = Logging(context.system, this)

  log.info("LifeCycleActor: constructor")

  override def preStart { log.info("LifeCycleActor: preStart") }

  override def postStop { log.info("LifeCycleActor: postStop") }

  override def preRestart(reason: Throwable, message: Option[Any]) {
    log.info("LifeCycleActor: preRestart")
    log.info(s"LifeCycleActor reason: ${reason.getMessage}")
    log.info(s"LifeCycleActor message: ${message.getOrElse("")}")
    super.preRestart(reason, message)
  }
  override def postRestart(reason: Throwable) {
    log.info("LifeCycleActor: postRestart")
    log.info(s"LifeCycleActor reason: ${reason.getMessage}")
    super.postRestart(reason)
  }
  def receive = {
    case RestartMessage => throw new Exception("RestartMessage seen")
    case _ => log.info("LifeCycleActor : got a message")
  }
}

We will then use this actor which is setup to receive dead letters

import akka.actor.{DeadLetter, Actor}

class DeadLetterMonitorActor 
  extends Actor 
  with akka.actor.ActorLogging {
  log.info("DeadLetterMonitorActor: constructor")

  def receive = {
    case d: DeadLetter => {
      log.error(s"DeadLetterMonitorActor : saw dead letter $d")
    }
    case _ => log.info("DeadLetterMonitorActor : got a message")
  }
}

We then use this demo code

import akka.actor._
import scala.language.postfixOps
import scala.io.StdIn
import akka.event.Logging

class Runner {

  def Run(): Unit = {

    //create the actor system
    val system = ActorSystem("LifeCycleSystem")

    // default Actor constructor
    val lifeCycleActor =
      system.actorOf(Props[LifeCycleActor],
        name = "lifecycleactor")

    val deadLetterMonitorActor =
      system.actorOf(Props[DeadLetterMonitorActor],
        name = "deadlettermonitoractor")

    //subscribe to system wide event bus 'DeadLetter'
    system.eventStream.subscribe(
      deadLetterMonitorActor, classOf[DeadLetter])

    val log = Logging(system, classOf[Runner])
    log.debug("Runner IS UP BABY")

    log.debug("sending lifeCycleActor a few numbers")
    lifeCycleActor ! 100
    lifeCycleActor ! 200
    Thread.sleep(1000)

    log.debug("sending lifeCycleActor a poison pill (kill it)")
    lifeCycleActor ! PoisonPill
    Thread.sleep(1000)
    log.debug("sending lifeCycleActor a few numbers")
    lifeCycleActor ! 100
    lifeCycleActor ! 200


    log.debug("stop lifeCycleActor/deadLetterMonitorActor")
    system.stop(lifeCycleActor)
    system.stop(deadLetterMonitorActor)

    //shutdown the actor system
    log.debug("stop actor system")
    system.terminate()

    StdIn.readLine()
  }
}

To to do the following

  • Create the 2 actors above
  • Subscribe the DeadLetterMonitorActor (above) to the “DeadLetter” topic of the inbuilt akka EventStream (internal pub/sub bus)
  • We then send a few messages to the LifeCycleActor
  • Then send the LifeCylceActor a PoisonPill (Which terminates it)
  • We then send a few more messages to the LifeCycleActor (which it doesn’t get cos its dead like)
  • We expect the DeadLetterMonitorActor to receive the DeadLetter messages and log these DeadLetter messages

If we run these bits of code this is the result, where it can plainly be seen that the DeadLetterMonitorActor does receive the DeadLetter messages

INFO [LifeCycleSystem-akka.actor.default-dispatcher-2] Slf4jLogger – Slf4jLogger started
16:55:49.546UTC DEBUG[LifeCycleSystem-akka.actor.default-dispatcher-2] EventStream – logger log1-Slf4jLogger started
16:55:49.547UTC DEBUG[LifeCycleSystem-akka.actor.default-dispatcher-2] EventStream – Default Loggers started
16:55:49.559UTC INFO [LifeCycleSystem-akka.actor.default-dispatcher-2] LifeCycleActor – LifeCycleActor: constructor
16:55:49.560UTC INFO [LifeCycleSystem-akka.actor.default-dispatcher-2] LifeCycleActor – LifeCycleActor: preStart
16:55:49.560UTC DEBUG[LifeCycleSystem-akka.actor.default-dispatcher-2] Runner – Runner IS UP BABY
16:55:49.561UTC DEBUG[LifeCycleSystem-akka.actor.default-dispatcher-2] Runner – sending lifeCycleActor a few numbers
16:55:49.561UTC INFO [LifeCycleSystem-akka.actor.default-dispatcher-2] DeadLetterMonitorActor – DeadLetterMonitorActor: constructor
16:55:49.563UTC INFO [LifeCycleSystem-akka.actor.default-dispatcher-2] LifeCycleActor – LifeCycleActor : got a message
16:55:49.563UTC INFO [LifeCycleSystem-akka.actor.default-dispatcher-2] LifeCycleActor – LifeCycleActor : got a message
16:55:50.561UTC DEBUG[LifeCycleSystem-akka.actor.default-dispatcher-2] Runner – sending lifeCycleActor a poison pill (kill it)
16:55:50.576UTC INFO [LifeCycleSystem-akka.actor.default-dispatcher-9] LifeCycleActor – LifeCycleActor: postStop
16:55:51.565UTC DEBUG[LifeCycleSystem-akka.actor.default-dispatcher-9] Runner – sending lifeCycleActor a few numbers
16:55:51.570UTC DEBUG[LifeCycleSystem-akka.actor.default-dispatcher-2] Runner – stop lifeCycleActor/deadLetterMonitorActor
16:55:51.573UTC DEBUG[LifeCycleSystem-akka.actor.default-dispatcher-7] Runner – stop actor system
16:55:51.581UTC INFO [LifeCycleSystem-akka.actor.default-dispatcher-9] RepointableActorRef – Message [java.lang.Integer]
    from Actor[akka://LifeCycleSystem/deadLetters] to Actor[akka://LifeCycleSystem/user/lifecycleactor#-912643257]
    was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration
    settings ‘akka.log-dead-letters’ and ‘akka.log-dead-letters-during-shutdown’.
16:55:51.581UTC INFO [LifeCycleSystem-akka.actor.default-dispatcher-9] RepointableActorRef – Message [java.lang.Integer]
    from Actor[akka://LifeCycleSystem/deadLetters] to Actor[akka://LifeCycleSystem/user/lifecycleactor#-912643257]
    was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration
    settings ‘akka.log-dead-letters’ and ‘akka.log-dead-letters-during-shutdown’.
16:55:51.588UTC ERROR[LifeCycleSystem-akka.actor.default-dispatcher-8] DeadLetterMonitorActor – DeadLetterMonitorActor :
    saw dead letter DeadLetter(100,Actor[akka://LifeCycleSystem/deadLetters],Actor[akka://LifeCycleSystem/user/lifecycleactor#-912643257])
16:55:51.590UTC ERROR[LifeCycleSystem-akka.actor.default-dispatcher-8] DeadLetterMonitorActor – DeadLetterMonitorActor :
    saw dead letter DeadLetter(200,Actor[akka://LifeCycleSystem/deadLetters],Actor[akka://LifeCycleSystem/user/lifecycleactor#-912643257])
16:55:51.609UTC INFO [LifeCycleSystem-akka.actor.default-dispatcher-6] LocalActorRef – Message [akka.actor.StopChild]
    from Actor[akka://LifeCycleSystem/deadLetters] to Actor[akka://LifeCycleSystem/user] was not delivered. [3] dead letters encountered.
    This logging can be turned off or adjusted with configuration settings ‘akka.log-dead-letters’ and ‘akka.log-dead-letters-during-shutdown’.
16:55:51.616UTC DEBUG[LifeCycleSystem-akka.actor.default-dispatcher-6] EventStream – shutting down: StandardOutLogger started

 

 

NOTE : According to the official Akka docs “Dead letters are not propagated over the network, if you want to collect them in one place you will have to subscribe one actor per network node and forward them manually.”

Are All Messages In DeadLetters A Problem

No, not all of the messages that end up in the deadLetters synthetic actor are problems. For example suppose an actor is sent several Terminate messages. Only one of these will take effect, the others will likely end up in deadLetters, but this is not a concern.

The deadLetters actor and the monitoring of it is more of a debugging aid than anything else. You will have to use some modicum of common sense when examining the results of the deadLetters logging, which we discussed above

Where Is The Code?

As previously stated all the code for this series will end up in this GitHub repo:

https://github.com/sachabarber/SachaBarber.AkkaExamples

Akka

akka logging

This is likely to be a smaller one of the series, but just because it is small in size doesn’t mean that it is not mighty.

Every app needs logging, and when working with a distributed set of actors this is crucial.

Akka provides 2 types of logging adaptor out of the box

  • Console
  • SLF4J (where you need the appropriate back end for this which is usually Logback)

It also has various configuration sections that allow you to adjust the following elements of Akka

  • Akka setup messages
  • Receive of messages
  • All actor lifecycle messages
  • Finite state machine messages
  • EventStream subscription messages
  • Remoting messages

Before we look at how you can customize the logging to capture all this good stuff lets first see what steps you need to setup basic logging in Akka

Step1 : Grab the JARs

There are a couple of JARs you will need to perform logging in Akka. These are shown below

See Built.sbt

name := "HelloWorld"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies ++= Seq(
  "com.typesafe.akka" % "akka-actor_2.11" % "2.4.8",
  "ch.qos.logback" % "logback-classic" % "1.1.7",
  "com.typesafe.akka" % "akka-slf4j_2.11" % "2.4.8")

Step2 : application.conf

You must then configure how Akka will log its entries. This is done in an configuration file, I have decided to call mine “application.conf”

See resources/application.conf

akka {
  # event-handlers = ["akka.event.slf4j.Slf4jEventHandler"]
  loggers = ["akka.event.slf4j.Slf4jLogger"]
  loglevel = "DEBUG"
}

Step3 : logback.xml

For the SL4J logging to work we need to configure logback. This is typically done with a configuration file called “logback.xml”.

An example of this may look like this

See “resources/logback.xml”

<?xml version="1.0" encoding="UTF-8"?>
<configuration>

    <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
        <target>System.out</target>
        <encoder>
            <pattern>%X{akkaTimestamp} %-5level[%thread] %logger{0} - %msg%n</pattern>
        </encoder>
    </appender>

    <appender name="FILE" class="ch.qos.logback.core.FileAppender">
        <file>c:/logs/akka.log</file>
        <append>true</append>
        <encoder>
            <pattern>%date{yyyy-MM-dd} %X{akkaTimestamp} %-5level[%thread] %logger{1} - %msg%n</pattern>
        </encoder>
    </appender>

    <logger name="akka" level="DEBUG" />

    <root level="DEBUG">
        <appender-ref ref="CONSOLE"/>
        <appender-ref ref="FILE"/>
    </root>

</configuration>

Step4 : Log Some Stuff

Once you have the above steps completed. You have 2 choices about how to consume the logging

Using The LoggingAdaptor.Apply

You are able to use the LoggingAdaptor.Apply to create a new log that you may use. Here is an example

import akka.actor._
import scala.language.postfixOps
import scala.io.StdIn
import akka.event.Logging

class Runner {

  def Run(): Unit = {

    //create the actor system
    val system = ActorSystem("LifeCycleSystem")

    val log = Logging(system, classOf[Runner])
    log.debug("Runner IS UP BABY")
    ...

    StdIn.readLine()
  }
}

Using Logging Trait

To make this process easier Akka also provides a trait that can be mixed in called “akka.actor.ActorLogging”. This can be mixed in wherever you require logging

import akka.actor.Actor
import akka.event.Logging

class LifeCycleActorWithLoggingTrait extends Actor with akka.actor.ActorLogging {

  log.info("LifeCycleActor: constructor")

  .....
}
 

Async Or NOT

Some of the more experiences JVM/Scala devs amongst you may think heck I can just use my own logging, I don’t need the Akka trait or LoggingAdaptor.

The thing is if you use the Akka trait or LogginAdaptor they are setup to log asynchronously and not introduce any time delays into the messaging pipeline when logging.

So just be warned that you should probably use the inbuilt Akka stuff rather than roll your own. Logging to things like an ELK stack may be the exception.

Common Akka Configuration Around Logging

These configuration sections are useful for controlling what is logged

General Log Level

akka {
  # general logging level
  loglevel = DEBUG
}

Log Akka Config At Start

akka {
  # Log the complete configuration at INFO level when the actor system is started.
  # This is useful when you are uncertain of what configuration is used.
  log-config-on-start = on
}

Actor Receive Messages

akka {
  debug {
    # enable function of LoggingReceive, which is to log any received message at
    # DEBUG level
    receive = on
  }
}

You can also monitor lifecycle events like this

akka {
  debug {
    # enable DEBUG logging of actor lifecycle changes
    lifecycle = on
  }
}

Remoting Logging

Firstly you can log the message the remote actor is sent by the transport layer

kka {
  remote {
    # If this is "on", Akka will log all outbound messages at DEBUG level, if off then they are not logged
    log-sent-messages = on
  }
}

You may also see what messages are received by the transport layer like this

akka {
  remote {
    # If this is "on", Akka will log all inbound messages at DEBUG level, if off then they are not logged
    log-received-messages = on
  }
}

Where Is The Code?

As previously stated all the code for this series will end up in this GitHub repo:

https://github.com/sachabarber/SachaBarber.AkkaExamples

Uncategorized

akka mailboxes

This post will be a bit smaller than the ones we have just done, but none the less still just as important.

Let’ start by talking about mailboxes and dispatchers, and what exactly they are and how they relate to each other.

What’s A Mailbox?

In Akka the mailbox is some time of queue that holds the messages for an actor. There is usually a mailbox per actor. Though in some cases, where routing gets involved there may only be one mailbox between a number of actors, but for now lets assume a one to one relationship between mailboxes and actors.

What’s A Dispatcher

In Akka a Dispatcher is the heart of the actor system, and it is the thing that dispatches the messages.

There is a way that Akka actors may be configured to use a certain Dispatcher and the Dispatcher may in turn be configured to use a certain mailbox type.

Here is an example of how you might configure an actor to use a custom Dispatcher

You may have this code for an actor

import akka.actor.Props
val myActor =
  context.actorOf(Props[MyActor].withDispatcher("my-dispatcher"), "myactor1")

Where you may have this custom Dispatcher in your configuration of the system

my-dispatcher {

  # Type of mailbox to use for the Dispatcher
  mailbox-requirement = org.example.MyInterface

  # Dispatcher is the name of the event-based dispatcher
  type = Dispatcher

  # What kind of ExecutionService to use
  executor = "thread-pool-executor"

  # Configuration for the thread pool
  thread-pool-executor {

    # minimum number of threads to cap factor-based core number to
    core-pool-size-min = 2

    # No of core threads ... ceil(available processors * factor)
    core-pool-size-factor = 2.0

    # maximum number of threads to cap factor-based number to
    core-pool-size-max = 10
  }

  # Throughput defines the maximum number of messages to be
  # processed per actor before the thread jumps to the next actor.
  # Set to 1 for as fair as possible.
  throughput = 100
}

It can be seen above that we are able to configure the mailbox type for a Dispatcher in the configuration using the line

# Type of mailbox to use for the Dispatcher
mailbox-requirement = org.example.MyInterface

There are actually several inbuilt Dispatcher types that you may use when creating a custom Dispatcher.

Talking about Dispatch types and how they all work is kind of out of scope for what I wanted to talk about in this post though, so if you want to know more about Akka Dispatchers you should consult the official Akka documentation

http://doc.akka.io/docs/akka/snapshot/scala/dispatchers.html

Ok so now that we have taken that slight detour and talked about how you can associate a mailbox type with a custom Dispatcher should you want to let’s get back to the main thrust of this post, which is to talk about mailboxes.

As previously stated mailboxes represent a storage mechanism for an actors messages.

Built In Mailbox Types

Akka comes shipped with a number of mailbox implementations:

UnboundedMailbox – The default mailbox

  • Backed by a java.util.concurrent.ConcurrentLinkedQueue
  • Blocking: No
  • Bounded: No
  • Configuration name: “unbounded” or “akka.dispatch.UnboundedMailbox”

SingleConsumerOnlyUnboundedMailbox

  • Backed by a very efficient Multiple Producer Single Consumer queue, cannot be used with
  • BalancingDispatcher
  • Blocking: No
  • Bounded: No
  • Configuration name: “akka.dispatch.SingleConsumerOnlyUnboundedMailbox”

BoundedMailbox

  • Backed by a java.util.concurrent.LinkedBlockingQueue
  • Blocking: Yes
  • Bounded: Yes
  • Configuration name: “bounded” or “akka.dispatch.BoundedMailbox”

UnboundedPriorityMailbox

  • Backed by a java.util.concurrent.PriorityBlockingQueue
  • Blocking: Yes
  • Bounded: No
  • Configuration name: “akka.dispatch.UnboundedPriorityMailbox”

BoundedPriorityMailbox

  • Backed by a java.util.PriorityBlockingQueue wrapped in an akka.util.BoundedBlockingQueue
  • Blocking: Yes
  • Bounded: Yes
  • Configuration name: “akka.dispatch.BoundedPriorityMailbox”

 

Default Mailbox

As shown above the unbounded mailbox is the default. You can however swap out the default using the following configuration, though you will need to ensure that the chosen default mailbox is the correct one for the type of Dispatcher used. For example a SingleConsumerOnlyUnboundedMailbox can not be used with a BalancingDispatcher

Anyway this is how you would change the default mailbox in config

akka.actor.default-mailbox {
  mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
}

 

Mailbox Type For An Actor

It is possible to associate a particular type of mailbox with a particular type of an actor which can be done by mixing in the RequiresMessageQueue trait

import akka.dispatch.RequiresMessageQueue
import akka.dispatch.BoundedMessageQueueSemantics
 
class SomeActor extends Actor
  with RequiresMessageQueue[BoundedMessageQueueSemantics]

Where you would use the following configuration to configure the mailbox

bounded-mailbox {
  mailbox-type = "akka.dispatch.BoundedMailbox"
  mailbox-capacity = 1000
  mailbox-push-timeout-time = 10s
}
 
akka.actor.mailbox.requirements {
  "akka.dispatch.BoundedMessageQueueSemantics" = bounded-mailbox
}

It is worth noting that this setting could be overwritten by code or by a dispatcher mailbox configuration section

Where Is The Code?

As previously stated all the code for this series will end up in this GitHub repo:

https://github.com/sachabarber/SachaBarber.AkkaExamples