Category Archives: Akka

MADCAP IDEA 10 : PLAY FRAMEWORK REACTIVE KAFKA PRODUCER

Last Time

So last time we walk through the Rating Kafka streams architecture and also showed how we can query the local state stores. I also stated that the standard KafkaProducer that was used in the last post was more for demonstration purposes and long term, we would like to swap that out with a Play framework REST endpoint that allowed us to publish a message straight from our app to the Kafka rating stream processing

 

PreAmble

Just as a reminder this is part of my ongoing set of posts which I talk about here :

https://sachabarbs.wordpress.com/2017/05/01/madcap-idea/, where we will be building up to a point where we have a full app using lots of different stuff, such as these

 

  • WebPack
  • React.js
  • React Router
  • TypeScript
  • Babel.js
  • Akka
  • Scala
  • Play (Scala Http Stack)
  • MySql
  • SBT
  • Kafka
  • Kafka Streams

Ok so now that we have the introductions out of the way, lets crack on with what we want to cover in this post.

 

Where is the code?

As usual the code is on GitHub here : https://github.com/sachabarber/MadCapIdea

 

What Is This Post All About?

As stated in the last post “Kafka interactive queries” we used a standard KafkaProducer to simulate what would have come from the end user via the Play Framework API. This time we will build out the Play Framework side of things, to include the ability to produce “rating” objects that are consumed via the rating Kafka Stream processing topology introduced in the last post

 

SBT

So we already had an SBT file inside of the PlayBackEndApi project, but we need to expand that to include support for a couple of things

  • Reactive Kafka
  • Jackson JSON (play already comes with its own JSON support, but for the Kafka Serialization-DeSerialization (Serdes) I wanted to make sure it was the same as the Kafka Streams project

This means these additions to the built.sbt file

 

val configVersion = "1.0.1"

libraryDependencies ++= Seq(
  "org.reactivemongo" %% "play2-reactivemongo" % "0.11.12",
  "com.typesafe.akka" % "akka-stream-kafka_2.11" % "0.17",
  "org.skinny-framework.com.fasterxml.jackson.module" % "jackson-module-scala_2.11" % "2.8.4",
  "com.typesafe"        % "config" % configVersion
)

 

Topics

We also want to ensure that we are using the same topics as the stream processing topology so I have just replicated that class (in reality I should have made this stuff a common JAR, but meh)

 

package kafka.topics

object RatingsTopics {
    val RATING_SUBMIT_TOPIC = "rating-submit-topic"
    val RATING_OUTPUT_TOPIC = "rating-output-topic"
  }

 

Routing

As this is essentially a new route that would be called via the front end React app when a new Rating is given, we obviously need a new route/controller. The route is fairly simple which is as follows:

 

# Rating page
POST  /rating/submit/new                       controllers.RatingController.submitNewRating()

 

JSON

The new Rating route expects a Rating object to be provided as a POST in JSON. Here is the actual Rating object and play JSON handling for it

 

package Entities

import play.api.libs.json._
import play.api.libs.functional.syntax._

case class Rating(fromEmail: String, toEmail: String, score: Float)

object Rating {
  implicit val formatter = Json.format[Rating]
}

object RatingJsonFormatters {

  implicit val ratingWrites = new Writes[Rating] {
    def writes(rating: Rating) = Json.obj(
      "fromEmail" -> rating.fromEmail,
      "toEmail" -> rating.toEmail,
      "score" -> rating.score
    )
  }

  implicit val ratingReads: Reads[Rating] = (
      (JsPath \ "fromEmail").read[String] and
      (JsPath \ "toEmail").read[String] and
      ((JsPath \ "score").read[Float])
    )(Rating.apply _)
}

 

Controller

So now that we have a route lets turn our attention to the new RatingController. Which right now to just accept a new Rating just looks like this:

package controllers

import javax.inject.Inject

import Actors.Rating.RatingProducerActor
import Entities.RatingJsonFormatters._
import Entities._
import akka.actor.{ActorSystem, OneForOneStrategy, Props, SupervisorStrategy}
import akka.pattern.{Backoff, BackoffSupervisor}
import akka.stream.{ActorMaterializer, ActorMaterializerSettings, Supervision}
import play.api.libs.json._
import play.api.mvc.{Action, Controller}
import utils.Errors

import scala.concurrent.{ExecutionContext, Future}
import scala.util.Random
import scala.concurrent.duration._

class RatingController @Inject()
(
  implicit actorSystem: ActorSystem,
  ec: ExecutionContext
) extends Controller
{

  //Error handling for streams
  //http://doc.akka.io/docs/akka/2.5.2/scala/stream/stream-error.html
  val decider: Supervision.Decider = {
    case _                      => Supervision.Restart
  }

  implicit val mat = ActorMaterializer(
    ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider))
  val childRatingActorProps = Props(classOf[RatingProducerActor],mat,ec)
  val rand = new Random()
  val ratingSupervisorProps = BackoffSupervisor.props(
    Backoff.onStop(
      childRatingActorProps,
      childName = s"RatingProducerActor_${rand.nextInt()}",
      minBackoff = 3.seconds,
      maxBackoff = 30.seconds,
      randomFactor = 0.2
    ).withSupervisorStrategy(
      OneForOneStrategy() {
        case _ => SupervisorStrategy.Restart
      })
    )

  val ratingSupervisorActorRef = 
    actorSystem.actorOf(
      ratingSupervisorProps, 
      name = "ratingSupervisor"
    )

  def submitNewRating = Action.async(parse.json) { request =>
    Json.fromJson[Rating](request.body) match {
      case JsSuccess(newRating, _) => {
        ratingSupervisorActorRef ! newRating
        Future.successful(Ok(Json.toJson(
          newRating.copy(toEmail = newRating.toEmail.toUpperCase))))
      }
      case JsError(errors) =>
        Future.successful(BadRequest(
          "Could not build a Rating from the json provided. " +
          Errors.show(errors)))
    }
  }
}

 

The main points from the code above are

  • We use the standard Play framework JSON handling for the unmarshalling/marshalling to JSON
  • That controller route is async (see how it returns a Future[T]
  • That we will not process anything if the JSON is invalid
  • That the RatingController creates a supervisor actor that will supervise the creation of another actor namely a RatingProducerActor. It may look like this happens each time the RatingController is instantiated, which is true. However this only happens once as there is only one router in play, and the controller are created by the router. You can read more about this here : https://github.com/playframework/playframework/issues/4508#issuecomment-127820190. The short story is that the supervisor is created once, and the actor is supervises will be created using a BackOffSupervisor where the creation of the actor will be retried using an incremental back off strategy. We also use the OneForOneStrategy to ensure only the single failed child actor is effected by the supervisor.
  • That this controller is also responsible for creating a ActorMaterializer with a supervision strategy (more on this in the next section). The ActorMaterializer  is used to create actors within Akka Streams workflows.

 

 

RatingProducerActor

The final part of the pipeline for this post is obviously to be able to write a Rating to a Kafka topic, via a Kafka producer. As already stated I chose to use reactive a Reactive Kafka (akka streams Kafka producer which build upon Akka streams ideas, where we have Sinks/Sources/Flow/RunnableGraph all the good stuff. So here is the full code for the actor:

package Actors.Rating

import Entities.Rating
import Serialization.JSONSerde
import akka.Done
import akka.actor.{Actor, PoisonPill}
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.{ActorMaterializer, KillSwitches}
import akka.stream.scaladsl.{Keep, MergeHub, Source}
import kafka.topics.RatingsTopics
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}
import utils.Settings

import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success}


class RatingProducerActor(
    implicit materializer: ActorMaterializer,
    ec: ExecutionContext
) extends Actor {

  val jSONSerde = new JSONSerde[Rating]
  val ratingProducerSettings = ProducerSettings(
      context.system,
      new StringSerializer,
      new ByteArraySerializer)
    .withBootstrapServers(Settings.bootStrapServers)
    .withProperty(Settings.ACKS_CONFIG, "all")

  val ((mergeHubSink, killswitch), kafkaSourceFuture) =
    MergeHub.source[Rating](perProducerBufferSize = 16)
      .map(rating => {
        val ratingBytes = jSONSerde.serializer().serialize("", rating)
        (rating, ratingBytes)
      })
      .map { ratingWithBytes =>
        val (rating, ratingBytes) = ratingWithBytes
        new ProducerRecord[String, Array[Byte]](
          RatingsTopics.RATING_SUBMIT_TOPIC, rating.toEmail, ratingBytes)
      }
      .viaMat(KillSwitches.single)(Keep.both)
      .toMat(Producer.plainSink(ratingProducerSettings))(Keep.both)
      .run()

  kafkaSourceFuture.onComplete {
    case Success(value) => println(s"Got the callback, value = $value")
    case Failure(e) => self ! PoisonPill
  }

  override def postStop(): Unit = {
    println(s"RatingProducerActor seen 'Done'")
    killswitch.shutdown()    
    super.postStop()
  }

  override def receive: Receive = {
    case (rating: Rating) => {
      println(s"RatingProducerActor seen ${rating}")
      Source.single(rating).runWith(mergeHubSink)
    }
    case Done => {
      println(s"RatingProducerActor seen 'Done'")
      killswitch.shutdown()
      self ! PoisonPill
    }
  }
}

 

I’ll be honest there is a fair bit going on in that small chunk of code above so lets dissect it. What it happening exactly?

  • The most important point is that we simply use the actor as a vessel to host a reactive kafka akka stream RunnableGraph representing a Graph of MergeHub – > Reactive Kafka producer sink. This is completely fine and a normal thing to do. Discussing akka streams is out of scope for this post but if you want to know more you can read more on a previous post I did here : https://sachabarbs.wordpress.com/2016/12/13/akka-streams/ 
  • So we now know this actor hosts a stream, but the stream could fail, or the actor could fail. So what we want is if the actor fails the stream is stopped, and if the stream fails the actor is stopped. To do that we need to do a couple of thing
    • STREAM FAILING : Since the RunnableGraph can return a Future[T] we can hook a callback Success/Failure on that, and send a PoisonPill to the hosting actor. Then the supervisor actor we saw above would kick in and try and create a new instance of this actor. Another thing to note is that the stream hosted in this actor uses the ActorMaterializer that was supplied by the RatingController, where we provided a restart supervision strategy for the stream.
    • ACTOR FAILING : If the actor itself fails the Akka framework will call the postStop() method, at which point we want to shutdown the stream within this actor. So how can we shutdown the hosted stream? Well see in the middle of the stream setup there is this line .viaMat(KillSwitches.single)(Keep.both) this allows us to get a killswitch from the materialized values for the stream. Once we have a KillSwitch we can simply call its shutDown() method.
    • BELTS AND BRACES : I have also provided a way for the outside world to shutdown this actor and its hosted stream. This is via sending this actor a Done message. I have not put this in yet, but the hook is there to demonstrate how you could do this.
  • We can see that there is a MergeHub source which allows external code to push stuff through the MergeHub via the materialized Sink value from within the actor
  • We can also see that the Rating object that the actor sees is indeed pushed into the MergeHub materialized Sink via this actor, and then some transformation is done on it, to grab its raw bytes
  • We can see the final stage in the RunnableGraph is the  Reactive Kafka Producer.plainSink. Which would result in a message being pushed out to a Kafka topic from the hosted stream, pushed Rating object from this actor into the stream

And I think that is the main set of points about how this actor works

 

 

The End Result

Just to prove that this is all working here is a screen shot of the new RatingController http://localhost:9000/rating/submit/new endpoint being called with a JSON payload representing the Rating

 

image

 

And here is the Akka Http endpoint that queries the Kafka Stream state store(s)

 

http://localhost:8080/ratingByEmail?email=sacha@here.com this gives an empty list as we have NOT sent any Rating through for email “sacha@here.com” yet

 

image

 

http://localhost:8080/ratingByEmail?email=henry@there.com this gives 1 result which is consistent with the amount of Rating(s) I created

 

image

 

http://localhost:8080/ratingByEmail?email=henry@there.com this gives 3 result which is consistent with the amount of Rating(s) I created

 

image

 

So that’s cool this means that we have successfully integrated publishing of a JSON payload Rating object through Kafka to the the Kafka streams Rating processor…….Happy days

 

 

Conclusion

Straight after the last article I decided to Dockerize everything (a decision I have now reversed, due to the flay nature of Dockers dependsOn and it not truly waiting for the item depended on even when using “condition : server_healthy” and “healthcheck – test” etc et), and some code must have become corrupt, as stuff from the last post stopped working.

 

An example from the Docker-Compose docs being

 

version: '2.1'
services:
  web:
    build: .
    depends_on:
      db:
        condition: service_healthy
      redis:
        condition: service_started
  redis:
    image: redis
  db:
    image: redis
    healthcheck:
      test: "exit 0"

 

I love Docker but for highly complex setups I think you are better off using a Docker-Compose file but just not trying to bring it all up in one go. I may bring the Docker bits back into the fold for anyone that is reading this that wants to play around, but I will have to think about that closely.

Once I realized that my Docker campaign was doing more harm than good, and I reverted back to my extremely hand coded, but deterministic PowerShell startup script, I found that getting the Play Framework and a Reactive Kafka (akka streams Kafka producer up and running was quite simple, and it kind of worked like a charm first time. Yay

 

 

Next Time

Next time we should be able to making the entire rating view page work as we now have the following things

So we should quite easily be able to turn that data into a simple bootstrap table in the React portion of this application

Advertisements

MADCAP IDEA 9 : KAFKA STREAMS INTERACTIVE QUERIES

Last Time

 

So last time we came up with a sort of 1/2 way house type post that would pave the way for this one, where we examined several different types of REST frameworks for use with Scala. The requirements were that we would be able to use JSON and that there would be both a client and server side API for the chosen library. For http4s and Akka Http worked. I decided to go with Akka Http due to being more familiar with it.

So that examination of REST APIs has allowed this post to happen. In this post what we will be doing is looking at

  • How to install Kafka/Zookeeper and get them running on Windows
  • Walking through a KafkaProducer
  • Walking through a Kafka Streams processing node, and the duality of streams
  • Walking through Kafka Streams interactive queries

 

PreAmble

Just as a reminder this is part of my ongoing set of posts which I talk about here :

https://sachabarbs.wordpress.com/2017/05/01/madcap-idea/, where we will be building up to a point where we have a full app using lots of different stuff, such as these

 

  • WebPack
  • React.js
  • React Router
  • TypeScript
  • Babel.js
  • Akka
  • Scala
  • Play (Scala Http Stack)
  • MySql
  • SBT
  • Kafka
  • Kafka Streams

 

Ok so now that we have the introductions out of the way, lets crack on with what we want to cover in this post.

 

Where is the code?

As usual the code is on GitHub here : https://github.com/sachabarber/MadCapIdea

 

Before we start, what is this post all about?

 

Well I don’t know if you recall but we are attempting to create a uber simple uber type application where there are drivers/clients. A client can put out a new job, drivers bid for it. And at the end of a job they can rate each other, see the job completion/rating/view rating sections of this previous post : https://sachabarbs.wordpress.com/2017/06/27/madcap-idea-part-6-static-screen-design/

 

The ratings will be placed into streams and aggregated into permanent storage, and will be available for querying later to display in the react front end. The rating should be grouped by email, and also searchable via the use of an email.

 

So that is what we are attempting to cover in this post. However since this is the 1st time we have had to use Kafka, this post will also talk through what you have to go through to get Kafka setup on windows. In a subsequent post I will attempt to get EVERYTHING up and working (including the Play front end) in Docker containers, but for now we will assume a local install of Kafka, if nothing else its good to know how to set this up

 

How to install Kafka/Zookeeper and get them running on Windows

This section will talk you through how to get Kafka and get it working on Windows

 

Step 1 : Download Kafka

Grab Confluence Platform 3.3.0 Open Source : http://packages.confluent.io/archive/3.3/confluent-oss-3.3.0-2.11.zip

 

Step 2 : Update Dodgy BAT Files

The official Kafka windows BAT files don’t seem to work in the Confluence Platform 3.3.0 Open Source download. So replace the official [YOUR INSTALL PATH]\confluent-3.3.0\bin\windows BAT files with the ones found here : https://github.com/renukaradhya/confluentplatform/tree/master/bin/windows

 

Step 3 : Make Some Minor Changes To Log Locations etc etc

Kafka/Zookeeper as installed are setup for Linux, as such these paths won’t work on Windows. So we need to adjust that a bit. So lets do that now

 

  • Modify the [YOUR INSTALL PATH]\confluent-3.3.0\etc\kafka\zookeeper.properties file to change the dataDir to something like dataDir=c:/temp/zookeeper
  • Modify the [YOUR INSTALL PATH]\confluent-3.3.0\etc\kafka\server.properties file to uncomment the line delete.topic.enable=true

 

Step 4 : Running Zookeeper + Kafka + Creating Topics

Now that we have installed everything, it’s just a matter of running stuff up. Sadly before we can run Kafka we need to run Zookeeper, and before Kafka can send messages we need to ensure that the Kafka topics are created. Topics must exist before messages

 

Mmm that sounds like a fair bit of work. Well it is, so I decided to script this into a little PowerShell script. This script is available within the source code at https://github.com/sachabarber/MadCapIdea/tree/master/PowerShellProject/PowerShellProject where the script is called RunPipeline.ps1

 

So all we need to do is change to the directory that the RunPipeline.ps1 is in, and run it.

Obviously it is setup to my installation folders, you WILL have to change the variables at the top of the script if you want to run this on your own machine

 

Here is the contents of the RunPipeline.ps1 file

 

$global:mongoDbInstallationFolder = "C:\Program Files\MongoDB\Server\3.5\bin\"
$global:kafkaWindowsBatFolder = "C:\Apache\confluent-3.3.0\bin\windows\"
$global:kafkaTopics = 
	"rating-submit-topic",
	"rating-output-topic"
$global:ProcessesToKill = @()



function RunPipeLine() 
{
	WriteHeader "STOPPING PREVIOUS SERVICES"
	StopZookeeper
	StopKafka

	Start-Sleep -s 20
	
	WriteHeader "STARTING NEW SERVICE INSTANCES"
	StartZookeeper
	StartKafka
	
	Start-Sleep -s 20

	CreateKafkaTopics
    RunMongo

	WaitForKeyPress

	WriteHeader "KILLING PROCESSES CREATED BY SCRIPT"
	KillProcesses
}

function WriteHeader($text) 
{
	Write-Host "========================================`r`n"
	Write-Host "$text`r`n"
	Write-Host "========================================`r`n"
}


function StopZookeeper() {
	$zookeeperCommandLine = $global:kafkaWindowsBatFolder + "zookeeper-server-stop.bat"
	Write-Host "> Zookeeper Command Line : $zookeeperCommandLine`r`n"
    $global:ProcessesToKill += start-process $zookeeperCommandLine -WindowStyle Normal -PassThru
}

function StopKafka() {
	$kafkaServerCommandLine = $global:kafkaWindowsBatFolder + "kafka-server-stop.bat" 
	Write-Host "> Kafka Server Command Line : $kafkaServerCommandLine`r`n"
    $global:ProcessesToKill += start-process $kafkaServerCommandLine  -WindowStyle Normal -PassThru
}

function StartZookeeper() {
	$zookeeperCommandLine = $global:kafkaWindowsBatFolder + "zookeeper-server-start.bat"
	$arguments = $global:kafkaWindowsBatFolder + "..\..\etc\kafka\zookeeper.properties"
	Write-Host "> Zookeeper Command Line : $zookeeperCommandLine args: $arguments `r`n"
    $global:ProcessesToKill += start-process $zookeeperCommandLine $arguments -WindowStyle Normal -PassThru
}

function StartKafka() {
	$kafkaServerCommandLine = $global:kafkaWindowsBatFolder + "kafka-server-start.bat" 
	$arguments = $global:kafkaWindowsBatFolder + "..\..\etc\kafka\server.properties"
	Write-Host "> Kafka Server Command Line : $kafkaServerCommandLine args: $arguments `r`n"
    $global:ProcessesToKill += start-process $kafkaServerCommandLine $arguments -WindowStyle Normal -PassThru
}

function CreateKafkaTopics() 
{
	Foreach ($topic in $global:kafkaTopics )
	{
		$kafkaCommandLine = $global:kafkaWindowsBatFolder + "kafka-topics.bat"
		$arguments = "--zookeeper localhost:2181 --create  --replication-factor 1 --partitions 1 --topic $topic"
		Write-Host "> Create Kafka Topic Command Line : $kafkaCommandLine args: $arguments `r`n"
		$global:ProcessesToKill += start-process $kafkaCommandLine $arguments -WindowStyle Normal -PassThru
	}
}

function RunMongo() {
	$mongoexe = $global:mongoDbInstallationFolder + "mongod.exe"
	Write-Host "> Mongo Command Line : $mongoexe `r`n" 
	$global:ProcessesToKill += Start-Process -FilePath $mongoexe  -WindowStyle Normal -PassThru
}

function WaitForKeyPress
{
	Write-Host -NoNewLine "Press any key to continue....`r`n"
	[Console]::ReadKey()
}


function KillProcesses() 
{
	Foreach ($processToKill in $global:ProcessesToKill )
	{
		$name = $processToKill | Get-ChildItem -Name
		Write-Host "Killing Process : $name `r`n" 
		$processToKill | Stop-Process -Force
	}
}


# Kick of the entire pipeline
RunPipeLine

 

 

As you can see this script does a bit more than just run up Zookeeper and Kafka, it also create the topics and runs Mongo DB that is also required by the main Play application (remember we are using Reactive Mongo for the login/registration side of things)

 

So far I have not had many issues with the script. Though occasionally when you are trying out new code, I do tend to clear out all the Zookeeper/Kafka state so far, which for me is stored here

 

image

 

It just allows me to start with a clean slate as it were, you should need to do this that often

 

Walking through a KafkaProducer

So the Kafka Producer I present here will send a String key and a JSON Ranking object as a payload. Lets have a quick look at the Ranking object and how it gets turned to and from JSON before we look at the producer code

 

The Domain Entities

Here is what the domain entities looks like, it can be seen that these use the Spray formatters (part of Akka Http) Marshaller/UnMarshaller JSON support. This is not required by the producer but is required by the REST API, which we will look at later. The producer and Kafka streams code work with a different serialization abstraction, something known as SERDES, which as far as I know is only used as a term in Kafka. Its quite simple it stands for Serializer-Deserializer (Serdes)

 

Anyway here are the domain entities and Spray formatters that allow Akka (but not Kafka Streams, more on this later) to work

 

package Entities

import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import spray.json.DefaultJsonProtocol._

case class Ranking(fromEmail: String, toEmail: String, score: Float)
case class HostStoreInfo(host: String, port: Int, storeNames: List[String])

object AkkaHttpEntitiesJsonFormats {
  implicit val RankingFormat = jsonFormat3(Ranking)
  implicit val HostStoreInfoFormat = jsonFormat3(HostStoreInfo)
}

 

Serdes

So as we just described Kafka Streams actually cares not for the standard Akka Http/Spray JSON formatters, its not part of Kafka Streams after all. However Kafka Streams still has some of the same concerns where it needs to serialize and de-serialize data (like when it re-partitions (i.e. shuffles data)), so how does it realize that. Well it uses a weirdly name thing called a “serde”. There are MANY inbuilt “serde” types, but you can of course create your own to represent your “on the wire format”. I am using JSON, so this is what my generic “Serde” implementation looks like. I should point out that I owe a great many things to a chap called Jendrik whom I made contact with who has been doing some great stuff with Kafka, his blog has really helped me out : https://www.madewithtea.com/category/kafka-streams.html

 

Anyway here is my/his/yours/ours “Serde” code for JSON

 

import java.lang.reflect.{ParameterizedType, Type}
import java.util
import com.fasterxml.jackson.annotation.JsonInclude
import com.fasterxml.jackson.core.JsonParseException
import com.fasterxml.jackson.core.`type`.TypeReference
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.exc.{UnrecognizedPropertyException => UPE}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.kafka.common.serialization.{Deserializer, Serde, Serializer}


package Serialization {

  object Json {

    type ParseException = JsonParseException
    type UnrecognizedPropertyException = UPE

    private val mapper = new ObjectMapper()
    mapper.registerModule(DefaultScalaModule)
    mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL)

    private def typeReference[T: Manifest] = new TypeReference[T] {
      override def getType = typeFromManifest(manifest[T])
    }

    private def typeFromManifest(m: Manifest[_]): Type = {
      if (m.typeArguments.isEmpty) {
        m.runtimeClass
      }
      else new ParameterizedType {
        def getRawType = m.runtimeClass

        def getActualTypeArguments = m.typeArguments.map(typeFromManifest).toArray

        def getOwnerType = null
      }
    }

    object ByteArray {
      def encode(value: Any): Array[Byte] = mapper.writeValueAsBytes(value)

      def decode[T: Manifest](value: Array[Byte]): T =
        mapper.readValue(value, typeReference[T])
    }

  }

  /**
    * JSON serializer for JSON serde
    *
    * @tparam T
    */
  class JSONSerializer[T] extends Serializer[T] {
    override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = ()

    override def serialize(topic: String, data: T): Array[Byte] =
      Json.ByteArray.encode(data)

    override def close(): Unit = ()
  }

  /**
    * JSON deserializer for JSON serde
    *
    * @tparam T
    */
  class JSONDeserializer[T >: Null <: Any : Manifest] extends Deserializer[T] {
    override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = ()

    override def close(): Unit = ()

    override def deserialize(topic: String, data: Array[Byte]): T = {
      if (data == null) {
        return null
      } else {
        Json.ByteArray.decode[T](data)
      }
    }
  }

  /**
    * JSON serde for local state serialization
    *
    * @tparam T
    */
  class JSONSerde[T >: Null <: Any : Manifest] extends Serde[T] {
    override def deserializer(): Deserializer[T] = new JSONDeserializer[T]

    override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = ()

    override def close(): Unit = ()

    override def serializer(): Serializer[T] = new JSONSerializer[T]
  }

}

 

And finally here is what the producer code looks like

 

