Monthly Archives: September 2016

AKKA : TESTKIT

So the journey continues, we have covered a fair bit of ground, but there is still a long way to go yet, with many exciting features of Akka yet to cover.

Bit before we get on to some of the more advanced stuff, I thought it would be a good idea to take a small detour and look at how you can test Akka actor systems.

TestKit

Akka comes with a completely separate module for testing which you MUST include, this can be obtained using the following SBT settings

name := "Testing"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies ++= Seq(
  "com.typesafe.akka"   %%    "akka-actor"    %   "2.4.8",
  "com.typesafe.akka"   %%    "akka-testkit"  %   "2.4.8"   %   "test",
  "org.scalatest"       %%    "scalatest"     %   "2.2.5"   %   "test"
)

I have chosen to use scalatest, but you could use other popular testing frameworks such as specs2.

 

 

Testing Actors

The following sub sections will outline how Akka allows the testing of actors

 

The built in ‘testActor’

The official Akka testkit docs do a great job of explaining the testActor and why there is a need for a test actor.

Testing the business logic inside Actor classes can be divided into two parts: first, each atomic operation must work in isolation, then sequences of incoming events must be processed correctly, even in the presence of some possible variability in the ordering of events. The former is the primary use case for single-threaded unit testing, while the latter can only be verified in integration tests.

Normally, the ActorRef shields the underlying Actor instance from the outside, the only communications channel is the actor’s mailbox. This restriction is an impediment to unit testing, which led to the inception of the TestActorRef.

http://doc.akka.io/docs/akka/snapshot/scala/testing.html#Using_Multiple_Probe_Actors

It is by using this testActor that we are able to test the individual operations of an actor under test.

You essentially instantiate the TestActorRef passing it the real actor you would like to test. The test actor then allows you to send messages which are forwarded to the contained real actor that you are attempting to test.

CallingThreadDispatcher/TestActorRef

As Akka is an asynchronous beast by nature, and uses the concept of Dispatchers to conduct the dispatching of messages. We have also seen that the message loop (receive) can be replaced with become/unbecome, all of which contributes to the overall behviour of the actor being quite hard to test.

Akka comes with a special actor called TestActorRef.Which is a special actor that comes with the Akka TestKit.

It should come as no surprise that this TestActorRef aslo makes use of a Dispatcher. But what makes this actor more suited to testing is that it uses a specialized testing Dispatcher, which makes testing the asynchronous code easier to test.

The specialized dispatcher is called CallingThreadDispatcher. As the name suggests it uses the current thread to deal with the message dispatching.

This makes thing easier of that there is no doubt. As stated you don’t really need to do anything other than use the Akka TestKit TestActorRef.

 

Anatomy Of An Actor Testkit Test

This is what a basic skeleton looks like when using the Akka TestKit (please note this is for ScalaTest)

import akka.actor.{Props, ActorSystem}
import akka.util.Timeout
import org.scalatest._
import akka.testkit.{ImplicitSender, TestKit, TestActorRef}
import scala.concurrent.duration._
import scala.concurrent.Await
import akka.pattern.ask

import scala.util.Success


class HelloActorTests
  extends TestKit(ActorSystem("MySpec"))
  with ImplicitSender
  with WordSpecLike
  with BeforeAndAfterAll
  with Matchers {

  override def afterAll {
    TestKit.shutdownActorSystem(system)
  }



}


There are a couple of things to note there, so lets go through them

  • Extending the akka TestKit trait. allows us to get all the good pre-canned assertions that allow us to assert facts about our actors
  • Extending the akka ImplicitSender trait allow us to have an actual sender which would be set to the test suits self actor as the sender.

 

The Built In Assertions

The Akka TestKit comes with many useful assertions which you can see here

