akka : state machines

In this post we will look at 2 ways you can write state machines with Akka. We will firstly examine the more primitive (but easily understandable) approach, and then look into the more sophisticated approach offered by AkkaFSM.

What Is A State Machine?

For those of you out there that do not know what a state machine is.

This is what Wikipedia says about them

A finite-state machine (FSM) or finite-state automaton (FSA, plural: automata), or simply a state machine, is a mathematical model of computation used to design both computer programs and sequential logic circuits. It is conceived as an abstract machine that can be in one of a finite number of states. The machine is in only one state at a time; the state it is in at any given time is called the current state. It can change from one state to another when initiated by a triggering event or condition; this is called a transition. A particular FSM is defined by a list of its states, and the triggering condition for each transition.

https://en.wikipedia.org/wiki/Finite-state_machine

This could be an example state machine for a coin operated barrier

Akka supports swapping the standard message loop using become which is available via the context where  this is the standard signature of receive (the message loop)

PartialFunction[Any, Unit]

The newly applied message loops are maintained in a stack and may be pushed popped

Become

There are different ways of swapping out the message loop. Here is one such example

import akka.actor.Actor

class HotColdStateActor extends Actor {

  //need this for become/unbecome
  import context._

  def cold: Receive = {
    case "snow" => println("I am already cold!")
    case "sun" => becomeHot
  }

  def hot: Receive = {
    case "sun" => println("I am already hot!")
    case "snow" => becomeCold
  }

  def receive = {
    case "snow" => becomeCold
    case "sun" => becomeHot
  }


  private def becomeCold: Unit = {
    println("becoming cold")
    become(cold)
  }

  private def becomeHot: Unit = {
    println("becoming hot")
    become(hot)
  }
}

With this example we simply use become to push a new message loop, the latest become code is the current message loop.

If we use the following demo code against this actor code

import akka.actor._
import scala.language.postfixOps
import scala.io.StdIn

object Demo extends App {

  RunBecomeUnbecomeStateDemo

  def RunHotColdStateDemo : Unit = {
    //create the actor system
    val system = ActorSystem("StateMachineSystem")

    val hotColdStateActor =
      system.actorOf(Props(classOf[HotColdStateActor]),
        "demo-HotColdStateActor")


    println("sending sun")
    hotColdStateActor ! "sun"
    //actors are async, so give it chance to get message
    //obviously we would not do this in prod code, its just
    //for the demo, to get the correct ordering for the print
    //statements
    Thread.sleep(1000)

    println("sending sun")
    hotColdStateActor ! "sun"
    Thread.sleep(1000)

    println("sending snow")
    hotColdStateActor ! "snow"
    Thread.sleep(1000)

    println("sending snow")
    hotColdStateActor ! "snow"
    Thread.sleep(1000)

    println("sending sun")
    hotColdStateActor ! "sun"
    Thread.sleep(1000)

    println("sending snow")
    hotColdStateActor ! "snow"
    Thread.sleep(1000)

    StdIn.readLine()

    //shutdown the actor system
    system.terminate()

    StdIn.readLine()
  }

}

We would see output like this

image

 

UnBecome

The other way to swap out the message loop relies on having matching pairs of become/unbecome. Where the standard message loop is not replaced as such, but will use the last value on a stack of values as the message loop.

Care must taken to ensure the amount of push (become) and pop (unbecome) operations match, otherwise memory leaks may occur. Which is why this is not the default behavior.

Here is an example actor that uses the become/unbecome matched operations.

import akka.actor.Actor

class BecomeUnbecomeStateActor extends Actor {

  //need this for become/unbecome
  import context._

  def receive = {
    case "snow" =>
      println("saw snow, becoming")
      become({
        case "sun" =>
          println("saw sun, unbecoming")
          unbecome() // resets the latest 'become' (just for fun)
        case _ => println("Unknown message, state only likes sun")
      }, discardOld = false) // push on top instead of replace

    case _ => println("Unknown message, state only likes snow")
  }



}

I personally think this is harder to read, and manage.

Here is some demo code to exercise this actor:

import akka.actor._
import scala.language.postfixOps
import scala.io.StdIn

object Demo extends App {

  RunBecomeUnbecomeStateDemo


  def RunBecomeUnbecomeStateDemo : Unit = {
    //create the actor system
    val system = ActorSystem("StateMachineSystem")

    val becomeUnbecomeStateActor =
      system.actorOf(Props(classOf[BecomeUnbecomeStateActor]),
        "demo-BecomeUnbecomeStateActor")

    println("Sending snow")
    becomeUnbecomeStateActor ! "snow"
    //actors are async, so give it chance to get message
    Thread.sleep(1000)

    println("Sending snow")
    becomeUnbecomeStateActor ! "snow"
    Thread.sleep(1000)

    println("Sending sun")
    becomeUnbecomeStateActor ! "sun"
    Thread.sleep(1000)

    println("Sending sun")
    becomeUnbecomeStateActor ! "sun"
    Thread.sleep(1000)

    println("Sending snow")
    becomeUnbecomeStateActor ! "snow"
    Thread.sleep(1000)

    StdIn.readLine()

    //shutdown the actor system
    system.terminate()

    StdIn.readLine()
  }

}

