Akka

AKKA : clustering

Last time we look at remoting. You can kind of think of clustering as an extension to remoting, as some of the same underlying parts are used.  But as we will see clustering is way more powerful (and more fault tolerant too).

My hope is by the end of this post that you will know enough about Akka clustering that you would be able to create your own clustered Akka apps.

A Note About All The Demos In This Topic

I wanted the demos in this section to be as close to real life as possible. The official akka examples tend to have a single process. Which I personally think is quite confusing when you are trying to deal with quite hard concepts. As such I decided to go with multi process projects to demonstrate things. I do however only have 1 laptop, so they are hosted on the same node, but they are separate processes/JVMs.

I am hoping by doing this it will make the learning process easier, as it is closer to what you would do in real life rather than have 1 main method that spawns an entire cluster. You just would not have that in real life.

 

What Is Akka Clustering?

Unlike remoting which is peer to peer, a cluster may constitute many members, which can grow and contract depending on demand/failure. There is also the concept of roles for actors with a cluster, which this post will talk about.

You can see how this could be very useful, in fact you could see how this may be used to create a general purpose grid calculation engine such as Apache Spark.

 

Seed Nodes

Akka has the concept of some initial contact points within the cluster to allow the cluster to bootstrap itself as it were.

Here is what the official Akka docs say on this:

You may decide if joining to the cluster should be done manually or automatically to configured initial contact points, so-called seed nodes. When a new node is started it sends a message to all seed nodes and then sends join command to the one that answers first. If no one of the seed nodes replied (might not be started yet) it retries this procedure until successful or shutdown.

You may choose to configure these “seed nodes” in code, but the easiest way is via configuration. The relevant part of the demo apps configuration is here

akka {
  .....
  .....
  cluster {
    seed-nodes = [
      "akka.tcp://ClusterSystem@127.0.0.1:2551",
      "akka.tcp://ClusterSystem@127.0.0.1:2552"]
  }
  .....
  .....
]




The seed nodes can be started in any order and it is not necessary to have all seed nodes running, but the node configured as the first element in the seed-nodes configuration list must be started when initially starting a cluster, otherwise the other seed-nodes will not become initialized and no other node can join the cluster. The reason for the special first seed node is to avoid forming separated islands when starting from an empty cluster. It is quickest to start all configured seed nodes at the same time (order doesn’t matter), otherwise it can take up to the configured seed-node-timeout until the nodes can join.

Once more than two seed nodes have been started it is no problem to shut down the first seed node. If the first seed node is restarted, it will first try to join the other seed nodes in the existing cluster.

We will see the entire configuration for the demo app later on this post. For now just be aware that there is a concept of seed nodes and the best way to configure those for the cluster is via configuration.

Saying that there may be some amongst you that would prefer to use the JVM property system which you may do as follows:

-Dakka.cluster.seed-nodes.0=akka.tcp://ClusterSystem@127.0.0.1:2551
-Dakka.cluster.seed-nodes.1=akka.tcp://ClusterSystem@127.0.0.1:2552

Roles

Akka clustering comes with the concept of roles.You may be asking why would we need that?

Well its quite simple really, say we have a higher than normal volume of data coming through you akka cluster system, you may want to increase the total processing power of the cluster to deal with this. How do we do that, we spin up more actors within a particular role. The role here may be “backend” that do work designated to them by some other actor say “frontend” role.

By using roles we can manage which bits of the cluster get dynamically allocated more/less actors.

You can configure the minimum number of role actor in configuration, which you can read more about here:

http://doc.akka.io/docs/akka/snapshot/java/cluster-usage.html#How_To_Startup_when_Cluster_Size_Reached

Member Events

Akka provides the ability to listen to member events. There are a number of reasons this could be useful, for example

  • Determining if a member has left the cluster
  • If a new member has joined the cluster

Here is a full list of the me Cluster events that you may choose to listen to

The events to track the life-cycle of members are:

  • ClusterEvent.MemberJoined – A new member has joined the cluster and its status has been changed to Joining.
  • ClusterEvent.MemberUp – A new member has joined the cluster and its status has been changed to Up.
  • ClusterEvent.MemberExited – A member is leaving the cluster and its status has been changed to Exiting Note that the node might already have been shutdown when this event is published on another node.
  • ClusterEvent.MemberRemoved – Member completely removed from the cluster.
  • ClusterEvent.UnreachableMember – A member is considered as unreachable, detected by the failure detector of at least one other node.
  • ClusterEvent.ReachableMember – A member is considered as reachable again, after having been unreachable. All nodes that previously detected it as unreachable has detected it as reachable again.

And this is how you might subscribe to these events

cluster.subscribe(self, classOf[MemberUp])

Which you may use in an actor like this:

class SomeActor extends Actor {

  val cluster = Cluster(context.system)

  // subscribe to cluster changes, MemberUp
  override def preStart(): Unit = cluster.subscribe(self, classOf[MemberUp])
  
  def receive = {
    case MemberUp(m) => register(m)
  }

  def register(member: Member): Unit =
    if (member.hasRole("frontend"))
     ...
}

We will see more on this within the demo code which we will walk through later

ClusterClient

What use is a cluster which cant receive commands from the outside world?

Well luckily we don’t have to care about that as Akka comes with 2 things that make this otherwise glib situation ok.

Akka comes with a ClusterClient which allows actors which are not part of the cluster to talk to the cluster. Here is what the offical Akka docs have to say about this

An actor system that is not part of the cluster can communicate with actors somewhere in the cluster via this ClusterClient. The client can of course be part of another cluster. It only needs to know the location of one (or more) nodes to use as initial contact points. It will establish a connection to a ClusterReceptionist somewhere in the cluster. It will monitor the connection to the receptionist and establish a new connection if the link goes down. When looking for a new receptionist it uses fresh contact points retrieved from previous establishment, or periodically refreshed contacts, i.e. not necessarily the initial contact points.

 

Receptionist

As mentioned above the ClusterClient makes use of a ClusterReceptionist, but what is that, and how do we make a cluster actor available to the client using that?

The ClusterReceptionist is an Akka contrib extension, and must be configured on ALL the nodes that the ClusterClient will need to talk to.

There are 2 parts this, firstly we must ensure that the ClusterReceptionist is started on the nodes that ClusterClient will need to communicate with. This is easily done using the following config:

akka {
  ....
  ....
  ....
  # enable receptionist at start
  extensions = ["akka.cluster.client.ClusterClientReceptionist"]

}

The other thing that needs doing, is that any actor within the cluster that you want to be able to talk to using the  ClusterClient will need to register itself as a service with the ClusterClientReceptionist. Here is an example of how to do that

val system = ActorSystem("ClusterSystem", config)
val frontend = system.actorOf(Props[TransformationFrontend], name = "frontend")
ClusterClientReceptionist(system).registerService(frontend)

Now that you have done that you should be able to communicate with this actor within the cluster using the ClusterClient

 

The Demo Dissection

I have based the demo for this post largely against the “Transformation” demo that LightBend provide, which you can grab from here :

http://www.lightbend.com/activator/template/akka-sample-cluster-scala

The “Official” example as it is, provides a cluster which contains “frontend” and “backend” roles. The “frontend” actors will take a text message and pass it to the register workers (“Backend”s) who will UPPERCASE the message and return to the “frontend”.