expectMsg[T](d: Duration, msg: T): T
expectMsgPF[T](d: Duration)(pf: PartialFunction[Any, T]): T
expectMsgClass[T](d: Duration, c: Class[T]): T
expectMsgType[T: Manifest](d: Duration)
expectMsgAnyOf[T](d: Duration, obj: T*): T
expectMsgAnyClassOf[T](d: Duration, obj: Class[_ <: T]*): T
expectMsgAllOf[T](d: Duration, obj: T*): Seq[T]
expectMsgAllClassOf[T](d: Duration, c: Class[_ <: T]*): Seq[T]
expectMsgAllConformingOf[T](d: Duration, c: Class[_ <: T]*): Seq[T]
expectNoMsg(d: Duration)
receiveN(n: Int, d: Duration): Seq[AnyRef]
fishForMessage(max: Duration, hint: String)
	(pf: PartialFunction[Any, Boolean]): Any
receiveOne(d: Duration): AnyRef
receiveWhile[T](max: Duration, idle: Duration, messages: Int)
	(pf: PartialFunction[Any, T]): Seq[T]
awaitCond(p: => Boolean, max: Duration, interval: Duration)
awaitAssert(a: => Any, max: Duration, interval: Duration)
ignoreMsg(pf: PartialFunction[AnyRef, Boolean])
within[T](min: FiniteDuration, max: FiniteDuration)
	(f: ⇒ T): T

You can read more about these at the official documentation: http://doc.akka.io/docs/akka/snapshot/scala/testing.html#Built-In_Assertions

 

Demo : HelloActor That We Will Test

Lets assume we have this actor which we would like to test for the next 3 points

import akka.actor.Actor


class HelloActor extends Actor {
  def receive = {
    case "hello" => sender ! "hello world"
    case _       => throw new IllegalArgumentException("bad juju")
  }
}

Sending Messages

So we have the HelloActor above, and we would like to send a message to it, and assert that the sender (the test suite testActor) gets a message back.

Here is how we might do that.

import akka.actor.{Props, ActorSystem}
import akka.util.Timeout
import org.scalatest._
import akka.testkit.{ImplicitSender, TestKit, TestActorRef}
import scala.concurrent.duration._
import scala.concurrent.Await
import akka.pattern.ask

import scala.util.Success


class HelloActorTests
  extends TestKit(ActorSystem("MySpec"))
  with ImplicitSender
  with WordSpecLike
  with BeforeAndAfterAll
  with Matchers {

  override def afterAll {
    TestKit.shutdownActorSystem(system)
  }

  "An HelloActor using implicit sender " must {
    "send back 'hello world'" in {
      val helloActor = system.actorOf(Props[HelloActor], name = "helloActor")
      helloActor ! "hello"
      expectMsg("hello world")
    }
  }
}


 

This is thanks to the fact that we used the ImplicitSender trait, and we used the available akka TestKit assertions

 

Expecting A Response

Another thing that is quite common is to expect a response from an actor that we have asked a result from using the Akka ask pattern (which returns a Future[T] to represent the eventual result)

Here is how we might write a test for this

import akka.actor.{Props, ActorSystem}
import akka.util.Timeout
import org.scalatest._
import akka.testkit.{ImplicitSender, TestKit, TestActorRef}
import scala.concurrent.duration._
import scala.concurrent.Await
import akka.pattern.ask

import scala.util.Success


class HelloActorTests
  extends TestKit(ActorSystem("MySpec"))
  with ImplicitSender
  with WordSpecLike
  with BeforeAndAfterAll
  with Matchers {

  override def afterAll {
    TestKit.shutdownActorSystem(system)
  }

  "An HelloActor using TestActorRef " must {
    "send back 'hello world' when asked" in {
      implicit val timeout = Timeout(5 seconds)
      val helloActorRef = TestActorRef(new HelloActor)
      val future = helloActorRef ? "hello"
      val Success(result: String) = future.value.get
      result should be("hello world")
    }
  }
}

It can be seen that we make use of the TestActorRef (which we discussed above the one that uses the CallingThreadDispatcher), as the actor that we use to wrap (for want of a better word) the actual actor we wish to test.

 

Expecting Exceptions

Another completely plausible thing to want to do is test for exceptions that may be thrown. It can be seen in the HelloActor that we are trying to test that it will throw an IllegalArgumentException should it see a message it doesn’t handle.

So how do we test that?

We use the inbuilt intercept function to allow us to catch the exception

import akka.actor.{Props, ActorSystem}
import akka.util.Timeout
import org.scalatest._
import akka.testkit.{ImplicitSender, TestKit, TestActorRef}
import scala.concurrent.duration._
import scala.concurrent.Await
import akka.pattern.ask

import scala.util.Success


class HelloActorTests
  extends TestKit(ActorSystem("MySpec"))
  with ImplicitSender
  with WordSpecLike
  with BeforeAndAfterAll
  with Matchers {

  override def afterAll {
    TestKit.shutdownActorSystem(system)
  }

  "An HelloActor using TestActorRef " must {
    "should throw IllegalArgumentException when sent unhandled message" in {
      val actorRef = TestActorRef(new HelloActor)
      intercept[IllegalArgumentException] { actorRef.receive("should blow up") }
    }
  }
}


 

 

Testing Finite State Machines

Last time we looked at implementing finite state machines in Akka. One of the methods we use for that was using Akka FSM. Lets examine how we can test those using the TestKit.

Lets assume we have this simple AkkaFSM example (based on the LightSwitch demo from the last article)

import akka.actor.{Actor, ActorRef, FSM}
import scala.concurrent.duration._
import scala.collection._

// received events
final case class PowerOn()
final case class PowerOff()

// states
sealed trait LightSwitchState
case object On extends LightSwitchState
case object Off extends LightSwitchState

//data
sealed trait LightSwitchData
case object NoData extends LightSwitchData

class LightSwitchActor extends FSM[LightSwitchState, LightSwitchData] {

  startWith(Off, NoData)

  when(Off) {
    case Event(PowerOn, _) =>
      goto(On) using NoData
  }

  when(On, stateTimeout = 1 second) {
    case Event(PowerOff, _) =>
      goto(Off) using NoData
    case Event(StateTimeout, _) =>
      println("'On' state timed out, moving to 'Off'")
      goto(Off) using NoData
  }

  whenUnhandled {
    case Event(e, s) =>
      log.warning("received unhandled request {} in state {}/{}", e, stateName, s)
      goto(Off) using NoData
  }

  onTransition {
    case Off -> On => println("Moved from Off to On")
    case On -> Off => println("Moved from On to Off")
  }

  initialize()
}

This example is fairly good as it only has 2 states On/Off. So it makes for quite a good simply example to showcase the testing.

Another Special Test Actor

To test FSMs there is yet another specialized actor which may only be used for testing FSMs. This actor is call TestFSMRef. Just like the TestActorRef you use the TestFSMRef to accept the actual FSM actor you are trying to test.

Here is an example of that

 val fsm = TestFSMRef(new LightSwitchActor())

The TestFSMRef comes with a whole host of useful methods, properties that can be used when testing FSMs. We will see some of them used below.

Testing Initial State

As we saw last time Akka FSM has the idea of an initialise() method that may be used to place the FSM in an initial state. So we should be able to test that.

Here is how we can do that

import akka.actor.{ActorSystem, Props}
import akka.pattern.ask
import akka.testkit.{ImplicitSender, TestActorRef, TestKit}
import akka.util.Timeout
import org.scalatest._
import akka.testkit.TestFSMRef
import akka.actor.FSM

import scala.concurrent.duration._
import scala.util.Success


class LightSwitchFSMActorTests
  extends TestKit(ActorSystem("MySpec"))
  with ImplicitSender
  with WordSpecLike
  with BeforeAndAfterAll
  with Matchers {

  override def afterAll {
    TestKit.shutdownActorSystem(system)
  }

  "An LightSwitchActor " must {
    "start in the 'Off' state" in {
      val fsm = TestFSMRef(new LightSwitchActor())
      assert(fsm.stateName == Off)
      assert(fsm.stateData == NoData)
    }
  }
}


Testing State Move

The TestFSMRef comes with the ability to move the underlying FSM actor into a new state using the setState method. Here is an example of how to use that:

import akka.actor.{ActorSystem, Props}
import akka.pattern.ask
import akka.testkit.{ImplicitSender, TestActorRef, TestKit}
import akka.util.Timeout
import org.scalatest._
import akka.testkit.TestFSMRef
import akka.actor.FSM

import scala.concurrent.duration._
import scala.util.Success


class LightSwitchFSMActorTests
  extends TestKit(ActorSystem("MySpec"))
  with ImplicitSender
  with WordSpecLike
  with BeforeAndAfterAll
  with Matchers {

  override def afterAll {
    TestKit.shutdownActorSystem(system)
  }

  "An LightSwitchActor that starts with 'Off' " must {
    "should transition to 'On' when told to by the test" in {
      val fsm = TestFSMRef(new LightSwitchActor())
      fsm.setState(stateName = On)
      assert(fsm.stateName == On)
      assert(fsm.stateData == NoData)
    }
  }
}


You can of course still send messages to the TestFSMRef which would instruct the underlying FSM actor to move to a new state.

import akka.actor.{ActorSystem, Props}
import akka.pattern.ask
import akka.testkit.{ImplicitSender, TestActorRef, TestKit}
import akka.util.Timeout
import org.scalatest._
import akka.testkit.TestFSMRef
import akka.actor.FSM

import scala.concurrent.duration._
import scala.util.Success


class LightSwitchFSMActorTests
  extends TestKit(ActorSystem("MySpec"))
  with ImplicitSender
  with WordSpecLike
  with BeforeAndAfterAll
  with Matchers {

  override def afterAll {
    TestKit.shutdownActorSystem(system)
  }

  "An LightSwitchActor that starts with 'Off' " must {
    "should transition to 'On' when sent a 'PowerOn' message" in {
      val fsm = TestFSMRef(new LightSwitchActor())
      fsm ! PowerOn
      assert(fsm.stateName == On)
      assert(fsm.stateData == NoData)
    }
  }
}


 
Testing StateTimeout

Another thing that AkkaFSM supports is the notion of a StateTimeout. In the example FSM we are trying to test, if the FSM stays in the On state for more than 1 second it should automatically move to the Off state.

So how do we test that?

Here is how:

import akka.actor.{ActorSystem, Props}
import akka.pattern.ask
import akka.testkit.{ImplicitSender, TestActorRef, TestKit}
import akka.util.Timeout
import org.scalatest._
import akka.testkit.TestFSMRef
import akka.actor.FSM

import scala.concurrent.duration._
import scala.util.Success


class LightSwitchFSMActorTests
  extends TestKit(ActorSystem("MySpec"))
  with ImplicitSender
  with WordSpecLike
  with BeforeAndAfterAll
  with Matchers {

  override def afterAll {
    TestKit.shutdownActorSystem(system)
  }

  "An LightSwitchActor that stays 'On' for more than 1 second " must {
    "should transition to 'Off' thanks to the StateTimeout" in {
      val fsm = TestFSMRef(new LightSwitchActor())
      fsm ! PowerOn
      awaitCond(fsm.stateName == Off, 1200 milliseconds, 100 milliseconds)
    }
  }
}


 

Testing Using Probes

So far we have been looking at testing a single actor that might reply to a single sender. Sometimes though we may need to test an enture suite of actors all working together. And due to the single threaded nature of the TestActorRef (thanks to the very useful CurrentThreadDispatcher), we may find it difficult to distinguish the incoming messages read.

Akka TestKit provides yet another abstraction to deal with this, which is the idea of a concrete actor that you inject into the message flow. This concept is called a TestProbe.

Lets assume we have this actor that replies to 2 actorRef.

import akka.actor.{ActorRef, Actor}

class DoubleSenderActor extends Actor {
  var dest1: ActorRef = _
  var dest2: ActorRef = _
  def receive = {
    case (d1: ActorRef, d2: ActorRef) =>
      dest1 = d1
      dest2 = d2
    case x =>
      dest1 ! x
      dest2 ! x
  }
}

We could test this code as follows using the TestProbe.

import akka.actor.{ActorSystem, Props}
import akka.pattern.ask
import akka.testkit.{TestProbe, ImplicitSender, TestActorRef, TestKit}
import akka.util.Timeout
import org.scalatest._

import scala.concurrent.duration._
import scala.util.Success


class DoubleSenderActorTestsUsingProbe
  extends TestKit(ActorSystem("MySpec"))
  with ImplicitSender
  with WordSpecLike
  with BeforeAndAfterAll
  with Matchers {

  override def afterAll {
    TestKit.shutdownActorSystem(system)
  }

  "An DoubleSenderActor that has 2 target ActorRef for sending messages to " must {
    "should send messages to both supplied 'TestProbe(s)'" in {
      val doubleSenderActor = system.actorOf(Props[DoubleSenderActor],
        name = "multiSenderActor")
      val probe1 = TestProbe()
      val probe2 = TestProbe()
      doubleSenderActor ! ((probe1.ref, probe2.ref))
      doubleSenderActor ! "hello"
      probe1.expectMsg(500 millis, "hello")
      probe2.expectMsg(500 millis, "hello")
    }
  }
}


It can be seen that the TestProbe comes with its own set of useful assertion methods. This is due to the fact that TestProbe inherits from the TestKit trait, and as such you can expect to find ALL the TestKit traits assertions available to use when using TestProbe objects.

 

 

Where Can I Find The Code Examples?

I will be augmenting this GitHub repo with the example projects as I move through this series

https://github.com/sachabarber/SachaBarber.AkkaExamples

Tagged

akka : state machines

In this post we will look at 2 ways you can write state machines with Akka. We will firstly examine the more primitive (but easily understandable) approach, and then look into the more sophisticated approach offered by AkkaFSM.

What Is A State Machine?

For those of you out there that do not know what a state machine is.

This is what Wikipedia says about them

A finite-state machine (FSM) or finite-state automaton (FSA, plural: automata), or simply a state machine, is a mathematical model of computation used to design both computer programs and sequential logic circuits. It is conceived as an abstract machine that can be in one of a finite number of states. The machine is in only one state at a time; the state it is in at any given time is called the current state. It can change from one state to another when initiated by a triggering event or condition; this is called a transition. A particular FSM is defined by a list of its states, and the triggering condition for each transition.

https://en.wikipedia.org/wiki/Finite-state_machine

This could be an example state machine for a coin operated barrier

Akka supports swapping the standard message loop using become which is available via the context where  this is the standard signature of receive (the message loop)

PartialFunction[Any, Unit]

The newly applied message loops are maintained in a stack and may be pushed popped

Become

There are different ways of swapping out the message loop. Here is one such example

import akka.actor.Actor

class HotColdStateActor extends Actor {

  //need this for become/unbecome
  import context._

  def cold: Receive = {
    case "snow" => println("I am already cold!")
    case "sun" => becomeHot
  }

  def hot: Receive = {
    case "sun" => println("I am already hot!")
    case "snow" => becomeCold
  }

  def receive = {
    case "snow" => becomeCold
    case "sun" => becomeHot
  }


  private def becomeCold: Unit = {
    println("becoming cold")
    become(cold)
  }

  private def becomeHot: Unit = {
    println("becoming hot")
    become(hot)
  }
}

With this example we simply use become to push a new message loop, the latest become code is the current message loop.

If we use the following demo code against this actor code

import akka.actor._
import scala.language.postfixOps
import scala.io.StdIn

object Demo extends App {

  RunBecomeUnbecomeStateDemo

  def RunHotColdStateDemo : Unit = {
    //create the actor system
    val system = ActorSystem("StateMachineSystem")

    val hotColdStateActor =
      system.actorOf(Props(classOf[HotColdStateActor]),
        "demo-HotColdStateActor")


    println("sending sun")
    hotColdStateActor ! "sun"
    //actors are async, so give it chance to get message
    //obviously we would not do this in prod code, its just
    //for the demo, to get the correct ordering for the print
    //statements
    Thread.sleep(1000)

    println("sending sun")
    hotColdStateActor ! "sun"
    Thread.sleep(1000)

    println("sending snow")
    hotColdStateActor ! "snow"
    Thread.sleep(1000)

    println("sending snow")
    hotColdStateActor ! "snow"
    Thread.sleep(1000)

    println("sending sun")
    hotColdStateActor ! "sun"
    Thread.sleep(1000)

    println("sending snow")
    hotColdStateActor ! "snow"
    Thread.sleep(1000)

    StdIn.readLine()

    //shutdown the actor system
    system.terminate()

    StdIn.readLine()
  }

}

We would see output like this

image

 

UnBecome

The other way to swap out the message loop relies on having matching pairs of become/unbecome. Where the standard message loop is not replaced as such, but will use the last value on a stack of values as the message loop.

Care must taken to ensure the amount of push (become) and pop (unbecome) operations match, otherwise memory leaks may occur. Which is why this is not the default behavior.

Here is an example actor that uses the become/unbecome matched operations.

import akka.actor.Actor

class BecomeUnbecomeStateActor extends Actor {

  //need this for become/unbecome
  import context._

  def receive = {
    case "snow" =>
      println("saw snow, becoming")
      become({
        case "sun" =>
          println("saw sun, unbecoming")
          unbecome() // resets the latest 'become' (just for fun)
        case _ => println("Unknown message, state only likes sun")
      }, discardOld = false) // push on top instead of replace

    case _ => println("Unknown message, state only likes snow")
  }



}

I personally think this is harder to read, and manage.

Here is some demo code to exercise this actor:

import akka.actor._
import scala.language.postfixOps
import scala.io.StdIn

object Demo extends App {

  RunBecomeUnbecomeStateDemo


  def RunBecomeUnbecomeStateDemo : Unit = {
    //create the actor system
    val system = ActorSystem("StateMachineSystem")

    val becomeUnbecomeStateActor =
      system.actorOf(Props(classOf[BecomeUnbecomeStateActor]),
        "demo-BecomeUnbecomeStateActor")

    println("Sending snow")
    becomeUnbecomeStateActor ! "snow"
    //actors are async, so give it chance to get message
    Thread.sleep(1000)

    println("Sending snow")
    becomeUnbecomeStateActor ! "snow"
    Thread.sleep(1000)

    println("Sending sun")
    becomeUnbecomeStateActor ! "sun"
    Thread.sleep(1000)

    println("Sending sun")
    becomeUnbecomeStateActor ! "sun"
    Thread.sleep(1000)

    println("Sending snow")
    becomeUnbecomeStateActor ! "snow"
    Thread.sleep(1000)

    StdIn.readLine()

    //shutdown the actor system
    system.terminate()

    StdIn.readLine()
  }

}

Which when run should yield the following results

image

 

AkkaFSM

While become/unbecome will get the job done. Akka comes with a much better alternative called Akka FSM

Using Akka FSM you can not only handle states but also have state data, and add code against the movement from one state to the next, which as most people will know are known as “transitions”.

When using Akka FSM you may see a state machine expressed using this sort of notation

State(S) x Event(E) -> Actions (A), State(S’)

f we are in state S and the event E occurs, we should perform the actions A and make a transition to the state S’.

To use Akka FSM there are a number of things you can do. Some of them are mandatory and some you can opt into depending on your requirements. Lets have a look at some of the moving parts that make Akka FSM shine.

FSM[S,D]

In order to use Akka FSM you need to mixin the FSM trait. The trait itself looks like this

trait FSM[S, D]

 

Where S is the state type, and D is the data type.

startWith

You can choose what state the FSM starts in by using the startWith method which has the following signature.

startWith(stateName: S, stateData: D, timeout: Timeout = None): Unit

initialize

Calling initialilze() performs the transition into the initial state and sets up timers (if required).

when

when is used to match the state, and is also used to control the movement to a new state using the inbuilt goto method, or possibly stay in the current state.

When uses pattern matching to match the events that a particular state can handle. As stated it is completely valid to stay in the current state or move to a new state

The examples that follow below will show you both stay and goto in action.

whenUnhandled

You should obviously try and make sure you cover all the correct state movements in response to all the events your FSM knows about. But Akka FSM also comes with the whenUnhandled method for catching events that were not handled by YOUR state handling (when) logic.

onTransition

You may also monitor the movement from one state to the next and run some code when this occurs. This is accomplished using the onTransition method.

onTransition has the following signature

nTransition(transitionHandler: TransitionHandler): Unit 

Where TransitionHandler is really just a PartialFunction that has the following generic parameters

PartialFunction[(S, S), Unit]

Where the tuple is a tuple of “from state” to “to state”.

 

Time for an example

 

Lightswitch Example

This first example is a very simply FSM, that has 2 states, On and Off. It doesn’t really need any state, however the Akka FSM trait, always needs a Data object. So in this case we simple use a base trait for the Data which we don’t really care about.

The idea behind this example is that the lightswitch can move from Off –> On, and On –> Off.

This example also shows a stateTimeout in action, where by the On state will move from On, if left in that state for more than 1 second.

Here is the full code for this example

import akka.actor.{Actor, ActorRef, FSM}
import scala.concurrent.duration._
import scala.collection._

// received events
final case class PowerOn()
final case class PowerOff()

// states
sealed trait LightSwitchState
case object On extends LightSwitchState
case object Off extends LightSwitchState

//data
sealed trait LightSwitchData
case object NoData extends LightSwitchData

class LightSwitchActor extends FSM[LightSwitchState, LightSwitchData] {

  startWith(Off, NoData)

  when(Off) {
    case Event(PowerOn, _) =>
      goto(On) using NoData
  }

  when(On, stateTimeout = 1 second) {
    case Event(PowerOff, _) =>
      goto(Off) using NoData
    case Event(StateTimeout, _) =>
      println("'On' state timed out, moving to 'Off'")
      goto(Off) using NoData
  }

  whenUnhandled {
    case Event(e, s) =>
      log.warning("received unhandled request {} in state {}/{}", e, stateName, s)
      goto(Off) using NoData
  }

  onTransition {
    case Off -> On => println("Moved from Off to On")
    case On -> Off => println("Moved from On to Off")
  }

  initialize()
}

Which we can run using this demo code.

import akka.actor._
import scala.language.postfixOps
import scala.io.StdIn

object Demo extends App {

  RunLightSwitchDemo

  def RunLightSwitchDemo : Unit = {
    //create the actor system
    val system = ActorSystem("StateMachineSystem")

    val lightSwitchActor =
      system.actorOf(Props(classOf[LightSwitchActor]),
        "demo-LightSwitch")


    println("sending PowerOff, should be off already")
    lightSwitchActor ! PowerOff
    //akka is async allow it some time to pick up message
    //from its mailbox
    Thread.sleep(500)


    println("sending PowerOn")
    lightSwitchActor ! PowerOn
    //akka is async allow it some time to pick up message
    //from its mailbox
    Thread.sleep(500)

    println("sending PowerOff")
    lightSwitchActor ! PowerOff
    //akka is async allow it some time to pick up message
    //from its mailbox
    Thread.sleep(500)


    println("sending PowerOn")
    lightSwitchActor ! PowerOn
    //akka is async allow it some time to pick up message
    //from its mailbox
    Thread.sleep(500)

    println("sleep for a while to allow 'On' state to timeout")
    Thread.sleep(2000)

    StdIn.readLine()

    //shutdown the actor system
    system.terminate()

    StdIn.readLine()
  }



}

When run we get the following results

sending PowerOff, should be off already
[WARN] [09/06/2016 07:21:28.864] [StateMachineSystem-akka.actor.default-dispatcher-4]
  [akka://StateMachineSystem/user/demo-LightSwitch] received unhandled request PowerOff in state Off/NoData
sending PowerOn
Moved from Off to On
sending PowerOff
Moved from On to Off
sending PowerOn
Moved from Off to On
sleep for a while to allow ‘On’ state to timeout
‘On’ state timed out, moving to ‘Off’
Moved from On to Off

There is something interesting here, which is that we see an Unhandled event. Why is this?

Well this is due to the fact that this demo FSM starts in the Off state, and we send a PowerOff event. This is not handled in the when for the Off state.

We could fix this, by amending this as follows:

when(Off) {
  case Event(PowerOn, _) =>
    goto(On) using NoData
  case Event(PowerOff, _) =>
    println("already off")
    stay
}

So if we apply this amended code, and run the demo again. We would now see this output instead

sending PowerOff, should be off already
already off
sending PowerOn
Moved from Off to On
sending PowerOff
Moved from On to Off
sending PowerOn
Moved from Off to On
sleep for a while to allow ‘On’ state to timeout
‘On’ state timed out, moving to ‘Off’
Moved from On to Off

 

Buncher Example

I have basically taken this one straight from the official Akka FSM docs.

This example shall receive and queue messages while they arrive in a burst and send them on to another target actor after the burst ended or a flush request is received.

Here is the full code for this example. It is a slightly fuller example, so this time we use a proper set of data objects for the states.

import akka.actor.{Actor, ActorRef, FSM}
import scala.concurrent.duration._
import scala.collection._

// received events
final case class SetTarget(ref: ActorRef)
final case class Queue(obj: Any)
case object Flush

// sent events
final case class Batch(obj: immutable.Seq[Any])

// states
sealed trait State
case object Idle extends State
case object Active extends State

//data
sealed trait BuncherData
case object Uninitialized extends BuncherData
final case class Todo(target: ActorRef, queue: immutable.Seq[Any]) extends BuncherData

class BuncherActor extends FSM[State, BuncherData] {

  startWith(Idle, Uninitialized)

  when(Idle) {
    case Event(SetTarget(ref), Uninitialized) =>
      stay using Todo(ref, Vector.empty)
  }

  when(Active, stateTimeout = 1 second) {
    case Event(Flush | StateTimeout, t: Todo) =>
      goto(Idle) using t.copy(queue = Vector.empty)
  }

  whenUnhandled {
    // common code for both states
    case Event(Queue(obj), t @ Todo(_, v)) =>
      goto(Active) using t.copy(queue = v :+ obj)

    case Event(e, s) =>
      log.warning("received unhandled request {} in state {}/{}", e, stateName, s)
      stay
  }

  onTransition {
    case Active -> Idle =>
      stateData match {
        case Todo(ref, queue) => ref ! Batch(queue)
        case _                => // nothing to do
      }
  }

  initialize()
}


class BunchReceivingActor extends Actor {
  def receive = {
    case Batch(theBatchData) => {
      println(s"receiving the batch data $theBatchData")
    }
    case _ => println("unknown message")
  }
}

Which we can run using this demo code

import akka.actor._
import scala.language.postfixOps
import scala.io.StdIn

object Demo extends App {

  RunBuncherDemo

  def RunBuncherDemo : Unit = {
    //create the actor system
    val system = ActorSystem("StateMachineSystem")

    val buncherActor =
      system.actorOf(Props(classOf[BuncherActor]),
        "demo-Buncher")

    val bunchReceivingActor =
      system.actorOf(Props(classOf[BunchReceivingActor]),
        "demo-BunchReceiving")

    buncherActor ! SetTarget(bunchReceivingActor)

    println("sending Queue(42)")
    buncherActor ! Queue(42)
    println("sending Queue(43)")
    buncherActor ! Queue(43)
    println("sending Queue(44)")
    buncherActor ! Queue(44)
    println("sending Flush")
    buncherActor ! Flush
    println("sending Queue(45)")
    buncherActor ! Queue(45)


    StdIn.readLine()

    //shutdown the actor system
    system.terminate()

    StdIn.readLine()
  }
}

When this demo runs you should see something like this

sending Queue(42)
sending Queue(43)
sending Queue(44)
sending Flush
sending Queue(45)
receiving the batch data Vector(42, 43, 44)
receiving the batch data Vector(45)

 

 

Where Can I Find The Code Examples?

I will be augmenting this GitHub repo with the example projects as I move through this series

https://github.com/sachabarber/SachaBarber.AkkaExamples