Which when run should yield the following results

image

 

AkkaFSM

While become/unbecome will get the job done. Akka comes with a much better alternative called Akka FSM

Using Akka FSM you can not only handle states but also have state data, and add code against the movement from one state to the next, which as most people will know are known as “transitions”.

When using Akka FSM you may see a state machine expressed using this sort of notation

State(S) x Event(E) -> Actions (A), State(S’)

f we are in state S and the event E occurs, we should perform the actions A and make a transition to the state S’.

To use Akka FSM there are a number of things you can do. Some of them are mandatory and some you can opt into depending on your requirements. Lets have a look at some of the moving parts that make Akka FSM shine.

FSM[S,D]

In order to use Akka FSM you need to mixin the FSM trait. The trait itself looks like this

trait FSM[S, D]

 

Where S is the state type, and D is the data type.

startWith

You can choose what state the FSM starts in by using the startWith method which has the following signature.

startWith(stateName: S, stateData: D, timeout: Timeout = None): Unit

initialize

Calling initialilze() performs the transition into the initial state and sets up timers (if required).

when

when is used to match the state, and is also used to control the movement to a new state using the inbuilt goto method, or possibly stay in the current state.

When uses pattern matching to match the events that a particular state can handle. As stated it is completely valid to stay in the current state or move to a new state

The examples that follow below will show you both stay and goto in action.

whenUnhandled

You should obviously try and make sure you cover all the correct state movements in response to all the events your FSM knows about. But Akka FSM also comes with the whenUnhandled method for catching events that were not handled by YOUR state handling (when) logic.

onTransition

You may also monitor the movement from one state to the next and run some code when this occurs. This is accomplished using the onTransition method.

onTransition has the following signature

nTransition(transitionHandler: TransitionHandler): Unit 

Where TransitionHandler is really just a PartialFunction that has the following generic parameters

PartialFunction[(S, S), Unit]

Where the tuple is a tuple of “from state” to “to state”.

 

Time for an example

 

Lightswitch Example

This first example is a very simply FSM, that has 2 states, On and Off. It doesn’t really need any state, however the Akka FSM trait, always needs a Data object. So in this case we simple use a base trait for the Data which we don’t really care about.

The idea behind this example is that the lightswitch can move from Off –> On, and On –> Off.

This example also shows a stateTimeout in action, where by the On state will move from On, if left in that state for more than 1 second.

Here is the full code for this example

import akka.actor.{Actor, ActorRef, FSM}
import scala.concurrent.duration._
import scala.collection._

// received events
final case class PowerOn()
final case class PowerOff()

// states
sealed trait LightSwitchState
case object On extends LightSwitchState
case object Off extends LightSwitchState

//data
sealed trait LightSwitchData
case object NoData extends LightSwitchData

class LightSwitchActor extends FSM[LightSwitchState, LightSwitchData] {

  startWith(Off, NoData)

  when(Off) {
    case Event(PowerOn, _) =>
      goto(On) using NoData
  }

  when(On, stateTimeout = 1 second) {
    case Event(PowerOff, _) =>
      goto(Off) using NoData
    case Event(StateTimeout, _) =>
      println("'On' state timed out, moving to 'Off'")
      goto(Off) using NoData
  }

  whenUnhandled {
    case Event(e, s) =>
      log.warning("received unhandled request {} in state {}/{}", e, stateName, s)
      goto(Off) using NoData
  }

  onTransition {
    case Off -> On => println("Moved from Off to On")
    case On -> Off => println("Moved from On to Off")
  }

  initialize()
}

Which we can run using this demo code.

import akka.actor._
import scala.language.postfixOps
import scala.io.StdIn

object Demo extends App {

  RunLightSwitchDemo

  def RunLightSwitchDemo : Unit = {
    //create the actor system
    val system = ActorSystem("StateMachineSystem")

    val lightSwitchActor =
      system.actorOf(Props(classOf[LightSwitchActor]),
        "demo-LightSwitch")


    println("sending PowerOff, should be off already")
    lightSwitchActor ! PowerOff
    //akka is async allow it some time to pick up message
    //from its mailbox
    Thread.sleep(500)


    println("sending PowerOn")
    lightSwitchActor ! PowerOn
    //akka is async allow it some time to pick up message
    //from its mailbox
    Thread.sleep(500)

    println("sending PowerOff")
    lightSwitchActor ! PowerOff
    //akka is async allow it some time to pick up message
    //from its mailbox
    Thread.sleep(500)


    println("sending PowerOn")
    lightSwitchActor ! PowerOn
    //akka is async allow it some time to pick up message
    //from its mailbox
    Thread.sleep(500)

    println("sleep for a while to allow 'On' state to timeout")
    Thread.sleep(2000)

    StdIn.readLine()

    //shutdown the actor system
    system.terminate()

    StdIn.readLine()
  }



}