I have taken this sample and added the ability to use the ClusterClient with it, which works using Future[T] and the ask pattern, such that the ClusterClient  will get a response from the cluster request.

We will dive into all of this in just a moment

For the demo this is what we are trying to build

image

SBT / Dependencies

Before we dive into the demo code (which as I say is based largely on the official lightbend clustering example anyway) I would just like to dive into the SBT file that drives the demo projects

This is the complete SBT file for the entire demo

import sbt._
import sbt.Keys._


lazy val allResolvers = Seq(
  "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/",
  "Akka Snapshot Repository" at "http://repo.akka.io/snapshots/"
)

lazy val AllLibraryDependencies =
  Seq(
    "com.typesafe.akka" %% "akka-actor"         % "2.4.8",
    "com.typesafe.akka" %% "akka-remote"        % "2.4.8",
    "com.typesafe.akka" %% "akka-cluster"       % "2.4.8",
    "com.typesafe.akka" %% "akka-cluster-tools" % "2.4.8",
    "com.typesafe.akka" %% "akka-contrib"       % "2.4.8"
  )


lazy val commonSettings = Seq(
  version := "1.0",
  scalaVersion := "2.11.8",
  resolvers := allResolvers,
  libraryDependencies := AllLibraryDependencies
)


lazy val root =(project in file(".")).
  settings(commonSettings: _*).
  settings(
    name := "Base"
  )
  .aggregate(common, frontend, backend)
  .dependsOn(common, frontend, backend)

lazy val common = (project in file("common")).
  settings(commonSettings: _*).
  settings(
    name := "common"
  )

lazy val frontend = (project in file("frontend")).
  settings(commonSettings: _*).
  settings(
    name := "frontend"
  )
  .aggregate(common)
  .dependsOn(common)

lazy val backend = (project in file("backend")).
  settings(commonSettings: _*).
  settings(
    name := "backend"
  )
  .aggregate(common)
  .dependsOn(common)

There are a few things to note in this

  • We need a few dependencies to get clustering to work. Namely
    • akka-remote
    • akka-cluster
    • akka-cluster-tools
    • akka-contrib
  • There are a few projects
    • root : The cluster client portion
    • common : common files
    • frontend : frontend cluster based actors (the client will talk to these)
    • backend : backend cluster based actors

 

The Projects

Now that we have seen the projects involved from an SBT point of view, lets continue to look at how the actual projects perform their duties

Remember the workflow we are trying to achieve is something like this

  • We should ensure that a frontend (seed node) is started first
  • We should ensure a backend (seed node) is started. This will have the effect of the backend actor registering itself as a worker with the already running frontend actor
  • At this point we could start more frontend/backend non seed nodes actors, if we chose to
  • We start the client app (root) which will periodically send messages to the frontend actor that is looked up by its known seed node information. We would expect the frontend actor to delegate work of to one of its known backend actors, and then send the response back to the client (ClusterClient) where we can use the response to send to a local actor, or consume the response directly

Common

The common project simply contains the common objects across the other projects. Which for this demo app are just the messages as shown below

package sample.cluster.transformation

final case class TransformationJob(text: String)
final case class TransformationResult(text: String)
final case class JobFailed(reason: String, job: TransformationJob)
case object BackendRegistration

 

Root

This is the client app that will talk to the cluster (in particular the “frontend” seed node which expected to be running on 127.0.0.1:2551.

This client app uses the following configuration file

akka {
  actor {
    provider = "akka.cluster.ClusterActorRefProvider"
  }

  remote {
    transport = "akka.remote.netty.NettyRemoteTransport"
    log-remote-lifecycle-events = off
    netty.tcp {
      hostname = "127.0.0.1"
      port = 5000
    }
  }
}

We then use the following main method to kick of the client app

import java.util.concurrent.atomic.AtomicInteger

import akka.actor.{Props, ActorSystem}
import akka.util.Timeout
import scala.io.StdIn
import scala.concurrent.duration._


object DemoClient {
  def main(args : Array[String]) {

    val system = ActorSystem("OTHERSYSTEM")
    val clientJobTransformationSendingActor =
      system.actorOf(Props[ClientJobTransformationSendingActor],
        name = "clientJobTransformationSendingActor")

    val counter = new AtomicInteger
    import system.dispatcher
    system.scheduler.schedule(2.seconds, 2.seconds) {
      clientJobTransformationSendingActor ! Send(counter.incrementAndGet())
      Thread.sleep(1000)
    }

    StdIn.readLine()
    system.terminate()
  }
}




There is not too much to talk about here, we simply create a standard actor, and send it messages on a recurring schedule.

The message looks like this

case class Send(count:Int)

The real work of talking to the cluster is inside the ClientJobTransformationSendingActor which we will look at now

import akka.actor.Actor
import akka.actor.ActorPath
import akka.cluster.client.{ClusterClientSettings, ClusterClient}
import akka.pattern.Patterns
import sample.cluster.transformation.{TransformationResult, TransformationJob}
import akka.util.Timeout
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}


class ClientJobTransformationSendingActor extends Actor {

  val initialContacts = Set(
    ActorPath.fromString("akka.tcp://ClusterSystem@127.0.0.1:2551/system/receptionist"))
  val settings = ClusterClientSettings(context.system)
    .withInitialContacts(initialContacts)

  val c = context.system.actorOf(ClusterClient.props(settings), "demo-client")


  def receive = {
    case TransformationResult(result) => {
      println("Client response")
      println(result)
    }
    case Send(counter) => {
        val job = TransformationJob("hello-" + counter)
        implicit val timeout = Timeout(5 seconds)
        val result = Patterns.ask(c,ClusterClient.Send("/user/frontend", job, localAffinity = true), timeout)

        result.onComplete {
          case Success(transformationResult) => {
            println(s"Client saw result: $transformationResult")
            self ! transformationResult
          }
          case Failure(t) => println("An error has occured: " + t.getMessage)
        }
      }
  }
}

As you can see this is a regular actor, but there are several important things to note here:

  • We setup the ClusterClient with a known set of seed nodes that we can expect to be able to contact within the cluster (remember these nodes MUST have registered themselves as available services with the ClusterClientReceptionist
  • That we use a new type of actor a ClusterClient
  • That we use the ClusterClient to send a message to a seed node within the cluster (frontend) in our case. We use the ask pattern which will give use a Future[T] which represents the response.
  • We use the response to send a local message to ourself

 

FrontEnd

As previously stated the “frontend” role actors serve as the seed nodes for the ClusterClient. There is only one seed node for the frontend which we just saw the client app uses via the ClusterClient.

So what happens when the client app uses the frontend actors via the ClusterClient, well its quite simple the client app (once a connection is made to the frontend seed node) send a simple TransformationJob which is a simple message that contains a bit of text that the frontend actor will pass on to one of its registered backend workers for processing.

The backend actor (also in the cluster) will simply convert the TransformationJob contained text to  UPPERCASE and return it to the frontend actor. The frontend actor will then send this TransformationResult back to the sender which happens to be the ClusterClient. The client app will listen to this (which was done using the ask pattern) and will hook up a callback for the Future[T] and will the send the TransformationResult to the clients own actor.

Happy days.

So that is what we are trying to achieve, lets see what bits and bobs we need for the frontend side of things

Here is the configuration the frontend needs

#//#snippet
akka {
  actor {
    provider = "akka.cluster.ClusterActorRefProvider"
  }
  remote {
    log-remote-lifecycle-events = off
    netty.tcp {
      hostname = "127.0.0.1"
      port = 0
    }
  }

  cluster {
    seed-nodes = [
      "akka.tcp://ClusterSystem@127.0.0.1:2551",
      "akka.tcp://ClusterSystem@127.0.0.1:2552"]

    #//#snippet
    # excluded from snippet
    auto-down-unreachable-after = 10s
    #//#snippet
    # auto downing is NOT safe for production deployments.
    # you may want to use it during development, read more about it in the docs.
    #
    # auto-down-unreachable-after = 10s
  }

  # enable receptionist at start
  extensions = ["akka.cluster.client.ClusterClientReceptionist"]

}

There are a couple of important things to note in this, namely:

  • That we configure the seed nodes
  • That we also use add the ClusterClientReceptionist
  • That we use the ClusterActorRefProvider

And here is the frontend application

package sample.cluster.transformation.frontend

import language.postfixOps
import akka.actor.ActorSystem
import akka.actor.Props
import com.typesafe.config.ConfigFactory
import akka.cluster.client.ClusterClientReceptionist



object TransformationFrontendApp {

  def main(args: Array[String]): Unit = {

    // Override the configuration of the port when specified as program argument
    val port = if (args.isEmpty) "0" else args(0)
    val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port").
      withFallback(ConfigFactory.parseString("akka.cluster.roles = [frontend]")).
      withFallback(ConfigFactory.load())

    val system = ActorSystem("ClusterSystem", config)
    val frontend = system.actorOf(Props[TransformationFrontend], name = "frontend")
    ClusterClientReceptionist(system).registerService(frontend)
  }

}

The important parts here are that we embellish the read config with the role of “frontend”, and that we also register the frontend actor with the ClusterClientReceptionist such that the actor is available to communicate with by the ClusterClient

Other than that it is all pretty vanilla akka to be honest

So lets now focus our attention to the actual frontend actor, which is shown below

package sample.cluster.transformation.frontend

import sample.cluster.transformation.{TransformationResult, BackendRegistration, JobFailed, TransformationJob}
import language.postfixOps
import scala.concurrent.Future
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Terminated
import akka.util.Timeout
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import akka.pattern.pipe
import akka.pattern.ask


class TransformationFrontend extends Actor {

  var backends = IndexedSeq.empty[ActorRef]
  var jobCounter = 0

  def receive = {
    case job: TransformationJob if backends.isEmpty =>
      sender() ! JobFailed("Service unavailable, try again later", job)

    case job: TransformationJob =>
      println(s"Frontend saw TransformationJob : '$job'")
      jobCounter += 1
      implicit val timeout = Timeout(5 seconds)
      val result  = (backends(jobCounter % backends.size) ? job)
        .map(x => x.asInstanceOf[TransformationResult])
      result pipeTo sender
      //pipe(result) to sender

    case BackendRegistration if !backends.contains(sender()) =>
      context watch sender()
      backends = backends :+ sender()

    case Terminated(a) =>
      backends = backends.filterNot(_ == a)
  }
}

The crucial parts here are:

  • That when a backend registers it will send a BackendRegistration, which we then watch and monitor, and if that backend terminates it is removed from the list of this frontend actors known backend actors
  • That we palm off the incoming TransformationJob to a random backend, and then use the pipe pattern to pipe the response back to the client

And with that, all that is left to do is examine the backend code, lets looks at that now

 

BackEnd

As always lets start with the configuration, which for the backend is as follows:

#//#snippet
akka {
  actor {
    provider = "akka.cluster.ClusterActorRefProvider"
  }
  remote {
    log-remote-lifecycle-events = off
    netty.tcp {
      hostname = "127.0.0.1"
      port = 0
    }
  }

  cluster {
    seed-nodes = [
      "akka.tcp://ClusterSystem@127.0.0.1:2551",
      "akka.tcp://ClusterSystem@127.0.0.1:2552"]

    #//#snippet
    # excluded from snippet
    auto-down-unreachable-after = 10s
    #//#snippet
    # auto downing is NOT safe for production deployments.
    # you may want to use it during development, read more about it in the docs.
    #
    # auto-down-unreachable-after = 10s
  }

  # enable receptionist at start
  extensions = ["akka.cluster.client.ClusterClientReceptionist"]
}




You can see this is pretty much the same as the frontend, so I won’t speak to this anymore.

Ok so following what we did with the frontend side of things, lets now look at the backend app

package sample.cluster.transformation.backend

import language.postfixOps
import scala.concurrent.duration._
import akka.actor.ActorSystem
import akka.actor.Props
import com.typesafe.config.ConfigFactory

object TransformationBackendApp {
  def main(args: Array[String]): Unit = {
    // Override the configuration of the port when specified as program argument
    val port = if (args.isEmpty) "0" else args(0)
    val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port").
      withFallback(ConfigFactory.parseString("akka.cluster.roles = [backend]")).
      withFallback(ConfigFactory.load())

    val system = ActorSystem("ClusterSystem", config)
    system.actorOf(Props[TransformationBackend], name = "backend")
  }
}

Again this is VERY similar to the front end app, the only notable exception being that we now use a “backend” role instead of a “frontend” one

So now lets look at the backend actor code, which is the final piece of the puzzle

package sample.cluster.transformation.backend

import sample.cluster.transformation.{BackendRegistration, TransformationResult, TransformationJob}
import language.postfixOps
import scala.concurrent.duration._
import akka.actor.Actor
import akka.actor.RootActorPath
import akka.cluster.Cluster
import akka.cluster.ClusterEvent.CurrentClusterState
import akka.cluster.ClusterEvent.MemberUp
import akka.cluster.Member
import akka.cluster.MemberStatus


class TransformationBackend extends Actor {

  val cluster = Cluster(context.system)

  // subscribe to cluster changes, MemberUp
  // re-subscribe when restart
  override def preStart(): Unit = cluster.subscribe(self, classOf[MemberUp])
  override def postStop(): Unit = cluster.unsubscribe(self)

  def receive = {
    case TransformationJob(text) => {
      val result = text.toUpperCase
      println(s"Backend has transformed the incoming job text of '$text' into '$result'")
      sender() ! TransformationResult(text.toUpperCase)
    }
    case state: CurrentClusterState =>
      state.members.filter(_.status == MemberStatus.Up) foreach register
    case MemberUp(m) => register(m)
  }

  def register(member: Member): Unit =
    if (member.hasRole("frontend"))
      context.actorSelection(RootActorPath(member.address) / "user" / "frontend") !
        BackendRegistration
}

The key points here are:

  • That we use the cluster events, to subscribe to MemberUp such that if its “frontend” role actor, we will register this backend with it by sending a BackendRegistration message to it
  • That for any TrasformationJob received (from the frontend which is ultimately for the client app) we do the work, and send a TransformationResult back, which will make its way all the way back to the client

 

And in a nutshell that is how the entire demo hangs together. I hope I have not lost anyone along the way.

Anyway lets now see how we can run the demo

How do I Run The Demo

You will need to ensure that you run the following 3 projects in this order (as a minimum. You can run more NON seed node frontend/backend versions before you start the root (client) if you like)

  • Frontend (seed node) : frontend with command line args : 2551
  • Backend (seed node) : backend with command line args : 2551
  • Optionally run more frontend/backend projects but DON’T supply any command line args. This is how you get them to not be treated as seed nodes
  •  Root : This is the client app

 

Once you run the projects you should see some output like

The “root” (client) project output:

[INFO] [10/05/2016 07:22:02.831] [main] [akka.remote.Remoting] Starting remoting
[INFO] [10/05/2016 07:22:03.302] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://OTHERSYSTEM@127.0.0.1:5000]
[INFO] [10/05/2016 07:22:03.322] [main] [akka.cluster.Cluster(akka://OTHERSYSTEM)] Cluster Node [akka.tcp://OTHERSYSTEM@127.0.0.1:5000] – Starting up…
[INFO] [10/05/2016 07:22:03.450] [main] [akka.cluster.Cluster(akka://OTHERSYSTEM)] Cluster Node [akka.tcp://OTHERSYSTEM@127.0.0.1:5000] – Registered cluster JMX MBean [akka:type=Cluster]
[INFO] [10/05/2016 07:22:03.450] [main] [akka.cluster.Cluster(akka://OTHERSYSTEM)] Cluster Node [akka.tcp://OTHERSYSTEM@127.0.0.1:5000] – Started up successfully
[INFO] [10/05/2016 07:22:03.463] [OTHERSYSTEM-akka.actor.default-dispatcher-2] [akka.cluster.Cluster(akka://OTHERSYSTEM)] Cluster Node [akka.tcp://OTHERSYSTEM@127.0.0.1:5000] – Metrics will be retreived from MBeans, and may be incorrect on some platforms. To increase metric accuracy add the ‘sigar.jar’ to the classpath and the appropriate platform-specific native libary to ‘java.library.path’. Reason: java.lang.ClassNotFoundException: org.hyperic.sigar.Sigar
[INFO] [10/05/2016 07:22:03.493] [OTHERSYSTEM-akka.actor.default-dispatcher-2] [akka.cluster.Cluster(akka://OTHERSYSTEM)] Cluster Node [akka.tcp://OTHERSYSTEM@127.0.0.1:5000] – Metrics collection has started successfully
[WARN] [10/05/2016 07:22:03.772] [OTHERSYSTEM-akka.actor.default-dispatcher-19] [akka.tcp://OTHERSYSTEM@127.0.0.1:5000/system/cluster/core/daemon] Trying to join member with wrong ActorSystem name, but was ignored, expected [OTHERSYSTEM] but was [ClusterSystem]
[INFO] [10/05/2016 07:22:03.811] [OTHERSYSTEM-akka.actor.default-dispatcher-19] [akka.tcp://OTHERSYSTEM@127.0.0.1:5000/user/demo-client] Connected to [akka.tcp://ClusterSystem@127.0.0.1:2552/system/receptionist]
[WARN] [10/05/2016 07:22:05.581] [OTHERSYSTEM-akka.remote.default-remote-dispatcher-14] [akka.serialization.Serialization(akka://OTHERSYSTEM)] Using the default Java serializer for class [sample.cluster.transformation.TransformationJob] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting ‘akka.actor.warn-about-java-serializer-usage’
Client saw result: TransformationResult(HELLO-1)
Client response
HELLO-1
Client saw result: TransformationResult(HELLO-2)
Client response
HELLO-2
Client saw result: TransformationResult(HELLO-3)
Client response
HELLO-3
Client saw result: TransformationResult(HELLO-4)
Client response
HELLO-4

The “frontend” project output:

[INFO] [10/05/2016 07:21:35.592] [main] [akka.remote.Remoting] Starting remoting
[INFO] [10/05/2016 07:21:35.883] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://ClusterSystem@127.0.0.1:2551]
[INFO] [10/05/2016 07:21:35.901] [main] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] – Starting up…
[INFO] [10/05/2016 07:21:36.028] [main] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] – Registered cluster JMX MBean [akka:type=Cluster]
[INFO] [10/05/2016 07:21:36.028] [main] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] – Started up successfully
[INFO] [10/05/2016 07:21:36.037] [ClusterSystem-akka.actor.default-dispatcher-2] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] – Metrics will be retreived from MBeans, and may be incorrect on some platforms. To increase metric accuracy add the ‘sigar.jar’ to the classpath and the appropriate platform-specific native libary to ‘java.library.path’. Reason: java.lang.ClassNotFoundException: org.hyperic.sigar.Sigar
[INFO] [10/05/2016 07:21:36.040] [ClusterSystem-akka.actor.default-dispatcher-2] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] – Metrics collection has started successfully
[WARN] [10/05/2016 07:21:37.202] [ClusterSystem-akka.remote.default-remote-dispatcher-5] [akka.tcp://ClusterSystem@127.0.0.1:2551/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.1%3A2552-0] Association with remote system [akka.tcp://ClusterSystem@127.0.0.1:2552] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://ClusterSystem@127.0.0.1:2552]] Caused by: [Connection refused: no further information: /127.0.0.1:2552]
[INFO] [10/05/2016 07:21:37.229] [ClusterSystem-akka.actor.default-dispatcher-17] [akka://ClusterSystem/deadLetters] Message [akka.cluster.InternalClusterAction$InitJoin$] from Actor[akka://ClusterSystem/system/cluster/core/daemon/firstSeedNodeProcess-1#-2009390233] to Actor[akka://ClusterSystem/deadLetters] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings ‘akka.log-dead-letters’ and ‘akka.log-dead-letters-during-shutdown’.
[INFO] [10/05/2016 07:21:37.229] [ClusterSystem-akka.actor.default-dispatcher-17] [akka://ClusterSystem/deadLetters] Message [akka.cluster.InternalClusterAction$InitJoin$] from Actor[akka://ClusterSystem/system/cluster/core/daemon/firstSeedNodeProcess-1#-2009390233] to Actor[akka://ClusterSystem/deadLetters] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings ‘akka.log-dead-letters’ and ‘akka.log-dead-letters-during-shutdown’.
[INFO] [10/05/2016 07:21:37.232] [ClusterSystem-akka.actor.default-dispatcher-21] [akka://ClusterSystem/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.1%3A2552-0/endpointWriter] Message [akka.remote.EndpointWriter$AckIdleCheckTimer$] from Actor[akka://ClusterSystem/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.1%3A2552-0/endpointWriter#-1346529294] to Actor[akka://ClusterSystem/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.1%3A2552-0/endpointWriter#-1346529294] was not delivered. [3] dead letters encountered. This logging can be turned off or adjusted with configuration settings ‘akka.log-dead-letters’ and ‘akka.log-dead-letters-during-shutdown’.
[INFO] [10/05/2016 07:21:38.085] [ClusterSystem-akka.actor.default-dispatcher-22] [akka://ClusterSystem/deadLetters] Message [akka.cluster.InternalClusterAction$InitJoin$] from Actor[akka://ClusterSystem/system/cluster/core/daemon/firstSeedNodeProcess-1#-2009390233] to Actor[akka://ClusterSystem/deadLetters] was not delivered. [4] dead letters encountered. This logging can be turned off or adjusted with configuration settings ‘akka.log-dead-letters’ and ‘akka.log-dead-letters-during-shutdown’.
[INFO] [10/05/2016 07:21:39.088] [ClusterSystem-akka.actor.default-dispatcher-14] [akka://ClusterSystem/deadLetters] Message [akka.cluster.InternalClusterAction$InitJoin$] from Actor[akka://ClusterSystem/system/cluster/core/daemon/firstSeedNodeProcess-1#-2009390233] to Actor[akka://ClusterSystem/deadLetters] was not delivered. [5] dead letters encountered. This logging can be turned off or adjusted with configuration settings ‘akka.log-dead-letters’ and ‘akka.log-dead-letters-during-shutdown’.
[INFO] [10/05/2016 07:21:40.065] [ClusterSystem-akka.actor.default-dispatcher-20] [akka://ClusterSystem/deadLetters] Message [akka.cluster.InternalClusterAction$InitJoin$] from Actor[akka://ClusterSystem/system/cluster/core/daemon/firstSeedNodeProcess-1#-2009390233] to Actor[akka://ClusterSystem/deadLetters] was not delivered. [6] dead letters encountered. This logging can be turned off or adjusted with configuration settings ‘akka.log-dead-letters’ and ‘akka.log-dead-letters-during-shutdown’.
[INFO] [10/05/2016 07:21:41.095] [ClusterSystem-akka.actor.default-dispatcher-19] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] – Node [akka.tcp://ClusterSystem@127.0.0.1:2551] is JOINING, roles [frontend]
[INFO] [10/05/2016 07:21:41.123] [ClusterSystem-akka.actor.default-dispatcher-19] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] – Leader is moving node [akka.tcp://ClusterSystem@127.0.0.1:2551] to [Up]
[INFO] [10/05/2016 07:21:50.837] [ClusterSystem-akka.actor.default-dispatcher-14] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] – Node [akka.tcp://ClusterSystem@127.0.0.1:2552] is JOINING, roles [backend]
[INFO] [10/05/2016 07:21:51.096] [ClusterSystem-akka.actor.default-dispatcher-16] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] – Leader is moving node [akka.tcp://ClusterSystem@127.0.0.1:2552] to [Up]
Frontend saw TransformationJob : ‘TransformationJob(hello-1)’
[WARN] [10/05/2016 07:22:05.669] [ClusterSystem-akka.remote.default-remote-dispatcher-24] [akka.serialization.Serialization(akka://ClusterSystem)] Using the default Java serializer for class [sample.cluster.transformation.TransformationJob] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting ‘akka.actor.warn-about-java-serializer-usage’
[WARN] [10/05/2016 07:22:05.689] [ClusterSystem-akka.remote.default-remote-dispatcher-23] [akka.serialization.Serialization(akka://ClusterSystem)] Using the default Java serializer for class [sample.cluster.transformation.TransformationResult] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting ‘akka.actor.warn-about-java-serializer-usage’
Frontend saw TransformationJob : ‘TransformationJob(hello-2)’
Frontend saw TransformationJob : ‘TransformationJob(hello-3)’
Frontend saw TransformationJob : ‘TransformationJob(hello-4)’
.

 

The “backend”project output:

[INFO] [10/05/2016 07:21:50.023] [main] [akka.remote.Remoting] Starting remoting
[INFO] [10/05/2016 07:21:50.338] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://ClusterSystem@127.0.0.1:2552]
[INFO] [10/05/2016 07:21:50.353] [main] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] – Starting up…
[INFO] [10/05/2016 07:21:50.430] [main] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] – Registered cluster JMX MBean [akka:type=Cluster]
[INFO] [10/05/2016 07:21:50.430] [main] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] – Started up successfully
[INFO] [10/05/2016 07:21:50.437] [ClusterSystem-akka.actor.default-dispatcher-6] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] – Metrics will be retreived from MBeans, and may be incorrect on some platforms. To increase metric accuracy add the ‘sigar.jar’ to the classpath and the appropriate platform-specific native libary to ‘java.library.path’. Reason: java.lang.ClassNotFoundException: org.hyperic.sigar.Sigar
[INFO] [10/05/2016 07:21:50.441] [ClusterSystem-akka.actor.default-dispatcher-6] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] – Metrics collection has started successfully
[INFO] [10/05/2016 07:21:50.977] [ClusterSystem-akka.actor.default-dispatcher-2] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] – Welcome from [akka.tcp://ClusterSystem@127.0.0.1:2551]
[WARN] [10/05/2016 07:21:51.289] [ClusterSystem-akka.remote.default-remote-dispatcher-15] [akka.serialization.Serialization(akka://ClusterSystem)] Using the default Java serializer for class [sample.cluster.transformation.BackendRegistration$] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting ‘akka.actor.warn-about-java-serializer-usage’
[WARN] [10/05/2016 07:22:05.651] [ClusterSystem-akka.remote.default-remote-dispatcher-7] [akka.serialization.Serialization(akka://ClusterSystem)] Using the default Java serializer for class [sample.cluster.transformation.TransformationJob] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting ‘akka.actor.warn-about-java-serializer-usage’
Backend has transformed the incoming job text of ‘hello-1’ into ‘HELLO-1’
[WARN] [10/05/2016 07:22:05.677] [ClusterSystem-akka.remote.default-remote-dispatcher-15] [akka.serialization.Serialization(akka://ClusterSystem)] Using the default Java serializer for class [sample.cluster.transformation.TransformationResult] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting ‘akka.actor.warn-about-java-serializer-usage’
Backend has transformed the incoming job text of ‘hello-2’ into ‘HELLO-2’
Backend has transformed the incoming job text of ‘hello-3’ into ‘HELLO-3’
Backend has transformed the incoming job text of ‘hello-4’ into ‘HELLO-4’

 

Nat or Docker Considerations

Akka clustering does not work transparently with Network Address Translation, Load Balancers, or in Docker containers. If this is your case you may need to further configure Akka as described here :

http://doc.akka.io/docs/akka/snapshot/scala/remoting.html#remote-configuration-nat

 

 

Where Can I Find The Code Examples?

I will be augmenting this GitHub repo with the example projects as I move through this series

https://github.com/sachabarber/SachaBarber.AkkaExamples

Advertisements
Akka

Akka : remoting

It has been a while since I wrote a post, the reason for this is actually this post.

I would consider remoting/clustering to be some of the more advanced stuff you could do with Akka. That said this and the next post will outline all of this good stuff for you, and by the end of this post I would hope to have demonstrated enough for you guys to go off and write Akka remoting/clustered apps.

I have decided to split the remoting and clustering stuff into 2 posts, to make it more focused and digestible. I think this is the right thing to do.

 

A Note About All The Demos In This Topic

I wanted the demos in this section to be as close to real life as possible. The official akka examples tend to have a single process. Which I personally think is quite confusing when you are trying to deal with quite hard concepts. As such I decided to go with multi process projects to demonstrate things. I do however only have 1 laptop, so they are hosted on the same node, but they are separate processes/JVMs.

I am hoping by doing this it will make the learning process easier, as it is closer to what you would do in real life rather than have 1 main method that spawns an entire cluster. You just would not have that in real life.

 

What Is Akka Remoting

If you have ever used RMI in Java or Remoting/WCF in C# you can kind of think of Akka remoting as something similar to that. Where there is the ability to call a remote objects method as is it were local. It is essentially peer-to-peer.

Obviously in Akkas case the remote object is actually an Actor, and you will not actually be calling a method at all,but will instead by treating the remote actor just like any other actor where you simply pass messages to it, and the remote actor will work just like any other actor where it will receive the message and act on it accordingly.

This is actually quite unique actually, I have work with Java Remoting and also C# Remoting, and done a lot with .NET WCF. What all of these had in common was that there was some code voodoo that you had to do, where the difference between working with a local object and working with a remote object required a fair bit of code, be it remoting channels, proxies etc etc

In Akka there is literally no change in coding style to work with remoting, it is completely configuration driven. This is quite nice.

Akkas Remoting Interaction Models

 Akka supports 2 ways of using remoting

  • Lookup : Where we use actorSelection to lookup an already running remote actor
  • Creation : Where an actor will be created on the remote node

We will be looking at both these approaches

Requirements

As I have stated on numerous occasions I have chosen to use SBT. As such this is my SBT file dependencies section for both these Remoting examples.

lazy val AllLibraryDependencies =
  Seq(
    "com.typesafe.akka" %% "akka-actor" % "2.4.8",
    "com.typesafe.akka" %% "akka-remote" % "2.4.8"
  )

It can be seen that the crucial dependency is akka-remote library

 

Remote Selection

As stated above “Remote Selection” will try and use actorSelection to look up a remote actor. The remote actor IS expected to be available and running.

In this example there will be 2 projects configured in the SBT file

  • Remote : The remote actor, that is expected to be running before the local actor tries to communicate with it
  • Local : The local actor that will call the remote

Here is the complete SBT file for this section

import sbt._
import sbt.Keys._


lazy val allResolvers = Seq(
  "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/",
  "Akka Snapshot Repository" at "http://repo.akka.io/snapshots/"
)

lazy val AllLibraryDependencies =
  Seq(
    "com.typesafe.akka" %% "akka-actor" % "2.4.8",
    "com.typesafe.akka" %% "akka-remote" % "2.4.8"
  )


lazy val commonSettings = Seq(
  name := "AkkaRemoting",
  version := "1.0",
  scalaVersion := "2.11.8",
  resolvers := allResolvers,
  libraryDependencies := AllLibraryDependencies
)


lazy val remote = (project in file("remote")).
  settings(commonSettings: _*).
  settings(
    // other settings
  )

lazy val local = (project in file("local")).
  settings(commonSettings: _*).
  settings(
    // other settings
  )



This simply creates 2 projects for us, Remote and Local.

Remote Project

Now that we have the relevant projects in place, lets talk about how we expose a remote actor for selection.

We must ensure that the remote actor is available for selection, which requires the use of an IP address and a port.

Here is how we do this

akka {
  actor {
    provider = "akka.remote.RemoteActorRefProvider"
  }
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "127.0.0.1"
      port = 4444
    }
    log-sent-messages = on
    log-received-messages = on
  }
}

Also note the akka.actor.provider is set to

akka.remote.RemoteActorRefProvider

Ok so now we have the ability to expose the remote actor on a IP address and port, lets have a look at the remote actor code.

Here is the remote actor itself

import akka.actor.Actor

class RemoteActor extends Actor {
  def receive = {
    case msg: String =>
      println(s"RemoteActor received message '$msg'")
      sender ! "Hello from the RemoteActor"
  }
}

Nothing too special, we simply receive a string message and send a response string message back to the sender (the local actor would be the sender in this case)

And here is the main method that drives the remote project

import akka.actor.{Props, ActorSystem}

import scala.io.StdIn

object RemoteDemo extends App  {
  val system = ActorSystem("RemoteDemoSystem")
  val remoteActor = system.actorOf(Props[RemoteActor], name = "RemoteActor")
  remoteActor ! "The RemoteActor is alive"
  StdIn.readLine()
  system.terminate()
  StdIn.readLine()
}

Again pretty standard stuff, no voodoo here

And that is all there is to the remote side of the “remote selection” remoting version. Lets now turn our attention to the local side.

Local Project

So far we have a remote actor which is configured to up and running at 127.0.0.1:4444.

We now need to open up the local side of things. This is done using the following configuration.Notice the port is a different port from the already in use 4444. Obviously if you host these actors on physically different boxes there would be nothing to stop you using port 4444 again, but from a sanity point of view, I find it is better to not do that.

akka {
  actor {
    provider = "akka.remote.RemoteActorRefProvider"
  }
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "127.0.0.1"
      port = 0
    }
  }
}

Plain Selection

We can make use of plain old actor selection to select any actor by a path of our chosing. Where we may use the resolveOne (if we expect to only match one actor, remember we can use wildcards so there are times we may match more than one) to give us a ActorRef.

context.actorSelection(path).resolveOne()

When we use resolveOne() we would get a Future[ActorRef] that we can use in any of the normal ways we would handle and work with Futute[T]. I have chosen to use a for comprehension to capture the result of the ActorRef of the more actor. I also monitor the remote actor using context.watch such that if it terminates we will see a Terminated message and can shutdown the local actor system.

We also make use of the become (see the state machines post for more info on that) to swap out the message loop for the local actor, so work differently once we have a remote ActorRef.

Once we have an ActorRef representing the remote actor it is pretty standard stuff where we just send messages to the remote actor ref using the ActorRef that represents it.

Here is the entire code for the plain actor selection approach of dealing with a remote actor.

import java.util.concurrent.atomic.AtomicInteger
import akka.actor.{Terminated, ActorRef, ReceiveTimeout, Actor}
import akka.util.Timeout
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global

case object Init

class LocalActorUsingPlainSelection extends Actor {

  val path = "akka.tcp://RemoteDemoSystem@127.0.0.1:4444/user/RemoteActor"
  val atomicInteger = new AtomicInteger();
  context.setReceiveTimeout(3 seconds)

  def receive = identifying

  def identifying: Receive = {
    case Init => {
      implicit val resolveTimeout = Timeout(5 seconds)
      for (ref : ActorRef <- context.actorSelection(path).resolveOne()) {
        println("Resolved remote actor ref using Selection")
        context.watch(ref)
        context.become(active(ref))
        context.setReceiveTimeout(Duration.Undefined)
        self ! Start
      }
    }
    case ReceiveTimeout => println("timeout")
  }

  def active(actor: ActorRef): Receive = {
    case Start =>
      actor ! "Hello from the LocalActorUsingPlainSelection"
    case msg: String =>
      println(s"LocalActorUsingPlainSelection received message: '$msg'")
      if (atomicInteger.get() < 5) {
        sender ! "Hello back to you"
        atomicInteger.getAndAdd(1)
      }
    case Terminated(`actor`) =>
      println("Receiver terminated")
      context.system.terminate()
  }
}

 

Using Identity Messages

Another approach that can be taken rather than relying on straight actor selection is by using some special Akka messages, namely Identify and ActorIdentity.

The idea is that we still use actorSelection for a given path, but rather than using resolveOne we sent the send the ActorSelection a special Identify message. The actor that was chosen by the ActorSelection should see this Identify message and should respond with a ActorIdentity message.

As this point the local actor can simply listen for ActorIdentity messages and when it sees one, it can test this messages correlationId to see if it matches the requested path, if it does you know that is the correct actor and you can then use the ActorRef of the ActorIdentity message.

As in the previous example we also make use of the become (see the state machines post for more info on that) to swap out the message loop for the local actor, so work differently once we have a remote ActorRef.

Once we have an ActorRef representing the remote actor it is pretty standard stuff where we just send messages to the remote actor ref using the ActorRef that represents it.

Here is the entire code for the Identity actor approach

import java.util.concurrent.atomic.AtomicInteger
import akka.actor._
import scala.concurrent.duration._


case object Start


class LocalActorUsingIdentity extends Actor {

  val path = "akka.tcp://RemoteDemoSystem@127.0.0.1:4444/user/RemoteActor"
  val atomicInteger = new AtomicInteger();
  context.setReceiveTimeout(3 seconds)
  sendIdentifyRequest()

  def receive = identifying

  def sendIdentifyRequest(): Unit =
    context.actorSelection(path) ! Identify(path)

  def identifying: Receive = {
    case identity : ActorIdentity =>
      if(identity.correlationId.equals(path)) {
        identity.ref match {
          case Some(remoteRef) => {
            context.watch(remoteRef)
            context.become(active(remoteRef))
            context.setReceiveTimeout(Duration.Undefined)
            self ! Start
          }
          case None => println(s"Remote actor not available: $path")
        }
      }
    case ReceiveTimeout => sendIdentifyRequest()
  }

  def active(actor: ActorRef): Receive = {
    case Start =>
      actor ! "Hello from the LocalActorUsingIdentity"
    case msg: String =>
      println(s"LocalActorUsingIdentity received message: '$msg'")
      if (atomicInteger.get() < 5) {
        sender ! "Hello back to you"
        atomicInteger.getAndAdd(1)
      }
    case Terminated(`actor`) =>
      println("Receiver terminated")
      context.system.terminate()
  }
}

How do I Run The Demo

You will need to ensure that you run the following 2 projects in this order:

  • Remote
  • Local

Once you run the 2 projects you should see some output like this

The Remote project output:

[INFO] [10/03/2016 07:02:54.282] [main] [akka.remote.Remoting] Starting remoting
[INFO] [10/03/2016 07:02:54.842] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://RemoteDemoSystem@127.0.0.1:4444]
[INFO] [10/03/2016 07:02:54.844] [main] [akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://RemoteDemoSystem@127.0.0.1:4444]
RemoteActor received message ‘The RemoteActor is alive’
[INFO] [10/03/2016 07:02:54.867] [RemoteDemoSystem-akka.actor.default-dispatcher-15] [akka://RemoteDemoSystem/deadLetters]
Message [java.lang.String] from Actor[akka://RemoteDemoSystem/user/RemoteActor#-109465353] to Actor[akka://RemoteDemoSystem/deadLetters]
was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings
‘akka.log-dead-letters’ and ‘akka.log-dead-letters-during-shutdown’.
RemoteActor received message ‘Hello from the LocalActorUsingPlainSelection’
RemoteActor received message ‘Hello back to you’
RemoteActor received message ‘Hello back to you’
RemoteActor received message ‘Hello back to you’
RemoteActor received message ‘Hello back to you’
RemoteActor received message ‘Hello back to you’

 

The Local project output:

[INFO] [10/03/2016 07:03:09.489] [main] [akka.remote.Remoting] Starting remoting
[INFO] [10/03/2016 07:03:09.961] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://LocalDemoSystem@127.0.0.1:64945]
[INFO] [10/03/2016 07:03:09.963] [main] [akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://LocalDemoSystem@127.0.0.1:64945]
Resolved remote actor ref using Selection
LocalActorUsingPlainSelection received message: ‘Hello from the RemoteActor’
LocalActorUsingPlainSelection received message: ‘Hello from the RemoteActor’
LocalActorUsingPlainSelection received message: ‘Hello from the RemoteActor’
LocalActorUsingPlainSelection received message: ‘Hello from the RemoteActor’
LocalActorUsingPlainSelection received message: ‘Hello from the RemoteActor’
LocalActorUsingPlainSelection received message: ‘Hello from the RemoteActor’

 

 

 

Remote Creation

Just as we did with remote selection, remote creation shall be split into a remote project and a local project, however since this time the local project must know about the type of the more actor to create it in the first place we introduce a common project which both the remote and local depend on.

In this example there will be 3 projects configured in the SBT file

  • Remote : The remote actor, that is expected to be created by the local actor
  • Common : The common files that both Local/Remote projects depend on
  • Local : The local actor that will create and call the remote actor

Here is the complete SBT file for this section

import sbt._
import sbt.Keys._


lazy val allResolvers = Seq(
  "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/",
  "Akka Snapshot Repository" at "http://repo.akka.io/snapshots/"
)

lazy val AllLibraryDependencies =
  Seq(
    "com.typesafe.akka" %% "akka-actor" % "2.4.8",
    "com.typesafe.akka" %% "akka-remote" % "2.4.8"
  )


lazy val commonSettings = Seq(
  version := "1.0",
  scalaVersion := "2.11.8",
  resolvers := allResolvers,
  libraryDependencies := AllLibraryDependencies
)


lazy val root =(project in file(".")).
  settings(commonSettings: _*).
  settings(
    name := "Base"
  )
  .aggregate(common, remote)
  .dependsOn(common, remote)

lazy val common = (project in file("common")).
  settings(commonSettings: _*).
  settings(
    name := "common"
  )

lazy val remote = (project in file("remote")).
  settings(commonSettings: _*).
  settings(
    name := "remote"
  )
  .aggregate(common)
  .dependsOn(common)

It can be seen that both the local/remote will aggregate/depend on the common project. This is standard SBT stuff so I will not go into that.

So now that we understand a bit more about the SBT side of things lets focus on the remote side of things.

This may seem odd since we are expecting the local actor to create the remote aren’t we?

Well yes we are but the remote actor system must still be available prior to start/deploy and actor in it via the local system.

So it still makes sense to examine the remote side of things first.

Remote Project

Now that we have the relevant projects in place, lets talk about how we expose a remote actor for creation.

We must ensure that the remote system is available for creation requests, which requires the use of an IP address and a port.

Here is how we do this

akka {

  actor {
    provider = "akka.remote.RemoteActorRefProvider"
  }

  remote {
    netty.tcp {
      hostname = "127.0.0.1",
      # LISTEN on tcp port 2552
      port=2552
    }
  }

}

I also mentioned that the remote actor system MUST be started to allow remote creation to work, as such the entire codebase for the remote end of thing (excluding the actor remote actor which gets created by the local side of thing) is shown below

import com.typesafe.config.ConfigFactory
import akka.actor.ActorSystem

object CalculatorApplication {

  def main(args: Array[String]): Unit = {
    startRemoteWorkerSystem()
  }

  def startRemoteWorkerSystem(): Unit = {
    ActorSystem("CalculatorWorkerSystem", ConfigFactory.load("calculator"))
    println("Started CalculatorWorkerSystem")
  }

}

All that is happening here is that the remote actor system gets created.

Local Project

Most of the hard work is done in this project. As it is the local side of things that is responsible for creating and deploying the remote actor in the remote actor system, before it can then make use of it.

Lets start with the deployment of the remote actor from the local side.

Firstly we need this configuration to allow this happen

akka {

  actor {
    provider = "akka.remote.RemoteActorRefProvider",
    deployment {
      "/creationActor/*" {
        remote = "akka.tcp://CalculatorWorkerSystem@127.0.0.1:2552"
      }
    }
  }

  remote {
    netty.tcp {
      hostname = "127.0.0.1",
      port=2554
    }
  }

}

If you look carefully at this configuration file and the one in the remote end you will see that the ip address/poprt/actor system name used within the deployment section all match. This is how the local actor system is able to create and deploy an actor to the remote actor system (which must be running prior to the local actor system trying to deploy a remote actor to it)

So now that we have seen this config, lets see how it is used by the local

import sample.remote.calculator.{Divide, Multiply, CreationActor}
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import scala.util.Random
import akka.actor.ActorSystem
import akka.actor.Props

object CreationApplication {

  def main(args: Array[String]): Unit = {
    startRemoteCreationSystem()
  }

  def startRemoteCreationSystem(): Unit = {
    val system =
      ActorSystem("CreationSystem", ConfigFactory.load("remotecreation"))
    val actor = system.actorOf(Props[CreationActor],
      name = "creationActor")

    println("Started CreationSystem")
    import system.dispatcher
    system.scheduler.schedule(1.second, 1.second) {
      if (Random.nextInt(100) % 2 == 0)
        actor ! Multiply(Random.nextInt(20), Random.nextInt(20))
      else
        actor ! Divide(Random.nextInt(10000), (Random.nextInt(99) + 1))
    }
  }

}

It can be seen that the first thing we try and do is try and create the remote actor (CreationActor) using the config above. If this all works we will end up with a CreationActor being created in the already running remote actor system. This CreationActor can then be used just like any other actor.

For completeness here is the code of the CreationActor

package sample.remote.calculator

import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Props

class CreationActor extends Actor {

  def receive = {
    case op: MathOp =>
      val calculator = context.actorOf(Props[CalculatorActor])
      calculator ! op
    case result: MathResult => result match {
      case MultiplicationResult(n1, n2, r) =>
        printf("Mul result: %d * %d = %d\n", n1, n2, r)
        context.stop(sender())
      case DivisionResult(n1, n2, r) =>
        printf("Div result: %.0f / %d = %.2f\n", n1, n2, r)
        context.stop(sender())
    }
  }
}
It can be seen that the CreationActor above also creates another actor called CalculatorActor which does the real work. Lets see the code for that one
package sample.remote.calculator

import akka.actor.Props
import akka.actor.Actor

class CalculatorActor extends Actor {
  def receive = {
    case Add(n1, n2) =>
      println("Calculating %d + %d".format(n1, n2))
      sender() ! AddResult(n1, n2, n1 + n2)
    case Subtract(n1, n2) =>
      println("Calculating %d - %d".format(n1, n2))
      sender() ! SubtractResult(n1, n2, n1 - n2)
    case Multiply(n1, n2) =>
      println("Calculating %d * %d".format(n1, n2))
      sender() ! MultiplicationResult(n1, n2, n1 * n2)
    case Divide(n1, n2) =>
      println("Calculating %.0f / %d".format(n1, n2))
      sender() ! DivisionResult(n1, n2, n1 / n2)
  }
}

Nothing special there really, its just a standard actor

So we now have a complete pipeline.

How do I Run The Demo

You will need to ensure that you run the following 2 projects in this order:

  • Remote
  • Root

Once you run the 2 projects you should see some output like this

The Remote project output:

[INFO] [10/03/2016 07:30:58.763] [main] [akka.remote.Remoting] Starting remoting
[INFO] [10/03/2016 07:30:59.235] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://CalculatorWorkerSystem@127.0.0.1:2552]
[INFO] [10/03/2016 07:30:59.237] [main] [akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://CalculatorWorkerSystem@127.0.0.1:2552]
Started CalculatorWorkerSystem
Calculating 6 * 15
[WARN] [10/03/2016 07:31:10.988] [CalculatorWorkerSystem-akka.remote.default-remote-dispatcher-6] [akka.serialization.Serialization(akka://CalculatorWorkerSystem)] Using the default Java serializer for class [sample.remote.calculator.MultiplicationResult] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting ‘akka.actor.warn-about-java-serializer-usage’
Calculating 1346 / 82
[WARN] [10/03/2016 07:31:11.586] [CalculatorWorkerSystem-akka.remote.default-remote-dispatcher-6] [akka.serialization.Serialization(akka://CalculatorWorkerSystem)] Using the default Java serializer for class [sample.remote.calculator.DivisionResult] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting ‘akka.actor.warn-about-java-serializer-usage’
Calculating 2417 / 31
Calculating 229 / 66
Calculating 9966 / 43
Calculating 4 * 12
Calculating 9 * 5
Calculating 1505 / 91

The Root project output:

[INFO] [10/03/2016 07:31:08.849] [main] [akka.remote.Remoting] Starting remoting
[INFO] [10/03/2016 07:31:09.470] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://CreationSystem@127.0.0.1:2554]
[INFO] [10/03/2016 07:31:09.472] [main] [akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://CreationSystem@127.0.0.1:2554]
Started CreationSystem
[WARN] [10/03/2016 07:31:10.808] [CreationSystem-akka.remote.default-remote-dispatcher-5] [akka.serialization.Serialization(akka://CreationSystem)] Using the default Java serializer for class [com.typesafe.config.impl.SimpleConfig] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting ‘akka.actor.warn-about-java-serializer-usage’
[WARN] [10/03/2016 07:31:10.848] [CreationSystem-akka.remote.default-remote-dispatcher-5] [akka.serialization.Serialization(akka://CreationSystem)] Using the default Java serializer for class [sample.remote.calculator.Multiply] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting ‘akka.actor.warn-about-java-serializer-usage’
Mul result: 6 * 15 = 90
[WARN] [10/03/2016 07:31:11.559] [CreationSystem-akka.remote.default-remote-dispatcher-6] [akka.serialization.Serialization(akka://CreationSystem)] Using the default Java serializer for class [sample.remote.calculator.Divide] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting ‘akka.actor.warn-about-java-serializer-usage’
Div result: 1346 / 82 = 16.41
Div result: 2417 / 31 = 77.97
Div result: 229 / 66 = 3.47
Div result: 9966 / 43 = 231.77
Mul result: 4 * 12 = 48
Mul result: 9 * 5 = 45
Div result: 1505 / 91 = 16.54
Mul result: 7 * 4 = 28
Div result: 1797 / 95 = 18.92
Mul result: 12 * 17 = 204
Div result: 2998 / 72 = 41.64
Div result: 1157 / 98 = 11.81
Div result: 1735 / 22 = 78.86
Mul result: 4 * 19 = 76
Div result: 6257 / 51 = 122.69
Mul result: 14 * 4 = 56
Div result: 587 / 27 = 21.74
Div result: 2528 / 98 = 25.80
Mul result: 9 * 16 = 144
Mul result: 13 * 10 = 130

 

 

 

 

Nat or Docker Considerations

Akka Remoting does not work transparently with Network Address Translation, Load Balancers, or in Docker containers. If this is your case you may need to further configure Akka as described here :

http://doc.akka.io/docs/akka/snapshot/scala/remoting.html#remote-configuration-nat

 

 

 

Where Can I Find The Code Examples?

I will be augmenting this GitHub repo with the example projects as I move through this series

https://github.com/sachabarber/SachaBarber.AkkaExamples