As you can see it is actually fairly straight forward, its simple produces 10 messages (though you can uncomment the line in the code to make it endless) of String as a key and a Ranking as the message. The key would be who the ranking is destined for. These will then be aggregated by the streams processing node, and stored as a list against a Key (ie email)

 


package Processing.Ratings {

  import java.util.concurrent.TimeUnit

  import Entities.Ranking
  import Serialization.JSONSerde
  import Topics.RatingsTopics

  import scala.util.Random
  import org.apache.kafka.clients.producer.ProducerRecord
  import org.apache.kafka.clients.producer.KafkaProducer
  import org.apache.kafka.common.serialization.Serdes
  import Utils.Settings
  import org.apache.kafka.clients.producer.ProducerConfig

  object RatingsProducerApp extends App {

   run()

    private def run(): Unit = {

      val jSONSerde = new JSONSerde[Ranking]
      val random = new Random
      val producerProps = Settings.createBasicProducerProperties
      val rankingList = List(
        Ranking("jarden@here.com","sacha@here.com", 1.5f),
        Ranking("miro@here.com","mary@here.com", 1.5f),
        Ranking("anne@here.com","margeret@here.com", 3.5f),
        Ranking("frank@here.com","bert@here.com", 2.5f),
        Ranking("morgan@here.com","ruth@here.com", 1.5f))

      producerProps.put(ProducerConfig.ACKS_CONFIG, "all")

      System.out.println("Connecting to Kafka cluster via bootstrap servers " +
        s"${producerProps.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)}")

      // send a random string from List event every 100 milliseconds
      val rankingProducer = new KafkaProducer[String, Array[Byte]](
        producerProps, Serdes.String.serializer, Serdes.ByteArray.serializer)

      //while (true) {
      for (i <- 0 to 10) {
        val ranking = rankingList(random.nextInt(rankingList.size))
        val rankingBytes = jSONSerde.serializer().serialize("", ranking)
        System.out.println(s"Writing ranking ${ranking} to input topic ${RatingsTopics.RATING_SUBMIT_TOPIC}")
        rankingProducer.send(new ProducerRecord[String, Array[Byte]](
          RatingsTopics.RATING_SUBMIT_TOPIC, ranking.toEmail, rankingBytes))
        Thread.sleep(100)
      }

      Runtime.getRuntime.addShutdownHook(new Thread(() => {
        rankingProducer.close(10, TimeUnit.SECONDS)
      }))
    }
  }
}

 

 

 

Walking through a Kafka Streams processing node, and the duality of streams

Before we get started I just wanted to include a severak excerpts taken from the official Kafka docs : http://docs.confluent.io/current/streams/concepts.html#duality-of-streams-and-tables which talks about KStream and KTable objects (which are the stream and table objects inside Kafka streams)

 

When implementing stream processing use cases in practice, you typically need both streams and also databases. An example use case that is very common in practice is an e-commerce application that enriches an incoming stream of customer transactions with the latest customer information from a database table. In other words, streams are everywhere, but databases are everywhere, too.

Any stream processing technology must therefore provide first-class support for streams and tables. Kafka’s Streams API provides such functionality through its core abstractions for streams and tables, which we will talk about in a minute. Now, an interesting observation is that there is actually a close relationship between streams and tables, the so-called stream-table duality. And Kafka exploits this duality in many ways: for example, to make your applications elastic, to support fault-tolerant stateful processing, or to run interactive queries against your application’s latest processing results. And, beyond its internal usage, the Kafka Streams API also allows developers to exploit this duality in their own applications.

 