When run we get the following results

sending PowerOff, should be off already
[WARN] [09/06/2016 07:21:28.864] [StateMachineSystem-akka.actor.default-dispatcher-4]
  [akka://StateMachineSystem/user/demo-LightSwitch] received unhandled request PowerOff in state Off/NoData
sending PowerOn
Moved from Off to On
sending PowerOff
Moved from On to Off
sending PowerOn
Moved from Off to On
sleep for a while to allow ‘On’ state to timeout
‘On’ state timed out, moving to ‘Off’
Moved from On to Off

There is something interesting here, which is that we see an Unhandled event. Why is this?

Well this is due to the fact that this demo FSM starts in the Off state, and we send a PowerOff event. This is not handled in the when for the Off state.

We could fix this, by amending this as follows:

when(Off) {
  case Event(PowerOn, _) =>
    goto(On) using NoData
  case Event(PowerOff, _) =>
    println("already off")
    stay
}

So if we apply this amended code, and run the demo again. We would now see this output instead

sending PowerOff, should be off already
already off
sending PowerOn
Moved from Off to On
sending PowerOff
Moved from On to Off
sending PowerOn
Moved from Off to On
sleep for a while to allow ‘On’ state to timeout
‘On’ state timed out, moving to ‘Off’
Moved from On to Off

 

Buncher Example

I have basically taken this one straight from the official Akka FSM docs.

This example shall receive and queue messages while they arrive in a burst and send them on to another target actor after the burst ended or a flush request is received.

Here is the full code for this example. It is a slightly fuller example, so this time we use a proper set of data objects for the states.

import akka.actor.{Actor, ActorRef, FSM}
import scala.concurrent.duration._
import scala.collection._

// received events
final case class SetTarget(ref: ActorRef)
final case class Queue(obj: Any)
case object Flush

// sent events
final case class Batch(obj: immutable.Seq[Any])

// states
sealed trait State
case object Idle extends State
case object Active extends State

//data
sealed trait BuncherData
case object Uninitialized extends BuncherData
final case class Todo(target: ActorRef, queue: immutable.Seq[Any]) extends BuncherData

class BuncherActor extends FSM[State, BuncherData] {

  startWith(Idle, Uninitialized)

  when(Idle) {
    case Event(SetTarget(ref), Uninitialized) =>
      stay using Todo(ref, Vector.empty)
  }

  when(Active, stateTimeout = 1 second) {
    case Event(Flush | StateTimeout, t: Todo) =>
      goto(Idle) using t.copy(queue = Vector.empty)
  }

  whenUnhandled {
    // common code for both states
    case Event(Queue(obj), t @ Todo(_, v)) =>
      goto(Active) using t.copy(queue = v :+ obj)

    case Event(e, s) =>
      log.warning("received unhandled request {} in state {}/{}", e, stateName, s)
      stay
  }

  onTransition {
    case Active -> Idle =>
      stateData match {
        case Todo(ref, queue) => ref ! Batch(queue)
        case _                => // nothing to do
      }
  }

  initialize()
}


class BunchReceivingActor extends Actor {
  def receive = {
    case Batch(theBatchData) => {
      println(s"receiving the batch data $theBatchData")
    }
    case _ => println("unknown message")
  }
}

Which we can run using this demo code

import akka.actor._
import scala.language.postfixOps
import scala.io.StdIn

object Demo extends App {

  RunBuncherDemo

  def RunBuncherDemo : Unit = {
    //create the actor system
    val system = ActorSystem("StateMachineSystem")

    val buncherActor =
      system.actorOf(Props(classOf[BuncherActor]),
        "demo-Buncher")

    val bunchReceivingActor =
      system.actorOf(Props(classOf[BunchReceivingActor]),
        "demo-BunchReceiving")

    buncherActor ! SetTarget(bunchReceivingActor)

    println("sending Queue(42)")
    buncherActor ! Queue(42)
    println("sending Queue(43)")
    buncherActor ! Queue(43)
    println("sending Queue(44)")
    buncherActor ! Queue(44)
    println("sending Flush")
    buncherActor ! Flush
    println("sending Queue(45)")
    buncherActor ! Queue(45)


    StdIn.readLine()

    //shutdown the actor system
    system.terminate()

    StdIn.readLine()
  }
}

When this demo runs you should see something like this

sending Queue(42)
sending Queue(43)
sending Queue(44)
sending Flush
sending Queue(45)
receiving the batch data Vector(42, 43, 44)
receiving the batch data Vector(45)

 

 

Where Can I Find The Code Examples?

I will be augmenting this GitHub repo with the example projects as I move through this series

https://github.com/sachabarber/SachaBarber.AkkaExamples

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: