Daily Archives: 03/10/2016

Akka : remoting

It has been a while since I wrote a post, the reason for this is actually this post.

I would consider remoting/clustering to be some of the more advanced stuff you could do with Akka. That said this and the next post will outline all of this good stuff for you, and by the end of this post I would hope to have demonstrated enough for you guys to go off and write Akka remoting/clustered apps.

I have decided to split the remoting and clustering stuff into 2 posts, to make it more focused and digestible. I think this is the right thing to do.

 

A Note About All The Demos In This Topic

I wanted the demos in this section to be as close to real life as possible. The official akka examples tend to have a single process. Which I personally think is quite confusing when you are trying to deal with quite hard concepts. As such I decided to go with multi process projects to demonstrate things. I do however only have 1 laptop, so they are hosted on the same node, but they are separate processes/JVMs.

I am hoping by doing this it will make the learning process easier, as it is closer to what you would do in real life rather than have 1 main method that spawns an entire cluster. You just would not have that in real life.

 

What Is Akka Remoting

If you have ever used RMI in Java or Remoting/WCF in C# you can kind of think of Akka remoting as something similar to that. Where there is the ability to call a remote objects method as is it were local. It is essentially peer-to-peer.

Obviously in Akkas case the remote object is actually an Actor, and you will not actually be calling a method at all,but will instead by treating the remote actor just like any other actor where you simply pass messages to it, and the remote actor will work just like any other actor where it will receive the message and act on it accordingly.

This is actually quite unique actually, I have work with Java Remoting and also C# Remoting, and done a lot with .NET WCF. What all of these had in common was that there was some code voodoo that you had to do, where the difference between working with a local object and working with a remote object required a fair bit of code, be it remoting channels, proxies etc etc

In Akka there is literally no change in coding style to work with remoting, it is completely configuration driven. This is quite nice.

Akkas Remoting Interaction Models

 Akka supports 2 ways of using remoting

  • Lookup : Where we use actorSelection to lookup an already running remote actor
  • Creation : Where an actor will be created on the remote node

We will be looking at both these approaches

Requirements

As I have stated on numerous occasions I have chosen to use SBT. As such this is my SBT file dependencies section for both these Remoting examples.

lazy val AllLibraryDependencies =
  Seq(
    "com.typesafe.akka" %% "akka-actor" % "2.4.8",
    "com.typesafe.akka" %% "akka-remote" % "2.4.8"
  )

It can be seen that the crucial dependency is akka-remote library

 

Remote Selection

As stated above “Remote Selection” will try and use actorSelection to look up a remote actor. The remote actor IS expected to be available and running.

In this example there will be 2 projects configured in the SBT file

  • Remote : The remote actor, that is expected to be running before the local actor tries to communicate with it
  • Local : The local actor that will call the remote

Here is the complete SBT file for this section

import sbt._
import sbt.Keys._


lazy val allResolvers = Seq(
  "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/",
  "Akka Snapshot Repository" at "http://repo.akka.io/snapshots/"
)

lazy val AllLibraryDependencies =
  Seq(
    "com.typesafe.akka" %% "akka-actor" % "2.4.8",
    "com.typesafe.akka" %% "akka-remote" % "2.4.8"
  )


lazy val commonSettings = Seq(
  name := "AkkaRemoting",
  version := "1.0",
  scalaVersion := "2.11.8",
  resolvers := allResolvers,
  libraryDependencies := AllLibraryDependencies
)


lazy val remote = (project in file("remote")).
  settings(commonSettings: _*).
  settings(
    // other settings
  )

lazy val local = (project in file("local")).
  settings(commonSettings: _*).
  settings(
    // other settings
  )



This simply creates 2 projects for us, Remote and Local.

Remote Project

Now that we have the relevant projects in place, lets talk about how we expose a remote actor for selection.

We must ensure that the remote actor is available for selection, which requires the use of an IP address and a port.

Here is how we do this

akka {
  actor {
    provider = "akka.remote.RemoteActorRefProvider"
  }
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "127.0.0.1"
      port = 4444
    }
    log-sent-messages = on
    log-received-messages = on
  }
}

Also note the akka.actor.provider is set to

akka.remote.RemoteActorRefProvider

Ok so now we have the ability to expose the remote actor on a IP address and port, lets have a look at the remote actor code.

Here is the remote actor itself

import akka.actor.Actor

class RemoteActor extends Actor {
  def receive = {
    case msg: String =>
      println(s"RemoteActor received message '$msg'")
      sender ! "Hello from the RemoteActor"
  }
}

Nothing too special, we simply receive a string message and send a response string message back to the sender (the local actor would be the sender in this case)

And here is the main method that drives the remote project

import akka.actor.{Props, ActorSystem}

import scala.io.StdIn

object RemoteDemo extends App  {
  val system = ActorSystem("RemoteDemoSystem")
  val remoteActor = system.actorOf(Props[RemoteActor], name = "RemoteActor")
  remoteActor ! "The RemoteActor is alive"
  StdIn.readLine()
  system.terminate()
  StdIn.readLine()
}

Again pretty standard stuff, no voodoo here

And that is all there is to the remote side of the “remote selection” remoting version. Lets now turn our attention to the local side.

Local Project

So far we have a remote actor which is configured to up and running at 127.0.0.1:4444.

We now need to open up the local side of things. This is done using the following configuration.Notice the port is a different port from the already in use 4444. Obviously if you host these actors on physically different boxes there would be nothing to stop you using port 4444 again, but from a sanity point of view, I find it is better to not do that.

akka {
  actor {
    provider = "akka.remote.RemoteActorRefProvider"
  }
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "127.0.0.1"
      port = 0
    }
  }
}

Plain Selection

We can make use of plain old actor selection to select any actor by a path of our chosing. Where we may use the resolveOne (if we expect to only match one actor, remember we can use wildcards so there are times we may match more than one) to give us a ActorRef.

context.actorSelection(path).resolveOne()

When we use resolveOne() we would get a Future[ActorRef] that we can use in any of the normal ways we would handle and work with Futute[T]. I have chosen to use a for comprehension to capture the result of the ActorRef of the more actor. I also monitor the remote actor using context.watch such that if it terminates we will see a Terminated message and can shutdown the local actor system.

We also make use of the become (see the state machines post for more info on that) to swap out the message loop for the local actor, so work differently once we have a remote ActorRef.

Once we have an ActorRef representing the remote actor it is pretty standard stuff where we just send messages to the remote actor ref using the ActorRef that represents it.

Here is the entire code for the plain actor selection approach of dealing with a remote actor.

import java.util.concurrent.atomic.AtomicInteger
import akka.actor.{Terminated, ActorRef, ReceiveTimeout, Actor}
import akka.util.Timeout
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global

case object Init

class LocalActorUsingPlainSelection extends Actor {

  val path = "akka.tcp://RemoteDemoSystem@127.0.0.1:4444/user/RemoteActor"
  val atomicInteger = new AtomicInteger();
  context.setReceiveTimeout(3 seconds)

  def receive = identifying

  def identifying: Receive = {
    case Init => {
      implicit val resolveTimeout = Timeout(5 seconds)
      for (ref : ActorRef <- context.actorSelection(path).resolveOne()) {
        println("Resolved remote actor ref using Selection")
        context.watch(ref)
        context.become(active(ref))
        context.setReceiveTimeout(Duration.Undefined)
        self ! Start
      }
    }
    case ReceiveTimeout => println("timeout")
  }

  def active(actor: ActorRef): Receive = {
    case Start =>
      actor ! "Hello from the LocalActorUsingPlainSelection"
    case msg: String =>
      println(s"LocalActorUsingPlainSelection received message: '$msg'")
      if (atomicInteger.get() < 5) {
        sender ! "Hello back to you"
        atomicInteger.getAndAdd(1)
      }
    case Terminated(`actor`) =>
      println("Receiver terminated")
      context.system.terminate()
  }
}

 

Using Identity Messages

Another approach that can be taken rather than relying on straight actor selection is by using some special Akka messages, namely Identify and ActorIdentity.

The idea is that we still use actorSelection for a given path, but rather than using resolveOne we sent the send the ActorSelection a special Identify message. The actor that was chosen by the ActorSelection should see this Identify message and should respond with a ActorIdentity message.

As this point the local actor can simply listen for ActorIdentity messages and when it sees one, it can test this messages correlationId to see if it matches the requested path, if it does you know that is the correct actor and you can then use the ActorRef of the ActorIdentity message.

As in the previous example we also make use of the become (see the state machines post for more info on that) to swap out the message loop for the local actor, so work differently once we have a remote ActorRef.

Once we have an ActorRef representing the remote actor it is pretty standard stuff where we just send messages to the remote actor ref using the ActorRef that represents it.

Here is the entire code for the Identity actor approach

import java.util.concurrent.atomic.AtomicInteger
import akka.actor._
import scala.concurrent.duration._


case object Start


class LocalActorUsingIdentity extends Actor {

  val path = "akka.tcp://RemoteDemoSystem@127.0.0.1:4444/user/RemoteActor"
  val atomicInteger = new AtomicInteger();
  context.setReceiveTimeout(3 seconds)
  sendIdentifyRequest()

  def receive = identifying

  def sendIdentifyRequest(): Unit =
    context.actorSelection(path) ! Identify(path)

  def identifying: Receive = {
    case identity : ActorIdentity =>
      if(identity.correlationId.equals(path)) {
        identity.ref match {
          case Some(remoteRef) => {
            context.watch(remoteRef)
            context.become(active(remoteRef))
            context.setReceiveTimeout(Duration.Undefined)
            self ! Start
          }
          case None => println(s"Remote actor not available: $path")
        }
      }
    case ReceiveTimeout => sendIdentifyRequest()
  }

  def active(actor: ActorRef): Receive = {
    case Start =>
      actor ! "Hello from the LocalActorUsingIdentity"
    case msg: String =>
      println(s"LocalActorUsingIdentity received message: '$msg'")
      if (atomicInteger.get() < 5) {
        sender ! "Hello back to you"
        atomicInteger.getAndAdd(1)
      }
    case Terminated(`actor`) =>
      println("Receiver terminated")
      context.system.terminate()
  }
}

How do I Run The Demo

You will need to ensure that you run the following 2 projects in this order:

  • Remote
  • Local

Once you run the 2 projects you should see some output like this

The Remote project output:

[INFO] [10/03/2016 07:02:54.282] [main] [akka.remote.Remoting] Starting remoting
[INFO] [10/03/2016 07:02:54.842] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://RemoteDemoSystem@127.0.0.1:4444]
[INFO] [10/03/2016 07:02:54.844] [main] [akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://RemoteDemoSystem@127.0.0.1:4444]
RemoteActor received message ‘The RemoteActor is alive’
[INFO] [10/03/2016 07:02:54.867] [RemoteDemoSystem-akka.actor.default-dispatcher-15] [akka://RemoteDemoSystem/deadLetters]
Message [java.lang.String] from Actor[akka://RemoteDemoSystem/user/RemoteActor#-109465353] to Actor[akka://RemoteDemoSystem/deadLetters]
was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings
‘akka.log-dead-letters’ and ‘akka.log-dead-letters-during-shutdown’.
RemoteActor received message ‘Hello from the LocalActorUsingPlainSelection’
RemoteActor received message ‘Hello back to you’
RemoteActor received message ‘Hello back to you’
RemoteActor received message ‘Hello back to you’
RemoteActor received message ‘Hello back to you’
RemoteActor received message ‘Hello back to you’

 

The Local project output:

[INFO] [10/03/2016 07:03:09.489] [main] [akka.remote.Remoting] Starting remoting
[INFO] [10/03/2016 07:03:09.961] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://LocalDemoSystem@127.0.0.1:64945]
[INFO] [10/03/2016 07:03:09.963] [main] [akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://LocalDemoSystem@127.0.0.1:64945]
Resolved remote actor ref using Selection
LocalActorUsingPlainSelection received message: ‘Hello from the RemoteActor’
LocalActorUsingPlainSelection received message: ‘Hello from the RemoteActor’
LocalActorUsingPlainSelection received message: ‘Hello from the RemoteActor’
LocalActorUsingPlainSelection received message: ‘Hello from the RemoteActor’
LocalActorUsingPlainSelection received message: ‘Hello from the RemoteActor’
LocalActorUsingPlainSelection received message: ‘Hello from the RemoteActor’

 

 

 

Remote Creation

Just as we did with remote selection, remote creation shall be split into a remote project and a local project, however since this time the local project must know about the type of the more actor to create it in the first place we introduce a common project which both the remote and local depend on.

In this example there will be 3 projects configured in the SBT file

  • Remote : The remote actor, that is expected to be created by the local actor
  • Common : The common files that both Local/Remote projects depend on
  • Local : The local actor that will create and call the remote actor

Here is the complete SBT file for this section

import sbt._
import sbt.Keys._


lazy val allResolvers = Seq(
  "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/",
  "Akka Snapshot Repository" at "http://repo.akka.io/snapshots/"
)

lazy val AllLibraryDependencies =
  Seq(
    "com.typesafe.akka" %% "akka-actor" % "2.4.8",
    "com.typesafe.akka" %% "akka-remote" % "2.4.8"
  )


lazy val commonSettings = Seq(
  version := "1.0",
  scalaVersion := "2.11.8",
  resolvers := allResolvers,
  libraryDependencies := AllLibraryDependencies
)


lazy val root =(project in file(".")).
  settings(commonSettings: _*).
  settings(
    name := "Base"
  )
  .aggregate(common, remote)
  .dependsOn(common, remote)

lazy val common = (project in file("common")).
  settings(commonSettings: _*).
  settings(
    name := "common"
  )

lazy val remote = (project in file("remote")).
  settings(commonSettings: _*).
  settings(
    name := "remote"
  )
  .aggregate(common)
  .dependsOn(common)

It can be seen that both the local/remote will aggregate/depend on the common project. This is standard SBT stuff so I will not go into that.

So now that we understand a bit more about the SBT side of things lets focus on the remote side of things.

This may seem odd since we are expecting the local actor to create the remote aren’t we?

Well yes we are but the remote actor system must still be available prior to start/deploy and actor in it via the local system.

So it still makes sense to examine the remote side of things first.

Remote Project

Now that we have the relevant projects in place, lets talk about how we expose a remote actor for creation.

We must ensure that the remote system is available for creation requests, which requires the use of an IP address and a port.

Here is how we do this

akka {

  actor {
    provider = "akka.remote.RemoteActorRefProvider"
  }

  remote {
    netty.tcp {
      hostname = "127.0.0.1",
      # LISTEN on tcp port 2552
      port=2552
    }
  }

}

I also mentioned that the remote actor system MUST be started to allow remote creation to work, as such the entire codebase for the remote end of thing (excluding the actor remote actor which gets created by the local side of thing) is shown below

import com.typesafe.config.ConfigFactory
import akka.actor.ActorSystem

object CalculatorApplication {

  def main(args: Array[String]): Unit = {
    startRemoteWorkerSystem()
  }

  def startRemoteWorkerSystem(): Unit = {
    ActorSystem("CalculatorWorkerSystem", ConfigFactory.load("calculator"))
    println("Started CalculatorWorkerSystem")
  }

}

All that is happening here is that the remote actor system gets created.

Local Project

Most of the hard work is done in this project. As it is the local side of things that is responsible for creating and deploying the remote actor in the remote actor system, before it can then make use of it.

Lets start with the deployment of the remote actor from the local side.

Firstly we need this configuration to allow this happen

akka {

  actor {
    provider = "akka.remote.RemoteActorRefProvider",
    deployment {
      "/creationActor/*" {
        remote = "akka.tcp://CalculatorWorkerSystem@127.0.0.1:2552"
      }
    }
  }

  remote {
    netty.tcp {
      hostname = "127.0.0.1",
      port=2554
    }
  }

}

If you look carefully at this configuration file and the one in the remote end you will see that the ip address/poprt/actor system name used within the deployment section all match. This is how the local actor system is able to create and deploy an actor to the remote actor system (which must be running prior to the local actor system trying to deploy a remote actor to it)

So now that we have seen this config, lets see how it is used by the local

import sample.remote.calculator.{Divide, Multiply, CreationActor}
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import scala.util.Random
import akka.actor.ActorSystem
import akka.actor.Props

object CreationApplication {

  def main(args: Array[String]): Unit = {
    startRemoteCreationSystem()
  }

  def startRemoteCreationSystem(): Unit = {
    val system =
      ActorSystem("CreationSystem", ConfigFactory.load("remotecreation"))
    val actor = system.actorOf(Props[CreationActor],
      name = "creationActor")

    println("Started CreationSystem")
    import system.dispatcher
    system.scheduler.schedule(1.second, 1.second) {
      if (Random.nextInt(100) % 2 == 0)
        actor ! Multiply(Random.nextInt(20), Random.nextInt(20))
      else
        actor ! Divide(Random.nextInt(10000), (Random.nextInt(99) + 1))
    }
  }

}

It can be seen that the first thing we try and do is try and create the remote actor (CreationActor) using the config above. If this all works we will end up with a CreationActor being created in the already running remote actor system. This CreationActor can then be used just like any other actor.

For completeness here is the code of the CreationActor

package sample.remote.calculator

import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Props

class CreationActor extends Actor {

  def receive = {
    case op: MathOp =>
      val calculator = context.actorOf(Props[CalculatorActor])
      calculator ! op
    case result: MathResult => result match {
      case MultiplicationResult(n1, n2, r) =>
        printf("Mul result: %d * %d = %d\n", n1, n2, r)
        context.stop(sender())
      case DivisionResult(n1, n2, r) =>
        printf("Div result: %.0f / %d = %.2f\n", n1, n2, r)
        context.stop(sender())
    }
  }
}
It can be seen that the CreationActor above also creates another actor called CalculatorActor which does the real work. Lets see the code for that one
package sample.remote.calculator

import akka.actor.Props
import akka.actor.Actor

class CalculatorActor extends Actor {
  def receive = {
    case Add(n1, n2) =>
      println("Calculating %d + %d".format(n1, n2))
      sender() ! AddResult(n1, n2, n1 + n2)
    case Subtract(n1, n2) =>
      println("Calculating %d - %d".format(n1, n2))
      sender() ! SubtractResult(n1, n2, n1 - n2)
    case Multiply(n1, n2) =>
      println("Calculating %d * %d".format(n1, n2))
      sender() ! MultiplicationResult(n1, n2, n1 * n2)
    case Divide(n1, n2) =>
      println("Calculating %.0f / %d".format(n1, n2))
      sender() ! DivisionResult(n1, n2, n1 / n2)
  }
}

Nothing special there really, its just a standard actor

So we now have a complete pipeline.

How do I Run The Demo

You will need to ensure that you run the following 2 projects in this order:

  • Remote
  • Root

Once you run the 2 projects you should see some output like this

The Remote project output:

[INFO] [10/03/2016 07:30:58.763] [main] [akka.remote.Remoting] Starting remoting
[INFO] [10/03/2016 07:30:59.235] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://CalculatorWorkerSystem@127.0.0.1:2552]
[INFO] [10/03/2016 07:30:59.237] [main] [akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://CalculatorWorkerSystem@127.0.0.1:2552]
Started CalculatorWorkerSystem
Calculating 6 * 15
[WARN] [10/03/2016 07:31:10.988] [CalculatorWorkerSystem-akka.remote.default-remote-dispatcher-6] [akka.serialization.Serialization(akka://CalculatorWorkerSystem)] Using the default Java serializer for class [sample.remote.calculator.MultiplicationResult] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting ‘akka.actor.warn-about-java-serializer-usage’
Calculating 1346 / 82
[WARN] [10/03/2016 07:31:11.586] [CalculatorWorkerSystem-akka.remote.default-remote-dispatcher-6] [akka.serialization.Serialization(akka://CalculatorWorkerSystem)] Using the default Java serializer for class [sample.remote.calculator.DivisionResult] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting ‘akka.actor.warn-about-java-serializer-usage’
Calculating 2417 / 31
Calculating 229 / 66
Calculating 9966 / 43
Calculating 4 * 12
Calculating 9 * 5
Calculating 1505 / 91

The Root project output:

[INFO] [10/03/2016 07:31:08.849] [main] [akka.remote.Remoting] Starting remoting
[INFO] [10/03/2016 07:31:09.470] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://CreationSystem@127.0.0.1:2554]
[INFO] [10/03/2016 07:31:09.472] [main] [akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://CreationSystem@127.0.0.1:2554]
Started CreationSystem
[WARN] [10/03/2016 07:31:10.808] [CreationSystem-akka.remote.default-remote-dispatcher-5] [akka.serialization.Serialization(akka://CreationSystem)] Using the default Java serializer for class [com.typesafe.config.impl.SimpleConfig] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting ‘akka.actor.warn-about-java-serializer-usage’
[WARN] [10/03/2016 07:31:10.848] [CreationSystem-akka.remote.default-remote-dispatcher-5] [akka.serialization.Serialization(akka://CreationSystem)] Using the default Java serializer for class [sample.remote.calculator.Multiply] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting ‘akka.actor.warn-about-java-serializer-usage’
Mul result: 6 * 15 = 90
[WARN] [10/03/2016 07:31:11.559] [CreationSystem-akka.remote.default-remote-dispatcher-6] [akka.serialization.Serialization(akka://CreationSystem)] Using the default Java serializer for class [sample.remote.calculator.Divide] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting ‘akka.actor.warn-about-java-serializer-usage’
Div result: 1346 / 82 = 16.41
Div result: 2417 / 31 = 77.97
Div result: 229 / 66 = 3.47
Div result: 9966 / 43 = 231.77
Mul result: 4 * 12 = 48
Mul result: 9 * 5 = 45
Div result: 1505 / 91 = 16.54
Mul result: 7 * 4 = 28
Div result: 1797 / 95 = 18.92
Mul result: 12 * 17 = 204
Div result: 2998 / 72 = 41.64
Div result: 1157 / 98 = 11.81
Div result: 1735 / 22 = 78.86
Mul result: 4 * 19 = 76
Div result: 6257 / 51 = 122.69
Mul result: 14 * 4 = 56
Div result: 587 / 27 = 21.74
Div result: 2528 / 98 = 25.80
Mul result: 9 * 16 = 144
Mul result: 13 * 10 = 130

 

 

 

 

Nat or Docker Considerations

Akka Remoting does not work transparently with Network Address Translation, Load Balancers, or in Docker containers. If this is your case you may need to further configure Akka as described here :

http://doc.akka.io/docs/akka/snapshot/scala/remoting.html#remote-configuration-nat

 

 

 

Where Can I Find The Code Examples?

I will be augmenting this GitHub repo with the example projects as I move through this series

https://github.com/sachabarber/SachaBarber.AkkaExamples

Advertisements