A simple form of a table is a collection of key-value pairs, also called a map or associative array. Such a table may look as follows:

 

    ../_images/streams-table-duality-01.jpg

     

    The stream-table duality describes the close relationship between streams and tables.

     

    • Stream as Table: A stream can be considered a changelog of a table, where each data record in the stream captures a state change of the table. A stream is thus a table in disguise, and it can be easily turned into a “real” table by replaying the changelog from beginning to end to reconstruct the table. Similarly, aggregating data records in a stream will return a table. For example, we could compute the total number of pageviews by user from an input stream of pageview events, and the result would be a table, with the table key being the user and the value being the corresponding pageview count.
    • Table as Stream: A table can be considered a snapshot, at a point in time, of the latest value for each key in a stream (a stream’s data records are key-value pairs). A table is thus a stream in disguise, and it can be easily turned into a “real” stream by iterating over each key-value entry in the table.

     

    Let’s illustrate this with an example. Imagine a table that tracks the total number of pageviews by user (first column of diagram below). Over time, whenever a new pageview event is processed, the state of the table is updated accordingly. Here, the state changes between different points in time – and different revisions of the table – can be represented as a changelog stream (second column).

     

    image

     

    Because of the stream-table duality, the same stream can be used to reconstruct the original table (third column):

     

    ../_images/streams-table-duality-03.jpg

     

    The same mechanism is used, for example, to replicate databases via change data capture (CDC) and, within Kafka Streams, to replicate its so-called state stores across machines for fault tolerance. The stream-table duality is such an important concept for stream processing applications in practice that Kafka Streams models it explicitly via the KStream and KTable abstractions, which we describe in the next sections.

     

     

    I would STRONLY urge you all to read the section of the official docs above, as it will really help you should you want to get into Kafka Streams.

     

    Anyway with all that in mind how does that relate to the use case we are trying to solve. So far we have a publisher that pushes out Rating objects, and as stated ideally we would like to query these across all  processor nodes. As such we should now know that this will involve a KStream and some sort of aggregation to an eventual KTable (where a state store will be used).

     

    Probably the easiest thing to do is to start with the code, which looks like this for the main stream processing code for the Rating section of then final app.

      import java.util.concurrent.TimeUnit
      import org.apache.kafka.common.serialization._
      import org.apache.kafka.streams._
      import org.apache.kafka.streams.kstream._
      import Entities.Ranking
      import Serialization.JSONSerde
      import Topics.RatingsTopics
      import Utils.Settings
      import Stores.StateStores
      import org.apache.kafka.streams.state.HostInfo
      import scala.concurrent.ExecutionContext
    
    
      package Processing.Ratings {
    
        class RankingByEmailInitializer extends Initializer[List[Ranking]] {
          override def apply(): List[Ranking] = List[Ranking]()
        }
    
        class RankingByEmailAggregator extends Aggregator[String, Ranking,List[Ranking]] {
          override def apply(aggKey: String, value: Ranking, aggregate: List[Ranking]) = {
            value :: aggregate
          }
        }
    
    
        object RatingStreamProcessingApp extends App {
    
          implicit val ec = ExecutionContext.global
    
          run()
    
          private def run() : Unit = {
            val stringSerde = Serdes.String
            val rankingSerde = new JSONSerde[Ranking]
            val listRankingSerde = new JSONSerde[List[Ranking]]
            val builder: KStreamBuilder = new KStreamBuilder
            val rankings = builder.stream(stringSerde, rankingSerde, RatingsTopics.RATING_SUBMIT_TOPIC)
    
            //aggrgate by (user email -> their rankings)
            val rankingTable = rankings.groupByKey(stringSerde,rankingSerde)
              .aggregate(
                new RankingByEmailInitializer(),
                new RankingByEmailAggregator(),
                listRankingSerde,
                StateStores.RANKINGS_BY_EMAIL_STORE
              )
    
            //useful debugging aid, print KTable contents
            rankingTable.toStream.print()
    
            val streams: KafkaStreams = new KafkaStreams(builder, Settings.createRatingStreamsProperties)
            val restEndpoint:HostInfo  = new HostInfo(Settings.restApiDefaultHostName, Settings.restApiDefaultPort)
            System.out.println(s"Connecting to Kafka cluster via bootstrap servers ${Settings.bootStrapServers}")
            System.out.println(s"REST endpoint at http://${restEndpoint.host}:${restEndpoint.port}")
    
            // Always (and unconditionally) clean local state prior to starting the processing topology.
            // We opt for this unconditional call here because this will make it easier for you to 
            // play around with the example when resetting the application for doing a re-run 
            // (via the Application Reset Tool,
            // http://docs.confluent.io/current/streams/developer-guide.html#application-reset-tool).
            //
            // The drawback of cleaning up local state prior is that your app must rebuilt its local 
            // state from scratch, which will take time and will require reading all the state-relevant 
            // data from the Kafka cluster over the network.
            // Thus in a production scenario you typically do not want to clean up always as we do 
            // here but rather only when it is truly needed, i.e., only under certain conditions 
            // (e.g., the presence of a command line flag for your app).
            // See `ApplicationResetExample.java` for a production-like example.
            streams.cleanUp();
            streams.start()
            val restService = new RatingRestService(streams, restEndpoint)
            restService.start()
    
            Runtime.getRuntime.addShutdownHook(new Thread(() => {
              streams.close(10, TimeUnit.SECONDS)
              restService.stop
            }))
    
            //return unit
            ()
          }
        }
      }
    

     

     

    Remember the idea is to get a Rating for  a user (based on their email address), and store all the Rating associated with them in some sequence/list such that they can be retrieved in one go based on a a key, where the key would be the users email, and the value would be this list of Rating objects.I think with the formal discussion from the official Kafka docs and my actual Rating requirement, the above should hopefully be pretty clear.

     

     

    Walking through Kafka Streams interactive queries

    So now that we have gone through how data is produced, and transformed (well actually I did not do too much transformation other than a simple map, but trust me you can), and how we aggregate results from a KStream to a KTable (and its state store), we will move on to see how we can use Kafka interactive queries to query the state stores.

     

    One important concept is that if you used multiple partitions for your original topic, the state may be spread across n-many processing node. For this project I have only chosen to use 1 partition, but have written the code to support n-many.

     

    So lets assume that each node could read a different segment of data, or that each node must read from n-many partitions (there is not actually a mapping to nodes and partitions these are 2 mut read chapters elastic-scaling-of-your-application and parallelism-model) we would need each node to expose a REST API to allow its OWN state store to be read. By reading ALL the state stores we are able to get a total view of ALL the persisted data across ALL the partitions. I urge all of you to read this section of the official docs : http://docs.confluent.io/current/streams/developer-guide.html#querying-remote-state-stores-for-the-entire-application

     

    This diagram has also be shamelessly stolen from the official docs:

     

    ../_images/streams-interactive-queries-api-02.png

    I think this diagram does an excellent job of showing you 3 separate processor nodes, and each of them may have a bit of state. ONLY be assembling ALL the data from these nodes are we able to see the ENTIRE dataset.

     

    Kafka allows this via metadata about the streams, where we can use the exposed metadata to help us gather the state store data. To do this we first need a MetadataService, which for me is as follows:

     

    package Processing.Ratings
    
    import org.apache.kafka.streams.KafkaStreams
    import org.apache.kafka.streams.state.StreamsMetadata
    import java.util.stream.Collectors
    import Entities.HostStoreInfo
    import org.apache.kafka.common.serialization.Serializer
    import org.apache.kafka.connect.errors.NotFoundException
    import scala.collection.JavaConverters._
    
    
    /**
      * Looks up StreamsMetadata from KafkaStreams
      */
    class MetadataService(val streams: KafkaStreams) {
    
    
       /**
        * Get the metadata for all of the instances of this Kafka Streams application
        *
        * @return List of { @link HostStoreInfo}
        */
      def streamsMetadata() : List[HostStoreInfo] = {
    
        // Get metadata for all of the instances of this Kafka Streams application
        val metadata = streams.allMetadata
        return mapInstancesToHostStoreInfo(metadata)
      }
    
    
      /**
        * Get the metadata for all instances of this Kafka Streams application that currently
        * has the provided store.
        *
        * @param store The store to locate
        * @return List of { @link HostStoreInfo}
        */
      def streamsMetadataForStore(store: String) : List[HostStoreInfo] = {
    
        // Get metadata for all of the instances of this Kafka Streams application hosting the store
        val metadata = streams.allMetadataForStore(store)
        return mapInstancesToHostStoreInfo(metadata)
      }
    
    
      /**
        * Find the metadata for the instance of this Kafka Streams Application that has the given
        * store and would have the given key if it exists.
        *
        * @param store Store to find
        * @param key   The key to find
        * @return { @link HostStoreInfo}
        */
      def streamsMetadataForStoreAndKey[T](store: String, key: T, serializer: Serializer[T]) : HostStoreInfo = {
        // Get metadata for the instances of this Kafka Streams application hosting the store and
        // potentially the value for key
        val metadata = streams.metadataForKey(store, key, serializer)
        if (metadata == null)
          throw new NotFoundException(
            s"No metadata could be found for store : ${store}, and key type : ${key.getClass.getName}")
    
        HostStoreInfo(metadata.host, metadata.port, metadata.stateStoreNames.asScala.toList)
      }
    
    
      def mapInstancesToHostStoreInfo(metadatas : java.util.Collection[StreamsMetadata]) : List[HostStoreInfo] = {
    
        metadatas.stream.map[HostStoreInfo](metadata =>
          HostStoreInfo(
            metadata.host(),
            metadata.port,
            metadata.stateStoreNames.asScala.toList))
          .collect(Collectors.toList())
          .asScala.toList
      }
    
    }
    

     

    This metadata service is used to obtain the state store information, which we can then use to extract the state data we want (it’s a key value store really).

     

    The next thing we need to do is expose a REST API to allow us to get the state. lets see that now

     

    package Processing.Ratings
    
    import org.apache.kafka.streams.KafkaStreams
    import org.apache.kafka.streams.state.HostInfo
    import akka.actor.ActorSystem
    import akka.http.scaladsl.Http
    import akka.http.scaladsl.model._
    import akka.http.scaladsl.server.Directives._
    import akka.stream.ActorMaterializer
    import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
    import spray.json.DefaultJsonProtocol._
    import Entities.AkkaHttpEntitiesJsonFormats._
    import Entities._
    import Stores.StateStores
    import akka.http.scaladsl.marshalling.ToResponseMarshallable
    import org.apache.kafka.common.serialization.Serdes
    import scala.concurrent.{Await, ExecutionContext, Future}
    import akka.http.scaladsl.unmarshalling.Unmarshal
    import spray.json._
    import scala.util.{Failure, Success}
    import org.apache.kafka.streams.state.QueryableStoreTypes
    import scala.concurrent.duration._
    
    
    
    object RestService {
      val DEFAULT_REST_ENDPOINT_HOSTNAME  = "localhost"
    }
    
    
    class RatingRestService(val streams: KafkaStreams, val hostInfo: HostInfo) {
    
      val metadataService = new MetadataService(streams)
      var bindingFuture: Future[Http.ServerBinding] = null
    
      implicit val system = ActorSystem("rating-system")
      implicit val materializer = ActorMaterializer()
      implicit val executionContext = system.dispatcher
    
    
      def start() : Unit = {
        val emailRegexPattern =  """\w+""".r
        val storeNameRegexPattern =  """\w+""".r
    
        val route =
    
    
          path("test") {
            get {
              parameters('email.as[String]) { (email) =>
                complete(HttpEntity(ContentTypes.`text/html(UTF-8)`,
                            s"<h1>${email}</h1>"))
              }
            }
          } ~
          path("ratingByEmail") {
            get {
              parameters('email.as[String]) { (email) =>
                try {
    
                  val host = metadataService.streamsMetadataForStoreAndKey[String](
                    StateStores.RANKINGS_BY_EMAIL_STORE,
                    email,
                    Serdes.String().serializer()
                  )
    
                  var future:Future[List[Ranking]] = null
    
                  //store is hosted on another process, REST Call
                  if(!thisHost(host))
                    future = fetchRemoteRatingByEmail(host, email)
                  else
                    future = fetchLocalRatingByEmail(email)
    
                  val rankings = Await.result(future, 20 seconds)
                  complete(rankings)
                }
                catch {
                  case (ex: Exception) => {
                    val finalList:List[Ranking] = scala.collection.immutable.List[Ranking]()
                    complete(finalList)
                  }
                }
              }
            }
          } ~
          path("instances") {
            get {
              complete(ToResponseMarshallable.apply(metadataService.streamsMetadata))
            }
          }~
          path("instances" / storeNameRegexPattern) { storeName =>
            get {
              complete(ToResponseMarshallable.apply(metadataService.streamsMetadataForStore(storeName)))
            }
          }
    
        bindingFuture = Http().bindAndHandle(route, hostInfo.host, hostInfo.port)
        println(s"Server online at http://${hostInfo.host}:${hostInfo.port}/\n")
    
        Runtime.getRuntime.addShutdownHook(new Thread(() => {
          bindingFuture
            .flatMap(_.unbind()) // trigger unbinding from the port
            .onComplete(_ => system.terminate()) // and shutdown when done
        }))
      }
    
    
      def fetchRemoteRatingByEmail(host:HostStoreInfo, email: String) : Future[List[Ranking]] = {
    
        val requestPath = s"http://${hostInfo.host}:${hostInfo.port}/ratingByEmail?email=${email}"
        println(s"Client attempting to fetch from online at ${requestPath}")
    
        val responseFuture: Future[List[Ranking]] = {
          Http().singleRequest(HttpRequest(uri = requestPath))
            .flatMap(response => Unmarshal(response.entity).to[List[Ranking]])
        }
    
        responseFuture
      }
    
      def fetchLocalRatingByEmail(email: String) : Future[List[Ranking]] = {
    
        val ec = ExecutionContext.global
    
        val host = metadataService.streamsMetadataForStoreAndKey[String](
          StateStores.RANKINGS_BY_EMAIL_STORE,
          email,
          Serdes.String().serializer()
        )
    
        val f = StateStores.waitUntilStoreIsQueryable(
          StateStores.RANKINGS_BY_EMAIL_STORE,
          QueryableStoreTypes.keyValueStore[String,List[Ranking]](),
          streams
        ).map(_.get(email))(ec)
    
        val mapped = f.map(ranking => {
          if (ranking == null)
            List[Ranking]()
          else
            ranking
        })
    
        mapped
      }
    
      def stop() : Unit = {
        bindingFuture
          .flatMap(_.unbind()) // trigger unbinding from the port
          .onComplete(_ => system.terminate()) // and shutdown when done
      }
    
      def thisHost(hostStoreInfo: HostStoreInfo) : Boolean = {
        hostStoreInfo.host.equals(hostInfo.host()) &&
          hostStoreInfo.port == hostInfo.port
      }
    }
    

     

    With that final class we are able to run the application and query it using the url http://localhost:8080/ratingByEmail?email=sacha@here.com (the key to the Kafka store here is “sacha@here.com” and the value could either be an empty list or a List[Ranking] objects as JSON, the results of which are shown below after we have run the producer and used Chrome (or any other REST tool of your picking) to get the results

     

    image

     

     

    Conclusion

    I have found the journey to get here an interesting one. The main issue being that the Kafka docs and example are all written in Java and some are not even using Java Lambdas (Java 1.8) so the translation from that to Scala code (where there is lambda everywhere) is sometimes trickier than you might think.

     

    The other thing that has caught me out a few times is that the Scala type system is pretty good at inferring the correct types, so you kind of let it get on with its job. But occasionally it doesn’t/can’t infer the type correctly, this may happen at compile time if you are lucky, or at run time. In the case of a runtime issue, I found it fairly hard to see exactly which part of the Kafka streams API would need to be told a bit more type information.

     

    As a general rule of thumb, if there is an overloaded method that takes a serde, and  one that doesn’t ALWAYS use the one that takes a serde and specify the generic type parameters explicitly. The methods that take Serdes are usually ones that involve some sort of shuffling around within partitions so need Serdes to serialize and deserialize correctly.

     

    Other than that I am VERY happy with working with Kafka streams, and once you get into it, its not that different from working with Apache Spark and RDDs

     

     

    Next time

    Next time we will be turning our attention back to the web site, where we will expose an endpoint that can be called from the ratings dialog that is launched at the end of a job. This endpoint will take the place of the RatingsProducerApp demonstrated in this app. For that we will be using https://github.com/akka/reactive-kafka. We would also expose a new end point to fetch the rating (via email address) fro the Kafka stream processor node

     

    The idea being that when a job is completed a Rating from a driver to passenger is given, this is sent to the Kafka stream processor node, and the combined rating are accumulated for users (by email address) and are exposed to be queried. As you can see this post covers the later part of this requirement already. The only thing we would need to do (as stated above) is replace the RatingsProducerApp demonstrated in this app with new reactive kafka producer in the main Play application

    AKKA STREAMS

    Last time we looked at Akka Http, this time we will look at Akka Streams.

    Akka Streams is a vast topic, and you will definitely need to supplement this  post with the official documentation.

    Akka Streams is one of the founding members of Reactive Streams, and Akka streams is one implementation (there are many) of the Reactive Streams APIs.

    Reactive Streams  is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure. This encompasses efforts aimed at runtime environments (JVM and JavaScript) as well as network protocols.

    Introduction

    There may be some readers who have come from .NET such as myself who have used RX.

    You may even have heard of Reactive Streams before. So what exactly makes reactive streams different from Rx?

    The central thing that is the big win with reactive streams over Rx is the idea of back pressure. Here is what the Akka docs say about back pressure

    The back pressure protocol is defined in terms of the number of elements a downstream Subscriber is able to receive and buffer, referred to as demand. The source of data, referred to as Publisher in Reactive Streams terminology and implemented as Source in Akka Streams, guarantees that it will never emit more elements than the received total demand for any given Subscriber.

    http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-flows-and-basics.html#back-pressure-explained

    Luckily this is all inbuilt to Akka streams, you do not have to worry about this too much as a user of Akka streams.

    You can pretty much decide how you want the built in streams pipelines (which we will be diving into in more details below) in terms of backpressure using the OverflowStrategy enum value. Here is a very simple example

    Source(1 to 10).buffer(10, OverflowStrategy.backpressure)
    

    Where the following are the available OverflowStrategy values

    object OverflowStrategy {
      /**
       * If the buffer is full when a new element arrives, drops the oldest element from the buffer to make space for
       * the new element.
       */
      def dropHead: OverflowStrategy = DropHead
    
      /**
       * If the buffer is full when a new element arrives, drops the youngest element from the buffer to make space for
       * the new element.
       */
      def dropTail: OverflowStrategy = DropTail
    
      /**
       * If the buffer is full when a new element arrives, drops all the buffered elements to make space for the new element.
       */
      def dropBuffer: OverflowStrategy = DropBuffer
    
      /**
       * If the buffer is full when a new element arrives, drops the new element.
       */
      def dropNew: OverflowStrategy = DropNew
    
      /**
       * If the buffer is full when a new element is available this strategy backpressures the upstream publisher until
       * space becomes available in the buffer.
       */
      def backpressure: OverflowStrategy = Backpressure
    
      /**
       * If the buffer is full when a new element is available this strategy completes the stream with failure.
       */
      def fail: OverflowStrategy = Fail
    }
    

    So that is the basic idea, Akka streams does provide a lot of stuff, such as

    • Built in stages/shapes
    • A graph API
    • Ability to create your own stages/shapes

    For the rest of this post we will be looking at some examples of these 3 points.

    Working With The Akka Streams APIs

    As stated at the beginning of this post the Akka Streams implementation is vast. There is a lot of ground to cover, far more than I can reasonably cover in a small blog post. The official docs are still the place to go, but if you have not heard of Akka Streams this post may be enough to get you into it.

    The official docs (at time of writing) are here:

    http://doc.akka.io/docs/akka/2.4.2/scala/stream/index.html

     

    Working With Built In Stages/Shapes

    Akka comes with loads of prebuilt stages which we can make use of. However before I mention those lets try and just spend a bit of time taking a bit about how you use the Akka Streams APIs in their most basic form.

    The idea is that we have 4 different parts that make up a useable pipeline.

    Source
    A processing stage with exactly one output, emitting data elements whenever downstream processing stages are ready to receive them.

    Sink
    A processing stage with exactly one input, requesting and accepting data elements possibly slowing down the upstream producer of elements

    Flow
    A processing stage which has exactly one input and output, which connects its up- and downstreams by transforming the data elements flowing through it.

    RunnableGraph
    A Flow that has both ends “attached” to a Source and Sink respectively, and is ready to be run().

    As I say Akka comes with loads of inbuilt stages to make our lives easier here. For example these are the available stages at time of writing

    Source Stages

    • fromIterator
    • apply
    • single
    • repeat
    • tick
    • fromFuture
    • fromCompletionStage
    • unfold
    • unfoldAsync
    • empty
    • maybe
    • failed
    • actorPublisher
    • actorRef
    • combine
    • queue
    • asSubscriber
    • fromPublisher
    • fromFile

    Sink Stages

    • head
    • headOption
    • last
    • lastOption
    • ignore
    • cancelled
    • seq
    • foreach
    • foreachParallel
    • onComplete
    • fold
    • reduce
    • combine
    • actorRef
    • actorRefWithAck
    • actorSubscriber
      asPublisher
    • fromSubscriber
    • toFile

    We will now look at some example of using some of these

    def simpleFlow() : Unit = {
      val source = Source(1 to 10)
      val sink = Sink.fold[Int, Int](0)(_ + _)
      // connect the Source to the Sink, obtaining a RunnableGraph
      val runnable: RunnableGraph[Future[Int]] = source.toMat(sink)(Keep.right)
      // materialize the flow and get the value of the FoldSink
      implicit val timeout = Timeout(5 seconds)
      val sumFuture: Future[Int] = runnable.run()
      val sum = Await.result(sumFuture, timeout.duration)
      println(s"source.toMat(sink)(Keep.right) Sum = $sum")
    
      // Use the shorthand source.runWith(sink)
      val sumFuture2: Future[Int] = source.runWith(sink)
      val sum2 = Await.result(sumFuture2, timeout.duration)
      println(s"source.runWith(sink) Sum = $sum")
    }
    

    In this simple example we have s Source(1 to 10) which we then wire up to a Sink which adds the numbers coming in.

    This block demonstrates various different Source(s) and Sink(s)

    def differentSourcesAndSinks() : Unit = {
      //various sources
      Source(List(1, 2, 3)).runWith(Sink.foreach(println))
      Source.single("only one element").runWith(Sink.foreach(println))
      //actor sink
      val helloActor = system.actorOf(Props[HelloActor], name = "helloactor")
      Source(List("hello", "hello"))
        .runWith(Sink.actorRef(helloActor,DoneMessage))
      //future source
      val futureString = Source.fromFuture(Future.successful("Hello Streams!"))
        .toMat(Sink.head)(Keep.right).run()
      implicit val timeout = Timeout(5 seconds)
      val theString = Await.result(futureString, timeout.duration)
      println(s"theString = $theString")
    }
    

    And this block demos using a simple Map on a Source

    def mapFlow() : Unit = {
      val source = Source(11 to 16)
      val doublerSource = source.map(x => x * 2)
      val sink = Sink.foreach(println)
      implicit val timeout = Timeout(5 seconds)
    
      // Use the shorthand source.runWith(sink)
      val printSinkFuture: Future[Done] = doublerSource.runWith(sink)
      Await.result(printSinkFuture, timeout.duration)
    }
    

    Working With The Graph API

    Akka streams also comes with a pretty funky graph building DSL. You would use this when you want to create quite elaborate flows.

    The other very interesting thing about the graph builder DSL is that you can use custom shapes inside it, and you can also leave it partially connected. Such that you could potentially use it as a Source/Sink.

    Lets say you had an output from the graph you built using the graph DSL, you could then use that partially constructed graph as a Source in its own right.

    The same goes if you had an unconnected input in the graph you created you could use that as a Sink.

    You can read more about this here :

    http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-graphs.html#constructing-sources-sinks-and-flows-from-partial-graphs

    I urge you all to have a read of that as its quite cool what can be done with the graph DSL

    Ok so time for an example, this example comes directly from the TypeSafe activator code

    http://www.lightbend.com/activator/template/akka-stream-scala

    package com.sas.graphs
    
    import java.io.File
    
    import akka.NotUsed
    import akka.actor.ActorSystem
    import akka.stream.ActorMaterializer
    import akka.stream.ClosedShape
    import akka.stream.scaladsl._
    import akka.util.ByteString
    
    import scala.concurrent.forkjoin.ThreadLocalRandom
    import scala.util.{ Failure, Success }
    
    class WritePrimesDemo {
    
      def run(): Unit = {
        implicit val system = ActorSystem("Sys")
        import system.dispatcher
        implicit val materializer = ActorMaterializer()
    
        // generate random numbers
        val maxRandomNumberSize = 1000000
        val primeSource: Source[Int, NotUsed] =
          Source.fromIterator(() => Iterator.continually(ThreadLocalRandom.current().nextInt(maxRandomNumberSize))).
            // filter prime numbers
            filter(rnd => isPrime(rnd)).
            // and neighbor +2 is also prime
            filter(prime => isPrime(prime + 2))
    
        // write to file sink
        val fileSink = FileIO.toPath(new File("target/primes.txt").toPath)
        val slowSink = Flow[Int]
          // act as if processing is really slow
          .map(i => { Thread.sleep(1000); ByteString(i.toString) })
          .toMat(fileSink)((_, bytesWritten) => bytesWritten)
    
        // console output sink
        val consoleSink = Sink.foreach[Int](println)
    
        // send primes to both slow file sink and console sink using graph API
        val graph = GraphDSL.create(slowSink, consoleSink)((slow, _) => slow) { implicit builder =>
          (slow, console) =>
            import GraphDSL.Implicits._
            val broadcast = builder.add(Broadcast[Int](2)) // the splitter - like a Unix tee
            primeSource ~> broadcast ~> slow // connect primes to splitter, and one side to file
            broadcast ~> console // connect other side of splitter to console
            ClosedShape
        }
        val materialized = RunnableGraph.fromGraph(graph).run()
    
        // ensure the output file is closed and the system shutdown upon completion
        materialized.onComplete {
          case Success(_) =>
            system.terminate()
          case Failure(e) =>
            println(s"Failure: ${e.getMessage}")
            system.terminate()
        }
    
      }
    
      def isPrime(n: Int): Boolean = {
        if (n <= 1) false
        else if (n == 2) true
        else !(2 to (n - 1)).exists(x => n % x == 0)
      }
    }
    

    The most important part of this code is this part

    // send primes to both slow file sink and console sink using graph API
    val graph = GraphDSL.create(slowSink, consoleSink)((slow, _) => slow) { implicit builder =>
      (slow, console) =>
        import GraphDSL.Implicits._
        val broadcast = builder.add(Broadcast[Int](2)) // the splitter - like a Unix tee
        primeSource ~> broadcast ~> slow // connect primes to splitter, and one side to file
        broadcast ~> console // connect other side of splitter to console
        ClosedShape
    }
    val materialized = RunnableGraph.fromGraph(graph).run()
    

    There is 2 sinks defined before we use the Graph

    • A file Sink
    • A console Sink

    There is also a Source that generates random primes

    So the Graph DSL allows you to um well create graphs. It allows you to take in inputs and create other shapes using the implicit builder that is provided.

    The DSL then allows you to connect inputs/other builder creates stages/shapes to the inputs and even expose the connected stages to an output.

    This is done using the ~> syntax than simply means connect

    As previously stated you can create partially connected graphs, but if you have all inputs and outputs connected it is considered a ClosedShape, that can be used as an isolated component

    Here is an example of the output of running this graph example

    image

    Create Custom Shapes/Stages

    It doesn’t stop there, we can also create out own shapes that can be used in flows. This is a pretty complex subject and you will definitely benefit from reading this page

    http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-customize.html

    There is no way this little post will cover enough, but here are some highlights of the official documentation

    This is the basic pattern you would use to create a custom stage

    import akka.stream.SourceShape
    import akka.stream.stage.GraphStage
     
    class NumbersSource extends GraphStage[SourceShape[Int]] {
      // Define the (sole) output port of this stage
      val out: Outlet[Int] = Outlet("NumbersSource")
      // Define the shape of this stage, which is SourceShape with the port we defined above
      override val shape: SourceShape[Int] = SourceShape(out)
     
      // This is where the actual (possibly stateful) logic will live
      override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = ???
    }
    

    Most of the actual logic will be inside the createLogic method. But in order to do anything useful in there you will need to use handlers. Handlers are what you use to handle input/output. There are InHandler and OutHandler.

    Each of which has its own state machine flow. For example this is the state machine for an OutHandler

    image

    Whilst this is the one for InHandler

    image

    This is the best page to read to learn more about these handlers

    http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-customize.html#Port_states__InHandler_and_OutHandler

    The one and ONLY place that state should be maintained is within the createLogic method.

    Lets consider a small example. Lets say we have some objects like this

    case class Element(id: Int, value: Int)
    

    And we want to build a custom stage that will allow us to select a value from this type, and should only emit an output value for unique values as provided by the property selector.

    We could call this DistinctUntilChanged. Lets see what an example for this could look like

    package com.sas.customshapes
    
    import akka.stream.stage.{GraphStageLogic, InHandler, OutHandler, GraphStage}
    import akka.stream.{Outlet, Attributes, Inlet, FlowShape}
    
    import scala.collection.immutable
    
    final class DistinctUntilChanged[E, P](propertyExtractor: E => P)
      extends GraphStage[FlowShape[E, E]] {
    
      val in = Inlet[E]("DistinctUntilChanged.in")
      val out = Outlet[E]("DistinctUntilChanged.out")
    
      override def shape = FlowShape.of(in, out)
    
      override def createLogic(attributes: Attributes) = new GraphStageLogic(shape) {
    
        private var savedState : Option[E] = None
    
        setHandlers(in, out, new InHandler with OutHandler {
    
          override def onPush(): Unit = {
            val nextElement = grab(in)
            val nextState = propertyExtractor(nextElement)
    
            if (savedState.isEmpty  || propertyExtractor(savedState.get) != nextState) {
              savedState = Some(nextElement)
              push(out, savedState.get)
            }
            else {
              pull(in)
            }
            savedState = Some(nextElement)
          }
    
          override def onPull(): Unit = {
            pull(in)
          }
    
          override def onUpstreamFinish(): Unit = {
            completeStage()
          }
        })
    
        override def postStop(): Unit = {
          savedState = None
        }
      }
    }
    
    
    
    

    The highlights of this are

    • We have a single Inlet
    • We have a single Outlet
    • We expose a FlowShape (in/out only) there are many shapes but FlowShape is what we want for one in/out out
    • We use createLogic to do the work
    • We use an InHandler to handle input
    • We use an OutHandler to handle output

    One other important thing (at least for this single in/out example) is that we DO NOT call pull/push more than once in the createLogic

    Lets assume we have these elements

    package com.sas.customshapes
    
    import scala.collection.immutable
    
    object SampleElements {
    
      val E11 = Element(1, 1)
      val E21 = Element(2, 1)
      val E31 = Element(3, 1)
      val E42 = Element(4, 2)
      val E52 = Element(5, 2)
      val E63 = Element(6, 3)
    
      val Ones = immutable.Seq(E11, E21, E31)
      val Twos = immutable.Seq(E42, E52)
      val Threes = immutable.Seq(E63)
    
      val All = Ones ++ Twos ++ Threes
    }
    

    And this demo code

    def runDistinctUntilChanged() : Unit = {
      Source(SampleElements.All)
        .via(new DistinctUntilChanged(_.value))
        .runWith(Sink.foreach(println))
    }
    

    We would get this output to the Sink

    image

    This example does owe a lot to a nice blog post I found here :

    https://www.softwaremill.com/implementing-a-custom-akka-streams-graph-stage/
     

    That’s It

    Anyway that is the end of the series I hope you have enjoyed it, and have learnt you some Akka along the way

    I am going to have a small break now and then start looking into some Azure/Web stuff I think

     

    Where Can I Find The Code Examples?

    I will be augmenting this GitHub repo with the example projects as I move through this series

    https://github.com/sachabarber/SachaBarber.AkkaExamples

    Akka http

    Last time we talked about routing within Akka. This time we will be looking at Akka’s support for http.

    But just before that, a bit of history. Before Akka.Http there was already a fairly successful Akk based http option available to you as a Scala developer, called Spray. There is a lot of Spray documentation available here http://spray.io/

    This framework was extremely well thought of, so much so that the good people at Akka have taken on much of the good work done by this team, and it now forms much of the codebase for Akka Http.

    In fact if you are familiar with Spray, you will certainly notice quite a lot of similarities in the way routes and JSON are handled in Akka.Http, as it is pretty much the Spray code.

     

    Introduction

    Akka.Http comes with server side and client side libraries. It also comes with good support for standard serialization such as JSON/XML and the ability to roll your own serialization should you want to.

    It also comes with a fairly nifty routing DSL which is very much inspired by the work done in Spray.

    This post will concentrate on the common use cases that you may come across when working with HTTP.

     

    SBT Dependencies

    As usual we need to make sure we have the correct JARs referenced. So here is the SBT file that I am using for both the server side/client side and common messages that pass between them

    import sbt._
    import sbt.Keys._
    
    
    lazy val allResolvers = Seq(
      "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/",
      "Akka Snapshot Repository" at "http://repo.akka.io/snapshots/"
    )
    
    
    lazy val AllLibraryDependencies =
      Seq(
        "com.typesafe.akka" % "akka-actor_2.11" % "2.4.12",
        "com.typesafe.akka" % "akka-http_2.11" % "3.0.0-RC1",
        "com.typesafe.akka" % "akka-http-core_2.11" % "3.0.0-RC1",
        "com.typesafe.akka" % "akka-http-spray-json_2.11" % "3.0.0-RC1"
      )
    
    
    lazy val commonSettings = Seq(
      version := "1.0",
      scalaVersion := "2.11.8",
      resolvers := allResolvers,
      libraryDependencies := AllLibraryDependencies
    )
    
    
    lazy val serverside =(project in file("serverside")).
      settings(commonSettings: _*).
      settings(
        name := "serverside"
      )
      .aggregate(common, clientside)
      .dependsOn(common, clientside)
    
    lazy val common = (project in file("common")).
      settings(commonSettings: _*).
      settings(
        name := "common"
      )
    
    lazy val clientside = (project in file("clientside")).
      settings(commonSettings: _*).
      settings(
        name := "clientside"
      )
      .aggregate(common)
      .dependsOn(common)
    

    It can be seen that the JSON dependency is contained in this JAR

    akka-http-spray-json_2.11
    

    Told you is was inspired by Spray a fair bit

     

    Server Side

    This section will talk about the server side element of Akka.Http

     

    Hosting The Service

    To have  a correctly formed/hostable server side we need a couple of things in place, namely the following

    • An actor system
    • A materializer (Akka http uses flows which is the subject of the next and final post)
    • An execution context
    • Routing

    Once we have these things it is really just a question of binding the route to a host name and port.

    Shown below is a barebones skeleton of what this may look like

    import akka.NotUsed
    import akka.actor.ActorSystem
    import akka.http.scaladsl.Http
    import akka.http.scaladsl.model.{ContentTypes, HttpEntity}
    import akka.http.scaladsl.server.Directives
    import akka.stream.scaladsl.Flow
    import common.{Item, JsonSupport}
    import scala.io.StdIn
    import scala.concurrent.Future
    import akka.http.scaladsl.model.ws.{Message, TextMessage}
    import akka.stream._
    import akka.stream.scaladsl._
    
    
    object Demo extends App with Directives with JsonSupport {
    
      implicit val system = ActorSystem("my-system")
      implicit val materializer = ActorMaterializer()
    
    
      val route = .....
    
      val (host, port) = ("localhost", 8080)
      val bindingFuture = Http().bindAndHandle(route, host, port)
    
      bindingFuture.onFailure {
        case ex: Exception =>
          println(s"$ex Failed to bind to $host:$port!")
      }
    
      println(s"Server online at http://localhost:8080/\nPress RETURN to stop...")
      StdIn.readLine() // let it run until user presses return
      bindingFuture
        .flatMap(_.unbind()) // trigger unbinding from the port
        .onComplete(_ => system.terminate()) // and shutdown when done
    }
    

    We will be looking at the routing DSL separately

     

    Routing DSL

    As stated, Akka.Http owes much to Spray, and the routing DSL in particular is practically unchanged from Spray, so it is well worth reading the Spray routing documentation which is available here : http://spray.io/documentation/1.2.4/spray-routing/ and for completeness here is the Akka.Http docs link too : http://doc.akka.io/docs/akka/2.4.7/scala/http/introduction.html#routing-dsl-for-http-servers

    There is way too many possible routes to go into for a single post. Lets consider a few basic examples and deconstruct them

    Some of these examples do rely on JSON which is the next topic, so for now just understand that there is a way to accept/return JSON.

    Lets consider the following use cases

    • GET that returns a simple string
    • GET that returns a JSON representation of an Item
    • POST that accept a new Item

    In all these cases this is what an Item looks like

    package common
    
    final case class Item(name: String, id: Long)
    

    So lets see the routing DSL that makes the above examples work

    val route =
      path("hello") {
        get {
          complete(HttpEntity(
    	ContentTypes.`text/html(UTF-8)`, 
    	"<h1>Say hello to akka-http</h1>"))
        }
      } ~
      path("randomitem") {
        get {
          // will marshal Item to JSON
          complete(Item("thing", 42))
        }
      } ~
      path("saveitem") {
        post {
          // will unmarshal JSON to Item
          entity(as[Item]) { item =>
            println(s"Server saw Item : $item")
            complete(item)
          }
        }
      }
    

    It can be seen that there are some common routing DSL bits and bobs in there, such as:

    • path : which satisfies the route name part of the route
    • get : which tells us that we should go further into the route matching if it’s a GET http request and it matched the path route DSL part
    • post: which tells us that we should go further into the route matching if it’s a POST http request and it matched the path route DSL part
    • complete : This is the final result from the route

    These parts of the DSL are known as directives. The general anatomy of a directive is as follows:

    name(arguments) { extractions =>
      ... // inner route
    }
    

    It has a name, zero or more arguments and optionally an inner route (The RouteDirectives are special in that they are always used at the leaf-level and as such cannot have inner routes). Additionally directives can “extract” a number of values and make them available to their inner routes as function arguments. When seen “from the outside” a directive with its inner route form an expression of type Route.

    Taken from http://doc.akka.io/docs/akka/2.4.7/scala/http/routing-dsl/directives/index.html#directives up on date 15/11/16

    What Directives Do?

    A directive can do one or more of the following:

    • Transform the incoming RequestContext before passing it on to its inner route (i.e. modify the request)
    • Filter the RequestContext according to some logic, i.e. only pass on certain requests and reject others
    • Extract values from the RequestContext and make them available to its inner route as “extractions”
    • Chain some logic into the RouteResult future transformation chain (i.e. modify the response or rejection)
    • Complete the request

     

    This means a Directive completely wraps the functionality of its inner route and can apply arbitrarily complex transformations, both (or either) on the request and on the response side.

    Ok so now that we have taken a whistle stop tour of the routing DSL and directives, lets have a look at the few we discussed above

     

    For this work I would strongly recommend the use of the “Postman” google app, which you can grab from here

    https://chrome.google.com/webstore/detail/postman/fhbjgbiflinjbdggehcddcbncdddomop?hl=en

    GET

    We can see this route looks like this

    path("hello") {
      get {
        complete(HttpEntity(ContentTypes.`text/html(UTF-8)`, "<h1>Say hello to akka-http</h1>"))
      }
    }
    

    So we use the path, and also the get directives to establish a get route. We then use complete to complete the route with some static string representing the html we would like to return

    So let’s see this one in postman

    image

     

    GET Item (as JSON)

    We can see this route looks like this

    path("randomitem") {
      get {
        // will marshal Item to JSON
        complete(Item("thing", 42))
      }
    } 
    

    So again we use the path/get directives, but this time we complete with an Item. This is done due to the JSON support that is able to create the right serialization data for us. We will look at this in the next section

    So let’s see this one in postman

    image

    POST Item

    We can see this route looks like this

    path("saveitem") {
      post {
        // will unmarshal JSON to Item
        entity(as[Item]) { item =>
          println(s"Server saw Item : $item")
          complete(item)
        }
      }
    } 
    

    So again we use the path directive, but this time we use a post, where the post expects an item as JSON to be provided. The converting from the incoming JSON string to an Item is done using an Unmarshaller, we will look at this in the next section

    So let’s see this one in postman

    image

     

    JSON Support

    Akka.http provides JSON support using this library akka-http-spray-json-experimental which you can grab from Maven Central Repo.

    JsonProtocol

    When using spray we may use the SprayJsonProtocol and DefaultJsonProtocol to create the JSON protcol for your custom objects

    Lets consider the Item class we have seen in the demos so far

    package common
    
    final case class Item(name: String, id: Long)
    

    This is how we might write the JSON protocol code for this simple class

    package common
    
    import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
    import spray.json.DefaultJsonProtocol
    
    trait JsonSupport extends SprayJsonSupport with DefaultJsonProtocol {
      implicit val itemFormat = jsonFormat2(Item)
    }
    

    It can be seen that there are jsonFormatXX helpers that can be used for very simple cases. In this case jsonFormat2 is used as our item class had 2 parameters

    Most of the time this inbuilt helpers are all we need. If however you want something more elaborate you are free to create your own jsonFormat read / write methods

     

    Marshalling

    Marshalling is sprays process of taking objects and create a JSON string representation of them to send across the wire.

    The Akka Spray JAR comes with a bunch of default marshallers that allow us to take custom classes and turn them into JSON

    These are the most common default marshallers that you will most likely use

    type ToEntityMarshaller[T] = Marshaller[T, MessageEntity]
    type ToHeadersAndEntityMarshaller[T] = Marshaller[T, (immutable.Seq[HttpHeader], MessageEntity)]
    type ToResponseMarshaller[T] = Marshaller[T, HttpResponse]
    type ToRequestMarshaller[T] = Marshaller[T, HttpRequest]
    

    You can read more about this here : http://doc.akka.io/docs/akka/2.4.7/scala/http/common/marshalling.html

    Luckily you don’t really have to get that involved with these that often as the routing DSL does most of the heavy lifting for you when you do the complete this is taken care of for you providing there is a marshaller that can be found implicitly

    Unmarshalling

    Unmarshalling is the process of taking the on the wire format (JSON string in these examples) back into a scala class (Item class in this case)

    You can read more about this at the official Akka docs page : http://doc.akka.io/docs/akka/2.4.7/scala/http/common/unmarshalling.html

    Luckily you don’t really have to get that involved with these that often as the routing DSL does most of the heavy lifting for you, which is what we use this part of the routing DSL, where this will use an unmarshaller to create the Item from the JSON string on the wire

    entity(as[Item]) { item =>
    

    WebSockets

    Akka Http also supports web sockets too. Lets start this investigation with looking at what is required from the routing DSL perspective, which starts like this

    path("websocket") {
      get {
        handleWebSocketMessages(websocketFlow)
      }
    } ~
    

    If we look at this special directive a bit more, what exactly does the handleWebSocketMessages directive look like

    Well it looks like this:

    def handleWebSocketMessages(handler: Flow[Message, Message, Any]): Route
    

    So we need to supply a flow. A Flow is part of akka reactive streams which will look at in the next part. But for now just be aware that you can create a Flow from a Sink/Source and Materializer to materialize the flow.

    For this websocket example here is what the Flow looks like

    val (websocketSink, websocketSource) =
      MergeHub.source[String].toMat(BroadcastHub.sink[String])(Keep.both).run()
    
    val websocketFlow: Flow[Message, Message, NotUsed] =
      Flow[Message].mapAsync(1) {
        // transform websocket message to domain message (string)
        case TextMessage.Strict(text) =>       Future.successful(text)
        case streamed: TextMessage.Streamed => streamed.textStream.runFold("")(_ ++ _)
      }.via(Flow.fromSinkAndSource(websocketSink, websocketSource))
        .map[Message](string => TextMessage(string))
    

    The idea is that when a websocket client connects and sends an initial message they will get a reply TextMessage sent over the websocket to them

    This uses some pretty new akka stream stages namely

    • MergeHub : Creates a Source that emits elements merged from a dynamic set of producers.
    • Broadcast : Emit each incoming element each of n outputs

     

    Lets start by running the server, and then opening the “WebSocketTestClient.html” page which should look like this

    image

    image

    Once the page is open, type something in the textbox and hit the “Send” button, you should see this

    image

    All fairly normal socket type stuff so far, we send a message from the web page client side to the server and the server responds with the text we sent.

    But what about if we wanted to send message to the client on demand, say from another route which could be a command to do some work, which notifies the clients of the websocket?

    With this Flow in place, we are also able to push back messages to the client end of the websocket.

    Lets see another route which will simulate some work, which results in messages being sent down the websocket back to the client (if its still connected)

    Here is the route

    path("sendmessagetowebsocket" / IntNumber) { msgCount =>
      post {
        for(i <- 0 until msgCount)
        {
          Source.single(s"sendmessagetowebsocket $i").runWith(websocketSink)
        }
        complete("done")
      }
    }
    

    It can be seen that we simply create a new source which is run with the existing Sink that was part of the Flow used by the websocket

    Here is what this would look like in postman

    image

    And here is what the web page client side websocket example looks like after this route has been called as above

    image

     

     

    Client Side

    Akka http support comes with 3 types of client API that one can use

    In this article I will only be using the last of these APIs, as in my opinion it is the most sensible client side choice.

    So what does the request level client API look like.

    GET

    If we consider that we want to conduct this request

    http://localhost:8080/randomitem

    which when run via postman gives the following JSON response

    image

    So lets see what the code looks like to do this using the request level client API

    import akka.actor.ActorSystem
    import akka.http.scaladsl.Http
    import akka.http.scaladsl.marshalling.Marshal
    import akka.http.scaladsl.model._
    import akka.http.scaladsl.unmarshalling.Unmarshal
    import akka.stream.ActorMaterializer
    import akka.stream.scaladsl._
    import scala.concurrent.{Await, Future}
    import concurrent.ExecutionContext.Implicits.global
    import common.{Item, JsonSupport}
    import concurrent.duration._
    import scala.io.StdIn
    
    class RegularRoutesDemo extends JsonSupport {
    
      def Run() : Unit = {
        implicit val system = ActorSystem()
        implicit val materializer = ActorMaterializer()
    
        val httpClient = Http().outgoingConnection(host = "localhost", port = 8080)
    
        //+++++++++++++++++++++++++++++++++++++++++++++++
        // GET http://localhost:8080/randomitem
        //+++++++++++++++++++++++++++++++++++++++++++++++
        val randomItemUrl = s"""/randomitem"""
        val flowGet : Future[Item] =
          Source.single(
            HttpRequest(
              method = HttpMethods.GET,
              uri = Uri(randomItemUrl))
            )
            .via(httpClient)
            .mapAsync(1)(response => Unmarshal(response.entity).to[Item])
            .runWith(Sink.head)
        val start = System.currentTimeMillis()
        val result = Await.result(flowGet, 5 seconds)
        val end = System.currentTimeMillis()
        println(s"Result in ${end-start} millis: $result")
    
      }
    }
    

    There are a couple of take away points in the code above

    • We use a Source which is a HttpRequest, where we can specify the HTTP verb and other request type things
    • We use Unmarshal to convert the incoming JSON string to an Item. We discussed Marshalling/Unmarshalling above.
    • This obviously relies on the Spray JSON support that we discussed above

     

    POST

    If we consider that we want to conduct this request

    http://localhost:8080/saveitem

    which when run via postman gives the following JSON response

    image

    So lets see what the code looks like to do this using the request level client API

    import akka.actor.ActorSystem
    import akka.http.scaladsl.Http
    import akka.http.scaladsl.marshalling.Marshal
    import akka.http.scaladsl.model._
    import akka.http.scaladsl.unmarshalling.Unmarshal
    import akka.stream.ActorMaterializer
    import akka.stream.scaladsl._
    import scala.concurrent.{Await, Future}
    import concurrent.ExecutionContext.Implicits.global
    import common.{Item, JsonSupport}
    import concurrent.duration._
    import scala.io.StdIn
    
    class RegularRoutesDemo extends JsonSupport {
    
      def Run() : Unit = {
        implicit val system = ActorSystem()
        implicit val materializer = ActorMaterializer()
    
        val httpClient = Http().outgoingConnection(host = "localhost", port = 8080)
    
        //+++++++++++++++++++++++++++++++++++++++++++++++
        // POST http://localhost:8080/saveitem
        //+++++++++++++++++++++++++++++++++++++++++++++++
        val saveItemUrl = s"""/saveitem"""
        val itemToSave = Item("newItemHere",12)
        val flowPost = for {
          requestEntity <- Marshal(itemToSave).to[RequestEntity]
          response <-
          Source.single(
            HttpRequest(
              method = HttpMethods.POST,
              uri = Uri(saveItemUrl),
              entity = requestEntity)
            )
            .via(httpClient)
            .mapAsync(1)(response => Unmarshal(response.entity).to[Item])
            .runWith(Sink.head)
        } yield response
        val startPost = System.currentTimeMillis()
        val resultPost = Await.result(flowPost, 5 seconds)
        val endPost = System.currentTimeMillis()
        println(s"Result in ${endPost-startPost} millis: $resultPost")
      }
    }
    

    The only thing that is different this time, is that we need to pass a JSON string representation of an Item which we pass to the HttpRequest.

    This is done use a JSON marshaller which must be in scope implicitly.

     

    Where Can I Find The Code Examples?

    I will be augmenting this GitHub repo with the example projects as I move through this series

    https://github.com/sachabarber/SachaBarber.AkkaExamples

    AKKA routing

     

    Last time we looked at Akka Clustering, this time we will look at routing.

    Routing allows messages to be routed to one or more actors known as routees, by sending the messages to a router that will know how to route the messages to the routees.

    Akka comes with quite a few inbuilt routing strategies that we can make use of. We will look at these next.

    Types Of Routing Strategy

    Akka comes with a whole bunch of inbuilt routing strategies such as :

    RoundRobin : Routes in a round-robin fashion to its routees.

    Random : This router type selects one of its routees randomly for each message.

    SmallestMailBox : A Router that tries to send to the non-suspended child routee with fewest messages in mailbox. The selection is done in this order: pick any idle routee (not processing message) with empty mailbox pick any routee with empty mailbox pick routee with fewest pending messages in mailbox pick any remote routee, remote actors are consider lowest priority, since their mailbox size is unknown

    Broadcast : A broadcast router forwards the message it receives to all its routees.

    ScatterGatherFirstCompleted : The ScatterGatherFirstCompletedRouter will send the message on to all its routees. It then waits for first reply it gets back. This result will be sent back to original sender. Other replies are discarded.

    TailChopping : The TailChoppingRouter will first send the message to one, randomly picked, routee and then after a small delay to a second routee (picked randomly from the remaining routees) and so on. It waits for first reply it gets back and forwards it back to original sender. Other replies are discarded.

    The goal of this router is to decrease latency by performing redundant queries to multiple routees, assuming that one of the other actors may still be faster to respond than the initial one.

    Regular Actor As A Router

    Akka allows you to create routers in 2 ways, the first way is to use RoutingLogic to setup your router.

    Therere are quite a few specializations of the RoutingLogic, such as

    • RoundRobinRoutingLogic
    • RandomRoutingLogic
    • SmallestMailboxRoutingLogic
    • BroadcastRoutingLogic

    You would typically use this in a regular actor. The actor in which you use the RoutingLogic would be the router. If you go down this path you would be responsible for managing the routers children, ie the routees. That means you would be responsible for managing ALL aspects of the routees, including adding them to a list of available routees, watching them for Termination to remove them from the list of available routees (which sounds a lot like supervision doesn’t it).

    Here is what a skeleton for an actor that is setup manually as a router may look like

    import java.util.concurrent.atomic.AtomicInteger
    
    import akka.actor.{Actor, Props, Terminated}
    import akka.routing.{RoutingLogic, ActorRefRoutee, RoundRobinRoutingLogic, Router}
    
    
    class RouterActor(val routingLogic : RoutingLogic)  extends Actor  {
    
      val counter : AtomicInteger = new AtomicInteger()
    
      val routees = Vector.fill(5) {
        val workerCount = counter.getAndIncrement()
        val r = context.actorOf(Props(
          new WorkerActor(workerCount)), name = s"workerActor-$workerCount")
        context watch r
        ActorRefRoutee(r)
      }
    
      //create a Router based on the incoming class field
      //RoutingLogic which will really determine what type of router
      //we end up with
      var router = Router(routingLogic, routees)
    
      def receive = {
        case WorkMessage =>
          router.route(WorkMessage, sender())
        case Report => routees.foreach(ref => ref.send(Report, sender()))
        case Terminated(a) =>
          router = router.removeRoutee(a)
          val workerCount = counter.getAndIncrement()
          val r = context.actorOf(Props(
            new WorkerActor(workerCount)), name = s"workerActor-$workerCount")
          context watch r
          router = router.addRoutee(r)
      }
    }
    

    It can be seen that I pass in the RoutingLogic, which would be one of the available RoutingLogic strategies that akka comes with.

    The other thing to note is that as we stated earlier we need to FULLY manage the collection of routee actors ourselves, including watching them for Termination.

    Sure there is a better way?

    Well yes thankfully there is, Akka also provides a Pool for this job. We will look at that next.

    Pool

    Akka comes with the ability to create a router using a pool where we tell it what actors we want to use as the routees, how many routees we want, and how the supervision should be handled.

    Here is some code from by demo code that uses 2 utility methods to create a pool created router that will use a simple FibboniciActor which is sent messages via an actor that is created using the pool router value

    def RunTailChoppingPoolDemo() : Unit = {
    
      val supervisionStrategy = OneForOneStrategy() {
        case e => SupervisorStrategy.restart
      }
    
      val props = TailChoppingPool(5, within = 10.seconds,
        supervisorStrategy = supervisionStrategy,interval = 20.millis).
        props(Props[FibonacciActor])
    
      RunPoolDemo(props)
    }
    
    def RunPoolDemo(props : Props) : Unit = {
      val system = ActorSystem("RoutingSystem")
      val actorRef = system.actorOf(Props(
        new PoolRouterContainerActor(props,"theRouter")), name = "thePoolContainer")
      actorRef ! WorkMessage
      StdIn.readLine()
      system.terminate()
    }
    
    
    
    import akka.actor._
    import akka.util.Timeout
    import scala.concurrent.Await
    import scala.concurrent.duration._
    import akka.pattern.ask
    
    class PoolRouterContainerActor(val props: Props, val name :String)  extends Actor  {
    
      val router: ActorRef = context.actorOf(props, name)
    
      def receive = {
        case WorkMessage =>
          implicit val timeout = Timeout(5 seconds)
          val futureResult = router ? FibonacciNumber(10)
          val (actName,result) = Await.result(futureResult, timeout.duration)
    
          println(s"FibonacciActor : ($actName) came back with result -> $result")
      }
    }
    
    
    
    import akka.actor.Actor
    import scala.annotation.tailrec
    
    class FibonacciActor extends Actor {
    
      val actName = self.path.name
    
      def receive = {
        case FibonacciNumber(nbr) => {
          println(s"FibonacciActor : ($actName) ->  " +
            s"has been asked to calculate FibonacciNumber")
          val result = fibonacci(nbr)
          sender ! (actName,result)
        }
      }
    
      private def fibonacci(n: Int): Int = {
        @tailrec
        def fib(n: Int, b: Int, a: Int): Int = n match {
          case 0 => a
          case _ => fib(n - 1, a + b, b)
        }
    
        fib(n, 1, 0)
      }
    }
    

    Supervision Using Pool

    Routees that are created by a pool router will be created as the router’s children. The router is therefore also the children’s supervisor.

    The supervision strategy of the router actor can be configured with the supervisorStrategy property of the Pool. If no configuration is provided, routers default to a strategy of “always escalate”. This means that errors are passed up to the router’s supervisor for handling. The router’s supervisor will decide what to do about any errors.

    Note the router’s supervisor will treat the error as an error with the router itself. Therefore a directive to stop or restart will cause the router itself to stop or restart. The router, in turn, will cause its children to stop and restart.

    It should be mentioned that the router’s restart behavior has been overridden so that a restart, while still re-creating the children, will still preserve the same number of actors in the pool.

    This means that if you have not specified supervisorStrategy of the router or its parent a failure in a routee will escalate to the parent of the router, which will by default restart the router, which will restart all routees (it uses Escalate and does not stop routees during restart). The reason is to make the default behave such that adding withRouter to a child’s definition does not change the supervision strategy applied to the child. This might be an inefficiency that you can avoid by specifying the strategy when defining the router.

    http://doc.akka.io/docs/akka/snapshot/scala/routing.html#Supervision up on 01/11/16

    Group

    You may also wish to create your routees separately and let the router know about them. This is achievable using Groups. This is not something I decided to cover in this post, but if this sounds of interest to you, you can read more about it at the official documentation here:

    http://doc.akka.io/docs/akka/snapshot/scala/routing.html#Group

    Routing Strategy Demos

    For the demos I am using a mixture of RoutingLogic hosted in my own actor, and also Pool based routers.

    Here is the basic setup for a RoutingLogic based actor of my own, where I have to manage all supervision concerns manually.

    There are ALWAYS 5 routees involved with this demo.

    import java.util.concurrent.TimeUnit
    
    import akka.actor._
    import akka.routing._
    import scala.concurrent.duration.FiniteDuration
    import scala.concurrent.duration._
    import scala.language.postfixOps
    import scala.io.StdIn
    
    object Demo extends App {
    
      //==============================================================
      //Standard Actor that does routing using Router class
      //where we apply relevant RoutingLogic
      //Supervision is done manually within the Actor that hosts
      //the Router, where we monitor the routees and remove /recreate
      //them on 'Terminated'
      //==============================================================
      RunRoutingDemo(RoundRobinRoutingLogic())
    
    
    
      def RunRoutingDemo(routingLogic : RoutingLogic) : Unit = {
        val system = ActorSystem("RoutingSystem")
        val actorRef = system.actorOf(Props(
          new RouterActor(routingLogic)), name = "theRouter")
    
        for (i <- 0 until 10) {
          actorRef ! WorkMessage
          Thread.sleep(1000)
        }
        actorRef ! Report
    
        StdIn.readLine()
        system.terminate()
      }
    }
    

    Where we make use of the following generic actor code that uses the specific RoutingLogic that is passed in.

    import java.util.concurrent.atomic.AtomicInteger
    
    import akka.actor.{Actor, Props, Terminated}
    import akka.routing.{RoutingLogic, ActorRefRoutee, RoundRobinRoutingLogic, Router}
    
    
    class RouterActor(val routingLogic : RoutingLogic)  extends Actor  {
    
      val counter : AtomicInteger = new AtomicInteger()
    
      val routees = Vector.fill(5) {
        val workerCount = counter.getAndIncrement()
        val r = context.actorOf(Props(
          new WorkerActor(workerCount)), name = s"workerActor-$workerCount")
        context watch r
        ActorRefRoutee(r)
      }
    
      //create a Router based on the incoming class field
      //RoutingLogic which will really determine what type of router
      //we end up with
      var router = Router(routingLogic, routees)
    
      def receive = {
        case WorkMessage =>
          router.route(WorkMessage, sender())
        case Report => routees.foreach(ref => ref.send(Report, sender()))
        case Terminated(a) =>
          router = router.removeRoutee(a)
          val workerCount = counter.getAndIncrement()
          val r = context.actorOf(Props(
            new WorkerActor(workerCount)), name = s"workerActor-$workerCount")
          context watch r
          router = router.addRoutee(r)
      }
    }
    

    This is what the routees look like for this set of demos

    import akka.actor.Actor
    
    class WorkerActor(val id : Int) extends Actor {
    
      var msgCount = 0
      val actName = self.path.name
    
      def receive = {
        case WorkMessage => {
          msgCount += 1
          println(s"worker : {$id}, name : ($actName) ->  ($msgCount)")
        }
        case Report => {
          println(s"worker : {$id}, name : ($actName) ->  saw total messages : ($msgCount)")
        }
        case _       => println("unknown message")
      }
    }
    

    Ok so lets have a look at some examples of using this code shall we:

    RoundRobin

    We get this output, where each routee gets the message round robin strategy applied

    worker : {0}, name : (workerActor-0) ->  (1)
    worker : {1}, name : (workerActor-1) ->  (1)
    worker : {2}, name : (workerActor-2) ->  (1)
    worker : {3}, name : (workerActor-3) ->  (1)
    worker : {4}, name : (workerActor-4) ->  (1)
    worker : {0}, name : (workerActor-0) ->  (2)
    worker : {1}, name : (workerActor-1) ->  (2)
    worker : {2}, name : (workerActor-2) ->  (2)
    worker : {3}, name : (workerActor-3) ->  (2)
    worker : {4}, name : (workerActor-4) ->  (2)
    worker : {0}, name : (workerActor-0) ->  saw total messages : (2)
    worker : {1}, name : (workerActor-1) ->  saw total messages : (2)
    worker : {2}, name : (workerActor-2) ->  saw total messages : (2)
    worker : {4}, name : (workerActor-4) ->  saw total messages : (2)
    worker : {3}, name : (workerActor-3) ->  saw total messages : (2)

    Random

    We get this output, where the messages are sent to routees randomly

    worker : {1}, name : (workerActor-1) ->  (1)
    worker : {1}, name : (workerActor-1) ->  (2)
    worker : {4}, name : (workerActor-4) ->  (1)
    worker : {0}, name : (workerActor-0) ->  (1)
    worker : {0}, name : (workerActor-0) ->  (2)
    worker : {2}, name : (workerActor-2) ->  (1)
    worker : {3}, name : (workerActor-3) ->  (1)
    worker : {4}, name : (workerActor-4) ->  (2)
    worker : {0}, name : (workerActor-0) ->  (3)
    worker : {0}, name : (workerActor-0) ->  (4)
    worker : {1}, name : (workerActor-1) ->  saw total messages : (2)
    worker : {0}, name : (workerActor-0) ->  saw total messages : (4)
    worker : {2}, name : (workerActor-2) ->  saw total messages : (1)
    worker : {4}, name : (workerActor-4) ->  saw total messages : (2)
    worker : {3}, name : (workerActor-3) ->  saw total messages : (1)

    SmallestMailBox

    We get this output, where the routee with the smallest mailbox will get the message sent to it. This example may look a bit weird, but if you think about it, by the time the new message is sent the 1st routee (workerActor0) will have dealt with the 1st message, and it ready to receive a new one, and since it’s the 1st routee in the list it is still considered the one with the smallest mailbox. If you introduced an artificial delay in the actor dealing with the message it may show different more interesting results.

    worker : {0}, name : (workerActor-0) ->  (1)
    worker : {0}, name : (workerActor-0) ->  (2)
    worker : {0}, name : (workerActor-0) ->  (3)
    worker : {0}, name : (workerActor-0) ->  (4)
    worker : {0}, name : (workerActor-0) ->  (5)
    worker : {0}, name : (workerActor-0) ->  (6)
    worker : {0}, name : (workerActor-0) ->  (7)
    worker : {0}, name : (workerActor-0) ->  (8)
    worker : {0}, name : (workerActor-0) ->  (9)
    worker : {0}, name : (workerActor-0) ->  (10)
    worker : {2}, name : (workerActor-2) ->  saw total messages : (0)
    worker : {4}, name : (workerActor-4) ->  saw total messages : (0)
    worker : {1}, name : (workerActor-1) ->  saw total messages : (0)
    worker : {0}, name : (workerActor-0) ->  saw total messages : (10)
    worker : {3}, name : (workerActor-3) ->  saw total messages : (0)

    Broadcast

    We get this output, where each routee should see ALL messages

    worker : {0}, name : (workerActor-0) ->  (1)
    worker : {2}, name : (workerActor-2) ->  (1)
    worker : {4}, name : (workerActor-4) ->  (1)
    worker : {3}, name : (workerActor-3) ->  (1)
    worker : {1}, name : (workerActor-1) ->  (1)
    worker : {0}, name : (workerActor-0) ->  (2)
    worker : {1}, name : (workerActor-1) ->  (2)
    worker : {4}, name : (workerActor-4) ->  (2)
    worker : {2}, name : (workerActor-2) ->  (2)
    worker : {3}, name : (workerActor-3) ->  (2)
    worker : {0}, name : (workerActor-0) ->  (3)
    worker : {2}, name : (workerActor-2) ->  (3)
    worker : {3}, name : (workerActor-3) ->  (3)
    worker : {4}, name : (workerActor-4) ->  (3)
    worker : {1}, name : (workerActor-1) ->  (3)
    worker : {1}, name : (workerActor-1) ->  (4)
    worker : {4}, name : (workerActor-4) ->  (4)
    worker : {3}, name : (workerActor-3) ->  (4)
    worker : {0}, name : (workerActor-0) ->  (4)
    worker : {2}, name : (workerActor-2) ->  (4)
    worker : {0}, name : (workerActor-0) ->  (5)
    worker : {1}, name : (workerActor-1) ->  (5)
    worker : {4}, name : (workerActor-4) ->  (5)
    worker : {2}, name : (workerActor-2) ->  (5)
    worker : {3}, name : (workerActor-3) ->  (5)
    worker : {3}, name : (workerActor-3) ->  (6)
    worker : {2}, name : (workerActor-2) ->  (6)
    worker : {1}, name : (workerActor-1) ->  (6)
    worker : {4}, name : (workerActor-4) ->  (6)
    worker : {0}, name : (workerActor-0) ->  (6)
    worker : {1}, name : (workerActor-1) ->  (7)
    worker : {0}, name : (workerActor-0) ->  (7)
    worker : {4}, name : (workerActor-4) ->  (7)
    worker : {2}, name : (workerActor-2) ->  (7)
    worker : {3}, name : (workerActor-3) ->  (7)
    worker : {0}, name : (workerActor-0) ->  (8)
    worker : {3}, name : (workerActor-3) ->  (8)
    worker : {1}, name : (workerActor-1) ->  (8)
    worker : {2}, name : (workerActor-2) ->  (8)
    worker : {4}, name : (workerActor-4) ->  (8)
    worker : {2}, name : (workerActor-2) ->  (9)
    worker : {3}, name : (workerActor-3) ->  (9)
    worker : {4}, name : (workerActor-4) ->  (9)
    worker : {1}, name : (workerActor-1) ->  (9)
    worker : {0}, name : (workerActor-0) ->  (9)
    worker : {0}, name : (workerActor-0) ->  (10)
    worker : {2}, name : (workerActor-2) ->  (10)
    worker : {1}, name : (workerActor-1) ->  (10)
    worker : {4}, name : (workerActor-4) ->  (10)
    worker : {3}, name : (workerActor-3) ->  (10)
    worker : {1}, name : (workerActor-1) ->  saw total messages : (10)
    worker : {2}, name : (workerActor-2) ->  saw total messages : (10)
    worker : {0}, name : (workerActor-0) ->  saw total messages : (10)
    worker : {3}, name : (workerActor-3) ->  saw total messages : (10)
    worker : {4}, name : (workerActor-4) ->  saw total messages : (10)

    So that about covers the demos I have created for using your own actor and using the RoutingLogic. Lets now look at using pools, as I have stated already pools take care of supervision for us, so we don’t have to manually take care of that any more.

    As before I have a helper actor to work with the pool, that accepts the router, where the router will receive the messages to send to its routees.

    Here is the demo code

    import java.util.concurrent.TimeUnit
    
    import akka.actor._
    import akka.routing._
    import scala.concurrent.duration.FiniteDuration
    import scala.concurrent.duration._
    import scala.language.postfixOps
    import scala.io.StdIn
    
    object Demo extends App {
    
      //==============================================================
      // Use built Pool router(s) which will do the supervision for us
      //
      //
      //    Comment/Uncomment to try the different router logic
      //
      //==============================================================
      RunScatterGatherFirstCompletedPoolDemo()
      //RunTailChoppingPoolDemo()
    
    
    
      def RunScatterGatherFirstCompletedPoolDemo() : Unit = {
    
        val supervisionStrategy = OneForOneStrategy() {
          case e => SupervisorStrategy.restart
        }
    
        val props = ScatterGatherFirstCompletedPool(
          5, supervisorStrategy = supervisionStrategy,within = 10.seconds).
          props(Props[FibonacciActor])
    
        RunPoolDemo(props)
      }
    
      def RunTailChoppingPoolDemo() : Unit = {
    
        val supervisionStrategy = OneForOneStrategy() {
          case e => SupervisorStrategy.restart
        }
    
        val props = TailChoppingPool(5, within = 10.seconds,
          supervisorStrategy = supervisionStrategy,interval = 20.millis).
          props(Props[FibonacciActor])
    
        RunPoolDemo(props)
      }
    
      def RunPoolDemo(props : Props) : Unit = {
        val system = ActorSystem("RoutingSystem")
        val actorRef = system.actorOf(Props(
          new PoolRouterContainerActor(props,"theRouter")), name = "thePoolContainer")
        actorRef ! WorkMessage
        StdIn.readLine()
        system.terminate()
      }
    }
    

    And here is the help actor

    import akka.actor._
    import akka.util.Timeout
    import scala.concurrent.Await
    import scala.concurrent.duration._
    import akka.pattern.ask
    
    class PoolRouterContainerActor(val props: Props, val name :String)  extends Actor  {
    
      val router: ActorRef = context.actorOf(props, name)
    
      def receive = {
        case WorkMessage =>
          implicit val timeout = Timeout(5 seconds)
          val futureResult = router ? FibonacciNumber(10)
          val (actName,result) = Await.result(futureResult, timeout.duration)
    
          println(s"FibonacciActor : ($actName) came back with result -> $result")
      }
    }
    

    As before we will use 5 routees.

    This is what the routees look like for the pool demo

    import akka.actor.Actor
    import scala.annotation.tailrec
    
    class FibonacciActor extends Actor {
    
      val actName = self.path.name
    
      def receive = {
        case FibonacciNumber(nbr) => {
          println(s"FibonacciActor : ($actName) ->  " +
            s"has been asked to calculate FibonacciNumber")
          val result = fibonacci(nbr)
          sender ! (actName,result)
        }
      }
    
      private def fibonacci(n: Int): Int = {
        @tailrec
        def fib(n: Int, b: Int, a: Int): Int = n match {
          case 0 => a
          case _ => fib(n - 1, a + b, b)
        }
    
        fib(n, 1, 0)
      }
    }
    

    ScatterGatherFirstCompletedPool

    Here is the output when we run this. It can be seen that we simply get the results from the routee that completed first

    FibonacciActor : ($d) ->  has been asked to calculate FibonacciNumber
    FibonacciActor : ($e) ->  has been asked to calculate FibonacciNumber
    FibonacciActor : ($a) ->  has been asked to calculate FibonacciNumber
    FibonacciActor : ($c) ->  has been asked to calculate FibonacciNumber
    FibonacciActor : ($b) ->  has been asked to calculate FibonacciNumber
    FibonacciActor : ($d) came back with result -> 55

    TailChoppingPool

    Here is the output when we run this. It can be seen that we simply get the results from the routee that completed first, out of the few routees that the message was sent to

    FibonacciActor : ($b) ->  has been asked to calculate FibonacciNumber
    FibonacciActor : ($b) came back with result -> 55

     

    What About Custom Routing Strategy

    Akka allows you to create your own routing strategy where you would create a class that extends the inbuilt Akka RoutingLogic. You can read more about this in the official Akka documentation:

    http://doc.akka.io/docs/akka/snapshot/scala/routing.html#Custom_Router

     

    Where Can I Find The Code Examples?

    I will be augmenting this GitHub repo with the example projects as I move through this series

    https://github.com/sachabarber/SachaBarber.AkkaExamples

    AKKA : clustering

    Last time we look at remoting. You can kind of think of clustering as an extension to remoting, as some of the same underlying parts are used.  But as we will see clustering is way more powerful (and more fault tolerant too).

    My hope is by the end of this post that you will know enough about Akka clustering that you would be able to create your own clustered Akka apps.

    A Note About All The Demos In This Topic

    I wanted the demos in this section to be as close to real life as possible. The official akka examples tend to have a single process. Which I personally think is quite confusing when you are trying to deal with quite hard concepts. As such I decided to go with multi process projects to demonstrate things. I do however only have 1 laptop, so they are hosted on the same node, but they are separate processes/JVMs.

    I am hoping by doing this it will make the learning process easier, as it is closer to what you would do in real life rather than have 1 main method that spawns an entire cluster. You just would not have that in real life.

     

    What Is Akka Clustering?

    Unlike remoting which is peer to peer, a cluster may constitute many members, which can grow and contract depending on demand/failure. There is also the concept of roles for actors with a cluster, which this post will talk about.

    You can see how this could be very useful, in fact you could see how this may be used to create a general purpose grid calculation engine such as Apache Spark.

     

    Seed Nodes

    Akka has the concept of some initial contact points within the cluster to allow the cluster to bootstrap itself as it were.

    Here is what the official Akka docs say on this:

    You may decide if joining to the cluster should be done manually or automatically to configured initial contact points, so-called seed nodes. When a new node is started it sends a message to all seed nodes and then sends join command to the one that answers first. If no one of the seed nodes replied (might not be started yet) it retries this procedure until successful or shutdown.

    You may choose to configure these “seed nodes” in code, but the easiest way is via configuration. The relevant part of the demo apps configuration is here

    akka {
      .....
      .....
      cluster {
        seed-nodes = [
          "akka.tcp://ClusterSystem@127.0.0.1:2551",
          "akka.tcp://ClusterSystem@127.0.0.1:2552"]
      }
      .....
      .....
    ]
    
    
    
    
    

    The seed nodes can be started in any order and it is not necessary to have all seed nodes running, but the node configured as the first element in the seed-nodes configuration list must be started when initially starting a cluster, otherwise the other seed-nodes will not become initialized and no other node can join the cluster. The reason for the special first seed node is to avoid forming separated islands when starting from an empty cluster. It is quickest to start all configured seed nodes at the same time (order doesn’t matter), otherwise it can take up to the configured seed-node-timeout until the nodes can join.

    Once more than two seed nodes have been started it is no problem to shut down the first seed node. If the first seed node is restarted, it will first try to join the other seed nodes in the existing cluster.

    We will see the entire configuration for the demo app later on this post. For now just be aware that there is a concept of seed nodes and the best way to configure those for the cluster is via configuration.

    Saying that there may be some amongst you that would prefer to use the JVM property system which you may do as follows:

    -Dakka.cluster.seed-nodes.0=akka.tcp://ClusterSystem@127.0.0.1:2551
    -Dakka.cluster.seed-nodes.1=akka.tcp://ClusterSystem@127.0.0.1:2552
    

    Roles

    Akka clustering comes with the concept of roles.You may be asking why would we need that?

    Well its quite simple really, say we have a higher than normal volume of data coming through you akka cluster system, you may want to increase the total processing power of the cluster to deal with this. How do we do that, we spin up more actors within a particular role. The role here may be “backend” that do work designated to them by some other actor say “frontend” role.

    By using roles we can manage which bits of the cluster get dynamically allocated more/less actors.

    You can configure the minimum number of role actor in configuration, which you can read more about here:

    http://doc.akka.io/docs/akka/snapshot/java/cluster-usage.html#How_To_Startup_when_Cluster_Size_Reached

    Member Events

    Akka provides the ability to listen to member events. There are a number of reasons this could be useful, for example

    • Determining if a member has left the cluster
    • If a new member has joined the cluster

    Here is a full list of the me Cluster events that you may choose to listen to

    The events to track the life-cycle of members are:

    • ClusterEvent.MemberJoined – A new member has joined the cluster and its status has been changed to Joining.
    • ClusterEvent.MemberUp – A new member has joined the cluster and its status has been changed to Up.
    • ClusterEvent.MemberExited – A member is leaving the cluster and its status has been changed to Exiting Note that the node might already have been shutdown when this event is published on another node.
    • ClusterEvent.MemberRemoved – Member completely removed from the cluster.
    • ClusterEvent.UnreachableMember – A member is considered as unreachable, detected by the failure detector of at least one other node.
    • ClusterEvent.ReachableMember – A member is considered as reachable again, after having been unreachable. All nodes that previously detected it as unreachable has detected it as reachable again.

    And this is how you might subscribe to these events

    cluster.subscribe(self, classOf[MemberUp])
    

    Which you may use in an actor like this:

    class SomeActor extends Actor {
    
      val cluster = Cluster(context.system)
    
      // subscribe to cluster changes, MemberUp
      override def preStart(): Unit = cluster.subscribe(self, classOf[MemberUp])
      
      def receive = {
        case MemberUp(m) => register(m)
      }
    
      def register(member: Member): Unit =
        if (member.hasRole("frontend"))
         ...
    }
    

    We will see more on this within the demo code which we will walk through later

    ClusterClient

    What use is a cluster which cant receive commands from the outside world?

    Well luckily we don’t have to care about that as Akka comes with 2 things that make this otherwise glib situation ok.

    Akka comes with a ClusterClient which allows actors which are not part of the cluster to talk to the cluster. Here is what the offical Akka docs have to say about this

    An actor system that is not part of the cluster can communicate with actors somewhere in the cluster via this ClusterClient. The client can of course be part of another cluster. It only needs to know the location of one (or more) nodes to use as initial contact points. It will establish a connection to a ClusterReceptionist somewhere in the cluster. It will monitor the connection to the receptionist and establish a new connection if the link goes down. When looking for a new receptionist it uses fresh contact points retrieved from previous establishment, or periodically refreshed contacts, i.e. not necessarily the initial contact points.

     

    Receptionist

    As mentioned above the ClusterClient makes use of a ClusterReceptionist, but what is that, and how do we make a cluster actor available to the client using that?

    The ClusterReceptionist is an Akka contrib extension, and must be configured on ALL the nodes that the ClusterClient will need to talk to.

    There are 2 parts this, firstly we must ensure that the ClusterReceptionist is started on the nodes that ClusterClient will need to communicate with. This is easily done using the following config:

    akka {
      ....
      ....
      ....
      # enable receptionist at start
      extensions = ["akka.cluster.client.ClusterClientReceptionist"]
    
    }
    
    

    The other thing that needs doing, is that any actor within the cluster that you want to be able to talk to using the  ClusterClient will need to register itself as a service with the ClusterClientReceptionist. Here is an example of how to do that

    val system = ActorSystem("ClusterSystem", config)
    val frontend = system.actorOf(Props[TransformationFrontend], name = "frontend")
    ClusterClientReceptionist(system).registerService(frontend)
    

    Now that you have done that you should be able to communicate with this actor within the cluster using the ClusterClient

     

    The Demo Dissection

    I have based the demo for this post largely against the “Transformation” demo that LightBend provide, which you can grab from here :

    http://www.lightbend.com/activator/template/akka-sample-cluster-scala

    The “Official” example as it is, provides a cluster which contains “frontend” and “backend” roles. The “frontend” actors will take a text message and pass it to the register workers (“Backend”s) who will UPPERCASE the message and return to the “frontend”.

    I have taken this sample and added the ability to use the ClusterClient with it, which works using Future[T] and the ask pattern, such that the ClusterClient  will get a response from the cluster request.

    We will dive into all of this in just a moment

    For the demo this is what we are trying to build

    image

    SBT / Dependencies

    Before we dive into the demo code (which as I say is based largely on the official lightbend clustering example anyway) I would just like to dive into the SBT file that drives the demo projects

    This is the complete SBT file for the entire demo

    import sbt._
    import sbt.Keys._
    
    
    lazy val allResolvers = Seq(
      "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/",
      "Akka Snapshot Repository" at "http://repo.akka.io/snapshots/"
    )
    
    lazy val AllLibraryDependencies =
      Seq(
        "com.typesafe.akka" %% "akka-actor"         % "2.4.8",
        "com.typesafe.akka" %% "akka-remote"        % "2.4.8",
        "com.typesafe.akka" %% "akka-cluster"       % "2.4.8",
        "com.typesafe.akka" %% "akka-cluster-tools" % "2.4.8",
        "com.typesafe.akka" %% "akka-contrib"       % "2.4.8"
      )
    
    
    lazy val commonSettings = Seq(
      version := "1.0",
      scalaVersion := "2.11.8",
      resolvers := allResolvers,
      libraryDependencies := AllLibraryDependencies
    )
    
    
    lazy val root =(project in file(".")).
      settings(commonSettings: _*).
      settings(
        name := "Base"
      )
      .aggregate(common, frontend, backend)
      .dependsOn(common, frontend, backend)
    
    lazy val common = (project in file("common")).
      settings(commonSettings: _*).
      settings(
        name := "common"
      )
    
    lazy val frontend = (project in file("frontend")).
      settings(commonSettings: _*).
      settings(
        name := "frontend"
      )
      .aggregate(common)
      .dependsOn(common)
    
    lazy val backend = (project in file("backend")).
      settings(commonSettings: _*).
      settings(
        name := "backend"
      )
      .aggregate(common)
      .dependsOn(common)
    

    There are a few things to note in this

    • We need a few dependencies to get clustering to work. Namely
      • akka-remote
      • akka-cluster
      • akka-cluster-tools
      • akka-contrib
    • There are a few projects
      • root : The cluster client portion
      • common : common files
      • frontend : frontend cluster based actors (the client will talk to these)
      • backend : backend cluster based actors

     

    The Projects

    Now that we have seen the projects involved from an SBT point of view, lets continue to look at how the actual projects perform their duties

    Remember the workflow we are trying to achieve is something like this

    • We should ensure that a frontend (seed node) is started first
    • We should ensure a backend (seed node) is started. This will have the effect of the backend actor registering itself as a worker with the already running frontend actor
    • At this point we could start more frontend/backend non seed nodes actors, if we chose to
    • We start the client app (root) which will periodically send messages to the frontend actor that is looked up by its known seed node information. We would expect the frontend actor to delegate work of to one of its known backend actors, and then send the response back to the client (ClusterClient) where we can use the response to send to a local actor, or consume the response directly

    Common

    The common project simply contains the common objects across the other projects. Which for this demo app are just the messages as shown below

    package sample.cluster.transformation
    
    final case class TransformationJob(text: String)
    final case class TransformationResult(text: String)
    final case class JobFailed(reason: String, job: TransformationJob)
    case object BackendRegistration
    

     

    Root

    This is the client app that will talk to the cluster (in particular the “frontend” seed node which expected to be running on 127.0.0.1:2551.

    This client app uses the following configuration file

    akka {
      actor {
        provider = "akka.cluster.ClusterActorRefProvider"
      }
    
      remote {
        transport = "akka.remote.netty.NettyRemoteTransport"
        log-remote-lifecycle-events = off
        netty.tcp {
          hostname = "127.0.0.1"
          port = 5000
        }
      }
    }
    

    We then use the following main method to kick of the client app

    import java.util.concurrent.atomic.AtomicInteger
    
    import akka.actor.{Props, ActorSystem}
    import akka.util.Timeout
    import scala.io.StdIn
    import scala.concurrent.duration._
    
    
    object DemoClient {
      def main(args : Array[String]) {
    
        val system = ActorSystem("OTHERSYSTEM")
        val clientJobTransformationSendingActor =
          system.actorOf(Props[ClientJobTransformationSendingActor],
            name = "clientJobTransformationSendingActor")
    
        val counter = new AtomicInteger
        import system.dispatcher
        system.scheduler.schedule(2.seconds, 2.seconds) {
          clientJobTransformationSendingActor ! Send(counter.incrementAndGet())
          Thread.sleep(1000)
        }
    
        StdIn.readLine()
        system.terminate()
      }
    }
    
    
    
    
    

    There is not too much to talk about here, we simply create a standard actor, and send it messages on a recurring schedule.

    The message looks like this

    case class Send(count:Int)
    

    The real work of talking to the cluster is inside the ClientJobTransformationSendingActor which we will look at now

    import akka.actor.Actor
    import akka.actor.ActorPath
    import akka.cluster.client.{ClusterClientSettings, ClusterClient}
    import akka.pattern.Patterns
    import sample.cluster.transformation.{TransformationResult, TransformationJob}
    import akka.util.Timeout
    import scala.concurrent.duration._
    import scala.concurrent.ExecutionContext.Implicits.global
    import scala.util.{Failure, Success}
    
    
    class ClientJobTransformationSendingActor extends Actor {
    
      val initialContacts = Set(
        ActorPath.fromString("akka.tcp://ClusterSystem@127.0.0.1:2551/system/receptionist"))
      val settings = ClusterClientSettings(context.system)
        .withInitialContacts(initialContacts)
    
      val c = context.system.actorOf(ClusterClient.props(settings), "demo-client")
    
    
      def receive = {
        case TransformationResult(result) => {
          println("Client response")
          println(result)
        }
        case Send(counter) => {
            val job = TransformationJob("hello-" + counter)
            implicit val timeout = Timeout(5 seconds)
            val result = Patterns.ask(c,ClusterClient.Send("/user/frontend", job, localAffinity = true), timeout)
    
            result.onComplete {
              case Success(transformationResult) => {
                println(s"Client saw result: $transformationResult")
                self ! transformationResult
              }
              case Failure(t) => println("An error has occured: " + t.getMessage)
            }
          }
      }
    }
    

    As you can see this is a regular actor, but there are several important things to note here:

    • We setup the ClusterClient with a known set of seed nodes that we can expect to be able to contact within the cluster (remember these nodes MUST have registered themselves as available services with the ClusterClientReceptionist
    • That we use a new type of actor a ClusterClient
    • That we use the ClusterClient to send a message to a seed node within the cluster (frontend) in our case. We use the ask pattern which will give use a Future[T] which represents the response.
    • We use the response to send a local message to ourself

     

    FrontEnd

    As previously stated the “frontend” role actors serve as the seed nodes for the ClusterClient. There is only one seed node for the frontend which we just saw the client app uses via the ClusterClient.

    So what happens when the client app uses the frontend actors via the ClusterClient, well its quite simple the client app (once a connection is made to the frontend seed node) send a simple TransformationJob which is a simple message that contains a bit of text that the frontend actor will pass on to one of its registered backend workers for processing.

    The backend actor (also in the cluster) will simply convert the TransformationJob contained text to  UPPERCASE and return it to the frontend actor. The frontend actor will then send this TransformationResult back to the sender which happens to be the ClusterClient. The client app will listen to this (which was done using the ask pattern) and will hook up a callback for the Future[T] and will the send the TransformationResult to the clients own actor.

    Happy days.

    So that is what we are trying to achieve, lets see what bits and bobs we need for the frontend side of things

    Here is the configuration the frontend needs

    #//#snippet
    akka {
      actor {
        provider = "akka.cluster.ClusterActorRefProvider"
      }
      remote {
        log-remote-lifecycle-events = off
        netty.tcp {
          hostname = "127.0.0.1"
          port = 0
        }
      }
    
      cluster {
        seed-nodes = [
          "akka.tcp://ClusterSystem@127.0.0.1:2551",
          "akka.tcp://ClusterSystem@127.0.0.1:2552"]
    
        #//#snippet
        # excluded from snippet
        auto-down-unreachable-after = 10s
        #//#snippet
        # auto downing is NOT safe for production deployments.
        # you may want to use it during development, read more about it in the docs.
        #
        # auto-down-unreachable-after = 10s
      }
    
      # enable receptionist at start
      extensions = ["akka.cluster.client.ClusterClientReceptionist"]
    
    }
    
    

    There are a couple of important things to note in this, namely:

    • That we configure the seed nodes
    • That we also use add the ClusterClientReceptionist
    • That we use the ClusterActorRefProvider

    And here is the frontend application

    package sample.cluster.transformation.frontend
    
    import language.postfixOps
    import akka.actor.ActorSystem
    import akka.actor.Props
    import com.typesafe.config.ConfigFactory
    import akka.cluster.client.ClusterClientReceptionist
    
    
    
    object TransformationFrontendApp {
    
      def main(args: Array[String]): Unit = {
    
        // Override the configuration of the port when specified as program argument
        val port = if (args.isEmpty) "0" else args(0)
        val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port").
          withFallback(ConfigFactory.parseString("akka.cluster.roles = [frontend]")).
          withFallback(ConfigFactory.load())
    
        val system = ActorSystem("ClusterSystem", config)
        val frontend = system.actorOf(Props[TransformationFrontend], name = "frontend")
        ClusterClientReceptionist(system).registerService(frontend)
      }
    
    }
    

    The important parts here are that we embellish the read config with the role of “frontend”, and that we also register the frontend actor with the ClusterClientReceptionist such that the actor is available to communicate with by the ClusterClient

    Other than that it is all pretty vanilla akka to be honest

    So lets now focus our attention to the actual frontend actor, which is shown below

    package sample.cluster.transformation.frontend
    
    import sample.cluster.transformation.{TransformationResult, BackendRegistration, JobFailed, TransformationJob}
    import language.postfixOps
    import scala.concurrent.Future
    import akka.actor.Actor
    import akka.actor.ActorRef
    import akka.actor.Terminated
    import akka.util.Timeout
    import scala.concurrent.duration._
    import scala.concurrent.ExecutionContext.Implicits.global
    import akka.pattern.pipe
    import akka.pattern.ask
    
    
    class TransformationFrontend extends Actor {
    
      var backends = IndexedSeq.empty[ActorRef]
      var jobCounter = 0
    
      def receive = {
        case job: TransformationJob if backends.isEmpty =>
          sender() ! JobFailed("Service unavailable, try again later", job)
    
        case job: TransformationJob =>
          println(s"Frontend saw TransformationJob : '$job'")
          jobCounter += 1
          implicit val timeout = Timeout(5 seconds)
          val result  = (backends(jobCounter % backends.size) ? job)
            .map(x => x.asInstanceOf[TransformationResult])
          result pipeTo sender
          //pipe(result) to sender
    
        case BackendRegistration if !backends.contains(sender()) =>
          context watch sender()
          backends = backends :+ sender()
    
        case Terminated(a) =>
          backends = backends.filterNot(_ == a)
      }
    }
    

    The crucial parts here are:

    • That when a backend registers it will send a BackendRegistration, which we then watch and monitor, and if that backend terminates it is removed from the list of this frontend actors known backend actors
    • That we palm off the incoming TransformationJob to a random backend, and then use the pipe pattern to pipe the response back to the client

    And with that, all that is left to do is examine the backend code, lets looks at that now

     

    BackEnd

    As always lets start with the configuration, which for the backend is as follows:

    #//#snippet
    akka {
      actor {
        provider = "akka.cluster.ClusterActorRefProvider"
      }
      remote {
        log-remote-lifecycle-events = off
        netty.tcp {
          hostname = "127.0.0.1"
          port = 0
        }
      }
    
      cluster {
        seed-nodes = [
          "akka.tcp://ClusterSystem@127.0.0.1:2551",
          "akka.tcp://ClusterSystem@127.0.0.1:2552"]
    
        #//#snippet
        # excluded from snippet
        auto-down-unreachable-after = 10s
        #//#snippet
        # auto downing is NOT safe for production deployments.
        # you may want to use it during development, read more about it in the docs.
        #
        # auto-down-unreachable-after = 10s
      }
    
      # enable receptionist at start
      extensions = ["akka.cluster.client.ClusterClientReceptionist"]
    }
    
    
    
    
    

    You can see this is pretty much the same as the frontend, so I won’t speak to this anymore.

    Ok so following what we did with the frontend side of things, lets now look at the backend app

    package sample.cluster.transformation.backend
    
    import language.postfixOps
    import scala.concurrent.duration._
    import akka.actor.ActorSystem
    import akka.actor.Props
    import com.typesafe.config.ConfigFactory
    
    object TransformationBackendApp {
      def main(args: Array[String]): Unit = {
        // Override the configuration of the port when specified as program argument
        val port = if (args.isEmpty) "0" else args(0)
        val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port").
          withFallback(ConfigFactory.parseString("akka.cluster.roles = [backend]")).
          withFallback(ConfigFactory.load())
    
        val system = ActorSystem("ClusterSystem", config)
        system.actorOf(Props[TransformationBackend], name = "backend")
      }
    }
    

    Again this is VERY similar to the front end app, the only notable exception being that we now use a “backend” role instead of a “frontend” one

    So now lets look at the backend actor code, which is the final piece of the puzzle

    package sample.cluster.transformation.backend
    
    import sample.cluster.transformation.{BackendRegistration, TransformationResult, TransformationJob}
    import language.postfixOps
    import scala.concurrent.duration._
    import akka.actor.Actor
    import akka.actor.RootActorPath
    import akka.cluster.Cluster
    import akka.cluster.ClusterEvent.CurrentClusterState
    import akka.cluster.ClusterEvent.MemberUp
    import akka.cluster.Member
    import akka.cluster.MemberStatus
    
    
    class TransformationBackend extends Actor {
    
      val cluster = Cluster(context.system)
    
      // subscribe to cluster changes, MemberUp
      // re-subscribe when restart
      override def preStart(): Unit = cluster.subscribe(self, classOf[MemberUp])
      override def postStop(): Unit = cluster.unsubscribe(self)
    
      def receive = {
        case TransformationJob(text) => {
          val result = text.toUpperCase
          println(s"Backend has transformed the incoming job text of '$text' into '$result'")
          sender() ! TransformationResult(text.toUpperCase)
        }
        case state: CurrentClusterState =>
          state.members.filter(_.status == MemberStatus.Up) foreach register
        case MemberUp(m) => register(m)
      }
    
      def register(member: Member): Unit =
        if (member.hasRole("frontend"))
          context.actorSelection(RootActorPath(member.address) / "user" / "frontend") !
            BackendRegistration
    }
    

    The key points here are:

    • That we use the cluster events, to subscribe to MemberUp such that if its “frontend” role actor, we will register this backend with it by sending a BackendRegistration message to it
    • That for any TrasformationJob received (from the frontend which is ultimately for the client app) we do the work, and send a TransformationResult back, which will make its way all the way back to the client

     

    And in a nutshell that is how the entire demo hangs together. I hope I have not lost anyone along the way.

    Anyway lets now see how we can run the demo

    How do I Run The Demo

    You will need to ensure that you run the following 3 projects in this order (as a minimum. You can run more NON seed node frontend/backend versions before you start the root (client) if you like)

    • Frontend (seed node) : frontend with command line args : 2551
    • Backend (seed node) : backend with command line args : 2551
    • Optionally run more frontend/backend projects but DON’T supply any command line args. This is how you get them to not be treated as seed nodes
    •  Root : This is the client app

     

    Once you run the projects you should see some output like

    The “root” (client) project output:

    [INFO] [10/05/2016 07:22:02.831] [main] [akka.remote.Remoting] Starting remoting
    [INFO] [10/05/2016 07:22:03.302] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://OTHERSYSTEM@127.0.0.1:5000]
    [INFO] [10/05/2016 07:22:03.322] [main] [akka.cluster.Cluster(akka://OTHERSYSTEM)] Cluster Node [akka.tcp://OTHERSYSTEM@127.0.0.1:5000] – Starting up…
    [INFO] [10/05/2016 07:22:03.450] [main] [akka.cluster.Cluster(akka://OTHERSYSTEM)] Cluster Node [akka.tcp://OTHERSYSTEM@127.0.0.1:5000] – Registered cluster JMX MBean [akka:type=Cluster]
    [INFO] [10/05/2016 07:22:03.450] [main] [akka.cluster.Cluster(akka://OTHERSYSTEM)] Cluster Node [akka.tcp://OTHERSYSTEM@127.0.0.1:5000] – Started up successfully
    [INFO] [10/05/2016 07:22:03.463] [OTHERSYSTEM-akka.actor.default-dispatcher-2] [akka.cluster.Cluster(akka://OTHERSYSTEM)] Cluster Node [akka.tcp://OTHERSYSTEM@127.0.0.1:5000] – Metrics will be retreived from MBeans, and may be incorrect on some platforms. To increase metric accuracy add the ‘sigar.jar’ to the classpath and the appropriate platform-specific native libary to ‘java.library.path’. Reason: java.lang.ClassNotFoundException: org.hyperic.sigar.Sigar
    [INFO] [10/05/2016 07:22:03.493] [OTHERSYSTEM-akka.actor.default-dispatcher-2] [akka.cluster.Cluster(akka://OTHERSYSTEM)] Cluster Node [akka.tcp://OTHERSYSTEM@127.0.0.1:5000] – Metrics collection has started successfully
    [WARN] [10/05/2016 07:22:03.772] [OTHERSYSTEM-akka.actor.default-dispatcher-19] [akka.tcp://OTHERSYSTEM@127.0.0.1:5000/system/cluster/core/daemon] Trying to join member with wrong ActorSystem name, but was ignored, expected [OTHERSYSTEM] but was [ClusterSystem]
    [INFO] [10/05/2016 07:22:03.811] [OTHERSYSTEM-akka.actor.default-dispatcher-19] [akka.tcp://OTHERSYSTEM@127.0.0.1:5000/user/demo-client] Connected to [akka.tcp://ClusterSystem@127.0.0.1:2552/system/receptionist]
    [WARN] [10/05/2016 07:22:05.581] [OTHERSYSTEM-akka.remote.default-remote-dispatcher-14] [akka.serialization.Serialization(akka://OTHERSYSTEM)] Using the default Java serializer for class [sample.cluster.transformation.TransformationJob] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting ‘akka.actor.warn-about-java-serializer-usage’
    Client saw result: TransformationResult(HELLO-1)
    Client response
    HELLO-1
    Client saw result: TransformationResult(HELLO-2)
    Client response
    HELLO-2
    Client saw result: TransformationResult(HELLO-3)
    Client response
    HELLO-3
    Client saw result: TransformationResult(HELLO-4)
    Client response
    HELLO-4

    The “frontend” project output:

    [INFO] [10/05/2016 07:21:35.592] [main] [akka.remote.Remoting] Starting remoting
    [INFO] [10/05/2016 07:21:35.883] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://ClusterSystem@127.0.0.1:2551]
    [INFO] [10/05/2016 07:21:35.901] [main] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] – Starting up…
    [INFO] [10/05/2016 07:21:36.028] [main] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] – Registered cluster JMX MBean [akka:type=Cluster]
    [INFO] [10/05/2016 07:21:36.028] [main] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] – Started up successfully
    [INFO] [10/05/2016 07:21:36.037] [ClusterSystem-akka.actor.default-dispatcher-2] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] – Metrics will be retreived from MBeans, and may be incorrect on some platforms. To increase metric accuracy add the ‘sigar.jar’ to the classpath and the appropriate platform-specific native libary to ‘java.library.path’. Reason: java.lang.ClassNotFoundException: org.hyperic.sigar.Sigar
    [INFO] [10/05/2016 07:21:36.040] [ClusterSystem-akka.actor.default-dispatcher-2] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] – Metrics collection has started successfully
    [WARN] [10/05/2016 07:21:37.202] [ClusterSystem-akka.remote.default-remote-dispatcher-5] [akka.tcp://ClusterSystem@127.0.0.1:2551/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.1%3A2552-0] Association with remote system [akka.tcp://ClusterSystem@127.0.0.1:2552] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://ClusterSystem@127.0.0.1:2552]] Caused by: [Connection refused: no further information: /127.0.0.1:2552]
    [INFO] [10/05/2016 07:21:37.229] [ClusterSystem-akka.actor.default-dispatcher-17] [akka://ClusterSystem/deadLetters] Message [akka.cluster.InternalClusterAction$InitJoin$] from Actor[akka://ClusterSystem/system/cluster/core/daemon/firstSeedNodeProcess-1#-2009390233] to Actor[akka://ClusterSystem/deadLetters] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings ‘akka.log-dead-letters’ and ‘akka.log-dead-letters-during-shutdown’.
    [INFO] [10/05/2016 07:21:37.229] [ClusterSystem-akka.actor.default-dispatcher-17] [akka://ClusterSystem/deadLetters] Message [akka.cluster.InternalClusterAction$InitJoin$] from Actor[akka://ClusterSystem/system/cluster/core/daemon/firstSeedNodeProcess-1#-2009390233] to Actor[akka://ClusterSystem/deadLetters] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings ‘akka.log-dead-letters’ and ‘akka.log-dead-letters-during-shutdown’.
    [INFO] [10/05/2016 07:21:37.232] [ClusterSystem-akka.actor.default-dispatcher-21] [akka://ClusterSystem/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.1%3A2552-0/endpointWriter] Message [akka.remote.EndpointWriter$AckIdleCheckTimer$] from Actor[akka://ClusterSystem/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.1%3A2552-0/endpointWriter#-1346529294] to Actor[akka://ClusterSystem/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.1%3A2552-0/endpointWriter#-1346529294] was not delivered. [3] dead letters encountered. This logging can be turned off or adjusted with configuration settings ‘akka.log-dead-letters’ and ‘akka.log-dead-letters-during-shutdown’.
    [INFO] [10/05/2016 07:21:38.085] [ClusterSystem-akka.actor.default-dispatcher-22] [akka://ClusterSystem/deadLetters] Message [akka.cluster.InternalClusterAction$InitJoin$] from Actor[akka://ClusterSystem/system/cluster/core/daemon/firstSeedNodeProcess-1#-2009390233] to Actor[akka://ClusterSystem/deadLetters] was not delivered. [4] dead letters encountered. This logging can be turned off or adjusted with configuration settings ‘akka.log-dead-letters’ and ‘akka.log-dead-letters-during-shutdown’.
    [INFO] [10/05/2016 07:21:39.088] [ClusterSystem-akka.actor.default-dispatcher-14] [akka://ClusterSystem/deadLetters] Message [akka.cluster.InternalClusterAction$InitJoin$] from Actor[akka://ClusterSystem/system/cluster/core/daemon/firstSeedNodeProcess-1#-2009390233] to Actor[akka://ClusterSystem/deadLetters] was not delivered. [5] dead letters encountered. This logging can be turned off or adjusted with configuration settings ‘akka.log-dead-letters’ and ‘akka.log-dead-letters-during-shutdown’.
    [INFO] [10/05/2016 07:21:40.065] [ClusterSystem-akka.actor.default-dispatcher-20] [akka://ClusterSystem/deadLetters] Message [akka.cluster.InternalClusterAction$InitJoin$] from Actor[akka://ClusterSystem/system/cluster/core/daemon/firstSeedNodeProcess-1#-2009390233] to Actor[akka://ClusterSystem/deadLetters] was not delivered. [6] dead letters encountered. This logging can be turned off or adjusted with configuration settings ‘akka.log-dead-letters’ and ‘akka.log-dead-letters-during-shutdown’.
    [INFO] [10/05/2016 07:21:41.095] [ClusterSystem-akka.actor.default-dispatcher-19] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] – Node [akka.tcp://ClusterSystem@127.0.0.1:2551] is JOINING, roles [frontend]
    [INFO] [10/05/2016 07:21:41.123] [ClusterSystem-akka.actor.default-dispatcher-19] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] – Leader is moving node [akka.tcp://ClusterSystem@127.0.0.1:2551] to [Up]
    [INFO] [10/05/2016 07:21:50.837] [ClusterSystem-akka.actor.default-dispatcher-14] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] – Node [akka.tcp://ClusterSystem@127.0.0.1:2552] is JOINING, roles [backend]
    [INFO] [10/05/2016 07:21:51.096] [ClusterSystem-akka.actor.default-dispatcher-16] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] – Leader is moving node [akka.tcp://ClusterSystem@127.0.0.1:2552] to [Up]
    Frontend saw TransformationJob : ‘TransformationJob(hello-1)’
    [WARN] [10/05/2016 07:22:05.669] [ClusterSystem-akka.remote.default-remote-dispatcher-24] [akka.serialization.Serialization(akka://ClusterSystem)] Using the default Java serializer for class [sample.cluster.transformation.TransformationJob] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting ‘akka.actor.warn-about-java-serializer-usage’
    [WARN] [10/05/2016 07:22:05.689] [ClusterSystem-akka.remote.default-remote-dispatcher-23] [akka.serialization.Serialization(akka://ClusterSystem)] Using the default Java serializer for class [sample.cluster.transformation.TransformationResult] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting ‘akka.actor.warn-about-java-serializer-usage’
    Frontend saw TransformationJob : ‘TransformationJob(hello-2)’
    Frontend saw TransformationJob : ‘TransformationJob(hello-3)’
    Frontend saw TransformationJob : ‘TransformationJob(hello-4)’
    .

     

    The “backend”project output:

    [INFO] [10/05/2016 07:21:50.023] [main] [akka.remote.Remoting] Starting remoting
    [INFO] [10/05/2016 07:21:50.338] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://ClusterSystem@127.0.0.1:2552]
    [INFO] [10/05/2016 07:21:50.353] [main] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] – Starting up…
    [INFO] [10/05/2016 07:21:50.430] [main] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] – Registered cluster JMX MBean [akka:type=Cluster]
    [INFO] [10/05/2016 07:21:50.430] [main] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] – Started up successfully
    [INFO] [10/05/2016 07:21:50.437] [ClusterSystem-akka.actor.default-dispatcher-6] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] – Metrics will be retreived from MBeans, and may be incorrect on some platforms. To increase metric accuracy add the ‘sigar.jar’ to the classpath and the appropriate platform-specific native libary to ‘java.library.path’. Reason: java.lang.ClassNotFoundException: org.hyperic.sigar.Sigar
    [INFO] [10/05/2016 07:21:50.441] [ClusterSystem-akka.actor.default-dispatcher-6] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] – Metrics collection has started successfully
    [INFO] [10/05/2016 07:21:50.977] [ClusterSystem-akka.actor.default-dispatcher-2] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] – Welcome from [akka.tcp://ClusterSystem@127.0.0.1:2551]
    [WARN] [10/05/2016 07:21:51.289] [ClusterSystem-akka.remote.default-remote-dispatcher-15] [akka.serialization.Serialization(akka://ClusterSystem)] Using the default Java serializer for class [sample.cluster.transformation.BackendRegistration$] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting ‘akka.actor.warn-about-java-serializer-usage’
    [WARN] [10/05/2016 07:22:05.651] [ClusterSystem-akka.remote.default-remote-dispatcher-7] [akka.serialization.Serialization(akka://ClusterSystem)] Using the default Java serializer for class [sample.cluster.transformation.TransformationJob] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting ‘akka.actor.warn-about-java-serializer-usage’
    Backend has transformed the incoming job text of ‘hello-1’ into ‘HELLO-1’
    [WARN] [10/05/2016 07:22:05.677] [ClusterSystem-akka.remote.default-remote-dispatcher-15] [akka.serialization.Serialization(akka://ClusterSystem)] Using the default Java serializer for class [sample.cluster.transformation.TransformationResult] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting ‘akka.actor.warn-about-java-serializer-usage’
    Backend has transformed the incoming job text of ‘hello-2’ into ‘HELLO-2’
    Backend has transformed the incoming job text of ‘hello-3’ into ‘HELLO-3’
    Backend has transformed the incoming job text of ‘hello-4’ into ‘HELLO-4’

     

    Nat or Docker Considerations

    Akka clustering does not work transparently with Network Address Translation, Load Balancers, or in Docker containers. If this is your case you may need to further configure Akka as described here :

    http://doc.akka.io/docs/akka/snapshot/scala/remoting.html#remote-configuration-nat

     

     

    Where Can I Find The Code Examples?

    I will be augmenting this GitHub repo with the example projects as I move through this series

    https://github.com/sachabarber/SachaBarber.AkkaExamples

    Akka : remoting

    It has been a while since I wrote a post, the reason for this is actually this post.

    I would consider remoting/clustering to be some of the more advanced stuff you could do with Akka. That said this and the next post will outline all of this good stuff for you, and by the end of this post I would hope to have demonstrated enough for you guys to go off and write Akka remoting/clustered apps.

    I have decided to split the remoting and clustering stuff into 2 posts, to make it more focused and digestible. I think this is the right thing to do.

     

    A Note About All The Demos In This Topic

    I wanted the demos in this section to be as close to real life as possible. The official akka examples tend to have a single process. Which I personally think is quite confusing when you are trying to deal with quite hard concepts. As such I decided to go with multi process projects to demonstrate things. I do however only have 1 laptop, so they are hosted on the same node, but they are separate processes/JVMs.

    I am hoping by doing this it will make the learning process easier, as it is closer to what you would do in real life rather than have 1 main method that spawns an entire cluster. You just would not have that in real life.

     

    What Is Akka Remoting

    If you have ever used RMI in Java or Remoting/WCF in C# you can kind of think of Akka remoting as something similar to that. Where there is the ability to call a remote objects method as is it were local. It is essentially peer-to-peer.

    Obviously in Akkas case the remote object is actually an Actor, and you will not actually be calling a method at all,but will instead by treating the remote actor just like any other actor where you simply pass messages to it, and the remote actor will work just like any other actor where it will receive the message and act on it accordingly.

    This is actually quite unique actually, I have work with Java Remoting and also C# Remoting, and done a lot with .NET WCF. What all of these had in common was that there was some code voodoo that you had to do, where the difference between working with a local object and working with a remote object required a fair bit of code, be it remoting channels, proxies etc etc

    In Akka there is literally no change in coding style to work with remoting, it is completely configuration driven. This is quite nice.

    Akkas Remoting Interaction Models

     Akka supports 2 ways of using remoting

    • Lookup : Where we use actorSelection to lookup an already running remote actor
    • Creation : Where an actor will be created on the remote node

    We will be looking at both these approaches

    Requirements

    As I have stated on numerous occasions I have chosen to use SBT. As such this is my SBT file dependencies section for both these Remoting examples.

    lazy val AllLibraryDependencies =
      Seq(
        "com.typesafe.akka" %% "akka-actor" % "2.4.8",
        "com.typesafe.akka" %% "akka-remote" % "2.4.8"
      )
    

    It can be seen that the crucial dependency is akka-remote library

     

    Remote Selection

    As stated above “Remote Selection” will try and use actorSelection to look up a remote actor. The remote actor IS expected to be available and running.

    In this example there will be 2 projects configured in the SBT file

    • Remote : The remote actor, that is expected to be running before the local actor tries to communicate with it
    • Local : The local actor that will call the remote

    Here is the complete SBT file for this section

    import sbt._
    import sbt.Keys._
    
    
    lazy val allResolvers = Seq(
      "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/",
      "Akka Snapshot Repository" at "http://repo.akka.io/snapshots/"
    )
    
    lazy val AllLibraryDependencies =
      Seq(
        "com.typesafe.akka" %% "akka-actor" % "2.4.8",
        "com.typesafe.akka" %% "akka-remote" % "2.4.8"
      )
    
    
    lazy val commonSettings = Seq(
      name := "AkkaRemoting",
      version := "1.0",
      scalaVersion := "2.11.8",
      resolvers := allResolvers,
      libraryDependencies := AllLibraryDependencies
    )
    
    
    lazy val remote = (project in file("remote")).
      settings(commonSettings: _*).
      settings(
        // other settings
      )
    
    lazy val local = (project in file("local")).
      settings(commonSettings: _*).
      settings(
        // other settings
      )
    
    
    
    

    This simply creates 2 projects for us, Remote and Local.

    Remote Project

    Now that we have the relevant projects in place, lets talk about how we expose a remote actor for selection.

    We must ensure that the remote actor is available for selection, which requires the use of an IP address and a port.

    Here is how we do this

    akka {
      actor {
        provider = "akka.remote.RemoteActorRefProvider"
      }
      remote {
        enabled-transports = ["akka.remote.netty.tcp"]
        netty.tcp {
          hostname = "127.0.0.1"
          port = 4444
        }
        log-sent-messages = on
        log-received-messages = on
      }
    }
    

    Also note the akka.actor.provider is set to

    akka.remote.RemoteActorRefProvider
    

    Ok so now we have the ability to expose the remote actor on a IP address and port, lets have a look at the remote actor code.

    Here is the remote actor itself

    import akka.actor.Actor
    
    class RemoteActor extends Actor {
      def receive = {
        case msg: String =>
          println(s"RemoteActor received message '$msg'")
          sender ! "Hello from the RemoteActor"
      }
    }
    

    Nothing too special, we simply receive a string message and send a response string message back to the sender (the local actor would be the sender in this case)

    And here is the main method that drives the remote project

    import akka.actor.{Props, ActorSystem}
    
    import scala.io.StdIn
    
    object RemoteDemo extends App  {
      val system = ActorSystem("RemoteDemoSystem")
      val remoteActor = system.actorOf(Props[RemoteActor], name = "RemoteActor")
      remoteActor ! "The RemoteActor is alive"
      StdIn.readLine()
      system.terminate()
      StdIn.readLine()
    }
    

    Again pretty standard stuff, no voodoo here

    And that is all there is to the remote side of the “remote selection” remoting version. Lets now turn our attention to the local side.

    Local Project

    So far we have a remote actor which is configured to up and running at 127.0.0.1:4444.

    We now need to open up the local side of things. This is done using the following configuration.Notice the port is a different port from the already in use 4444. Obviously if you host these actors on physically different boxes there would be nothing to stop you using port 4444 again, but from a sanity point of view, I find it is better to not do that.

    akka {
      actor {
        provider = "akka.remote.RemoteActorRefProvider"
      }
      remote {
        enabled-transports = ["akka.remote.netty.tcp"]
        netty.tcp {
          hostname = "127.0.0.1"
          port = 0
        }
      }
    }
    

    Plain Selection

    We can make use of plain old actor selection to select any actor by a path of our chosing. Where we may use the resolveOne (if we expect to only match one actor, remember we can use wildcards so there are times we may match more than one) to give us a ActorRef.

    context.actorSelection(path).resolveOne()
    

    When we use resolveOne() we would get a Future[ActorRef] that we can use in any of the normal ways we would handle and work with Futute[T]. I have chosen to use a for comprehension to capture the result of the ActorRef of the more actor. I also monitor the remote actor using context.watch such that if it terminates we will see a Terminated message and can shutdown the local actor system.

    We also make use of the become (see the state machines post for more info on that) to swap out the message loop for the local actor, so work differently once we have a remote ActorRef.

    Once we have an ActorRef representing the remote actor it is pretty standard stuff where we just send messages to the remote actor ref using the ActorRef that represents it.

    Here is the entire code for the plain actor selection approach of dealing with a remote actor.

    import java.util.concurrent.atomic.AtomicInteger
    import akka.actor.{Terminated, ActorRef, ReceiveTimeout, Actor}
    import akka.util.Timeout
    import scala.concurrent.duration._
    import scala.concurrent.ExecutionContext.Implicits.global
    
    case object Init
    
    class LocalActorUsingPlainSelection extends Actor {
    
      val path = "akka.tcp://RemoteDemoSystem@127.0.0.1:4444/user/RemoteActor"
      val atomicInteger = new AtomicInteger();
      context.setReceiveTimeout(3 seconds)
    
      def receive = identifying
    
      def identifying: Receive = {
        case Init => {
          implicit val resolveTimeout = Timeout(5 seconds)
          for (ref : ActorRef <- context.actorSelection(path).resolveOne()) {
            println("Resolved remote actor ref using Selection")
            context.watch(ref)
            context.become(active(ref))
            context.setReceiveTimeout(Duration.Undefined)
            self ! Start
          }
        }
        case ReceiveTimeout => println("timeout")
      }
    
      def active(actor: ActorRef): Receive = {
        case Start =>
          actor ! "Hello from the LocalActorUsingPlainSelection"
        case msg: String =>
          println(s"LocalActorUsingPlainSelection received message: '$msg'")
          if (atomicInteger.get() < 5) {
            sender ! "Hello back to you"
            atomicInteger.getAndAdd(1)
          }
        case Terminated(`actor`) =>
          println("Receiver terminated")
          context.system.terminate()
      }
    }
    

     

    Using Identity Messages

    Another approach that can be taken rather than relying on straight actor selection is by using some special Akka messages, namely Identify and ActorIdentity.

    The idea is that we still use actorSelection for a given path, but rather than using resolveOne we sent the send the ActorSelection a special Identify message. The actor that was chosen by the ActorSelection should see this Identify message and should respond with a ActorIdentity message.

    As this point the local actor can simply listen for ActorIdentity messages and when it sees one, it can test this messages correlationId to see if it matches the requested path, if it does you know that is the correct actor and you can then use the ActorRef of the ActorIdentity message.

    As in the previous example we also make use of the become (see the state machines post for more info on that) to swap out the message loop for the local actor, so work differently once we have a remote ActorRef.

    Once we have an ActorRef representing the remote actor it is pretty standard stuff where we just send messages to the remote actor ref using the ActorRef that represents it.

    Here is the entire code for the Identity actor approach

    import java.util.concurrent.atomic.AtomicInteger
    import akka.actor._
    import scala.concurrent.duration._
    
    
    case object Start
    
    
    class LocalActorUsingIdentity extends Actor {
    
      val path = "akka.tcp://RemoteDemoSystem@127.0.0.1:4444/user/RemoteActor"
      val atomicInteger = new AtomicInteger();
      context.setReceiveTimeout(3 seconds)
      sendIdentifyRequest()
    
      def receive = identifying
    
      def sendIdentifyRequest(): Unit =
        context.actorSelection(path) ! Identify(path)
    
      def identifying: Receive = {
        case identity : ActorIdentity =>
          if(identity.correlationId.equals(path)) {
            identity.ref match {
              case Some(remoteRef) => {
                context.watch(remoteRef)
                context.become(active(remoteRef))
                context.setReceiveTimeout(Duration.Undefined)
                self ! Start
              }
              case None => println(s"Remote actor not available: $path")
            }
          }
        case ReceiveTimeout => sendIdentifyRequest()
      }
    
      def active(actor: ActorRef): Receive = {
        case Start =>
          actor ! "Hello from the LocalActorUsingIdentity"
        case msg: String =>
          println(s"LocalActorUsingIdentity received message: '$msg'")
          if (atomicInteger.get() < 5) {
            sender ! "Hello back to you"
            atomicInteger.getAndAdd(1)
          }
        case Terminated(`actor`) =>
          println("Receiver terminated")
          context.system.terminate()
      }
    }
    

    How do I Run The Demo

    You will need to ensure that you run the following 2 projects in this order:

    • Remote
    • Local

    Once you run the 2 projects you should see some output like this

    The Remote project output:

    [INFO] [10/03/2016 07:02:54.282] [main] [akka.remote.Remoting] Starting remoting
    [INFO] [10/03/2016 07:02:54.842] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://RemoteDemoSystem@127.0.0.1:4444]
    [INFO] [10/03/2016 07:02:54.844] [main] [akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://RemoteDemoSystem@127.0.0.1:4444]
    RemoteActor received message ‘The RemoteActor is alive’
    [INFO] [10/03/2016 07:02:54.867] [RemoteDemoSystem-akka.actor.default-dispatcher-15] [akka://RemoteDemoSystem/deadLetters]
    Message [java.lang.String] from Actor[akka://RemoteDemoSystem/user/RemoteActor#-109465353] to Actor[akka://RemoteDemoSystem/deadLetters]
    was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings
    ‘akka.log-dead-letters’ and ‘akka.log-dead-letters-during-shutdown’.
    RemoteActor received message ‘Hello from the LocalActorUsingPlainSelection’
    RemoteActor received message ‘Hello back to you’
    RemoteActor received message ‘Hello back to you’
    RemoteActor received message ‘Hello back to you’
    RemoteActor received message ‘Hello back to you’
    RemoteActor received message ‘Hello back to you’

     

    The Local project output:

    [INFO] [10/03/2016 07:03:09.489] [main] [akka.remote.Remoting] Starting remoting
    [INFO] [10/03/2016 07:03:09.961] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://LocalDemoSystem@127.0.0.1:64945]
    [INFO] [10/03/2016 07:03:09.963] [main] [akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://LocalDemoSystem@127.0.0.1:64945]
    Resolved remote actor ref using Selection
    LocalActorUsingPlainSelection received message: ‘Hello from the RemoteActor’
    LocalActorUsingPlainSelection received message: ‘Hello from the RemoteActor’
    LocalActorUsingPlainSelection received message: ‘Hello from the RemoteActor’
    LocalActorUsingPlainSelection received message: ‘Hello from the RemoteActor’
    LocalActorUsingPlainSelection received message: ‘Hello from the RemoteActor’
    LocalActorUsingPlainSelection received message: ‘Hello from the RemoteActor’

     

     

     

    Remote Creation

    Just as we did with remote selection, remote creation shall be split into a remote project and a local project, however since this time the local project must know about the type of the more actor to create it in the first place we introduce a common project which both the remote and local depend on.

    In this example there will be 3 projects configured in the SBT file

    • Remote : The remote actor, that is expected to be created by the local actor
    • Common : The common files that both Local/Remote projects depend on
    • Local : The local actor that will create and call the remote actor

    Here is the complete SBT file for this section

    import sbt._
    import sbt.Keys._
    
    
    lazy val allResolvers = Seq(
      "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/",
      "Akka Snapshot Repository" at "http://repo.akka.io/snapshots/"
    )
    
    lazy val AllLibraryDependencies =
      Seq(
        "com.typesafe.akka" %% "akka-actor" % "2.4.8",
        "com.typesafe.akka" %% "akka-remote" % "2.4.8"
      )
    
    
    lazy val commonSettings = Seq(
      version := "1.0",
      scalaVersion := "2.11.8",
      resolvers := allResolvers,
      libraryDependencies := AllLibraryDependencies
    )
    
    
    lazy val root =(project in file(".")).
      settings(commonSettings: _*).
      settings(
        name := "Base"
      )
      .aggregate(common, remote)
      .dependsOn(common, remote)
    
    lazy val common = (project in file("common")).
      settings(commonSettings: _*).
      settings(
        name := "common"
      )
    
    lazy val remote = (project in file("remote")).
      settings(commonSettings: _*).
      settings(
        name := "remote"
      )
      .aggregate(common)
      .dependsOn(common)
    

    It can be seen that both the local/remote will aggregate/depend on the common project. This is standard SBT stuff so I will not go into that.

    So now that we understand a bit more about the SBT side of things lets focus on the remote side of things.

    This may seem odd since we are expecting the local actor to create the remote aren’t we?

    Well yes we are but the remote actor system must still be available prior to start/deploy and actor in it via the local system.

    So it still makes sense to examine the remote side of things first.

    Remote Project

    Now that we have the relevant projects in place, lets talk about how we expose a remote actor for creation.

    We must ensure that the remote system is available for creation requests, which requires the use of an IP address and a port.

    Here is how we do this

    akka {
    
      actor {
        provider = "akka.remote.RemoteActorRefProvider"
      }
    
      remote {
        netty.tcp {
          hostname = "127.0.0.1",
          # LISTEN on tcp port 2552
          port=2552
        }
      }
    
    }
    

    I also mentioned that the remote actor system MUST be started to allow remote creation to work, as such the entire codebase for the remote end of thing (excluding the actor remote actor which gets created by the local side of thing) is shown below

    import com.typesafe.config.ConfigFactory
    import akka.actor.ActorSystem
    
    object CalculatorApplication {
    
      def main(args: Array[String]): Unit = {
        startRemoteWorkerSystem()
      }
    
      def startRemoteWorkerSystem(): Unit = {
        ActorSystem("CalculatorWorkerSystem", ConfigFactory.load("calculator"))
        println("Started CalculatorWorkerSystem")
      }
    
    }
    

    All that is happening here is that the remote actor system gets created.

    Local Project

    Most of the hard work is done in this project. As it is the local side of things that is responsible for creating and deploying the remote actor in the remote actor system, before it can then make use of it.

    Lets start with the deployment of the remote actor from the local side.

    Firstly we need this configuration to allow this happen

    akka {
    
      actor {
        provider = "akka.remote.RemoteActorRefProvider",
        deployment {
          "/creationActor/*" {
            remote = "akka.tcp://CalculatorWorkerSystem@127.0.0.1:2552"
          }
        }
      }
    
      remote {
        netty.tcp {
          hostname = "127.0.0.1",
          port=2554
        }
      }
    
    }
    

    If you look carefully at this configuration file and the one in the remote end you will see that the ip address/poprt/actor system name used within the deployment section all match. This is how the local actor system is able to create and deploy an actor to the remote actor system (which must be running prior to the local actor system trying to deploy a remote actor to it)

    So now that we have seen this config, lets see how it is used by the local

    import sample.remote.calculator.{Divide, Multiply, CreationActor}
    import scala.concurrent.duration._
    import com.typesafe.config.ConfigFactory
    import scala.util.Random
    import akka.actor.ActorSystem
    import akka.actor.Props
    
    object CreationApplication {
    
      def main(args: Array[String]): Unit = {
        startRemoteCreationSystem()
      }
    
      def startRemoteCreationSystem(): Unit = {
        val system =
          ActorSystem("CreationSystem", ConfigFactory.load("remotecreation"))
        val actor = system.actorOf(Props[CreationActor],
          name = "creationActor")
    
        println("Started CreationSystem")
        import system.dispatcher
        system.scheduler.schedule(1.second, 1.second) {
          if (Random.nextInt(100) % 2 == 0)
            actor ! Multiply(Random.nextInt(20), Random.nextInt(20))
          else
            actor ! Divide(Random.nextInt(10000), (Random.nextInt(99) + 1))
        }
      }
    
    }
    

    It can be seen that the first thing we try and do is try and create the remote actor (CreationActor) using the config above. If this all works we will end up with a CreationActor being created in the already running remote actor system. This CreationActor can then be used just like any other actor.

    For completeness here is the code of the CreationActor

    package sample.remote.calculator
    
    import akka.actor.Actor
    import akka.actor.ActorRef
    import akka.actor.Props
    
    class CreationActor extends Actor {
    
      def receive = {
        case op: MathOp =>
          val calculator = context.actorOf(Props[CalculatorActor])
          calculator ! op
        case result: MathResult => result match {
          case MultiplicationResult(n1, n2, r) =>
            printf("Mul result: %d * %d = %d\n", n1, n2, r)
            context.stop(sender())
          case DivisionResult(n1, n2, r) =>
            printf("Div result: %.0f / %d = %.2f\n", n1, n2, r)
            context.stop(sender())
        }
      }
    }
    
    It can be seen that the CreationActor above also creates another actor called CalculatorActor which does the real work. Lets see the code for that one
    package sample.remote.calculator
    
    import akka.actor.Props
    import akka.actor.Actor
    
    class CalculatorActor extends Actor {
      def receive = {
        case Add(n1, n2) =>
          println("Calculating %d + %d".format(n1, n2))
          sender() ! AddResult(n1, n2, n1 + n2)
        case Subtract(n1, n2) =>
          println("Calculating %d - %d".format(n1, n2))
          sender() ! SubtractResult(n1, n2, n1 - n2)
        case Multiply(n1, n2) =>
          println("Calculating %d * %d".format(n1, n2))
          sender() ! MultiplicationResult(n1, n2, n1 * n2)
        case Divide(n1, n2) =>
          println("Calculating %.0f / %d".format(n1, n2))
          sender() ! DivisionResult(n1, n2, n1 / n2)
      }
    }
    
    

    Nothing special there really, its just a standard actor

    So we now have a complete pipeline.

    How do I Run The Demo

    You will need to ensure that you run the following 2 projects in this order:

    • Remote
    • Root

    Once you run the 2 projects you should see some output like this

    The Remote project output:

    [INFO] [10/03/2016 07:30:58.763] [main] [akka.remote.Remoting] Starting remoting
    [INFO] [10/03/2016 07:30:59.235] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://CalculatorWorkerSystem@127.0.0.1:2552]
    [INFO] [10/03/2016 07:30:59.237] [main] [akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://CalculatorWorkerSystem@127.0.0.1:2552]
    Started CalculatorWorkerSystem
    Calculating 6 * 15
    [WARN] [10/03/2016 07:31:10.988] [CalculatorWorkerSystem-akka.remote.default-remote-dispatcher-6] [akka.serialization.Serialization(akka://CalculatorWorkerSystem)] Using the default Java serializer for class [sample.remote.calculator.MultiplicationResult] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting ‘akka.actor.warn-about-java-serializer-usage’
    Calculating 1346 / 82
    [WARN] [10/03/2016 07:31:11.586] [CalculatorWorkerSystem-akka.remote.default-remote-dispatcher-6] [akka.serialization.Serialization(akka://CalculatorWorkerSystem)] Using the default Java serializer for class [sample.remote.calculator.DivisionResult] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting ‘akka.actor.warn-about-java-serializer-usage’
    Calculating 2417 / 31
    Calculating 229 / 66
    Calculating 9966 / 43
    Calculating 4 * 12
    Calculating 9 * 5
    Calculating 1505 / 91

    The Root project output:

    [INFO] [10/03/2016 07:31:08.849] [main] [akka.remote.Remoting] Starting remoting
    [INFO] [10/03/2016 07:31:09.470] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://CreationSystem@127.0.0.1:2554]
    [INFO] [10/03/2016 07:31:09.472] [main] [akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://CreationSystem@127.0.0.1:2554]
    Started CreationSystem
    [WARN] [10/03/2016 07:31:10.808] [CreationSystem-akka.remote.default-remote-dispatcher-5] [akka.serialization.Serialization(akka://CreationSystem)] Using the default Java serializer for class [com.typesafe.config.impl.SimpleConfig] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting ‘akka.actor.warn-about-java-serializer-usage’
    [WARN] [10/03/2016 07:31:10.848] [CreationSystem-akka.remote.default-remote-dispatcher-5] [akka.serialization.Serialization(akka://CreationSystem)] Using the default Java serializer for class [sample.remote.calculator.Multiply] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting ‘akka.actor.warn-about-java-serializer-usage’
    Mul result: 6 * 15 = 90
    [WARN] [10/03/2016 07:31:11.559] [CreationSystem-akka.remote.default-remote-dispatcher-6] [akka.serialization.Serialization(akka://CreationSystem)] Using the default Java serializer for class [sample.remote.calculator.Divide] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting ‘akka.actor.warn-about-java-serializer-usage’
    Div result: 1346 / 82 = 16.41
    Div result: 2417 / 31 = 77.97
    Div result: 229 / 66 = 3.47
    Div result: 9966 / 43 = 231.77
    Mul result: 4 * 12 = 48
    Mul result: 9 * 5 = 45
    Div result: 1505 / 91 = 16.54
    Mul result: 7 * 4 = 28
    Div result: 1797 / 95 = 18.92
    Mul result: 12 * 17 = 204
    Div result: 2998 / 72 = 41.64
    Div result: 1157 / 98 = 11.81
    Div result: 1735 / 22 = 78.86
    Mul result: 4 * 19 = 76
    Div result: 6257 / 51 = 122.69
    Mul result: 14 * 4 = 56
    Div result: 587 / 27 = 21.74
    Div result: 2528 / 98 = 25.80
    Mul result: 9 * 16 = 144
    Mul result: 13 * 10 = 130

     

     

     

     

    Nat or Docker Considerations

    Akka Remoting does not work transparently with Network Address Translation, Load Balancers, or in Docker containers. If this is your case you may need to further configure Akka as described here :

    http://doc.akka.io/docs/akka/snapshot/scala/remoting.html#remote-configuration-nat

     

     

     

    Where Can I Find The Code Examples?

    I will be augmenting this GitHub repo with the example projects as I move through this series

    https://github.com/sachabarber/SachaBarber.AkkaExamples

    AKKA : TESTKIT

    So the journey continues, we have covered a fair bit of ground, but there is still a long way to go yet, with many exciting features of Akka yet to cover.

    Bit before we get on to some of the more advanced stuff, I thought it would be a good idea to take a small detour and look at how you can test Akka actor systems.

    TestKit

    Akka comes with a completely separate module for testing which you MUST include, this can be obtained using the following SBT settings

    name := "Testing"
    
    version := "1.0"
    
    scalaVersion := "2.11.8"
    
    libraryDependencies ++= Seq(
      "com.typesafe.akka"   %%    "akka-actor"    %   "2.4.8",
      "com.typesafe.akka"   %%    "akka-testkit"  %   "2.4.8"   %   "test",
      "org.scalatest"       %%    "scalatest"     %   "2.2.5"   %   "test"
    )
    
    

    I have chosen to use scalatest, but you could use other popular testing frameworks such as specs2.

     

     

    Testing Actors

    The following sub sections will outline how Akka allows the testing of actors

     

    The built in ‘testActor’

    The official Akka testkit docs do a great job of explaining the testActor and why there is a need for a test actor.

    Testing the business logic inside Actor classes can be divided into two parts: first, each atomic operation must work in isolation, then sequences of incoming events must be processed correctly, even in the presence of some possible variability in the ordering of events. The former is the primary use case for single-threaded unit testing, while the latter can only be verified in integration tests.

    Normally, the ActorRef shields the underlying Actor instance from the outside, the only communications channel is the actor’s mailbox. This restriction is an impediment to unit testing, which led to the inception of the TestActorRef.

    http://doc.akka.io/docs/akka/snapshot/scala/testing.html#Using_Multiple_Probe_Actors

    It is by using this testActor that we are able to test the individual operations of an actor under test.

    You essentially instantiate the TestActorRef passing it the real actor you would like to test. The test actor then allows you to send messages which are forwarded to the contained real actor that you are attempting to test.

    CallingThreadDispatcher/TestActorRef

    As Akka is an asynchronous beast by nature, and uses the concept of Dispatchers to conduct the dispatching of messages. We have also seen that the message loop (receive) can be replaced with become/unbecome, all of which contributes to the overall behviour of the actor being quite hard to test.

    Akka comes with a special actor called TestActorRef.Which is a special actor that comes with the Akka TestKit.

    It should come as no surprise that this TestActorRef aslo makes use of a Dispatcher. But what makes this actor more suited to testing is that it uses a specialized testing Dispatcher, which makes testing the asynchronous code easier to test.

    The specialized dispatcher is called CallingThreadDispatcher. As the name suggests it uses the current thread to deal with the message dispatching.

    This makes thing easier of that there is no doubt. As stated you don’t really need to do anything other than use the Akka TestKit TestActorRef.

     

    Anatomy Of An Actor Testkit Test

    This is what a basic skeleton looks like when using the Akka TestKit (please note this is for ScalaTest)

    import akka.actor.{Props, ActorSystem}
    import akka.util.Timeout
    import org.scalatest._
    import akka.testkit.{ImplicitSender, TestKit, TestActorRef}
    import scala.concurrent.duration._
    import scala.concurrent.Await
    import akka.pattern.ask
    
    import scala.util.Success
    
    
    class HelloActorTests
      extends TestKit(ActorSystem("MySpec"))
      with ImplicitSender
      with WordSpecLike
      with BeforeAndAfterAll
      with Matchers {
    
      override def afterAll {
        TestKit.shutdownActorSystem(system)
      }
    
    
    
    }
    
    
    

    There are a couple of things to note there, so lets go through them

    • Extending the akka TestKit trait. allows us to get all the good pre-canned assertions that allow us to assert facts about our actors
    • Extending the akka ImplicitSender trait allow us to have an actual sender which would be set to the test suits self actor as the sender.

     

    The Built In Assertions

    The Akka TestKit comes with many useful assertions which you can see here

    expectMsg[T](d: Duration, msg: T): T
    expectMsgPF[T](d: Duration)(pf: PartialFunction[Any, T]): T
    expectMsgClass[T](d: Duration, c: Class[T]): T
    expectMsgType[T: Manifest](d: Duration)
    expectMsgAnyOf[T](d: Duration, obj: T*): T
    expectMsgAnyClassOf[T](d: Duration, obj: Class[_ <: T]*): T
    expectMsgAllOf[T](d: Duration, obj: T*): Seq[T]
    expectMsgAllClassOf[T](d: Duration, c: Class[_ <: T]*): Seq[T]
    expectMsgAllConformingOf[T](d: Duration, c: Class[_ <: T]*): Seq[T]
    expectNoMsg(d: Duration)
    receiveN(n: Int, d: Duration): Seq[AnyRef]
    fishForMessage(max: Duration, hint: String)
    	(pf: PartialFunction[Any, Boolean]): Any
    receiveOne(d: Duration): AnyRef
    receiveWhile[T](max: Duration, idle: Duration, messages: Int)
    	(pf: PartialFunction[Any, T]): Seq[T]
    awaitCond(p: => Boolean, max: Duration, interval: Duration)
    awaitAssert(a: => Any, max: Duration, interval: Duration)
    ignoreMsg(pf: PartialFunction[AnyRef, Boolean])
    within[T](min: FiniteDuration, max: FiniteDuration)
    	(f: ⇒ T): T
    

    You can read more about these at the official documentation: http://doc.akka.io/docs/akka/snapshot/scala/testing.html#Built-In_Assertions

     

    Demo : HelloActor That We Will Test

    Lets assume we have this actor which we would like to test for the next 3 points

    import akka.actor.Actor
    
    
    class HelloActor extends Actor {
      def receive = {
        case "hello" => sender ! "hello world"
        case _       => throw new IllegalArgumentException("bad juju")
      }
    }
    
    
    Sending Messages

    So we have the HelloActor above, and we would like to send a message to it, and assert that the sender (the test suite testActor) gets a message back.

    Here is how we might do that.

    import akka.actor.{Props, ActorSystem}
    import akka.util.Timeout
    import org.scalatest._
    import akka.testkit.{ImplicitSender, TestKit, TestActorRef}
    import scala.concurrent.duration._
    import scala.concurrent.Await
    import akka.pattern.ask
    
    import scala.util.Success
    
    
    class HelloActorTests
      extends TestKit(ActorSystem("MySpec"))
      with ImplicitSender
      with WordSpecLike
      with BeforeAndAfterAll
      with Matchers {
    
      override def afterAll {
        TestKit.shutdownActorSystem(system)
      }
    
      "An HelloActor using implicit sender " must {
        "send back 'hello world'" in {
          val helloActor = system.actorOf(Props[HelloActor], name = "helloActor")
          helloActor ! "hello"
          expectMsg("hello world")
        }
      }
    }
    
    
    

     

    This is thanks to the fact that we used the ImplicitSender trait, and we used the available akka TestKit assertions

     

    Expecting A Response

    Another thing that is quite common is to expect a response from an actor that we have asked a result from using the Akka ask pattern (which returns a Future[T] to represent the eventual result)

    Here is how we might write a test for this

    import akka.actor.{Props, ActorSystem}
    import akka.util.Timeout
    import org.scalatest._
    import akka.testkit.{ImplicitSender, TestKit, TestActorRef}
    import scala.concurrent.duration._
    import scala.concurrent.Await
    import akka.pattern.ask
    
    import scala.util.Success
    
    
    class HelloActorTests
      extends TestKit(ActorSystem("MySpec"))
      with ImplicitSender
      with WordSpecLike
      with BeforeAndAfterAll
      with Matchers {
    
      override def afterAll {
        TestKit.shutdownActorSystem(system)
      }
    
      "An HelloActor using TestActorRef " must {
        "send back 'hello world' when asked" in {
          implicit val timeout = Timeout(5 seconds)
          val helloActorRef = TestActorRef(new HelloActor)
          val future = helloActorRef ? "hello"
          val Success(result: String) = future.value.get
          result should be("hello world")
        }
      }
    }
    

    It can be seen that we make use of the TestActorRef (which we discussed above the one that uses the CallingThreadDispatcher), as the actor that we use to wrap (for want of a better word) the actual actor we wish to test.

     

    Expecting Exceptions

    Another completely plausible thing to want to do is test for exceptions that may be thrown. It can be seen in the HelloActor that we are trying to test that it will throw an IllegalArgumentException should it see a message it doesn’t handle.

    So how do we test that?

    We use the inbuilt intercept function to allow us to catch the exception

    import akka.actor.{Props, ActorSystem}
    import akka.util.Timeout
    import org.scalatest._
    import akka.testkit.{ImplicitSender, TestKit, TestActorRef}
    import scala.concurrent.duration._
    import scala.concurrent.Await
    import akka.pattern.ask
    
    import scala.util.Success
    
    
    class HelloActorTests
      extends TestKit(ActorSystem("MySpec"))
      with ImplicitSender
      with WordSpecLike
      with BeforeAndAfterAll
      with Matchers {
    
      override def afterAll {
        TestKit.shutdownActorSystem(system)
      }
    
      "An HelloActor using TestActorRef " must {
        "should throw IllegalArgumentException when sent unhandled message" in {
          val actorRef = TestActorRef(new HelloActor)
          intercept[IllegalArgumentException] { actorRef.receive("should blow up") }
        }
      }
    }
    
    
    

     

     

    Testing Finite State Machines

    Last time we looked at implementing finite state machines in Akka. One of the methods we use for that was using Akka FSM. Lets examine how we can test those using the TestKit.

    Lets assume we have this simple AkkaFSM example (based on the LightSwitch demo from the last article)

    import akka.actor.{Actor, ActorRef, FSM}
    import scala.concurrent.duration._
    import scala.collection._
    
    // received events
    final case class PowerOn()
    final case class PowerOff()
    
    // states
    sealed trait LightSwitchState
    case object On extends LightSwitchState
    case object Off extends LightSwitchState
    
    //data
    sealed trait LightSwitchData
    case object NoData extends LightSwitchData
    
    class LightSwitchActor extends FSM[LightSwitchState, LightSwitchData] {
    
      startWith(Off, NoData)
    
      when(Off) {
        case Event(PowerOn, _) =>
          goto(On) using NoData
      }
    
      when(On, stateTimeout = 1 second) {
        case Event(PowerOff, _) =>
          goto(Off) using NoData
        case Event(StateTimeout, _) =>
          println("'On' state timed out, moving to 'Off'")
          goto(Off) using NoData
      }
    
      whenUnhandled {
        case Event(e, s) =>
          log.warning("received unhandled request {} in state {}/{}", e, stateName, s)
          goto(Off) using NoData
      }
    
      onTransition {
        case Off -> On => println("Moved from Off to On")
        case On -> Off => println("Moved from On to Off")
      }
    
      initialize()
    }
    

    This example is fairly good as it only has 2 states On/Off. So it makes for quite a good simply example to showcase the testing.

    Another Special Test Actor

    To test FSMs there is yet another specialized actor which may only be used for testing FSMs. This actor is call TestFSMRef. Just like the TestActorRef you use the TestFSMRef to accept the actual FSM actor you are trying to test.

    Here is an example of that

     val fsm = TestFSMRef(new LightSwitchActor())
    

    The TestFSMRef comes with a whole host of useful methods, properties that can be used when testing FSMs. We will see some of them used below.

    Testing Initial State

    As we saw last time Akka FSM has the idea of an initialise() method that may be used to place the FSM in an initial state. So we should be able to test that.

    Here is how we can do that

    import akka.actor.{ActorSystem, Props}
    import akka.pattern.ask
    import akka.testkit.{ImplicitSender, TestActorRef, TestKit}
    import akka.util.Timeout
    import org.scalatest._
    import akka.testkit.TestFSMRef
    import akka.actor.FSM
    
    import scala.concurrent.duration._
    import scala.util.Success
    
    
    class LightSwitchFSMActorTests
      extends TestKit(ActorSystem("MySpec"))
      with ImplicitSender
      with WordSpecLike
      with BeforeAndAfterAll
      with Matchers {
    
      override def afterAll {
        TestKit.shutdownActorSystem(system)
      }
    
      "An LightSwitchActor " must {
        "start in the 'Off' state" in {
          val fsm = TestFSMRef(new LightSwitchActor())
          assert(fsm.stateName == Off)
          assert(fsm.stateData == NoData)
        }
      }
    }
    
    
    
    Testing State Move

    The TestFSMRef comes with the ability to move the underlying FSM actor into a new state using the setState method. Here is an example of how to use that:

    import akka.actor.{ActorSystem, Props}
    import akka.pattern.ask
    import akka.testkit.{ImplicitSender, TestActorRef, TestKit}
    import akka.util.Timeout
    import org.scalatest._
    import akka.testkit.TestFSMRef
    import akka.actor.FSM
    
    import scala.concurrent.duration._
    import scala.util.Success
    
    
    class LightSwitchFSMActorTests
      extends TestKit(ActorSystem("MySpec"))
      with ImplicitSender
      with WordSpecLike
      with BeforeAndAfterAll
      with Matchers {
    
      override def afterAll {
        TestKit.shutdownActorSystem(system)
      }
    
      "An LightSwitchActor that starts with 'Off' " must {
        "should transition to 'On' when told to by the test" in {
          val fsm = TestFSMRef(new LightSwitchActor())
          fsm.setState(stateName = On)
          assert(fsm.stateName == On)
          assert(fsm.stateData == NoData)
        }
      }
    }
    
    
    

    You can of course still send messages to the TestFSMRef which would instruct the underlying FSM actor to move to a new state.

    import akka.actor.{ActorSystem, Props}
    import akka.pattern.ask
    import akka.testkit.{ImplicitSender, TestActorRef, TestKit}
    import akka.util.Timeout
    import org.scalatest._
    import akka.testkit.TestFSMRef
    import akka.actor.FSM
    
    import scala.concurrent.duration._
    import scala.util.Success
    
    
    class LightSwitchFSMActorTests
      extends TestKit(ActorSystem("MySpec"))
      with ImplicitSender
      with WordSpecLike
      with BeforeAndAfterAll
      with Matchers {
    
      override def afterAll {
        TestKit.shutdownActorSystem(system)
      }
    
      "An LightSwitchActor that starts with 'Off' " must {
        "should transition to 'On' when sent a 'PowerOn' message" in {
          val fsm = TestFSMRef(new LightSwitchActor())
          fsm ! PowerOn
          assert(fsm.stateName == On)
          assert(fsm.stateData == NoData)
        }
      }
    }
    
    
    
     
    Testing StateTimeout

    Another thing that AkkaFSM supports is the notion of a StateTimeout. In the example FSM we are trying to test, if the FSM stays in the On state for more than 1 second it should automatically move to the Off state.

    So how do we test that?

    Here is how:

    import akka.actor.{ActorSystem, Props}
    import akka.pattern.ask
    import akka.testkit.{ImplicitSender, TestActorRef, TestKit}
    import akka.util.Timeout
    import org.scalatest._
    import akka.testkit.TestFSMRef
    import akka.actor.FSM
    
    import scala.concurrent.duration._
    import scala.util.Success
    
    
    class LightSwitchFSMActorTests
      extends TestKit(ActorSystem("MySpec"))
      with ImplicitSender
      with WordSpecLike
      with BeforeAndAfterAll
      with Matchers {
    
      override def afterAll {
        TestKit.shutdownActorSystem(system)
      }
    
      "An LightSwitchActor that stays 'On' for more than 1 second " must {
        "should transition to 'Off' thanks to the StateTimeout" in {
          val fsm = TestFSMRef(new LightSwitchActor())
          fsm ! PowerOn
          awaitCond(fsm.stateName == Off, 1200 milliseconds, 100 milliseconds)
        }
      }
    }
    
    
    

     

    Testing Using Probes

    So far we have been looking at testing a single actor that might reply to a single sender. Sometimes though we may need to test an enture suite of actors all working together. And due to the single threaded nature of the TestActorRef (thanks to the very useful CurrentThreadDispatcher), we may find it difficult to distinguish the incoming messages read.

    Akka TestKit provides yet another abstraction to deal with this, which is the idea of a concrete actor that you inject into the message flow. This concept is called a TestProbe.

    Lets assume we have this actor that replies to 2 actorRef.

    import akka.actor.{ActorRef, Actor}
    
    class DoubleSenderActor extends Actor {
      var dest1: ActorRef = _
      var dest2: ActorRef = _
      def receive = {
        case (d1: ActorRef, d2: ActorRef) =>
          dest1 = d1
          dest2 = d2
        case x =>
          dest1 ! x
          dest2 ! x
      }
    }
    

    We could test this code as follows using the TestProbe.

    import akka.actor.{ActorSystem, Props}
    import akka.pattern.ask
    import akka.testkit.{TestProbe, ImplicitSender, TestActorRef, TestKit}
    import akka.util.Timeout
    import org.scalatest._
    
    import scala.concurrent.duration._
    import scala.util.Success
    
    
    class DoubleSenderActorTestsUsingProbe
      extends TestKit(ActorSystem("MySpec"))
      with ImplicitSender
      with WordSpecLike
      with BeforeAndAfterAll
      with Matchers {
    
      override def afterAll {
        TestKit.shutdownActorSystem(system)
      }
    
      "An DoubleSenderActor that has 2 target ActorRef for sending messages to " must {
        "should send messages to both supplied 'TestProbe(s)'" in {
          val doubleSenderActor = system.actorOf(Props[DoubleSenderActor],
            name = "multiSenderActor")
          val probe1 = TestProbe()
          val probe2 = TestProbe()
          doubleSenderActor ! ((probe1.ref, probe2.ref))
          doubleSenderActor ! "hello"
          probe1.expectMsg(500 millis, "hello")
          probe2.expectMsg(500 millis, "hello")
        }
      }
    }
    
    
    

    It can be seen that the TestProbe comes with its own set of useful assertion methods. This is due to the fact that TestProbe inherits from the TestKit trait, and as such you can expect to find ALL the TestKit traits assertions available to use when using TestProbe objects.

     

     

    Where Can I Find The Code Examples?

    I will be augmenting this GitHub repo with the example projects as I move through this series

    https://github.com/sachabarber/SachaBarber.AkkaExamples

    Tagged

    akka : state machines

    In this post we will look at 2 ways you can write state machines with Akka. We will firstly examine the more primitive (but easily understandable) approach, and then look into the more sophisticated approach offered by AkkaFSM.

    What Is A State Machine?

    For those of you out there that do not know what a state machine is.

    This is what Wikipedia says about them

    A finite-state machine (FSM) or finite-state automaton (FSA, plural: automata), or simply a state machine, is a mathematical model of computation used to design both computer programs and sequential logic circuits. It is conceived as an abstract machine that can be in one of a finite number of states. The machine is in only one state at a time; the state it is in at any given time is called the current state. It can change from one state to another when initiated by a triggering event or condition; this is called a transition. A particular FSM is defined by a list of its states, and the triggering condition for each transition.

    https://en.wikipedia.org/wiki/Finite-state_machine

    This could be an example state machine for a coin operated barrier

    Akka supports swapping the standard message loop using become which is available via the context where  this is the standard signature of receive (the message loop)

    PartialFunction[Any, Unit]
    

    The newly applied message loops are maintained in a stack and may be pushed popped

    Become

    There are different ways of swapping out the message loop. Here is one such example

    import akka.actor.Actor
    
    class HotColdStateActor extends Actor {
    
      //need this for become/unbecome
      import context._
    
      def cold: Receive = {
        case "snow" => println("I am already cold!")
        case "sun" => becomeHot
      }
    
      def hot: Receive = {
        case "sun" => println("I am already hot!")
        case "snow" => becomeCold
      }
    
      def receive = {
        case "snow" => becomeCold
        case "sun" => becomeHot
      }
    
    
      private def becomeCold: Unit = {
        println("becoming cold")
        become(cold)
      }
    
      private def becomeHot: Unit = {
        println("becoming hot")
        become(hot)
      }
    }
    

    With this example we simply use become to push a new message loop, the latest become code is the current message loop.

    If we use the following demo code against this actor code

    import akka.actor._
    import scala.language.postfixOps
    import scala.io.StdIn
    
    object Demo extends App {
    
      RunBecomeUnbecomeStateDemo
    
      def RunHotColdStateDemo : Unit = {
        //create the actor system
        val system = ActorSystem("StateMachineSystem")
    
        val hotColdStateActor =
          system.actorOf(Props(classOf[HotColdStateActor]),
            "demo-HotColdStateActor")
    
    
        println("sending sun")
        hotColdStateActor ! "sun"
        //actors are async, so give it chance to get message
        //obviously we would not do this in prod code, its just
        //for the demo, to get the correct ordering for the print
        //statements
        Thread.sleep(1000)
    
        println("sending sun")
        hotColdStateActor ! "sun"
        Thread.sleep(1000)
    
        println("sending snow")
        hotColdStateActor ! "snow"
        Thread.sleep(1000)
    
        println("sending snow")
        hotColdStateActor ! "snow"
        Thread.sleep(1000)
    
        println("sending sun")
        hotColdStateActor ! "sun"
        Thread.sleep(1000)
    
        println("sending snow")
        hotColdStateActor ! "snow"
        Thread.sleep(1000)
    
        StdIn.readLine()
    
        //shutdown the actor system
        system.terminate()
    
        StdIn.readLine()
      }
    
    }
    

    We would see output like this

    image

     

    UnBecome

    The other way to swap out the message loop relies on having matching pairs of become/unbecome. Where the standard message loop is not replaced as such, but will use the last value on a stack of values as the message loop.

    Care must taken to ensure the amount of push (become) and pop (unbecome) operations match, otherwise memory leaks may occur. Which is why this is not the default behavior.

    Here is an example actor that uses the become/unbecome matched operations.

    import akka.actor.Actor
    
    class BecomeUnbecomeStateActor extends Actor {
    
      //need this for become/unbecome
      import context._
    
      def receive = {
        case "snow" =>
          println("saw snow, becoming")
          become({
            case "sun" =>
              println("saw sun, unbecoming")
              unbecome() // resets the latest 'become' (just for fun)
            case _ => println("Unknown message, state only likes sun")
          }, discardOld = false) // push on top instead of replace
    
        case _ => println("Unknown message, state only likes snow")
      }
    
    
    
    }
    

    I personally think this is harder to read, and manage.

    Here is some demo code to exercise this actor:

    import akka.actor._
    import scala.language.postfixOps
    import scala.io.StdIn
    
    object Demo extends App {
    
      RunBecomeUnbecomeStateDemo
    
    
      def RunBecomeUnbecomeStateDemo : Unit = {
        //create the actor system
        val system = ActorSystem("StateMachineSystem")
    
        val becomeUnbecomeStateActor =
          system.actorOf(Props(classOf[BecomeUnbecomeStateActor]),
            "demo-BecomeUnbecomeStateActor")
    
        println("Sending snow")
        becomeUnbecomeStateActor ! "snow"
        //actors are async, so give it chance to get message
        Thread.sleep(1000)
    
        println("Sending snow")
        becomeUnbecomeStateActor ! "snow"
        Thread.sleep(1000)
    
        println("Sending sun")
        becomeUnbecomeStateActor ! "sun"
        Thread.sleep(1000)
    
        println("Sending sun")
        becomeUnbecomeStateActor ! "sun"
        Thread.sleep(1000)
    
        println("Sending snow")
        becomeUnbecomeStateActor ! "snow"
        Thread.sleep(1000)
    
        StdIn.readLine()
    
        //shutdown the actor system
        system.terminate()
    
        StdIn.readLine()
      }
    
    }
    

    Which when run should yield the following results

    image

     

    AkkaFSM

    While become/unbecome will get the job done. Akka comes with a much better alternative called Akka FSM

    Using Akka FSM you can not only handle states but also have state data, and add code against the movement from one state to the next, which as most people will know are known as “transitions”.

    When using Akka FSM you may see a state machine expressed using this sort of notation

    State(S) x Event(E) -> Actions (A), State(S’)

    f we are in state S and the event E occurs, we should perform the actions A and make a transition to the state S’.

    To use Akka FSM there are a number of things you can do. Some of them are mandatory and some you can opt into depending on your requirements. Lets have a look at some of the moving parts that make Akka FSM shine.

    FSM[S,D]

    In order to use Akka FSM you need to mixin the FSM trait. The trait itself looks like this

    trait FSM[S, D]
    

     

    Where S is the state type, and D is the data type.

    startWith

    You can choose what state the FSM starts in by using the startWith method which has the following signature.

    startWith(stateName: S, stateData: D, timeout: Timeout = None): Unit
    

    initialize

    Calling initialilze() performs the transition into the initial state and sets up timers (if required).

    when

    when is used to match the state, and is also used to control the movement to a new state using the inbuilt goto method, or possibly stay in the current state.

    When uses pattern matching to match the events that a particular state can handle. As stated it is completely valid to stay in the current state or move to a new state

    The examples that follow below will show you both stay and goto in action.

    whenUnhandled

    You should obviously try and make sure you cover all the correct state movements in response to all the events your FSM knows about. But Akka FSM also comes with the whenUnhandled method for catching events that were not handled by YOUR state handling (when) logic.

    onTransition

    You may also monitor the movement from one state to the next and run some code when this occurs. This is accomplished using the onTransition method.

    onTransition has the following signature

    nTransition(transitionHandler: TransitionHandler): Unit 
    

    Where TransitionHandler is really just a PartialFunction that has the following generic parameters

    PartialFunction[(S, S), Unit]
    

    Where the tuple is a tuple of “from state” to “to state”.

     

    Time for an example

     

    Lightswitch Example

    This first example is a very simply FSM, that has 2 states, On and Off. It doesn’t really need any state, however the Akka FSM trait, always needs a Data object. So in this case we simple use a base trait for the Data which we don’t really care about.

    The idea behind this example is that the lightswitch can move from Off –> On, and On –> Off.

    This example also shows a stateTimeout in action, where by the On state will move from On, if left in that state for more than 1 second.

    Here is the full code for this example

    import akka.actor.{Actor, ActorRef, FSM}
    import scala.concurrent.duration._
    import scala.collection._
    
    // received events
    final case class PowerOn()
    final case class PowerOff()
    
    // states
    sealed trait LightSwitchState
    case object On extends LightSwitchState
    case object Off extends LightSwitchState
    
    //data
    sealed trait LightSwitchData
    case object NoData extends LightSwitchData
    
    class LightSwitchActor extends FSM[LightSwitchState, LightSwitchData] {
    
      startWith(Off, NoData)
    
      when(Off) {
        case Event(PowerOn, _) =>
          goto(On) using NoData
      }
    
      when(On, stateTimeout = 1 second) {
        case Event(PowerOff, _) =>
          goto(Off) using NoData
        case Event(StateTimeout, _) =>
          println("'On' state timed out, moving to 'Off'")
          goto(Off) using NoData
      }
    
      whenUnhandled {
        case Event(e, s) =>
          log.warning("received unhandled request {} in state {}/{}", e, stateName, s)
          goto(Off) using NoData
      }
    
      onTransition {
        case Off -> On => println("Moved from Off to On")
        case On -> Off => println("Moved from On to Off")
      }
    
      initialize()
    }
    

    Which we can run using this demo code.

    import akka.actor._
    import scala.language.postfixOps
    import scala.io.StdIn
    
    object Demo extends App {
    
      RunLightSwitchDemo
    
      def RunLightSwitchDemo : Unit = {
        //create the actor system
        val system = ActorSystem("StateMachineSystem")
    
        val lightSwitchActor =
          system.actorOf(Props(classOf[LightSwitchActor]),
            "demo-LightSwitch")
    
    
        println("sending PowerOff, should be off already")
        lightSwitchActor ! PowerOff
        //akka is async allow it some time to pick up message
        //from its mailbox
        Thread.sleep(500)
    
    
        println("sending PowerOn")
        lightSwitchActor ! PowerOn
        //akka is async allow it some time to pick up message
        //from its mailbox
        Thread.sleep(500)
    
        println("sending PowerOff")
        lightSwitchActor ! PowerOff
        //akka is async allow it some time to pick up message
        //from its mailbox
        Thread.sleep(500)
    
    
        println("sending PowerOn")
        lightSwitchActor ! PowerOn
        //akka is async allow it some time to pick up message
        //from its mailbox
        Thread.sleep(500)
    
        println("sleep for a while to allow 'On' state to timeout")
        Thread.sleep(2000)
    
        StdIn.readLine()
    
        //shutdown the actor system
        system.terminate()
    
        StdIn.readLine()
      }
    
    
    
    }
    

    When run we get the following results

    sending PowerOff, should be off already
    [WARN] [09/06/2016 07:21:28.864] [StateMachineSystem-akka.actor.default-dispatcher-4]
      [akka://StateMachineSystem/user/demo-LightSwitch] received unhandled request PowerOff in state Off/NoData
    sending PowerOn
    Moved from Off to On
    sending PowerOff
    Moved from On to Off
    sending PowerOn
    Moved from Off to On
    sleep for a while to allow ‘On’ state to timeout
    ‘On’ state timed out, moving to ‘Off’
    Moved from On to Off

    There is something interesting here, which is that we see an Unhandled event. Why is this?

    Well this is due to the fact that this demo FSM starts in the Off state, and we send a PowerOff event. This is not handled in the when for the Off state.

    We could fix this, by amending this as follows:

    when(Off) {
      case Event(PowerOn, _) =>
        goto(On) using NoData
      case Event(PowerOff, _) =>
        println("already off")
        stay
    }
    

    So if we apply this amended code, and run the demo again. We would now see this output instead

    sending PowerOff, should be off already
    already off
    sending PowerOn
    Moved from Off to On
    sending PowerOff
    Moved from On to Off
    sending PowerOn
    Moved from Off to On
    sleep for a while to allow ‘On’ state to timeout
    ‘On’ state timed out, moving to ‘Off’
    Moved from On to Off

     

    Buncher Example

    I have basically taken this one straight from the official Akka FSM docs.

    This example shall receive and queue messages while they arrive in a burst and send them on to another target actor after the burst ended or a flush request is received.

    Here is the full code for this example. It is a slightly fuller example, so this time we use a proper set of data objects for the states.

    import akka.actor.{Actor, ActorRef, FSM}
    import scala.concurrent.duration._
    import scala.collection._
    
    // received events
    final case class SetTarget(ref: ActorRef)
    final case class Queue(obj: Any)
    case object Flush
    
    // sent events
    final case class Batch(obj: immutable.Seq[Any])
    
    // states
    sealed trait State
    case object Idle extends State
    case object Active extends State
    
    //data
    sealed trait BuncherData
    case object Uninitialized extends BuncherData
    final case class Todo(target: ActorRef, queue: immutable.Seq[Any]) extends BuncherData
    
    class BuncherActor extends FSM[State, BuncherData] {
    
      startWith(Idle, Uninitialized)
    
      when(Idle) {
        case Event(SetTarget(ref), Uninitialized) =>
          stay using Todo(ref, Vector.empty)
      }
    
      when(Active, stateTimeout = 1 second) {
        case Event(Flush | StateTimeout, t: Todo) =>
          goto(Idle) using t.copy(queue = Vector.empty)
      }
    
      whenUnhandled {
        // common code for both states
        case Event(Queue(obj), t @ Todo(_, v)) =>
          goto(Active) using t.copy(queue = v :+ obj)
    
        case Event(e, s) =>
          log.warning("received unhandled request {} in state {}/{}", e, stateName, s)
          stay
      }
    
      onTransition {
        case Active -> Idle =>
          stateData match {
            case Todo(ref, queue) => ref ! Batch(queue)
            case _                => // nothing to do
          }
      }
    
      initialize()
    }
    
    
    class BunchReceivingActor extends Actor {
      def receive = {
        case Batch(theBatchData) => {
          println(s"receiving the batch data $theBatchData")
        }
        case _ => println("unknown message")
      }
    }
    

    Which we can run using this demo code

    import akka.actor._
    import scala.language.postfixOps
    import scala.io.StdIn
    
    object Demo extends App {
    
      RunBuncherDemo
    
      def RunBuncherDemo : Unit = {
        //create the actor system
        val system = ActorSystem("StateMachineSystem")
    
        val buncherActor =
          system.actorOf(Props(classOf[BuncherActor]),
            "demo-Buncher")
    
        val bunchReceivingActor =
          system.actorOf(Props(classOf[BunchReceivingActor]),
            "demo-BunchReceiving")
    
        buncherActor ! SetTarget(bunchReceivingActor)
    
        println("sending Queue(42)")
        buncherActor ! Queue(42)
        println("sending Queue(43)")
        buncherActor ! Queue(43)
        println("sending Queue(44)")
        buncherActor ! Queue(44)
        println("sending Flush")
        buncherActor ! Flush
        println("sending Queue(45)")
        buncherActor ! Queue(45)
    
    
        StdIn.readLine()
    
        //shutdown the actor system
        system.terminate()
    
        StdIn.readLine()
      }
    }
    

    When this demo runs you should see something like this

    sending Queue(42)
    sending Queue(43)
    sending Queue(44)
    sending Flush
    sending Queue(45)
    receiving the batch data Vector(42, 43, 44)
    receiving the batch data Vector(45)

     

     

    Where Can I Find The Code Examples?

    I will be augmenting this GitHub repo with the example projects as I move through this series

    https://github.com/sachabarber/SachaBarber.AkkaExamples

    akka : persistent actors

    In this post we will look at some of the core concepts around persistent actors. Which may seem strange since so far I have been saying that Akka is great cos it is lock free and each actor is stateless.

    Truth is there are times where some sort immutable state, just can’t be avoided, and for that Akka provides us with the PersistentActor

     

    Event Sourcing

    Akka persistence does borrow a couple of ideas from other concepts out there in the wild, such as

    • Event sourcing
    • Possibly CQRS
    • Snapshots

    if you have not heard of event sourcing before, it is fairly simple to explain. The idea is that you have a data structure that is able to receive events to set its internal state. At the same time you also have an event store that stores events in reverse chronological order.

    These events are then played onto the object that accepts them, whereby the object will build its own internal state from the events played on to it.

    The eagle eyed amongst you may be thinking won’t that lead to loads of events? Well yes it might.

    There is however an optimization around this called “snapshots”. snapshots are a special type of event where they capture the object state as it is right now.

    So in reality you would apply the latest snapshot, and then take any events that occurred after that and apply them, which would drastically reduce the amount of event play back that would need to occur against an object that would like to receive events from the event store.

    I have written a fairly well received article about this before for the .NET space which also included working code and also includes the CQRS (Command Query Responsibility Segregation) part of it too. If you would like to know more about that you can read about it here:

    http://www.codeproject.com/Articles/991648/CQRS-A-Cross-Examination-Of-How-It-Works

    Anyway I am kind of drifting of topic a bit here, the point is that Akka persistence borrows some of these ideas from other concepts/frameworks, so it may be of some use to have a read around some of it, in particular event sourcing and CQRS.

    Dependencies

    In order to work with Akka persistence you will need the following 2 SBT entries in your build.sbt file

    "com.typesafe.akka" %% "akka-actor" % "2.4.8",
    "com.typesafe.akka" %% "akka-persistence" % "2.4.8
    

    Storage

    Akka persistence comes with a couple of pre-canned persistence storage mechanisms, and there are many more community based storage frameworks that you can use.

    Within this article I have chosen to use the Akka provided storage mechanism which is LevelDB.

    In order to do this, you will need the following SBT entries in your build.sbt file

    "org.iq80.leveldb"            % "leveldb"          % "0.7",
    "org.fusesource.leveldbjni"   % "leveldbjni-all"   % "1.8"
    

    You can read more about storage in general at the official Akka docs page

    http://doc.akka.io/docs/akka/snapshot/scala/persistence.html#Storage_plugins

    Config

    As with most things in Akka you are able to configure things either in code (by way of overriding things) or by configuration. I personally prefer configuration. So in order to get the persistence to work, one must provide the following configuration

    akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
    akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
    
    akka.persistence.journal.leveldb.dir = "target/example/journal"
    akka.persistence.snapshot-store.local.dir = "target/example/snapshots"
    
    # DO NOT USE THIS IN PRODUCTION !!!
    # See also https://github.com/typesafehub/activator/issues/287
    akka.persistence.journal.leveldb.native = false
    

    For me this is in an Resources/application.conf file

    PersistentActor Trait

    Ok so getting back on track. How do we create a persistence actor in Akka.

    Well out of the box Akka provides the PersistentActor trait, which you can mix in. This trait looks like this

    trait PersistentActor extends Eventsourced with PersistenceIdentity {
      def receive = receiveCommand
    }
    

    There are several things that are of interest here

    • There is an expectation there will be a receiveCommand implementation (this is within the EventSourced trait)
    • That the base trait is called EventSourced is also interesting

    I personally find the fact that there is an expectation of a command and that we end up mixing in a trait that uses the name “EventSourced” to be quite a strong indicator of just how similar working with Akka persistence is to CQRS + traditional event sourcing ideas.

    If we were to go further down the rabbit hole and keep looking into the base trait EventSourced we would see a couple more abstract methods that are of interest that are expected to be supplied by the end users code:

    
      /**
       * Recovery handler that receives persisted events during recovery. If a state snapshot
       * has been captured and saved, this handler will receive a [[SnapshotOffer]] message
       * followed by events that are younger than the offered snapshot.
       *
       * This handler must not have side-effects other than changing persistent actor state i.e. it
       * should not perform actions that may fail, such as interacting with external services,
       * for example.
       *
       * If there is a problem with recovering the state of the actor from the journal, the error
       * will be logged and the actor will be stopped.
       *
       * @see [[Recovery]]
       */
      def receiveRecover: Receive
    
      /**
       * Command handler. Typically validates commands against current state (and/or by
       * communication with other actors). On successful validation, one or more events are
       * derived from a command and these events are then persisted by calling `persist`.
       */
      def receiveCommand: Receive
    

    As stated these methods are abstract methods that YOUR code would need to supply to make the persistence actor stuff work properly. We will see more of this in a bit

    Persistent Id

    One of the rules you must follow when using persistent actors is that the persistent actor MUST have the same ID, even including across incarnations. This can be set using the persistenceId method as shown below

    override def persistenceId = "demo-persistent-actor-1"
    

    Snapshots

    As I stated earlier snapshots can reduce the amount of extra events that need to replayed against the event source target (the persistent actor).

    In order to save a snapshot the actor may call the saveSnapshot method.

    • If the snapshot succeeds the actor will receive a SaveSnapshotSuccess message
    • If the snapshot succeeds the actor will receive a SaveSnapshotFailure message
    var state: Any = _
     
    override def receiveCommand: Receive = {
      case "snap"                                => saveSnapshot(state)
      case SaveSnapshotSuccess(metadata)         => // ...
      case SaveSnapshotFailure(metadata, reason) => // ...
    }
    

    Where the metadata looks like this

    final case class SnapshotMetadata(persistenceId: String, sequenceNr: Long, timestamp: Long = 0L)
    

    During the recovery process the persistent actor is offer a snapshotOffer from which it may restore its internal state.

    After the snapshotOffer will come the newer (younger in Akka speak) events, which when replayed onto the persistent actor will get it to its final internal state.

     

    Failure/Recovery

    Special care must be taken when shutting down persistent actors from outside. For non persistence actor a PoisonPill may be used. This is not recommended for persistence actors due to how the commands are stashed until such a time as the journaling mechanism signals that things are stored. At which time the mailbox is drained.  A better way is to use explicit shutdown messages

    Read more about this here :

    http://doc.akka.io/docs/akka/snapshot/scala/persistence.html#Safely_shutting_down_persistent_actors

     

    Persisting state

    So we have talked about event/commands/CQRS/event sourcing/snapshots, but so far we have not talked about how to actually save state. How is this done?

    Well as luck would have it, its very easy we simply call the persist method, which looks like this

    def persist[A](event: A)(handler: A ⇒ Unit): Unit 
    

    Demo Time

    Ok so have now gone through most of the bits you would need to work with persistent actors. Time for a demo

    Lets assume we have the following commands that will send to a single persistent actor

    case class Cmd(data: String)
    

    And this type of event that we would like to store

    case class Evt(data: String)
    

    Where we would hold this type of state within the persistent actor

    case class ExampleState(events: List[String] = Nil) {
      def updated(evt: Evt): ExampleState = copy(evt.data :: events)
      def size: Int = events.length
      override def toString: String = events.reverse.toString
    }
    

    And that the actual persistent actor looks like this

    import akka.actor._
    import akka.persistence._
    
    
    
    class DemoPersistentActor extends PersistentActor {
    
      //note : This is  mutable
      var state = ExampleState()
    
      def updateState(event: Evt): Unit =
        state = state.updated(event)
    
      def numEvents =
        state.size
    
      val receiveRecover: Receive = {
        case evt: Evt => updateState(evt)
        case SnapshotOffer(_, snapshot: ExampleState) => {
            println(s"offered state = $snapshot")
            state = snapshot
        }
      }
    
      val receiveCommand: Receive = {
        case Cmd(data) =>
          persist(Evt(s"${data}-${numEvents}"))(updateState)
          persist(Evt(s"${data}-${numEvents + 1}")) { event =>
            updateState(event)
            context.system.eventStream.publish(event)
          }
        case "snap"  => saveSnapshot(state)
        case SaveSnapshotSuccess(metadata) =>
          println(s"SaveSnapshotSuccess(metadata) :  metadata=$metadata")
        case SaveSnapshotFailure(metadata, reason) =>
          println("""SaveSnapshotFailure(metadata, reason) :
            metadata=$metadata, reason=$reason""")
        case "print" => println(state)
        case "boom"  => throw new Exception("boom")
      }
    
      override def persistenceId = "demo-persistent-actor-1"
    }
    

    Which we could first run using this demo code:

    import akka.actor._
    import scala.language.postfixOps
    import scala.io.StdIn
    
    object Demo extends App {
    
      //create the actor system
      val system = ActorSystem("PersitenceSystem")
    
      val persistentActor =
        system.actorOf(Props(classOf[DemoPersistentActor]),
          "demo-persistent-actor-1")
    
      persistentActor ! "print"
      persistentActor ! Cmd("foo")
      persistentActor ! Cmd("baz")
      persistentActor ! "boom"
      persistentActor ! Cmd("bar")
      persistentActor ! "snap"
      persistentActor ! Cmd("buzz")
      persistentActor ! "print"
    
    
      StdIn.readLine()
    
      //shutdown the actor system
      system.terminate()
    
      StdIn.readLine()
    }
    

    Which gives the following output (your output may look a little different to mine as I have run this code a number of times so have previous runs state on disk)

    offered state = List(foo-0, foo-1, baz-2, baz-3, bar-4, bar-5, buzz-6, buzz-7, foo-8,
    foo-9, baz-10, baz-11, bar-12, bar-13, buzz-14, buzz-15, foo-16, foo-17, baz-18, baz-19, bar-20, bar-21)
    List(foo-0, foo-1, baz-2, baz-3, bar-4, bar-5, buzz-6, buzz-7, foo-8, foo-9, baz-10, baz-11, bar-12,
    bar-13, buzz-14, buzz-15, foo-16, foo-17, baz-18, baz-19, bar-20, bar-21, buzz-22, buzz-23)
    [ERROR] [08/16/2016 18:43:21.475] [PersitenceSystem-akka.actor.default-dispatcher-5]
    [akka://PersitenceSystem/user/demo-persistent-actor-1] boom
    java.lang.Exception: boom
        at DemoPersistentActor$$anonfun$2.applyOrElse(DemoPersistentActor.scala:39)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:484)
        at DemoPersistentActor.akka$persistence$Eventsourced$$super$aroundReceive(DemoPersistentActor.scala:6)
        at akka.persistence.Eventsourced$$anon$1.stateReceive(Eventsourced.scala:657)
        at akka.persistence.Eventsourced$class.aroundReceive(Eventsourced.scala:182)
        at DemoPersistentActor.aroundReceive(DemoPersistentActor.scala:6)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
        at akka.actor.ActorCell.invoke(ActorCell.scala:495)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
        at akka.dispatch.Mailbox.run(Mailbox.scala:224)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

    offered state = List(foo-0, foo-1, baz-2, baz-3, bar-4, bar-5, buzz-6, buzz-7, foo-8, foo-9,
    baz-10, baz-11, bar-12, bar-13, buzz-14, buzz-15, foo-16, foo-17, baz-18, baz-19, bar-20, bar-21)
    [WARN] [08/16/2016 18:43:21.494] [PersitenceSystem-akka.persistence.dispatchers.default-stream-dispatcher-8]
    [akka.serialization.Serialization(akka://PersitenceSystem)] Using the default Java serializer for class
    [ExampleState] which is not recommended because of performance implications. Use another serializer or
    disable this warning using the setting ‘akka.actor.warn-about-java-serializer-usage’
    List(foo-0, foo-1, baz-2, baz-3, bar-4, bar-5, buzz-6, buzz-7, foo-8, foo-9, baz-10, baz-11, bar-12,
    bar-13, buzz-14, buzz-15, foo-16, foo-17, baz-18, baz-19, bar-20, bar-21, buzz-22, buzz-23, foo-24, foo-25,
    baz-26, baz-27, bar-28, bar-29, buzz-30, buzz-31)
    SaveSnapshotSuccess(metadata) :  metadata=SnapshotMetadata(demo-persistent-actor-1,33,1471369401491)

    And then if we were to run this demo code:

    import akka.actor._
    import scala.language.postfixOps
    import scala.io.StdIn
    
    object Demo extends App {
    
      //create the actor system
      val system = ActorSystem("PersitenceSystem")
    
      val persistentActor =
        system.actorOf(Props(classOf[DemoPersistentActor]),
          "demo-persistent-actor-1")
    
      persistentActor ! "print"
    
    
      StdIn.readLine()
    
      //shutdown the actor system
      system.terminate()
    
      StdIn.readLine()
    }
    

     

    Lets see what we can see:

    offered state = List(foo-0, foo-1, baz-2, baz-3, bar-4, bar-5, buzz-6, buzz-7,
    foo-8, foo-9, baz-10, baz-11, bar-12,
    bar-13, buzz-14, buzz-15, foo-16, foo-17, baz-18, baz-19, bar-20, bar-21, buzz-22, buzz-23,
    foo-24, foo-25, baz-26, baz-27, bar-28, bar-29)

    List(foo-0, foo-1, baz-2, baz-3, bar-4, bar-5, buzz-6, buzz-7, foo-8, foo-9, baz-10, baz-11,
    bar-12, bar-13, buzz-14, buzz-15, foo-16, foo-17, baz-18, baz-19, bar-20, bar-21, buzz-22,
    buzz-23, foo-24, foo-25, baz-26, baz-27, bar-28, bar-29, buzz-30, buzz-31)

    It can be seen that although we did not save any new events the state of the demo persistent actor was restored correctly using a combination of a snapshot and events that are newer than the snapshot

     

     

    Where Is The Code?

    As previously stated all the code for this series will end up in this GitHub repo:

    https://github.com/sachabarber/SachaBarber.AkkaExamples