Akka

Akka.NET + DI + Testing Considerations

So at work we are using Akka.NET (The .NET port of the original Scala Akka. I prefer the Scala original but the .NET port is fairly good at keeping up to date with changes to the Scala original, so its ok) on a fairly big project, and one of the among the things we need to do are

 

  • Have different individuals/teams work on different Actor/several actors at once
  • Have rich supervision hierarchies
  • Use service abstractions to allow Mocking / Testing (you know take a dependency on ISomeService rather than SomeService directly, ah now we can feed in Mocks….lovely)
  • Have testing at the fore front of our minds
  • Allow our testers to test a single actor/or a portion of the Actor graph. The important thing is that the tester should be able to choose which single actor/portion of the entire system that they are testing, without needing the rest of the graph of the actor system

 

So we have had to come up with a way to build our actor system with all this in mind

 

So what is the riddle that we are trying to solve?

Well as I stated above we want to allow tests to feed in possible mocks or test doubles into our actors, but we also want to ensure that we can allows tests to create as much or as little of the overall actor system graph as the tester/test needs. So how do we go about doing that?

 

When you think about how Akka usually works it is usually done using a construct called Props, which is what summons our Actors into life. The Akka documentation says this about Props

 

Props is a configuration class to specify options for the creation of actors, think of it as an immutable and thus freely shareable recipe for creating an actor including associated deployment information (e.g. which dispatcher to use, see more below).

 

Here are some examples

 

Props props1 = Props.Create(typeof(MyActor));
Props props2 = Props.Create(() => new MyActorWithArgs("arg"));
Props props3 = Props.Create<MyActor>();
Props props4 = Props.Create(typeof(MyActorWithArgs), "arg");

 

It is a good idea to provide static factory methods on the ReceiveActor which help keeping the creation of suitable Props as close to the actor definition as possible

 

public class DemoActor : ReceiveActor
{
    private readonly int _magicNumber;

    public DemoActor(int magicNumber)
    {
        _magicNumber = magicNumber;
        Receive<int>(x =>
        {
            Sender.Tell(x + _magicNumber);
        });
    }

    public static Props Props(int magicNumber)
    {
        return Akka.Actor.Props.Create(() => new DemoActor(magicNumber));
    }
}

system.ActorOf(DemoActor.Props(42), "demo");

 

This idea of using Props also holds true for supervision, which means that a supervisor would use its own Context, where an example might look something like this

 

public class FirstActor : ReceiveActor
{
    IActorRef child = Context.ActorOf<MyActor>("myChild");
    // plus some behavior ...
}

 

Ok so what’s the issue with all of this, seems logical enough right?

  • We have some nice factory thing called Props that we can use to create our Actor. Heck we can even pass args to this thing, so we should be able to pass through what we need from tests too right
  • If we want to create a child actor we use our own Context and just use the child Props, and bingo we now have a child actor of the type we asked for under our supervision

 

All good right?

 

WRONG

 

While this all sounds cool, imagine these couple of simple scenarios with the above setup

 

  • You want to pass in real implementations of services in the real code, and you want to use test doubles/mocks in your tests. Ok Props does allow this, but its not very elegant
  • You DON’T want to use the real child actor,  but would like to use a test double actor instead. One where you can control the message flow using a well known test actor that allows you to test just the part of the overall system you want to test. You essentially want to replace part of the graph with your own logic, to allow you to focus your testing

 

This would not be that easy with the hard coded Props, and child actor creation. Is there anything we can do about this?

 

Well yes we could introduce a IOC container and we could try and come up with some weird way of creating Props from the IOC Container. Luckily this is already available in Akka.NET, as IOC is a fairly common requirement for testing these days. You can read more about this here : https://getakka.net/articles/actors/dependency-injection.html. There is support for the following IOC containers

  • Autofac (my personal container of choice)
  • Castle Windsor
  • NInject

 

The basic idea is that we can use the IOC container to create our props for us. The basic idea is something like this

// Create your DI container of preference
var someContainer = ... ;

// Create the actor system
var system = ActorSystem.Create("MySystem");

// Create the dependency resolver for the actor system
IDependencyResolver resolver = new XyzDependencyResolver(someContainer, system);

// Create the Props using the DI extension on your ActorSystem instance
var worker1Ref = system.ActorOf(system.DI().Props<TypedWorker>(), "Worker1");
var worker2Ref = system.ActorOf(system.DI().Props<TypedWorker>(), "Worker2");

 

We can even use this IOC integration to create child actors as follows

var myActorRef = Context.ActorOf(Context.DI().Props<MyActor>(), "myChildActor");

 

Does all this get us any closer to our goals? Well actually yes it does. The only niggle we need to fix now is how do allow a test to pick a different child actor to create instead of the real codes child actor. Remember we want to allow the tests to drive how much of the actor system is tested. So that (at least for me) means that the test should be responsible for saying what child actor is being created for a give test.

 

So can we have our cake an eat it too?

So can we have everything I have stated I want? Yes we can I have written some code which for me ticks all my boxes, and it obviously builds upon the DI integration that Akka.NET already provides, I am simply adding a bit more pixie dust into the mix to allow things to be truly testable

 

So where do I grab the code?

You can grab the code/tests here : https://github.com/sachabarber/AkkaNetDITesting

 

Lets walk through the various bits a pieces of the code at that repo

 

The Real Code

This section walks through the real code (which is a very simple actor system)

 

The Actors

I suppose the best place to start is to see the simple actor hierarchy that I am using for the demo app, where there are 2 simple actors

  • MySupervisorActor : Which simply creates a MyChildActor (through some child creation service that we will see soon) and sends it a BeginChildMessage. This actor also handles the following possible messages from the child actor that it creates
    • ChildSucceededMessage
    • ChildFailedMessage
  • MyChildActor : Which responds to the Sender with a ChildSucceededMessage

 

Lets see the code for these 2 actors

 

MySupervisorActor

using System;
using Akka.Actor;
using AkkaDITest.Messages;
using AkkaDITest.Services;

namespace AkkaDITest.Actors
{
    public  class MySupervisorActor : ReceiveActor
    {
        private readonly ISomeService _someService;
        private readonly IChildActorCreator _childActorCreator;
        private IActorRef originalSender;

        public MySupervisorActor(ISomeService someService, IChildActorCreator childActorCreator)
        {
            _someService = someService;
            _childActorCreator = childActorCreator;
            Receive<StartMessage>(message =>
            {
                originalSender = Sender;
                var x = _someService.ReturnValue("war is a big business");
                Console.WriteLine($"ISomeService.ReturnValue(\"war is a big business\") gave result {x}");

				
				//IF WE WANT TO USE BACKOFF THIS IS HOW WE WOULD DO IT

                //var supervisor = BackoffSupervisor.Props(
                //    Backoff.OnFailure(
                //        _childActorCreator.GetChild(ActorNames.MyChildActorName,Context),
                //        childName: ActorNames.MyChildActorName,
                //        minBackoff: TimeSpan.FromSeconds(3),
                //        maxBackoff: TimeSpan.FromSeconds(30),
                //        randomFactor: 0.2));
                //return ctx.ActorOf(supervisor);
				
                var childActor = Context.ActorOf(_childActorCreator.GetChild(ActorNames.MyChildActorName,Context), ActorNames.MyChildActorName);
                childActor.Tell(new BeginChildMessage());

            });
            Receive<ChildSucceededMessage>(message =>
            {

                Console.WriteLine($"{message.FromWho}_ChildSucceededMessage");
                originalSender.Tell(message);
            });
            Receive<ChildFailedMessage>(message =>
            {
                Console.WriteLine($"{message.FromWho}_ChildFailedMessage");
                originalSender.Tell(message);
            });
        }
    }
}

 

See how we don’t create the child actor ourselves but rely on a service called IChildActorCreator which is responsible for giving back Props for the chosen actor. This way we can provide a test version of this service which will provide Props for alternative test double child actors that may be used to steer the code in whichever manor the tests require

 

MyChildActor

using System;
using Akka.Actor;
using AkkaDITest.Messages;
using AkkaDITest.Services;

namespace AkkaDITest.Actors
{
    public  class MyChildActor : ReceiveActor
    {
        private IFooService _fooService;


        public MyChildActor(IFooService fooService)
        {
            _fooService = fooService;

            Receive<BeginChildMessage>(message =>
            {
                var x = _fooService.ReturnValue(12);
                Console.WriteLine($"IFooService.ReturnValue(12) gave result {x}");

                Sender.Tell(new ChildSucceededMessage("MyChildActor"));
            });
        }
    }
}

 

Lets also have a quick look at that special Props creation service.

 

Real ChildActorCreator service

Here is the real code version, when we are in tests we will use a different version of this service, which will inject into the IOC container to override this original code one.  You can see below that we use the Akka.NET DI feature to resolve the Props for the new actor that we want to get Props for. This means all the dependencies that that Props needs from the IOC container will be satisfied for us using our chosen container (proving we have the relevant Nuget for that container of choice installed)

 

using System;
using System.Collections.Generic;
using System.Text;
using Akka.Actor;
using Akka.DI.Core;
using AkkaDITest.Messages;

namespace AkkaDITest.Actors
{

    public interface IChildActorCreator
    {
        Props GetChild(string actorNameKey, IUntypedActorContext context);
    }


    public class ChildActorCreator : IChildActorCreator
    {
        private Dictionary<string, Func<IUntypedActorContext, Props>> _propLookup =
            new Dictionary<string, Func<IUntypedActorContext, Props>>();

        public ChildActorCreator()
        {
            _propLookup.Add(ActorNames.MyChildActorName, (context) => context.DI().Props<MyChildActor>());
        }

        public Props GetChild(string actorNameKey, IUntypedActorContext context)
        {
            return _propLookup[actorNameKey](context);
        }

        public string Name => "ChildActorCreator";

    }
}

 

You might be asking why stop at Props and not have this service create a new IActorRef, well there is good reason for that. Akka.NET comes with powerful pre-canned backoff supervision strategies which work by using Props not IActorRef so we want to leave that avenue open to us should we wish to use it.

 

Default IOC Registration

The final pieces of the puzzle to making the real code work are the IOC container wireup and the initial MySupervisorActor instantiation. Both of these are shown below.

 

Here is the Autofac wireup (remember if you choose to use a different container your code will look different to this)

 

using System;
using Autofac;

namespace AkkaDITest.IOC
{
    public class ContainerOperations
    {
        private static readonly Lazy<ContainerOperations> _instance = new Lazy<ContainerOperations>(() => new ContainerOperations());
        private IContainer localContainer;
        private object _syncLock = new Object();

        private ContainerOperations()
        {
            CreateContainer();
        }

        public void ReInitialise()
        {
            lock (_syncLock)
            {
                localContainer = null;
            }
        }


        public static ContainerOperations Instance => _instance.Value;


        public IContainer Container
        {
            get
            {
                lock (_syncLock)
                {
                    if (localContainer == null)
                    {
                        CreateContainer();

                    }
                    return localContainer;
                }
            }
        }

        private void CreateContainer()
        {
            var builder = new ContainerBuilder();
            builder.RegisterModule(new GlobalAutofacModule());


            AddExtraModulesCallBack?.Invoke(builder); 

            localContainer = builder.Build();

        }

        public Action<ContainerBuilder> AddExtraModulesCallBack { get; set; } 
    }
}

 

And this is the main Autofac module for my initial runtime registrations

using Akka.Actor;
using AkkaDITest.Actors;
using AkkaDITest.Services;
using Autofac;
using System;

namespace AkkaDITest.IOC
{
    public class GlobalAutofacModule : Module
    {
        protected override void Load(ContainerBuilder builder)
        {
            builder.RegisterType<SomeService>()
                .As<ISomeService>()
                .SingleInstance();

            builder.RegisterType<FooService>()
                .As<IFooService>()
                .SingleInstance();

            builder.RegisterType<ChildActorCreator>()
                .As<IChildActorCreator>()
                .SingleInstance();

            builder.RegisterType<MySupervisorActor>();
            builder.RegisterType<MyChildActor>();

            var _runModelActorSystem = new Lazy<ActorSystem>(() =>
            {
                return ActorSystem.Create("MySystem");
            });

            builder.Register<ActorSystem>(cont => _runModelActorSystem.Value);


        }
    }
}

 

There are a couple of points to note here:

 

  • That the ActorSystem itself is registered in the IOC container
  • That each actor we want to resolve Props for using the IOC container is also registered in the IOC container
  • That all injectable args for the actor Props are also registered in the IOC container
  • That within the ContainerOperations class we allow the container to be ReInitialised (we found this is very useful in testing)
  • That we allow tests to provide more modules which will serve as overrides for any already registered components. Again this is a great aid in testing as we shall see soon

 

So with all that in place, we should now be able to turn our attention to the testing aspect of our simple demo app.

 

 

The Test Code

This section walks through the test code (which is simply testing the demo app codebase)

 

Just A Reminder

So lets just remind ourselves of what we are trying to achieve in our test code

  • We would like to be able to test any actor by itself should we wish to (think unit tests)
  • We would also like to be able to provide mock services should we wish to (think unit tests)
  • We should also be able to test the entire actor graph using the real services should we wish to (think integration tests)

 

Does what I have covered gives us all of this?

YES I think so, lets see how

 

So lets say we want to test the demo apps MySupervisorActor which in the real code creates a child actor “MyChildActor” which we are expecting to do the real work, and respond with it own messages. Now lets say I want to use my own version of that “MyChildActor” one that I can use in my tests, to drive the behavior of the MySupervisorActor, this is now relatively easy using a test version of the IChildCreator service that instead of creating Props for the REAL MyChildActor will create Props for a test double of my choosing.

 

I have chosen to do this using a mocking framework (I am using Moq here)

Mock<IChildActorCreator> mockChildActorCreator = new Mock<IChildActorCreator>();
mockChildActorCreator.Setup(x => x.GetChild(ActorNames.MyChildActorName, It.IsAny<IUntypedActorContext>()))
    .Returns((string childName, IUntypedActorContext context) => context.DI().Props<TestChildActor>());

 

This allows me to create any child Props for the actor under test of my choosing using the test code. In this example we will be using this test double child actor instead of the real codes MyChildActor

 

using System;
using Akka.Actor;
using AkkaDITest.Messages;
using AkkaDITest.Services;

namespace AkkaDITest.Tests.Actors
{
    public  class TestChildActor : ReceiveActor
    {
        private IFooService _fooService;


        public TestChildActor(IFooService fooService)
        {
            _fooService = fooService;

            Receive<BeginChildMessage>(message =>
            {
                var x = _fooService.ReturnValue(12);
                Console.WriteLine($"IFooService.ReturnValue(12) gave result {x}");

                Sender.Tell(new ChildSucceededMessage("TestChildActor"));
            });
        }
    }
}

 

This test double actor will respond in the way I see fit to drive my tests. Obviously this example is very simplistic and could be expanded upon, and I could have multiple test double actors, or even have a test double actor which allowed me to set the return message type I wanted for my tests.

 

So now that we have the ability to use test double Props and have a way of plumbing that into the overall actor graph that we want to test, lets see an actual test

 

For me a simple test for the MySupervisorActor using this test double TestChildActor looks like this

 

using System;
using Akka.Actor;
using Akka.DI.AutoFac;
using Akka.DI.Core;
using Akka.TestKit.NUnit3;
using AkkaDITest.Actors;
using AkkaDITest.IOC;
using AkkaDITest.Messages;
using AkkaDITest.Services;
using AkkaDITest.Tests.Actors;
using Autofac;
using Moq;
using NUnit.Framework;
using NUnit.Framework.Internal;

namespace AkkaDITest.Tests
{
    [TestFixture]
    public class MySupervisorActorTests : TestKit
    {
        [SetUp]
        public void SetUp()
        {
            ContainerOperations.Instance.ReInitialise();
        }

        [Test]
        public void Correct_Message_Received_When_Using_TestChildActor_Test()
        {
            Mock<IChildActorCreator> mockChildActorCreator = new Mock<IChildActorCreator>();
            mockChildActorCreator.Setup(x => x.GetChild(ActorNames.MyChildActorName, It.IsAny<IUntypedActorContext>()))
                .Returns((string childName, IUntypedActorContext context) => context.DI().Props<TestChildActor>());

            //Setup stuff for this testcase
            Mock<ISomeService> mockSomeService = new Mock<ISomeService>();
            mockSomeService.Setup(x => x.ReturnValue(It.IsAny<string>())).Returns("In a test mock");
            ContainerOperations.Instance.AddExtraModulesCallBack = builder =>
            {
                builder.Register(x=> mockSomeService.Object)
                    .As<ISomeService>()
                    .SingleInstance();

                builder.Register(x => mockChildActorCreator.Object)
                  .As<IChildActorCreator>()
                  .SingleInstance();

                builder.RegisterType<TestChildActor>();

            };

            var system = ContainerOperations.Instance.Container.Resolve<ActorSystem>();
            IDependencyResolver resolver = new AutoFacDependencyResolver(ContainerOperations.Instance.Container, system);
            var mySupervisorActor = system.ActorOf(system.DI().Props<MySupervisorActor>(), "MySupervisorActor");
            mySupervisorActor.Tell(new StartMessage(), TestActor);


            // Assert
            AwaitCondition(() => HasMessages, TimeSpan.FromSeconds(10));
            var message = ExpectMsg<ChildSucceededMessage>();
            Assert.AreEqual("TestChildActor", message.FromWho);
        }
    }
}

 

There are a couple of points to note here

  • We are using the Akka.NET TestKit base class which is very useful for testing actors
  • We providing a container level module overrides where we override these initial services with our test versions of them
    • ISomeService
    • IChildCreator
  • We make use of some of the Akka.NET TestKit goodies such as
    • TestActor (An IActorRef that we can use as a Sender)
    • AwaitCondition(…)
    • ExpectMsg(…)

 

Conclusion

Anyway there you go, I hope this small post shows you that it is still possible to have rich actor hierarchies and to still be able to break it down and test it in chunks and force the code in the desired paths using test doubles/mocks/fakes

 

Enjoy

Advertisements
Akka, MadCapIdea

MADCAP IDEA 10 : PLAY FRAMEWORK REACTIVE KAFKA PRODUCER

Last Time

So last time we walk through the Rating Kafka streams architecture and also showed how we can query the local state stores. I also stated that the standard KafkaProducer that was used in the last post was more for demonstration purposes and long term, we would like to swap that out with a Play framework REST endpoint that allowed us to publish a message straight from our app to the Kafka rating stream processing

 

PreAmble

Just as a reminder this is part of my ongoing set of posts which I talk about here :

https://sachabarbs.wordpress.com/2017/05/01/madcap-idea/, where we will be building up to a point where we have a full app using lots of different stuff, such as these

 

  • WebPack
  • React.js
  • React Router
  • TypeScript
  • Babel.js
  • Akka
  • Scala
  • Play (Scala Http Stack)
  • MySql
  • SBT
  • Kafka
  • Kafka Streams

Ok so now that we have the introductions out of the way, lets crack on with what we want to cover in this post.

 

Where is the code?

As usual the code is on GitHub here : https://github.com/sachabarber/MadCapIdea

 

What Is This Post All About?

As stated in the last post “Kafka interactive queries” we used a standard KafkaProducer to simulate what would have come from the end user via the Play Framework API. This time we will build out the Play Framework side of things, to include the ability to produce “rating” objects that are consumed via the rating Kafka Stream processing topology introduced in the last post

 

SBT

So we already had an SBT file inside of the PlayBackEndApi project, but we need to expand that to include support for a couple of things

  • Reactive Kafka
  • Jackson JSON (play already comes with its own JSON support, but for the Kafka Serialization-DeSerialization (Serdes) I wanted to make sure it was the same as the Kafka Streams project

This means these additions to the built.sbt file

 

val configVersion = "1.0.1"

libraryDependencies ++= Seq(
  "org.reactivemongo" %% "play2-reactivemongo" % "0.11.12",
  "com.typesafe.akka" % "akka-stream-kafka_2.11" % "0.17",
  "org.skinny-framework.com.fasterxml.jackson.module" % "jackson-module-scala_2.11" % "2.8.4",
  "com.typesafe"        % "config" % configVersion
)

 

Topics

We also want to ensure that we are using the same topics as the stream processing topology so I have just replicated that class (in reality I should have made this stuff a common JAR, but meh)

 

package kafka.topics

object RatingsTopics {
    val RATING_SUBMIT_TOPIC = "rating-submit-topic"
    val RATING_OUTPUT_TOPIC = "rating-output-topic"
  }

 

Routing

As this is essentially a new route that would be called via the front end React app when a new Rating is given, we obviously need a new route/controller. The route is fairly simple which is as follows:

 

# Rating page
POST  /rating/submit/new                       controllers.RatingController.submitNewRating()

 

JSON

The new Rating route expects a Rating object to be provided as a POST in JSON. Here is the actual Rating object and play JSON handling for it

 

package Entities

import play.api.libs.json._
import play.api.libs.functional.syntax._

case class Rating(fromEmail: String, toEmail: String, score: Float)

object Rating {
  implicit val formatter = Json.format[Rating]
}

object RatingJsonFormatters {

  implicit val ratingWrites = new Writes[Rating] {
    def writes(rating: Rating) = Json.obj(
      "fromEmail" -> rating.fromEmail,
      "toEmail" -> rating.toEmail,
      "score" -> rating.score
    )
  }

  implicit val ratingReads: Reads[Rating] = (
      (JsPath \ "fromEmail").read[String] and
      (JsPath \ "toEmail").read[String] and
      ((JsPath \ "score").read[Float])
    )(Rating.apply _)
}

 

Controller

So now that we have a route lets turn our attention to the new RatingController. Which right now to just accept a new Rating just looks like this:

package controllers

import javax.inject.Inject

import Actors.Rating.RatingProducerActor
import Entities.RatingJsonFormatters._
import Entities._
import akka.actor.{ActorSystem, OneForOneStrategy, Props, SupervisorStrategy}
import akka.pattern.{Backoff, BackoffSupervisor}
import akka.stream.{ActorMaterializer, ActorMaterializerSettings, Supervision}
import play.api.libs.json._
import play.api.mvc.{Action, Controller}
import utils.Errors

import scala.concurrent.{ExecutionContext, Future}
import scala.util.Random
import scala.concurrent.duration._

class RatingController @Inject()
(
  implicit actorSystem: ActorSystem,
  ec: ExecutionContext
) extends Controller
{

  //Error handling for streams
  //http://doc.akka.io/docs/akka/2.5.2/scala/stream/stream-error.html
  val decider: Supervision.Decider = {
    case _                      => Supervision.Restart
  }

  implicit val mat = ActorMaterializer(
    ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider))
  val childRatingActorProps = Props(classOf[RatingProducerActor],mat,ec)
  val rand = new Random()
  val ratingSupervisorProps = BackoffSupervisor.props(
    Backoff.onStop(
      childRatingActorProps,
      childName = s"RatingProducerActor_${rand.nextInt()}",
      minBackoff = 3.seconds,
      maxBackoff = 30.seconds,
      randomFactor = 0.2
    ).withSupervisorStrategy(
      OneForOneStrategy() {
        case _ => SupervisorStrategy.Restart
      })
    )

  val ratingSupervisorActorRef = 
    actorSystem.actorOf(
      ratingSupervisorProps, 
      name = "ratingSupervisor"
    )

  def submitNewRating = Action.async(parse.json) { request =>
    Json.fromJson[Rating](request.body) match {
      case JsSuccess(newRating, _) => {
        ratingSupervisorActorRef ! newRating
        Future.successful(Ok(Json.toJson(
          newRating.copy(toEmail = newRating.toEmail.toUpperCase))))
      }
      case JsError(errors) =>
        Future.successful(BadRequest(
          "Could not build a Rating from the json provided. " +
          Errors.show(errors)))
    }
  }
}

 

The main points from the code above are

  • We use the standard Play framework JSON handling for the unmarshalling/marshalling to JSON
  • That controller route is async (see how it returns a Future[T]
  • That we will not process anything if the JSON is invalid
  • That the RatingController creates a supervisor actor that will supervise the creation of another actor namely a RatingProducerActor. It may look like this happens each time the RatingController is instantiated, which is true. However this only happens once as there is only one router in play, and the controller are created by the router. You can read more about this here : https://github.com/playframework/playframework/issues/4508#issuecomment-127820190. The short story is that the supervisor is created once, and the actor is supervises will be created using a BackOffSupervisor where the creation of the actor will be retried using an incremental back off strategy. We also use the OneForOneStrategy to ensure only the single failed child actor is effected by the supervisor.
  • That this controller is also responsible for creating a ActorMaterializer with a supervision strategy (more on this in the next section). The ActorMaterializer  is used to create actors within Akka Streams workflows.

 

 

RatingProducerActor

The final part of the pipeline for this post is obviously to be able to write a Rating to a Kafka topic, via a Kafka producer. As already stated I chose to use reactive a Reactive Kafka (akka streams Kafka producer which build upon Akka streams ideas, where we have Sinks/Sources/Flow/RunnableGraph all the good stuff. So here is the full code for the actor:

package Actors.Rating

import Entities.Rating
import Serialization.JSONSerde
import akka.Done
import akka.actor.{Actor, PoisonPill}
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.{ActorMaterializer, KillSwitches}
import akka.stream.scaladsl.{Keep, MergeHub, Source}
import kafka.topics.RatingsTopics
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}
import utils.Settings

import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success}


class RatingProducerActor(
    implicit materializer: ActorMaterializer,
    ec: ExecutionContext
) extends Actor {

  val jSONSerde = new JSONSerde[Rating]
  val ratingProducerSettings = ProducerSettings(
      context.system,
      new StringSerializer,
      new ByteArraySerializer)
    .withBootstrapServers(Settings.bootStrapServers)
    .withProperty(Settings.ACKS_CONFIG, "all")

  val ((mergeHubSink, killswitch), kafkaSourceFuture) =
    MergeHub.source[Rating](perProducerBufferSize = 16)
      .map(rating => {
        val ratingBytes = jSONSerde.serializer().serialize("", rating)
        (rating, ratingBytes)
      })
      .map { ratingWithBytes =>
        val (rating, ratingBytes) = ratingWithBytes
        new ProducerRecord[String, Array[Byte]](
          RatingsTopics.RATING_SUBMIT_TOPIC, rating.toEmail, ratingBytes)
      }
      .viaMat(KillSwitches.single)(Keep.both)
      .toMat(Producer.plainSink(ratingProducerSettings))(Keep.both)
      .run()

  kafkaSourceFuture.onComplete {
    case Success(value) => println(s"Got the callback, value = $value")
    case Failure(e) => self ! PoisonPill
  }

  override def postStop(): Unit = {
    println(s"RatingProducerActor seen 'Done'")
    killswitch.shutdown()    
    super.postStop()
  }

  override def receive: Receive = {
    case (rating: Rating) => {
      println(s"RatingProducerActor seen ${rating}")
      Source.single(rating).runWith(mergeHubSink)
    }
    case Done => {
      println(s"RatingProducerActor seen 'Done'")
      killswitch.shutdown()
      self ! PoisonPill
    }
  }
}

 

I’ll be honest there is a fair bit going on in that small chunk of code above so lets dissect it. What it happening exactly?

  • The most important point is that we simply use the actor as a vessel to host a reactive kafka akka stream RunnableGraph representing a Graph of MergeHub – > Reactive Kafka producer sink. This is completely fine and a normal thing to do. Discussing akka streams is out of scope for this post but if you want to know more you can read more on a previous post I did here : https://sachabarbs.wordpress.com/2016/12/13/akka-streams/ 
  • So we now know this actor hosts a stream, but the stream could fail, or the actor could fail. So what we want is if the actor fails the stream is stopped, and if the stream fails the actor is stopped. To do that we need to do a couple of thing
    • STREAM FAILING : Since the RunnableGraph can return a Future[T] we can hook a callback Success/Failure on that, and send a PoisonPill to the hosting actor. Then the supervisor actor we saw above would kick in and try and create a new instance of this actor. Another thing to note is that the stream hosted in this actor uses the ActorMaterializer that was supplied by the RatingController, where we provided a restart supervision strategy for the stream.
    • ACTOR FAILING : If the actor itself fails the Akka framework will call the postStop() method, at which point we want to shutdown the stream within this actor. So how can we shutdown the hosted stream? Well see in the middle of the stream setup there is this line .viaMat(KillSwitches.single)(Keep.both) this allows us to get a killswitch from the materialized values for the stream. Once we have a KillSwitch we can simply call its shutDown() method.
    • BELTS AND BRACES : I have also provided a way for the outside world to shutdown this actor and its hosted stream. This is via sending this actor a Done message. I have not put this in yet, but the hook is there to demonstrate how you could do this.
  • We can see that there is a MergeHub source which allows external code to push stuff through the MergeHub via the materialized Sink value from within the actor
  • We can also see that the Rating object that the actor sees is indeed pushed into the MergeHub materialized Sink via this actor, and then some transformation is done on it, to grab its raw bytes
  • We can see the final stage in the RunnableGraph is the  Reactive Kafka Producer.plainSink. Which would result in a message being pushed out to a Kafka topic from the hosted stream, pushed Rating object from this actor into the stream

And I think that is the main set of points about how this actor works

 

 

The End Result

Just to prove that this is all working here is a screen shot of the new RatingController http://localhost:9000/rating/submit/new endpoint being called with a JSON payload representing the Rating

 

image

 

And here is the Akka Http endpoint that queries the Kafka Stream state store(s)

 

http://localhost:8080/ratingByEmail?email=sacha@here.com this gives an empty list as we have NOT sent any Rating through for email “sacha@here.com” yet

 

image

 

http://localhost:8080/ratingByEmail?email=henry@there.com this gives 1 result which is consistent with the amount of Rating(s) I created

 

image

 

http://localhost:8080/ratingByEmail?email=henry@there.com this gives 3 result which is consistent with the amount of Rating(s) I created

 

image

 

So that’s cool this means that we have successfully integrated publishing of a JSON payload Rating object through Kafka to the the Kafka streams Rating processor…….Happy days

 

 

Conclusion

Straight after the last article I decided to Dockerize everything (a decision I have now reversed, due to the flay nature of Dockers dependsOn and it not truly waiting for the item depended on even when using “condition : server_healthy” and “healthcheck – test” etc et), and some code must have become corrupt, as stuff from the last post stopped working.

 

An example from the Docker-Compose docs being

 

version: '2.1'
services:
  web:
    build: .
    depends_on:
      db:
        condition: service_healthy
      redis:
        condition: service_started
  redis:
    image: redis
  db:
    image: redis
    healthcheck:
      test: "exit 0"

 

I love Docker but for highly complex setups I think you are better off using a Docker-Compose file but just not trying to bring it all up in one go. I may bring the Docker bits back into the fold for anyone that is reading this that wants to play around, but I will have to think about that closely.

Once I realized that my Docker campaign was doing more harm than good, and I reverted back to my extremely hand coded, but deterministic PowerShell startup script, I found that getting the Play Framework and a Reactive Kafka (akka streams Kafka producer up and running was quite simple, and it kind of worked like a charm first time. Yay

 

 

Next Time

Next time we should be able to making the entire rating view page work as we now have the following things

So we should quite easily be able to turn that data into a simple bootstrap table in the React portion of this application

Akka, MadCapIdea, Scala

MADCAP IDEA 9 : KAFKA STREAMS INTERACTIVE QUERIES

Last Time

 

So last time we came up with a sort of 1/2 way house type post that would pave the way for this one, where we examined several different types of REST frameworks for use with Scala. The requirements were that we would be able to use JSON and that there would be both a client and server side API for the chosen library. For http4s and Akka Http worked. I decided to go with Akka Http due to being more familiar with it.

So that examination of REST APIs has allowed this post to happen. In this post what we will be doing is looking at

  • How to install Kafka/Zookeeper and get them running on Windows
  • Walking through a KafkaProducer
  • Walking through a Kafka Streams processing node, and the duality of streams
  • Walking through Kafka Streams interactive queries

 

PreAmble

Just as a reminder this is part of my ongoing set of posts which I talk about here :

https://sachabarbs.wordpress.com/2017/05/01/madcap-idea/, where we will be building up to a point where we have a full app using lots of different stuff, such as these

 

  • WebPack
  • React.js
  • React Router
  • TypeScript
  • Babel.js
  • Akka
  • Scala
  • Play (Scala Http Stack)
  • MySql
  • SBT
  • Kafka
  • Kafka Streams

 

Ok so now that we have the introductions out of the way, lets crack on with what we want to cover in this post.

 

Where is the code?

As usual the code is on GitHub here : https://github.com/sachabarber/MadCapIdea

 

Before we start, what is this post all about?

 

Well I don’t know if you recall but we are attempting to create a uber simple uber type application where there are drivers/clients. A client can put out a new job, drivers bid for it. And at the end of a job they can rate each other, see the job completion/rating/view rating sections of this previous post : https://sachabarbs.wordpress.com/2017/06/27/madcap-idea-part-6-static-screen-design/

 

The ratings will be placed into streams and aggregated into permanent storage, and will be available for querying later to display in the react front end. The rating should be grouped by email, and also searchable via the use of an email.

 

So that is what we are attempting to cover in this post. However since this is the 1st time we have had to use Kafka, this post will also talk through what you have to go through to get Kafka setup on windows. In a subsequent post I will attempt to get EVERYTHING up and working (including the Play front end) in Docker containers, but for now we will assume a local install of Kafka, if nothing else its good to know how to set this up

 

How to install Kafka/Zookeeper and get them running on Windows

This section will talk you through how to get Kafka and get it working on Windows

 

Step 1 : Download Kafka

Grab Confluence Platform 3.3.0 Open Source : http://packages.confluent.io/archive/3.3/confluent-oss-3.3.0-2.11.zip

 

Step 2 : Update Dodgy BAT Files

The official Kafka windows BAT files don’t seem to work in the Confluence Platform 3.3.0 Open Source download. So replace the official [YOUR INSTALL PATH]\confluent-3.3.0\bin\windows BAT files with the ones found here : https://github.com/renukaradhya/confluentplatform/tree/master/bin/windows

 

Step 3 : Make Some Minor Changes To Log Locations etc etc

Kafka/Zookeeper as installed are setup for Linux, as such these paths won’t work on Windows. So we need to adjust that a bit. So lets do that now

 

  • Modify the [YOUR INSTALL PATH]\confluent-3.3.0\etc\kafka\zookeeper.properties file to change the dataDir to something like dataDir=c:/temp/zookeeper
  • Modify the [YOUR INSTALL PATH]\confluent-3.3.0\etc\kafka\server.properties file to uncomment the line delete.topic.enable=true

 

Step 4 : Running Zookeeper + Kafka + Creating Topics

Now that we have installed everything, it’s just a matter of running stuff up. Sadly before we can run Kafka we need to run Zookeeper, and before Kafka can send messages we need to ensure that the Kafka topics are created. Topics must exist before messages

 

Mmm that sounds like a fair bit of work. Well it is, so I decided to script this into a little PowerShell script. This script is available within the source code at https://github.com/sachabarber/MadCapIdea/tree/master/PowerShellProject/PowerShellProject where the script is called RunPipeline.ps1

 

So all we need to do is change to the directory that the RunPipeline.ps1 is in, and run it.

Obviously it is setup to my installation folders, you WILL have to change the variables at the top of the script if you want to run this on your own machine

 

Here is the contents of the RunPipeline.ps1 file

 

$global:mongoDbInstallationFolder = "C:\Program Files\MongoDB\Server\3.5\bin\"
$global:kafkaWindowsBatFolder = "C:\Apache\confluent-3.3.0\bin\windows\"
$global:kafkaTopics = 
	"rating-submit-topic",
	"rating-output-topic"
$global:ProcessesToKill = @()



function RunPipeLine() 
{
	WriteHeader "STOPPING PREVIOUS SERVICES"
	StopZookeeper
	StopKafka

	Start-Sleep -s 20
	
	WriteHeader "STARTING NEW SERVICE INSTANCES"
	StartZookeeper
	StartKafka
	
	Start-Sleep -s 20

	CreateKafkaTopics
    RunMongo

	WaitForKeyPress

	WriteHeader "KILLING PROCESSES CREATED BY SCRIPT"
	KillProcesses
}

function WriteHeader($text) 
{
	Write-Host "========================================`r`n"
	Write-Host "$text`r`n"
	Write-Host "========================================`r`n"
}


function StopZookeeper() {
	$zookeeperCommandLine = $global:kafkaWindowsBatFolder + "zookeeper-server-stop.bat"
	Write-Host "> Zookeeper Command Line : $zookeeperCommandLine`r`n"
    $global:ProcessesToKill += start-process $zookeeperCommandLine -WindowStyle Normal -PassThru
}

function StopKafka() {
	$kafkaServerCommandLine = $global:kafkaWindowsBatFolder + "kafka-server-stop.bat" 
	Write-Host "> Kafka Server Command Line : $kafkaServerCommandLine`r`n"
    $global:ProcessesToKill += start-process $kafkaServerCommandLine  -WindowStyle Normal -PassThru
}

function StartZookeeper() {
	$zookeeperCommandLine = $global:kafkaWindowsBatFolder + "zookeeper-server-start.bat"
	$arguments = $global:kafkaWindowsBatFolder + "..\..\etc\kafka\zookeeper.properties"
	Write-Host "> Zookeeper Command Line : $zookeeperCommandLine args: $arguments `r`n"
    $global:ProcessesToKill += start-process $zookeeperCommandLine $arguments -WindowStyle Normal -PassThru
}

function StartKafka() {
	$kafkaServerCommandLine = $global:kafkaWindowsBatFolder + "kafka-server-start.bat" 
	$arguments = $global:kafkaWindowsBatFolder + "..\..\etc\kafka\server.properties"
	Write-Host "> Kafka Server Command Line : $kafkaServerCommandLine args: $arguments `r`n"
    $global:ProcessesToKill += start-process $kafkaServerCommandLine $arguments -WindowStyle Normal -PassThru
}

function CreateKafkaTopics() 
{
	Foreach ($topic in $global:kafkaTopics )
	{
		$kafkaCommandLine = $global:kafkaWindowsBatFolder + "kafka-topics.bat"
		$arguments = "--zookeeper localhost:2181 --create  --replication-factor 1 --partitions 1 --topic $topic"
		Write-Host "> Create Kafka Topic Command Line : $kafkaCommandLine args: $arguments `r`n"
		$global:ProcessesToKill += start-process $kafkaCommandLine $arguments -WindowStyle Normal -PassThru
	}
}

function RunMongo() {
	$mongoexe = $global:mongoDbInstallationFolder + "mongod.exe"
	Write-Host "> Mongo Command Line : $mongoexe `r`n" 
	$global:ProcessesToKill += Start-Process -FilePath $mongoexe  -WindowStyle Normal -PassThru
}

function WaitForKeyPress
{
	Write-Host -NoNewLine "Press any key to continue....`r`n"
	[Console]::ReadKey()
}


function KillProcesses() 
{
	Foreach ($processToKill in $global:ProcessesToKill )
	{
		$name = $processToKill | Get-ChildItem -Name
		Write-Host "Killing Process : $name `r`n" 
		$processToKill | Stop-Process -Force
	}
}


# Kick of the entire pipeline
RunPipeLine

 

 

As you can see this script does a bit more than just run up Zookeeper and Kafka, it also create the topics and runs Mongo DB that is also required by the main Play application (remember we are using Reactive Mongo for the login/registration side of things)

 

So far I have not had many issues with the script. Though occasionally when you are trying out new code, I do tend to clear out all the Zookeeper/Kafka state so far, which for me is stored here

 

image

 

It just allows me to start with a clean slate as it were, you should need to do this that often

 

Walking through a KafkaProducer

So the Kafka Producer I present here will send a String key and a JSON Ranking object as a payload. Lets have a quick look at the Ranking object and how it gets turned to and from JSON before we look at the producer code

 

The Domain Entities

Here is what the domain entities looks like, it can be seen that these use the Spray formatters (part of Akka Http) Marshaller/UnMarshaller JSON support. This is not required by the producer but is required by the REST API, which we will look at later. The producer and Kafka streams code work with a different serialization abstraction, something known as SERDES, which as far as I know is only used as a term in Kafka. Its quite simple it stands for Serializer-Deserializer (Serdes)

 

Anyway here are the domain entities and Spray formatters that allow Akka (but not Kafka Streams, more on this later) to work

 

package Entities

import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import spray.json.DefaultJsonProtocol._

case class Ranking(fromEmail: String, toEmail: String, score: Float)
case class HostStoreInfo(host: String, port: Int, storeNames: List[String])

object AkkaHttpEntitiesJsonFormats {
  implicit val RankingFormat = jsonFormat3(Ranking)
  implicit val HostStoreInfoFormat = jsonFormat3(HostStoreInfo)
}

 

Serdes

So as we just described Kafka Streams actually cares not for the standard Akka Http/Spray JSON formatters, its not part of Kafka Streams after all. However Kafka Streams still has some of the same concerns where it needs to serialize and de-serialize data (like when it re-partitions (i.e. shuffles data)), so how does it realize that. Well it uses a weirdly name thing called a “serde”. There are MANY inbuilt “serde” types, but you can of course create your own to represent your “on the wire format”. I am using JSON, so this is what my generic “Serde” implementation looks like. I should point out that I owe a great many things to a chap called Jendrik whom I made contact with who has been doing some great stuff with Kafka, his blog has really helped me out : https://www.madewithtea.com/category/kafka-streams.html

 

Anyway here is my/his/yours/ours “Serde” code for JSON

 

import java.lang.reflect.{ParameterizedType, Type}
import java.util
import com.fasterxml.jackson.annotation.JsonInclude
import com.fasterxml.jackson.core.JsonParseException
import com.fasterxml.jackson.core.`type`.TypeReference
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.exc.{UnrecognizedPropertyException => UPE}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.kafka.common.serialization.{Deserializer, Serde, Serializer}


package Serialization {

  object Json {

    type ParseException = JsonParseException
    type UnrecognizedPropertyException = UPE

    private val mapper = new ObjectMapper()
    mapper.registerModule(DefaultScalaModule)
    mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL)

    private def typeReference[T: Manifest] = new TypeReference[T] {
      override def getType = typeFromManifest(manifest[T])
    }

    private def typeFromManifest(m: Manifest[_]): Type = {
      if (m.typeArguments.isEmpty) {
        m.runtimeClass
      }
      else new ParameterizedType {
        def getRawType = m.runtimeClass

        def getActualTypeArguments = m.typeArguments.map(typeFromManifest).toArray

        def getOwnerType = null
      }
    }

    object ByteArray {
      def encode(value: Any): Array[Byte] = mapper.writeValueAsBytes(value)

      def decode[T: Manifest](value: Array[Byte]): T =
        mapper.readValue(value, typeReference[T])
    }

  }

  /**
    * JSON serializer for JSON serde
    *
    * @tparam T
    */
  class JSONSerializer[T] extends Serializer[T] {
    override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = ()

    override def serialize(topic: String, data: T): Array[Byte] =
      Json.ByteArray.encode(data)

    override def close(): Unit = ()
  }

  /**
    * JSON deserializer for JSON serde
    *
    * @tparam T
    */
  class JSONDeserializer[T >: Null <: Any : Manifest] extends Deserializer[T] {
    override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = ()

    override def close(): Unit = ()

    override def deserialize(topic: String, data: Array[Byte]): T = {
      if (data == null) {
        return null
      } else {
        Json.ByteArray.decode[T](data)
      }
    }
  }

  /**
    * JSON serde for local state serialization
    *
    * @tparam T
    */
  class JSONSerde[T >: Null <: Any : Manifest] extends Serde[T] {
    override def deserializer(): Deserializer[T] = new JSONDeserializer[T]

    override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = ()

    override def close(): Unit = ()

    override def serializer(): Serializer[T] = new JSONSerializer[T]
  }

}

 

And finally here is what the producer code looks like

 

As you can see it is actually fairly straight forward, its simple produces 10 messages (though you can uncomment the line in the code to make it endless) of String as a key and a Ranking as the message. The key would be who the ranking is destined for. These will then be aggregated by the streams processing node, and stored as a list against a Key (ie email)

 


package Processing.Ratings {

  import java.util.concurrent.TimeUnit

  import Entities.Ranking
  import Serialization.JSONSerde
  import Topics.RatingsTopics

  import scala.util.Random
  import org.apache.kafka.clients.producer.ProducerRecord
  import org.apache.kafka.clients.producer.KafkaProducer
  import org.apache.kafka.common.serialization.Serdes
  import Utils.Settings
  import org.apache.kafka.clients.producer.ProducerConfig

  object RatingsProducerApp extends App {

   run()

    private def run(): Unit = {

      val jSONSerde = new JSONSerde[Ranking]
      val random = new Random
      val producerProps = Settings.createBasicProducerProperties
      val rankingList = List(
        Ranking("jarden@here.com","sacha@here.com", 1.5f),
        Ranking("miro@here.com","mary@here.com", 1.5f),
        Ranking("anne@here.com","margeret@here.com", 3.5f),
        Ranking("frank@here.com","bert@here.com", 2.5f),
        Ranking("morgan@here.com","ruth@here.com", 1.5f))

      producerProps.put(ProducerConfig.ACKS_CONFIG, "all")

      System.out.println("Connecting to Kafka cluster via bootstrap servers " +
        s"${producerProps.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)}")

      // send a random string from List event every 100 milliseconds
      val rankingProducer = new KafkaProducer[String, Array[Byte]](
        producerProps, Serdes.String.serializer, Serdes.ByteArray.serializer)

      //while (true) {
      for (i <- 0 to 10) {
        val ranking = rankingList(random.nextInt(rankingList.size))
        val rankingBytes = jSONSerde.serializer().serialize("", ranking)
        System.out.println(s"Writing ranking ${ranking} to input topic ${RatingsTopics.RATING_SUBMIT_TOPIC}")
        rankingProducer.send(new ProducerRecord[String, Array[Byte]](
          RatingsTopics.RATING_SUBMIT_TOPIC, ranking.toEmail, rankingBytes))
        Thread.sleep(100)
      }

      Runtime.getRuntime.addShutdownHook(new Thread(() => {
        rankingProducer.close(10, TimeUnit.SECONDS)
      }))
    }
  }
}

 

 

 

Walking through a Kafka Streams processing node, and the duality of streams

Before we get started I just wanted to include a severak excerpts taken from the official Kafka docs : http://docs.confluent.io/current/streams/concepts.html#duality-of-streams-and-tables which talks about KStream and KTable objects (which are the stream and table objects inside Kafka streams)

 

When implementing stream processing use cases in practice, you typically need both streams and also databases. An example use case that is very common in practice is an e-commerce application that enriches an incoming stream of customer transactions with the latest customer information from a database table. In other words, streams are everywhere, but databases are everywhere, too.

Any stream processing technology must therefore provide first-class support for streams and tables. Kafka’s Streams API provides such functionality through its core abstractions for streams and tables, which we will talk about in a minute. Now, an interesting observation is that there is actually a close relationship between streams and tables, the so-called stream-table duality. And Kafka exploits this duality in many ways: for example, to make your applications elastic, to support fault-tolerant stateful processing, or to run interactive queries against your application’s latest processing results. And, beyond its internal usage, the Kafka Streams API also allows developers to exploit this duality in their own applications.

 

A simple form of a table is a collection of key-value pairs, also called a map or associative array. Such a table may look as follows:

 

    ../_images/streams-table-duality-01.jpg

     

    The stream-table duality describes the close relationship between streams and tables.

     

    • Stream as Table: A stream can be considered a changelog of a table, where each data record in the stream captures a state change of the table. A stream is thus a table in disguise, and it can be easily turned into a “real” table by replaying the changelog from beginning to end to reconstruct the table. Similarly, aggregating data records in a stream will return a table. For example, we could compute the total number of pageviews by user from an input stream of pageview events, and the result would be a table, with the table key being the user and the value being the corresponding pageview count.
    • Table as Stream: A table can be considered a snapshot, at a point in time, of the latest value for each key in a stream (a stream’s data records are key-value pairs). A table is thus a stream in disguise, and it can be easily turned into a “real” stream by iterating over each key-value entry in the table.

     

    Let’s illustrate this with an example. Imagine a table that tracks the total number of pageviews by user (first column of diagram below). Over time, whenever a new pageview event is processed, the state of the table is updated accordingly. Here, the state changes between different points in time – and different revisions of the table – can be represented as a changelog stream (second column).

     

    image

     

    Because of the stream-table duality, the same stream can be used to reconstruct the original table (third column):

     

    ../_images/streams-table-duality-03.jpg

     

    The same mechanism is used, for example, to replicate databases via change data capture (CDC) and, within Kafka Streams, to replicate its so-called state stores across machines for fault tolerance. The stream-table duality is such an important concept for stream processing applications in practice that Kafka Streams models it explicitly via the KStream and KTable abstractions, which we describe in the next sections.

     

     

    I would STRONLY urge you all to read the section of the official docs above, as it will really help you should you want to get into Kafka Streams.

     

    Anyway with all that in mind how does that relate to the use case we are trying to solve. So far we have a publisher that pushes out Rating objects, and as stated ideally we would like to query these across all  processor nodes. As such we should now know that this will involve a KStream and some sort of aggregation to an eventual KTable (where a state store will be used).

     

    Probably the easiest thing to do is to start with the code, which looks like this for the main stream processing code for the Rating section of then final app.

      import java.util.concurrent.TimeUnit
      import org.apache.kafka.common.serialization._
      import org.apache.kafka.streams._
      import org.apache.kafka.streams.kstream._
      import Entities.Ranking
      import Serialization.JSONSerde
      import Topics.RatingsTopics
      import Utils.Settings
      import Stores.StateStores
      import org.apache.kafka.streams.state.HostInfo
      import scala.concurrent.ExecutionContext
    
    
      package Processing.Ratings {
    
        class RankingByEmailInitializer extends Initializer[List[Ranking]] {
          override def apply(): List[Ranking] = List[Ranking]()
        }
    
        class RankingByEmailAggregator extends Aggregator[String, Ranking,List[Ranking]] {
          override def apply(aggKey: String, value: Ranking, aggregate: List[Ranking]) = {
            value :: aggregate
          }
        }
    
    
        object RatingStreamProcessingApp extends App {
    
          implicit val ec = ExecutionContext.global
    
          run()
    
          private def run() : Unit = {
            val stringSerde = Serdes.String
            val rankingSerde = new JSONSerde[Ranking]
            val listRankingSerde = new JSONSerde[List[Ranking]]
            val builder: KStreamBuilder = new KStreamBuilder
            val rankings = builder.stream(stringSerde, rankingSerde, RatingsTopics.RATING_SUBMIT_TOPIC)
    
            //aggrgate by (user email -> their rankings)
            val rankingTable = rankings.groupByKey(stringSerde,rankingSerde)
              .aggregate(
                new RankingByEmailInitializer(),
                new RankingByEmailAggregator(),
                listRankingSerde,
                StateStores.RANKINGS_BY_EMAIL_STORE
              )
    
            //useful debugging aid, print KTable contents
            rankingTable.toStream.print()
    
            val streams: KafkaStreams = new KafkaStreams(builder, Settings.createRatingStreamsProperties)
            val restEndpoint:HostInfo  = new HostInfo(Settings.restApiDefaultHostName, Settings.restApiDefaultPort)
            System.out.println(s"Connecting to Kafka cluster via bootstrap servers ${Settings.bootStrapServers}")
            System.out.println(s"REST endpoint at http://${restEndpoint.host}:${restEndpoint.port}")
    
            // Always (and unconditionally) clean local state prior to starting the processing topology.
            // We opt for this unconditional call here because this will make it easier for you to 
            // play around with the example when resetting the application for doing a re-run 
            // (via the Application Reset Tool,
            // http://docs.confluent.io/current/streams/developer-guide.html#application-reset-tool).
            //
            // The drawback of cleaning up local state prior is that your app must rebuilt its local 
            // state from scratch, which will take time and will require reading all the state-relevant 
            // data from the Kafka cluster over the network.
            // Thus in a production scenario you typically do not want to clean up always as we do 
            // here but rather only when it is truly needed, i.e., only under certain conditions 
            // (e.g., the presence of a command line flag for your app).
            // See `ApplicationResetExample.java` for a production-like example.
            streams.cleanUp();
            streams.start()
            val restService = new RatingRestService(streams, restEndpoint)
            restService.start()
    
            Runtime.getRuntime.addShutdownHook(new Thread(() => {
              streams.close(10, TimeUnit.SECONDS)
              restService.stop
            }))
    
            //return unit
            ()
          }
        }
      }
    

     

     

    Remember the idea is to get a Rating for  a user (based on their email address), and store all the Rating associated with them in some sequence/list such that they can be retrieved in one go based on a a key, where the key would be the users email, and the value would be this list of Rating objects.I think with the formal discussion from the official Kafka docs and my actual Rating requirement, the above should hopefully be pretty clear.

     

     

    Walking through Kafka Streams interactive queries

    So now that we have gone through how data is produced, and transformed (well actually I did not do too much transformation other than a simple map, but trust me you can), and how we aggregate results from a KStream to a KTable (and its state store), we will move on to see how we can use Kafka interactive queries to query the state stores.

     

    One important concept is that if you used multiple partitions for your original topic, the state may be spread across n-many processing node. For this project I have only chosen to use 1 partition, but have written the code to support n-many.

     

    So lets assume that each node could read a different segment of data, or that each node must read from n-many partitions (there is not actually a mapping to nodes and partitions these are 2 mut read chapters elastic-scaling-of-your-application and parallelism-model) we would need each node to expose a REST API to allow its OWN state store to be read. By reading ALL the state stores we are able to get a total view of ALL the persisted data across ALL the partitions. I urge all of you to read this section of the official docs : http://docs.confluent.io/current/streams/developer-guide.html#querying-remote-state-stores-for-the-entire-application

     

    This diagram has also be shamelessly stolen from the official docs:

     

    ../_images/streams-interactive-queries-api-02.png

    I think this diagram does an excellent job of showing you 3 separate processor nodes, and each of them may have a bit of state. ONLY be assembling ALL the data from these nodes are we able to see the ENTIRE dataset.

     

    Kafka allows this via metadata about the streams, where we can use the exposed metadata to help us gather the state store data. To do this we first need a MetadataService, which for me is as follows:

     

    package Processing.Ratings
    
    import org.apache.kafka.streams.KafkaStreams
    import org.apache.kafka.streams.state.StreamsMetadata
    import java.util.stream.Collectors
    import Entities.HostStoreInfo
    import org.apache.kafka.common.serialization.Serializer
    import org.apache.kafka.connect.errors.NotFoundException
    import scala.collection.JavaConverters._
    
    
    /**
      * Looks up StreamsMetadata from KafkaStreams
      */
    class MetadataService(val streams: KafkaStreams) {
    
    
       /**
        * Get the metadata for all of the instances of this Kafka Streams application
        *
        * @return List of { @link HostStoreInfo}
        */
      def streamsMetadata() : List[HostStoreInfo] = {
    
        // Get metadata for all of the instances of this Kafka Streams application
        val metadata = streams.allMetadata
        return mapInstancesToHostStoreInfo(metadata)
      }
    
    
      /**
        * Get the metadata for all instances of this Kafka Streams application that currently
        * has the provided store.
        *
        * @param store The store to locate
        * @return List of { @link HostStoreInfo}
        */
      def streamsMetadataForStore(store: String) : List[HostStoreInfo] = {
    
        // Get metadata for all of the instances of this Kafka Streams application hosting the store
        val metadata = streams.allMetadataForStore(store)
        return mapInstancesToHostStoreInfo(metadata)
      }
    
    
      /**
        * Find the metadata for the instance of this Kafka Streams Application that has the given
        * store and would have the given key if it exists.
        *
        * @param store Store to find
        * @param key   The key to find
        * @return { @link HostStoreInfo}
        */
      def streamsMetadataForStoreAndKey[T](store: String, key: T, serializer: Serializer[T]) : HostStoreInfo = {
        // Get metadata for the instances of this Kafka Streams application hosting the store and
        // potentially the value for key
        val metadata = streams.metadataForKey(store, key, serializer)
        if (metadata == null)
          throw new NotFoundException(
            s"No metadata could be found for store : ${store}, and key type : ${key.getClass.getName}")
    
        HostStoreInfo(metadata.host, metadata.port, metadata.stateStoreNames.asScala.toList)
      }
    
    
      def mapInstancesToHostStoreInfo(metadatas : java.util.Collection[StreamsMetadata]) : List[HostStoreInfo] = {
    
        metadatas.stream.map[HostStoreInfo](metadata =>
          HostStoreInfo(
            metadata.host(),
            metadata.port,
            metadata.stateStoreNames.asScala.toList))
          .collect(Collectors.toList())
          .asScala.toList
      }
    
    }
    

     

    This metadata service is used to obtain the state store information, which we can then use to extract the state data we want (it’s a key value store really).

     

    The next thing we need to do is expose a REST API to allow us to get the state. lets see that now

     

    package Processing.Ratings
    
    import org.apache.kafka.streams.KafkaStreams
    import org.apache.kafka.streams.state.HostInfo
    import akka.actor.ActorSystem
    import akka.http.scaladsl.Http
    import akka.http.scaladsl.model._
    import akka.http.scaladsl.server.Directives._
    import akka.stream.ActorMaterializer
    import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
    import spray.json.DefaultJsonProtocol._
    import Entities.AkkaHttpEntitiesJsonFormats._
    import Entities._
    import Stores.StateStores
    import akka.http.scaladsl.marshalling.ToResponseMarshallable
    import org.apache.kafka.common.serialization.Serdes
    import scala.concurrent.{Await, ExecutionContext, Future}
    import akka.http.scaladsl.unmarshalling.Unmarshal
    import spray.json._
    import scala.util.{Failure, Success}
    import org.apache.kafka.streams.state.QueryableStoreTypes
    import scala.concurrent.duration._
    
    
    
    object RestService {
      val DEFAULT_REST_ENDPOINT_HOSTNAME  = "localhost"
    }
    
    
    class RatingRestService(val streams: KafkaStreams, val hostInfo: HostInfo) {
    
      val metadataService = new MetadataService(streams)
      var bindingFuture: Future[Http.ServerBinding] = null
    
      implicit val system = ActorSystem("rating-system")
      implicit val materializer = ActorMaterializer()
      implicit val executionContext = system.dispatcher
    
    
      def start() : Unit = {
        val emailRegexPattern =  """\w+""".r
        val storeNameRegexPattern =  """\w+""".r
    
        val route =
    
    
          path("test") {
            get {
              parameters('email.as[String]) { (email) =>
                complete(HttpEntity(ContentTypes.`text/html(UTF-8)`,
                            s"<h1>${email}</h1>"))
              }
            }
          } ~
          path("ratingByEmail") {
            get {
              parameters('email.as[String]) { (email) =>
                try {
    
                  val host = metadataService.streamsMetadataForStoreAndKey[String](
                    StateStores.RANKINGS_BY_EMAIL_STORE,
                    email,
                    Serdes.String().serializer()
                  )
    
                  var future:Future[List[Ranking]] = null
    
                  //store is hosted on another process, REST Call
                  if(!thisHost(host))
                    future = fetchRemoteRatingByEmail(host, email)
                  else
                    future = fetchLocalRatingByEmail(email)
    
                  val rankings = Await.result(future, 20 seconds)
                  complete(rankings)
                }
                catch {
                  case (ex: Exception) => {
                    val finalList:List[Ranking] = scala.collection.immutable.List[Ranking]()
                    complete(finalList)
                  }
                }
              }
            }
          } ~
          path("instances") {
            get {
              complete(ToResponseMarshallable.apply(metadataService.streamsMetadata))
            }
          }~
          path("instances" / storeNameRegexPattern) { storeName =>
            get {
              complete(ToResponseMarshallable.apply(metadataService.streamsMetadataForStore(storeName)))
            }
          }
    
        bindingFuture = Http().bindAndHandle(route, hostInfo.host, hostInfo.port)
        println(s"Server online at http://${hostInfo.host}:${hostInfo.port}/\n")
    
        Runtime.getRuntime.addShutdownHook(new Thread(() => {
          bindingFuture
            .flatMap(_.unbind()) // trigger unbinding from the port
            .onComplete(_ => system.terminate()) // and shutdown when done
        }))
      }
    
    
      def fetchRemoteRatingByEmail(host:HostStoreInfo, email: String) : Future[List[Ranking]] = {
    
        val requestPath = s"http://${hostInfo.host}:${hostInfo.port}/ratingByEmail?email=${email}"
        println(s"Client attempting to fetch from online at ${requestPath}")
    
        val responseFuture: Future[List[Ranking]] = {
          Http().singleRequest(HttpRequest(uri = requestPath))
            .flatMap(response => Unmarshal(response.entity).to[List[Ranking]])
        }
    
        responseFuture
      }
    
      def fetchLocalRatingByEmail(email: String) : Future[List[Ranking]] = {
    
        val ec = ExecutionContext.global
    
        val host = metadataService.streamsMetadataForStoreAndKey[String](
          StateStores.RANKINGS_BY_EMAIL_STORE,
          email,
          Serdes.String().serializer()
        )
    
        val f = StateStores.waitUntilStoreIsQueryable(
          StateStores.RANKINGS_BY_EMAIL_STORE,
          QueryableStoreTypes.keyValueStore[String,List[Ranking]](),
          streams
        ).map(_.get(email))(ec)
    
        val mapped = f.map(ranking => {
          if (ranking == null)
            List[Ranking]()
          else
            ranking
        })
    
        mapped
      }
    
      def stop() : Unit = {
        bindingFuture
          .flatMap(_.unbind()) // trigger unbinding from the port
          .onComplete(_ => system.terminate()) // and shutdown when done
      }
    
      def thisHost(hostStoreInfo: HostStoreInfo) : Boolean = {
        hostStoreInfo.host.equals(hostInfo.host()) &&
          hostStoreInfo.port == hostInfo.port
      }
    }
    

     

    With that final class we are able to run the application and query it using the url http://localhost:8080/ratingByEmail?email=sacha@here.com (the key to the Kafka store here is “sacha@here.com” and the value could either be an empty list or a List[Ranking] objects as JSON, the results of which are shown below after we have run the producer and used Chrome (or any other REST tool of your picking) to get the results

     

    image

     

     

    Conclusion

    I have found the journey to get here an interesting one. The main issue being that the Kafka docs and example are all written in Java and some are not even using Java Lambdas (Java 1.8) so the translation from that to Scala code (where there is lambda everywhere) is sometimes trickier than you might think.

     

    The other thing that has caught me out a few times is that the Scala type system is pretty good at inferring the correct types, so you kind of let it get on with its job. But occasionally it doesn’t/can’t infer the type correctly, this may happen at compile time if you are lucky, or at run time. In the case of a runtime issue, I found it fairly hard to see exactly which part of the Kafka streams API would need to be told a bit more type information.

     

    As a general rule of thumb, if there is an overloaded method that takes a serde, and  one that doesn’t ALWAYS use the one that takes a serde and specify the generic type parameters explicitly. The methods that take Serdes are usually ones that involve some sort of shuffling around within partitions so need Serdes to serialize and deserialize correctly.

     

    Other than that I am VERY happy with working with Kafka streams, and once you get into it, its not that different from working with Apache Spark and RDDs

     

     

    Next time

    Next time we will be turning our attention back to the web site, where we will expose an endpoint that can be called from the ratings dialog that is launched at the end of a job. This endpoint will take the place of the RatingsProducerApp demonstrated in this app. For that we will be using https://github.com/akka/reactive-kafka. We would also expose a new end point to fetch the rating (via email address) fro the Kafka stream processor node

     

    The idea being that when a job is completed a Rating from a driver to passenger is given, this is sent to the Kafka stream processor node, and the combined rating are accumulated for users (by email address) and are exposed to be queried. As you can see this post covers the later part of this requirement already. The only thing we would need to do (as stated above) is replace the RatingsProducerApp demonstrated in this app with new reactive kafka producer in the main Play application

    Akka

    AKKA STREAMS

    Last time we looked at Akka Http, this time we will look at Akka Streams.

    Akka Streams is a vast topic, and you will definitely need to supplement this  post with the official documentation.

    Akka Streams is one of the founding members of Reactive Streams, and Akka streams is one implementation (there are many) of the Reactive Streams APIs.

    Reactive Streams  is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure. This encompasses efforts aimed at runtime environments (JVM and JavaScript) as well as network protocols.

    Introduction

    There may be some readers who have come from .NET such as myself who have used RX.

    You may even have heard of Reactive Streams before. So what exactly makes reactive streams different from Rx?

    The central thing that is the big win with reactive streams over Rx is the idea of back pressure. Here is what the Akka docs say about back pressure

    The back pressure protocol is defined in terms of the number of elements a downstream Subscriber is able to receive and buffer, referred to as demand. The source of data, referred to as Publisher in Reactive Streams terminology and implemented as Source in Akka Streams, guarantees that it will never emit more elements than the received total demand for any given Subscriber.

    http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-flows-and-basics.html#back-pressure-explained

    Luckily this is all inbuilt to Akka streams, you do not have to worry about this too much as a user of Akka streams.

    You can pretty much decide how you want the built in streams pipelines (which we will be diving into in more details below) in terms of backpressure using the OverflowStrategy enum value. Here is a very simple example

    Source(1 to 10).buffer(10, OverflowStrategy.backpressure)
    

    Where the following are the available OverflowStrategy values

    object OverflowStrategy {
      /**
       * If the buffer is full when a new element arrives, drops the oldest element from the buffer to make space for
       * the new element.
       */
      def dropHead: OverflowStrategy = DropHead
    
      /**
       * If the buffer is full when a new element arrives, drops the youngest element from the buffer to make space for
       * the new element.
       */
      def dropTail: OverflowStrategy = DropTail
    
      /**
       * If the buffer is full when a new element arrives, drops all the buffered elements to make space for the new element.
       */
      def dropBuffer: OverflowStrategy = DropBuffer
    
      /**
       * If the buffer is full when a new element arrives, drops the new element.
       */
      def dropNew: OverflowStrategy = DropNew
    
      /**
       * If the buffer is full when a new element is available this strategy backpressures the upstream publisher until
       * space becomes available in the buffer.
       */
      def backpressure: OverflowStrategy = Backpressure
    
      /**
       * If the buffer is full when a new element is available this strategy completes the stream with failure.
       */
      def fail: OverflowStrategy = Fail
    }
    

    So that is the basic idea, Akka streams does provide a lot of stuff, such as

    • Built in stages/shapes
    • A graph API
    • Ability to create your own stages/shapes

    For the rest of this post we will be looking at some examples of these 3 points.

    Working With The Akka Streams APIs

    As stated at the beginning of this post the Akka Streams implementation is vast. There is a lot of ground to cover, far more than I can reasonably cover in a small blog post. The official docs are still the place to go, but if you have not heard of Akka Streams this post may be enough to get you into it.

    The official docs (at time of writing) are here:

    http://doc.akka.io/docs/akka/2.4.2/scala/stream/index.html

     

    Working With Built In Stages/Shapes

    Akka comes with loads of prebuilt stages which we can make use of. However before I mention those lets try and just spend a bit of time taking a bit about how you use the Akka Streams APIs in their most basic form.

    The idea is that we have 4 different parts that make up a useable pipeline.

    Source
    A processing stage with exactly one output, emitting data elements whenever downstream processing stages are ready to receive them.

    Sink
    A processing stage with exactly one input, requesting and accepting data elements possibly slowing down the upstream producer of elements

    Flow
    A processing stage which has exactly one input and output, which connects its up- and downstreams by transforming the data elements flowing through it.

    RunnableGraph
    A Flow that has both ends “attached” to a Source and Sink respectively, and is ready to be run().

    As I say Akka comes with loads of inbuilt stages to make our lives easier here. For example these are the available stages at time of writing

    Source Stages

    • fromIterator
    • apply
    • single
    • repeat
    • tick
    • fromFuture
    • fromCompletionStage
    • unfold
    • unfoldAsync
    • empty
    • maybe
    • failed
    • actorPublisher
    • actorRef
    • combine
    • queue
    • asSubscriber
    • fromPublisher
    • fromFile

    Sink Stages

    • head
    • headOption
    • last
    • lastOption
    • ignore
    • cancelled
    • seq
    • foreach
    • foreachParallel
    • onComplete
    • fold
    • reduce
    • combine
    • actorRef
    • actorRefWithAck
    • actorSubscriber
      asPublisher
    • fromSubscriber
    • toFile

    We will now look at some example of using some of these

    def simpleFlow() : Unit = {
      val source = Source(1 to 10)
      val sink = Sink.fold[Int, Int](0)(_ + _)
      // connect the Source to the Sink, obtaining a RunnableGraph
      val runnable: RunnableGraph[Future[Int]] = source.toMat(sink)(Keep.right)
      // materialize the flow and get the value of the FoldSink
      implicit val timeout = Timeout(5 seconds)
      val sumFuture: Future[Int] = runnable.run()
      val sum = Await.result(sumFuture, timeout.duration)
      println(s"source.toMat(sink)(Keep.right) Sum = $sum")
    
      // Use the shorthand source.runWith(sink)
      val sumFuture2: Future[Int] = source.runWith(sink)
      val sum2 = Await.result(sumFuture2, timeout.duration)
      println(s"source.runWith(sink) Sum = $sum")
    }
    

    In this simple example we have s Source(1 to 10) which we then wire up to a Sink which adds the numbers coming in.

    This block demonstrates various different Source(s) and Sink(s)

    def differentSourcesAndSinks() : Unit = {
      //various sources
      Source(List(1, 2, 3)).runWith(Sink.foreach(println))
      Source.single("only one element").runWith(Sink.foreach(println))
      //actor sink
      val helloActor = system.actorOf(Props[HelloActor], name = "helloactor")
      Source(List("hello", "hello"))
        .runWith(Sink.actorRef(helloActor,DoneMessage))
      //future source
      val futureString = Source.fromFuture(Future.successful("Hello Streams!"))
        .toMat(Sink.head)(Keep.right).run()
      implicit val timeout = Timeout(5 seconds)
      val theString = Await.result(futureString, timeout.duration)
      println(s"theString = $theString")
    }
    

    And this block demos using a simple Map on a Source

    def mapFlow() : Unit = {
      val source = Source(11 to 16)
      val doublerSource = source.map(x => x * 2)
      val sink = Sink.foreach(println)
      implicit val timeout = Timeout(5 seconds)
    
      // Use the shorthand source.runWith(sink)
      val printSinkFuture: Future[Done] = doublerSource.runWith(sink)
      Await.result(printSinkFuture, timeout.duration)
    }
    

    Working With The Graph API

    Akka streams also comes with a pretty funky graph building DSL. You would use this when you want to create quite elaborate flows.

    The other very interesting thing about the graph builder DSL is that you can use custom shapes inside it, and you can also leave it partially connected. Such that you could potentially use it as a Source/Sink.

    Lets say you had an output from the graph you built using the graph DSL, you could then use that partially constructed graph as a Source in its own right.

    The same goes if you had an unconnected input in the graph you created you could use that as a Sink.

    You can read more about this here :

    http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-graphs.html#constructing-sources-sinks-and-flows-from-partial-graphs

    I urge you all to have a read of that as its quite cool what can be done with the graph DSL

    Ok so time for an example, this example comes directly from the TypeSafe activator code

    http://www.lightbend.com/activator/template/akka-stream-scala

    package com.sas.graphs
    
    import java.io.File
    
    import akka.NotUsed
    import akka.actor.ActorSystem
    import akka.stream.ActorMaterializer
    import akka.stream.ClosedShape
    import akka.stream.scaladsl._
    import akka.util.ByteString
    
    import scala.concurrent.forkjoin.ThreadLocalRandom
    import scala.util.{ Failure, Success }
    
    class WritePrimesDemo {
    
      def run(): Unit = {
        implicit val system = ActorSystem("Sys")
        import system.dispatcher
        implicit val materializer = ActorMaterializer()
    
        // generate random numbers
        val maxRandomNumberSize = 1000000
        val primeSource: Source[Int, NotUsed] =
          Source.fromIterator(() => Iterator.continually(ThreadLocalRandom.current().nextInt(maxRandomNumberSize))).
            // filter prime numbers
            filter(rnd => isPrime(rnd)).
            // and neighbor +2 is also prime
            filter(prime => isPrime(prime + 2))
    
        // write to file sink
        val fileSink = FileIO.toPath(new File("target/primes.txt").toPath)
        val slowSink = Flow[Int]
          // act as if processing is really slow
          .map(i => { Thread.sleep(1000); ByteString(i.toString) })
          .toMat(fileSink)((_, bytesWritten) => bytesWritten)
    
        // console output sink
        val consoleSink = Sink.foreach[Int](println)
    
        // send primes to both slow file sink and console sink using graph API
        val graph = GraphDSL.create(slowSink, consoleSink)((slow, _) => slow) { implicit builder =>
          (slow, console) =>
            import GraphDSL.Implicits._
            val broadcast = builder.add(Broadcast[Int](2)) // the splitter - like a Unix tee
            primeSource ~> broadcast ~> slow // connect primes to splitter, and one side to file
            broadcast ~> console // connect other side of splitter to console
            ClosedShape
        }
        val materialized = RunnableGraph.fromGraph(graph).run()
    
        // ensure the output file is closed and the system shutdown upon completion
        materialized.onComplete {
          case Success(_) =>
            system.terminate()
          case Failure(e) =>
            println(s"Failure: ${e.getMessage}")
            system.terminate()
        }
    
      }
    
      def isPrime(n: Int): Boolean = {
        if (n <= 1) false
        else if (n == 2) true
        else !(2 to (n - 1)).exists(x => n % x == 0)
      }
    }
    

    The most important part of this code is this part

    // send primes to both slow file sink and console sink using graph API
    val graph = GraphDSL.create(slowSink, consoleSink)((slow, _) => slow) { implicit builder =>
      (slow, console) =>
        import GraphDSL.Implicits._
        val broadcast = builder.add(Broadcast[Int](2)) // the splitter - like a Unix tee
        primeSource ~> broadcast ~> slow // connect primes to splitter, and one side to file
        broadcast ~> console // connect other side of splitter to console
        ClosedShape
    }
    val materialized = RunnableGraph.fromGraph(graph).run()
    

    There is 2 sinks defined before we use the Graph

    • A file Sink
    • A console Sink

    There is also a Source that generates random primes

    So the Graph DSL allows you to um well create graphs. It allows you to take in inputs and create other shapes using the implicit builder that is provided.

    The DSL then allows you to connect inputs/other builder creates stages/shapes to the inputs and even expose the connected stages to an output.

    This is done using the ~> syntax than simply means connect

    As previously stated you can create partially connected graphs, but if you have all inputs and outputs connected it is considered a ClosedShape, that can be used as an isolated component

    Here is an example of the output of running this graph example

    image

    Create Custom Shapes/Stages

    It doesn’t stop there, we can also create out own shapes that can be used in flows. This is a pretty complex subject and you will definitely benefit from reading this page

    http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-customize.html

    There is no way this little post will cover enough, but here are some highlights of the official documentation

    This is the basic pattern you would use to create a custom stage

    import akka.stream.SourceShape
    import akka.stream.stage.GraphStage
     
    class NumbersSource extends GraphStage[SourceShape[Int]] {
      // Define the (sole) output port of this stage
      val out: Outlet[Int] = Outlet("NumbersSource")
      // Define the shape of this stage, which is SourceShape with the port we defined above
      override val shape: SourceShape[Int] = SourceShape(out)
     
      // This is where the actual (possibly stateful) logic will live
      override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = ???
    }
    

    Most of the actual logic will be inside the createLogic method. But in order to do anything useful in there you will need to use handlers. Handlers are what you use to handle input/output. There are InHandler and OutHandler.

    Each of which has its own state machine flow. For example this is the state machine for an OutHandler

    image

    Whilst this is the one for InHandler

    image

    This is the best page to read to learn more about these handlers

    http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-customize.html#Port_states__InHandler_and_OutHandler

    The one and ONLY place that state should be maintained is within the createLogic method.

    Lets consider a small example. Lets say we have some objects like this

    case class Element(id: Int, value: Int)
    

    And we want to build a custom stage that will allow us to select a value from this type, and should only emit an output value for unique values as provided by the property selector.

    We could call this DistinctUntilChanged. Lets see what an example for this could look like

    package com.sas.customshapes
    
    import akka.stream.stage.{GraphStageLogic, InHandler, OutHandler, GraphStage}
    import akka.stream.{Outlet, Attributes, Inlet, FlowShape}
    
    import scala.collection.immutable
    
    final class DistinctUntilChanged[E, P](propertyExtractor: E => P)
      extends GraphStage[FlowShape[E, E]] {
    
      val in = Inlet[E]("DistinctUntilChanged.in")
      val out = Outlet[E]("DistinctUntilChanged.out")
    
      override def shape = FlowShape.of(in, out)
    
      override def createLogic(attributes: Attributes) = new GraphStageLogic(shape) {
    
        private var savedState : Option[E] = None
    
        setHandlers(in, out, new InHandler with OutHandler {
    
          override def onPush(): Unit = {
            val nextElement = grab(in)
            val nextState = propertyExtractor(nextElement)
    
            if (savedState.isEmpty  || propertyExtractor(savedState.get) != nextState) {
              savedState = Some(nextElement)
              push(out, savedState.get)
            }
            else {
              pull(in)
            }
            savedState = Some(nextElement)
          }
    
          override def onPull(): Unit = {
            pull(in)
          }
    
          override def onUpstreamFinish(): Unit = {
            completeStage()
          }
        })
    
        override def postStop(): Unit = {
          savedState = None
        }
      }
    }
    
    
    
    

    The highlights of this are

    • We have a single Inlet
    • We have a single Outlet
    • We expose a FlowShape (in/out only) there are many shapes but FlowShape is what we want for one in/out out
    • We use createLogic to do the work
    • We use an InHandler to handle input
    • We use an OutHandler to handle output

    One other important thing (at least for this single in/out example) is that we DO NOT call pull/push more than once in the createLogic

    Lets assume we have these elements

    package com.sas.customshapes
    
    import scala.collection.immutable
    
    object SampleElements {
    
      val E11 = Element(1, 1)
      val E21 = Element(2, 1)
      val E31 = Element(3, 1)
      val E42 = Element(4, 2)
      val E52 = Element(5, 2)
      val E63 = Element(6, 3)
    
      val Ones = immutable.Seq(E11, E21, E31)
      val Twos = immutable.Seq(E42, E52)
      val Threes = immutable.Seq(E63)
    
      val All = Ones ++ Twos ++ Threes
    }
    

    And this demo code

    def runDistinctUntilChanged() : Unit = {
      Source(SampleElements.All)
        .via(new DistinctUntilChanged(_.value))
        .runWith(Sink.foreach(println))
    }
    

    We would get this output to the Sink

    image

    This example does owe a lot to a nice blog post I found here :

    https://www.softwaremill.com/implementing-a-custom-akka-streams-graph-stage/
     

    That’s It

    Anyway that is the end of the series I hope you have enjoyed it, and have learnt you some Akka along the way

    I am going to have a small break now and then start looking into some Azure/Web stuff I think

     

    Where Can I Find The Code Examples?

    I will be augmenting this GitHub repo with the example projects as I move through this series

    https://github.com/sachabarber/SachaBarber.AkkaExamples

    Akka

    Akka http

    Last time we talked about routing within Akka. This time we will be looking at Akka’s support for http.

    But just before that, a bit of history. Before Akka.Http there was already a fairly successful Akk based http option available to you as a Scala developer, called Spray. There is a lot of Spray documentation available here http://spray.io/

    This framework was extremely well thought of, so much so that the good people at Akka have taken on much of the good work done by this team, and it now forms much of the codebase for Akka Http.

    In fact if you are familiar with Spray, you will certainly notice quite a lot of similarities in the way routes and JSON are handled in Akka.Http, as it is pretty much the Spray code.

     

    Introduction

    Akka.Http comes with server side and client side libraries. It also comes with good support for standard serialization such as JSON/XML and the ability to roll your own serialization should you want to.

    It also comes with a fairly nifty routing DSL which is very much inspired by the work done in Spray.

    This post will concentrate on the common use cases that you may come across when working with HTTP.

     

    SBT Dependencies

    As usual we need to make sure we have the correct JARs referenced. So here is the SBT file that I am using for both the server side/client side and common messages that pass between them

    import sbt._
    import sbt.Keys._
    
    
    lazy val allResolvers = Seq(
      "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/",
      "Akka Snapshot Repository" at "http://repo.akka.io/snapshots/"
    )
    
    
    lazy val AllLibraryDependencies =
      Seq(
        "com.typesafe.akka" % "akka-actor_2.11" % "2.4.12",
        "com.typesafe.akka" % "akka-http_2.11" % "3.0.0-RC1",
        "com.typesafe.akka" % "akka-http-core_2.11" % "3.0.0-RC1",
        "com.typesafe.akka" % "akka-http-spray-json_2.11" % "3.0.0-RC1"
      )
    
    
    lazy val commonSettings = Seq(
      version := "1.0",
      scalaVersion := "2.11.8",
      resolvers := allResolvers,
      libraryDependencies := AllLibraryDependencies
    )
    
    
    lazy val serverside =(project in file("serverside")).
      settings(commonSettings: _*).
      settings(
        name := "serverside"
      )
      .aggregate(common, clientside)
      .dependsOn(common, clientside)
    
    lazy val common = (project in file("common")).
      settings(commonSettings: _*).
      settings(
        name := "common"
      )
    
    lazy val clientside = (project in file("clientside")).
      settings(commonSettings: _*).
      settings(
        name := "clientside"
      )
      .aggregate(common)
      .dependsOn(common)
    

    It can be seen that the JSON dependency is contained in this JAR

    akka-http-spray-json_2.11
    

    Told you is was inspired by Spray a fair bit

     

    Server Side

    This section will talk about the server side element of Akka.Http

     

    Hosting The Service

    To have  a correctly formed/hostable server side we need a couple of things in place, namely the following

    • An actor system
    • A materializer (Akka http uses flows which is the subject of the next and final post)
    • An execution context
    • Routing

    Once we have these things it is really just a question of binding the route to a host name and port.

    Shown below is a barebones skeleton of what this may look like

    import akka.NotUsed
    import akka.actor.ActorSystem
    import akka.http.scaladsl.Http
    import akka.http.scaladsl.model.{ContentTypes, HttpEntity}
    import akka.http.scaladsl.server.Directives
    import akka.stream.scaladsl.Flow
    import common.{Item, JsonSupport}
    import scala.io.StdIn
    import scala.concurrent.Future
    import akka.http.scaladsl.model.ws.{Message, TextMessage}
    import akka.stream._
    import akka.stream.scaladsl._
    
    
    object Demo extends App with Directives with JsonSupport {
    
      implicit val system = ActorSystem("my-system")
      implicit val materializer = ActorMaterializer()
    
    
      val route = .....
    
      val (host, port) = ("localhost", 8080)
      val bindingFuture = Http().bindAndHandle(route, host, port)
    
      bindingFuture.onFailure {
        case ex: Exception =>
          println(s"$ex Failed to bind to $host:$port!")
      }
    
      println(s"Server online at http://localhost:8080/\nPress RETURN to stop...")
      StdIn.readLine() // let it run until user presses return
      bindingFuture
        .flatMap(_.unbind()) // trigger unbinding from the port
        .onComplete(_ => system.terminate()) // and shutdown when done
    }
    

    We will be looking at the routing DSL separately

     

    Routing DSL

    As stated, Akka.Http owes much to Spray, and the routing DSL in particular is practically unchanged from Spray, so it is well worth reading the Spray routing documentation which is available here : http://spray.io/documentation/1.2.4/spray-routing/ and for completeness here is the Akka.Http docs link too : http://doc.akka.io/docs/akka/2.4.7/scala/http/introduction.html#routing-dsl-for-http-servers

    There is way too many possible routes to go into for a single post. Lets consider a few basic examples and deconstruct them

    Some of these examples do rely on JSON which is the next topic, so for now just understand that there is a way to accept/return JSON.

    Lets consider the following use cases

    • GET that returns a simple string
    • GET that returns a JSON representation of an Item
    • POST that accept a new Item

    In all these cases this is what an Item looks like

    package common
    
    final case class Item(name: String, id: Long)
    

    So lets see the routing DSL that makes the above examples work

    val route =
      path("hello") {
        get {
          complete(HttpEntity(
    	ContentTypes.`text/html(UTF-8)`, 
    	"<h1>Say hello to akka-http</h1>"))
        }
      } ~
      path("randomitem") {
        get {
          // will marshal Item to JSON
          complete(Item("thing", 42))
        }
      } ~
      path("saveitem") {
        post {
          // will unmarshal JSON to Item
          entity(as[Item]) { item =>
            println(s"Server saw Item : $item")
            complete(item)
          }
        }
      }
    

    It can be seen that there are some common routing DSL bits and bobs in there, such as:

    • path : which satisfies the route name part of the route
    • get : which tells us that we should go further into the route matching if it’s a GET http request and it matched the path route DSL part
    • post: which tells us that we should go further into the route matching if it’s a POST http request and it matched the path route DSL part
    • complete : This is the final result from the route

    These parts of the DSL are known as directives. The general anatomy of a directive is as follows:

    name(arguments) { extractions =>
      ... // inner route
    }
    

    It has a name, zero or more arguments and optionally an inner route (The RouteDirectives are special in that they are always used at the leaf-level and as such cannot have inner routes). Additionally directives can “extract” a number of values and make them available to their inner routes as function arguments. When seen “from the outside” a directive with its inner route form an expression of type Route.

    Taken from http://doc.akka.io/docs/akka/2.4.7/scala/http/routing-dsl/directives/index.html#directives up on date 15/11/16

    What Directives Do?

    A directive can do one or more of the following:

    • Transform the incoming RequestContext before passing it on to its inner route (i.e. modify the request)
    • Filter the RequestContext according to some logic, i.e. only pass on certain requests and reject others
    • Extract values from the RequestContext and make them available to its inner route as “extractions”
    • Chain some logic into the RouteResult future transformation chain (i.e. modify the response or rejection)
    • Complete the request

     

    This means a Directive completely wraps the functionality of its inner route and can apply arbitrarily complex transformations, both (or either) on the request and on the response side.

    Ok so now that we have taken a whistle stop tour of the routing DSL and directives, lets have a look at the few we discussed above

     

    For this work I would strongly recommend the use of the “Postman” google app, which you can grab from here

    https://chrome.google.com/webstore/detail/postman/fhbjgbiflinjbdggehcddcbncdddomop?hl=en

    GET

    We can see this route looks like this

    path("hello") {
      get {
        complete(HttpEntity(ContentTypes.`text/html(UTF-8)`, "<h1>Say hello to akka-http</h1>"))
      }
    }
    

    So we use the path, and also the get directives to establish a get route. We then use complete to complete the route with some static string representing the html we would like to return

    So let’s see this one in postman

    image

     

    GET Item (as JSON)

    We can see this route looks like this

    path("randomitem") {
      get {
        // will marshal Item to JSON
        complete(Item("thing", 42))
      }
    } 
    

    So again we use the path/get directives, but this time we complete with an Item. This is done due to the JSON support that is able to create the right serialization data for us. We will look at this in the next section

    So let’s see this one in postman

    image

    POST Item

    We can see this route looks like this

    path("saveitem") {
      post {
        // will unmarshal JSON to Item
        entity(as[Item]) { item =>
          println(s"Server saw Item : $item")
          complete(item)
        }
      }
    } 
    

    So again we use the path directive, but this time we use a post, where the post expects an item as JSON to be provided. The converting from the incoming JSON string to an Item is done using an Unmarshaller, we will look at this in the next section

    So let’s see this one in postman

    image

     

    JSON Support

    Akka.http provides JSON support using this library akka-http-spray-json-experimental which you can grab from Maven Central Repo.

    JsonProtocol

    When using spray we may use the SprayJsonProtocol and DefaultJsonProtocol to create the JSON protcol for your custom objects

    Lets consider the Item class we have seen in the demos so far

    package common
    
    final case class Item(name: String, id: Long)
    

    This is how we might write the JSON protocol code for this simple class

    package common
    
    import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
    import spray.json.DefaultJsonProtocol
    
    trait JsonSupport extends SprayJsonSupport with DefaultJsonProtocol {
      implicit val itemFormat = jsonFormat2(Item)
    }
    

    It can be seen that there are jsonFormatXX helpers that can be used for very simple cases. In this case jsonFormat2 is used as our item class had 2 parameters

    Most of the time this inbuilt helpers are all we need. If however you want something more elaborate you are free to create your own jsonFormat read / write methods

     

    Marshalling

    Marshalling is sprays process of taking objects and create a JSON string representation of them to send across the wire.

    The Akka Spray JAR comes with a bunch of default marshallers that allow us to take custom classes and turn them into JSON

    These are the most common default marshallers that you will most likely use

    type ToEntityMarshaller[T] = Marshaller[T, MessageEntity]
    type ToHeadersAndEntityMarshaller[T] = Marshaller[T, (immutable.Seq[HttpHeader], MessageEntity)]
    type ToResponseMarshaller[T] = Marshaller[T, HttpResponse]
    type ToRequestMarshaller[T] = Marshaller[T, HttpRequest]
    

    You can read more about this here : http://doc.akka.io/docs/akka/2.4.7/scala/http/common/marshalling.html

    Luckily you don’t really have to get that involved with these that often as the routing DSL does most of the heavy lifting for you when you do the complete this is taken care of for you providing there is a marshaller that can be found implicitly

    Unmarshalling

    Unmarshalling is the process of taking the on the wire format (JSON string in these examples) back into a scala class (Item class in this case)

    You can read more about this at the official Akka docs page : http://doc.akka.io/docs/akka/2.4.7/scala/http/common/unmarshalling.html

    Luckily you don’t really have to get that involved with these that often as the routing DSL does most of the heavy lifting for you, which is what we use this part of the routing DSL, where this will use an unmarshaller to create the Item from the JSON string on the wire

    entity(as[Item]) { item =>
    

    WebSockets

    Akka Http also supports web sockets too. Lets start this investigation with looking at what is required from the routing DSL perspective, which starts like this

    path("websocket") {
      get {
        handleWebSocketMessages(websocketFlow)
      }
    } ~
    

    If we look at this special directive a bit more, what exactly does the handleWebSocketMessages directive look like

    Well it looks like this:

    def handleWebSocketMessages(handler: Flow[Message, Message, Any]): Route
    

    So we need to supply a flow. A Flow is part of akka reactive streams which will look at in the next part. But for now just be aware that you can create a Flow from a Sink/Source and Materializer to materialize the flow.

    For this websocket example here is what the Flow looks like

    val (websocketSink, websocketSource) =
      MergeHub.source[String].toMat(BroadcastHub.sink[String])(Keep.both).run()
    
    val websocketFlow: Flow[Message, Message, NotUsed] =
      Flow[Message].mapAsync(1) {
        // transform websocket message to domain message (string)
        case TextMessage.Strict(text) =>       Future.successful(text)
        case streamed: TextMessage.Streamed => streamed.textStream.runFold("")(_ ++ _)
      }.via(Flow.fromSinkAndSource(websocketSink, websocketSource))
        .map[Message](string => TextMessage(string))
    

    The idea is that when a websocket client connects and sends an initial message they will get a reply TextMessage sent over the websocket to them

    This uses some pretty new akka stream stages namely

    • MergeHub : Creates a Source that emits elements merged from a dynamic set of producers.
    • Broadcast : Emit each incoming element each of n outputs

     

    Lets start by running the server, and then opening the “WebSocketTestClient.html” page which should look like this

    image

    image

    Once the page is open, type something in the textbox and hit the “Send” button, you should see this

    image

    All fairly normal socket type stuff so far, we send a message from the web page client side to the server and the server responds with the text we sent.

    But what about if we wanted to send message to the client on demand, say from another route which could be a command to do some work, which notifies the clients of the websocket?

    With this Flow in place, we are also able to push back messages to the client end of the websocket.

    Lets see another route which will simulate some work, which results in messages being sent down the websocket back to the client (if its still connected)

    Here is the route

    path("sendmessagetowebsocket" / IntNumber) { msgCount =>
      post {
        for(i <- 0 until msgCount)
        {
          Source.single(s"sendmessagetowebsocket $i").runWith(websocketSink)
        }
        complete("done")
      }
    }
    

    It can be seen that we simply create a new source which is run with the existing Sink that was part of the Flow used by the websocket

    Here is what this would look like in postman

    image

    And here is what the web page client side websocket example looks like after this route has been called as above

    image

     

     

    Client Side

    Akka http support comes with 3 types of client API that one can use

    In this article I will only be using the last of these APIs, as in my opinion it is the most sensible client side choice.

    So what does the request level client API look like.

    GET

    If we consider that we want to conduct this request

    http://localhost:8080/randomitem

    which when run via postman gives the following JSON response

    image

    So lets see what the code looks like to do this using the request level client API

    import akka.actor.ActorSystem
    import akka.http.scaladsl.Http
    import akka.http.scaladsl.marshalling.Marshal
    import akka.http.scaladsl.model._
    import akka.http.scaladsl.unmarshalling.Unmarshal
    import akka.stream.ActorMaterializer
    import akka.stream.scaladsl._
    import scala.concurrent.{Await, Future}
    import concurrent.ExecutionContext.Implicits.global
    import common.{Item, JsonSupport}
    import concurrent.duration._
    import scala.io.StdIn
    
    class RegularRoutesDemo extends JsonSupport {
    
      def Run() : Unit = {
        implicit val system = ActorSystem()
        implicit val materializer = ActorMaterializer()
    
        val httpClient = Http().outgoingConnection(host = "localhost", port = 8080)
    
        //+++++++++++++++++++++++++++++++++++++++++++++++
        // GET http://localhost:8080/randomitem
        //+++++++++++++++++++++++++++++++++++++++++++++++
        val randomItemUrl = s"""/randomitem"""
        val flowGet : Future[Item] =
          Source.single(
            HttpRequest(
              method = HttpMethods.GET,
              uri = Uri(randomItemUrl))
            )
            .via(httpClient)
            .mapAsync(1)(response => Unmarshal(response.entity).to[Item])
            .runWith(Sink.head)
        val start = System.currentTimeMillis()
        val result = Await.result(flowGet, 5 seconds)
        val end = System.currentTimeMillis()
        println(s"Result in ${end-start} millis: $result")
    
      }
    }
    

    There are a couple of take away points in the code above

    • We use a Source which is a HttpRequest, where we can specify the HTTP verb and other request type things
    • We use Unmarshal to convert the incoming JSON string to an Item. We discussed Marshalling/Unmarshalling above.
    • This obviously relies on the Spray JSON support that we discussed above

     

    POST

    If we consider that we want to conduct this request

    http://localhost:8080/saveitem

    which when run via postman gives the following JSON response

    image

    So lets see what the code looks like to do this using the request level client API

    import akka.actor.ActorSystem
    import akka.http.scaladsl.Http
    import akka.http.scaladsl.marshalling.Marshal
    import akka.http.scaladsl.model._
    import akka.http.scaladsl.unmarshalling.Unmarshal
    import akka.stream.ActorMaterializer
    import akka.stream.scaladsl._
    import scala.concurrent.{Await, Future}
    import concurrent.ExecutionContext.Implicits.global
    import common.{Item, JsonSupport}
    import concurrent.duration._
    import scala.io.StdIn
    
    class RegularRoutesDemo extends JsonSupport {
    
      def Run() : Unit = {
        implicit val system = ActorSystem()
        implicit val materializer = ActorMaterializer()
    
        val httpClient = Http().outgoingConnection(host = "localhost", port = 8080)
    
        //+++++++++++++++++++++++++++++++++++++++++++++++
        // POST http://localhost:8080/saveitem
        //+++++++++++++++++++++++++++++++++++++++++++++++
        val saveItemUrl = s"""/saveitem"""
        val itemToSave = Item("newItemHere",12)
        val flowPost = for {
          requestEntity <- Marshal(itemToSave).to[RequestEntity]
          response <-
          Source.single(
            HttpRequest(
              method = HttpMethods.POST,
              uri = Uri(saveItemUrl),
              entity = requestEntity)
            )
            .via(httpClient)
            .mapAsync(1)(response => Unmarshal(response.entity).to[Item])
            .runWith(Sink.head)
        } yield response
        val startPost = System.currentTimeMillis()
        val resultPost = Await.result(flowPost, 5 seconds)
        val endPost = System.currentTimeMillis()
        println(s"Result in ${endPost-startPost} millis: $resultPost")
      }
    }
    

    The only thing that is different this time, is that we need to pass a JSON string representation of an Item which we pass to the HttpRequest.

    This is done use a JSON marshaller which must be in scope implicitly.

     

    Where Can I Find The Code Examples?

    I will be augmenting this GitHub repo with the example projects as I move through this series

    https://github.com/sachabarber/SachaBarber.AkkaExamples

    Akka

    AKKA routing

     

    Last time we looked at Akka Clustering, this time we will look at routing.

    Routing allows messages to be routed to one or more actors known as routees, by sending the messages to a router that will know how to route the messages to the routees.

    Akka comes with quite a few inbuilt routing strategies that we can make use of. We will look at these next.

    Types Of Routing Strategy

    Akka comes with a whole bunch of inbuilt routing strategies such as :

    RoundRobin : Routes in a round-robin fashion to its routees.

    Random : This router type selects one of its routees randomly for each message.

    SmallestMailBox : A Router that tries to send to the non-suspended child routee with fewest messages in mailbox. The selection is done in this order: pick any idle routee (not processing message) with empty mailbox pick any routee with empty mailbox pick routee with fewest pending messages in mailbox pick any remote routee, remote actors are consider lowest priority, since their mailbox size is unknown

    Broadcast : A broadcast router forwards the message it receives to all its routees.

    ScatterGatherFirstCompleted : The ScatterGatherFirstCompletedRouter will send the message on to all its routees. It then waits for first reply it gets back. This result will be sent back to original sender. Other replies are discarded.

    TailChopping : The TailChoppingRouter will first send the message to one, randomly picked, routee and then after a small delay to a second routee (picked randomly from the remaining routees) and so on. It waits for first reply it gets back and forwards it back to original sender. Other replies are discarded.

    The goal of this router is to decrease latency by performing redundant queries to multiple routees, assuming that one of the other actors may still be faster to respond than the initial one.

    Regular Actor As A Router

    Akka allows you to create routers in 2 ways, the first way is to use RoutingLogic to setup your router.

    Therere are quite a few specializations of the RoutingLogic, such as

    • RoundRobinRoutingLogic
    • RandomRoutingLogic
    • SmallestMailboxRoutingLogic
    • BroadcastRoutingLogic

    You would typically use this in a regular actor. The actor in which you use the RoutingLogic would be the router. If you go down this path you would be responsible for managing the routers children, ie the routees. That means you would be responsible for managing ALL aspects of the routees, including adding them to a list of available routees, watching them for Termination to remove them from the list of available routees (which sounds a lot like supervision doesn’t it).

    Here is what a skeleton for an actor that is setup manually as a router may look like

    import java.util.concurrent.atomic.AtomicInteger
    
    import akka.actor.{Actor, Props, Terminated}
    import akka.routing.{RoutingLogic, ActorRefRoutee, RoundRobinRoutingLogic, Router}
    
    
    class RouterActor(val routingLogic : RoutingLogic)  extends Actor  {
    
      val counter : AtomicInteger = new AtomicInteger()
    
      val routees = Vector.fill(5) {
        val workerCount = counter.getAndIncrement()
        val r = context.actorOf(Props(
          new WorkerActor(workerCount)), name = s"workerActor-$workerCount")
        context watch r
        ActorRefRoutee(r)
      }
    
      //create a Router based on the incoming class field
      //RoutingLogic which will really determine what type of router
      //we end up with
      var router = Router(routingLogic, routees)
    
      def receive = {
        case WorkMessage =>
          router.route(WorkMessage, sender())
        case Report => routees.foreach(ref => ref.send(Report, sender()))
        case Terminated(a) =>
          router = router.removeRoutee(a)
          val workerCount = counter.getAndIncrement()
          val r = context.actorOf(Props(
            new WorkerActor(workerCount)), name = s"workerActor-$workerCount")
          context watch r
          router = router.addRoutee(r)
      }
    }
    

    It can be seen that I pass in the RoutingLogic, which would be one of the available RoutingLogic strategies that akka comes with.

    The other thing to note is that as we stated earlier we need to FULLY manage the collection of routee actors ourselves, including watching them for Termination.

    Sure there is a better way?

    Well yes thankfully there is, Akka also provides a Pool for this job. We will look at that next.

    Pool

    Akka comes with the ability to create a router using a pool where we tell it what actors we want to use as the routees, how many routees we want, and how the supervision should be handled.

    Here is some code from by demo code that uses 2 utility methods to create a pool created router that will use a simple FibboniciActor which is sent messages via an actor that is created using the pool router value

    def RunTailChoppingPoolDemo() : Unit = {
    
      val supervisionStrategy = OneForOneStrategy() {
        case e => SupervisorStrategy.restart
      }
    
      val props = TailChoppingPool(5, within = 10.seconds,
        supervisorStrategy = supervisionStrategy,interval = 20.millis).
        props(Props[FibonacciActor])
    
      RunPoolDemo(props)
    }
    
    def RunPoolDemo(props : Props) : Unit = {
      val system = ActorSystem("RoutingSystem")
      val actorRef = system.actorOf(Props(
        new PoolRouterContainerActor(props,"theRouter")), name = "thePoolContainer")
      actorRef ! WorkMessage
      StdIn.readLine()
      system.terminate()
    }
    
    
    
    import akka.actor._
    import akka.util.Timeout
    import scala.concurrent.Await
    import scala.concurrent.duration._
    import akka.pattern.ask
    
    class PoolRouterContainerActor(val props: Props, val name :String)  extends Actor  {
    
      val router: ActorRef = context.actorOf(props, name)
    
      def receive = {
        case WorkMessage =>
          implicit val timeout = Timeout(5 seconds)
          val futureResult = router ? FibonacciNumber(10)
          val (actName,result) = Await.result(futureResult, timeout.duration)
    
          println(s"FibonacciActor : ($actName) came back with result -> $result")
      }
    }
    
    
    
    import akka.actor.Actor
    import scala.annotation.tailrec
    
    class FibonacciActor extends Actor {
    
      val actName = self.path.name
    
      def receive = {
        case FibonacciNumber(nbr) => {
          println(s"FibonacciActor : ($actName) ->  " +
            s"has been asked to calculate FibonacciNumber")
          val result = fibonacci(nbr)
          sender ! (actName,result)
        }
      }
    
      private def fibonacci(n: Int): Int = {
        @tailrec
        def fib(n: Int, b: Int, a: Int): Int = n match {
          case 0 => a
          case _ => fib(n - 1, a + b, b)
        }
    
        fib(n, 1, 0)
      }
    }
    

    Supervision Using Pool

    Routees that are created by a pool router will be created as the router’s children. The router is therefore also the children’s supervisor.

    The supervision strategy of the router actor can be configured with the supervisorStrategy property of the Pool. If no configuration is provided, routers default to a strategy of “always escalate”. This means that errors are passed up to the router’s supervisor for handling. The router’s supervisor will decide what to do about any errors.

    Note the router’s supervisor will treat the error as an error with the router itself. Therefore a directive to stop or restart will cause the router itself to stop or restart. The router, in turn, will cause its children to stop and restart.

    It should be mentioned that the router’s restart behavior has been overridden so that a restart, while still re-creating the children, will still preserve the same number of actors in the pool.

    This means that if you have not specified supervisorStrategy of the router or its parent a failure in a routee will escalate to the parent of the router, which will by default restart the router, which will restart all routees (it uses Escalate and does not stop routees during restart). The reason is to make the default behave such that adding withRouter to a child’s definition does not change the supervision strategy applied to the child. This might be an inefficiency that you can avoid by specifying the strategy when defining the router.

    http://doc.akka.io/docs/akka/snapshot/scala/routing.html#Supervision up on 01/11/16

    Group

    You may also wish to create your routees separately and let the router know about them. This is achievable using Groups. This is not something I decided to cover in this post, but if this sounds of interest to you, you can read more about it at the official documentation here:

    http://doc.akka.io/docs/akka/snapshot/scala/routing.html#Group

    Routing Strategy Demos

    For the demos I am using a mixture of RoutingLogic hosted in my own actor, and also Pool based routers.

    Here is the basic setup for a RoutingLogic based actor of my own, where I have to manage all supervision concerns manually.

    There are ALWAYS 5 routees involved with this demo.

    import java.util.concurrent.TimeUnit
    
    import akka.actor._
    import akka.routing._
    import scala.concurrent.duration.FiniteDuration
    import scala.concurrent.duration._
    import scala.language.postfixOps
    import scala.io.StdIn
    
    object Demo extends App {
    
      //==============================================================
      //Standard Actor that does routing using Router class
      //where we apply relevant RoutingLogic
      //Supervision is done manually within the Actor that hosts
      //the Router, where we monitor the routees and remove /recreate
      //them on 'Terminated'
      //==============================================================
      RunRoutingDemo(RoundRobinRoutingLogic())
    
    
    
      def RunRoutingDemo(routingLogic : RoutingLogic) : Unit = {
        val system = ActorSystem("RoutingSystem")
        val actorRef = system.actorOf(Props(
          new RouterActor(routingLogic)), name = "theRouter")
    
        for (i <- 0 until 10) {
          actorRef ! WorkMessage
          Thread.sleep(1000)
        }
        actorRef ! Report
    
        StdIn.readLine()
        system.terminate()
      }
    }
    

    Where we make use of the following generic actor code that uses the specific RoutingLogic that is passed in.

    import java.util.concurrent.atomic.AtomicInteger
    
    import akka.actor.{Actor, Props, Terminated}
    import akka.routing.{RoutingLogic, ActorRefRoutee, RoundRobinRoutingLogic, Router}
    
    
    class RouterActor(val routingLogic : RoutingLogic)  extends Actor  {
    
      val counter : AtomicInteger = new AtomicInteger()
    
      val routees = Vector.fill(5) {
        val workerCount = counter.getAndIncrement()
        val r = context.actorOf(Props(
          new WorkerActor(workerCount)), name = s"workerActor-$workerCount")
        context watch r
        ActorRefRoutee(r)
      }
    
      //create a Router based on the incoming class field
      //RoutingLogic which will really determine what type of router
      //we end up with
      var router = Router(routingLogic, routees)
    
      def receive = {
        case WorkMessage =>
          router.route(WorkMessage, sender())
        case Report => routees.foreach(ref => ref.send(Report, sender()))
        case Terminated(a) =>
          router = router.removeRoutee(a)
          val workerCount = counter.getAndIncrement()
          val r = context.actorOf(Props(
            new WorkerActor(workerCount)), name = s"workerActor-$workerCount")
          context watch r
          router = router.addRoutee(r)
      }
    }
    

    This is what the routees look like for this set of demos

    import akka.actor.Actor
    
    class WorkerActor(val id : Int) extends Actor {
    
      var msgCount = 0
      val actName = self.path.name
    
      def receive = {
        case WorkMessage => {
          msgCount += 1
          println(s"worker : {$id}, name : ($actName) ->  ($msgCount)")
        }
        case Report => {
          println(s"worker : {$id}, name : ($actName) ->  saw total messages : ($msgCount)")
        }
        case _       => println("unknown message")
      }
    }
    

    Ok so lets have a look at some examples of using this code shall we:

    RoundRobin

    We get this output, where each routee gets the message round robin strategy applied

    worker : {0}, name : (workerActor-0) ->  (1)
    worker : {1}, name : (workerActor-1) ->  (1)
    worker : {2}, name : (workerActor-2) ->  (1)
    worker : {3}, name : (workerActor-3) ->  (1)
    worker : {4}, name : (workerActor-4) ->  (1)
    worker : {0}, name : (workerActor-0) ->  (2)
    worker : {1}, name : (workerActor-1) ->  (2)
    worker : {2}, name : (workerActor-2) ->  (2)
    worker : {3}, name : (workerActor-3) ->  (2)
    worker : {4}, name : (workerActor-4) ->  (2)
    worker : {0}, name : (workerActor-0) ->  saw total messages : (2)
    worker : {1}, name : (workerActor-1) ->  saw total messages : (2)
    worker : {2}, name : (workerActor-2) ->  saw total messages : (2)
    worker : {4}, name : (workerActor-4) ->  saw total messages : (2)
    worker : {3}, name : (workerActor-3) ->  saw total messages : (2)

    Random

    We get this output, where the messages are sent to routees randomly

    worker : {1}, name : (workerActor-1) ->  (1)
    worker : {1}, name : (workerActor-1) ->  (2)
    worker : {4}, name : (workerActor-4) ->  (1)
    worker : {0}, name : (workerActor-0) ->  (1)
    worker : {0}, name : (workerActor-0) ->  (2)
    worker : {2}, name : (workerActor-2) ->  (1)
    worker : {3}, name : (workerActor-3) ->  (1)
    worker : {4}, name : (workerActor-4) ->  (2)
    worker : {0}, name : (workerActor-0) ->  (3)
    worker : {0}, name : (workerActor-0) ->  (4)
    worker : {1}, name : (workerActor-1) ->  saw total messages : (2)
    worker : {0}, name : (workerActor-0) ->  saw total messages : (4)
    worker : {2}, name : (workerActor-2) ->  saw total messages : (1)
    worker : {4}, name : (workerActor-4) ->  saw total messages : (2)
    worker : {3}, name : (workerActor-3) ->  saw total messages : (1)

    SmallestMailBox

    We get this output, where the routee with the smallest mailbox will get the message sent to it. This example may look a bit weird, but if you think about it, by the time the new message is sent the 1st routee (workerActor0) will have dealt with the 1st message, and it ready to receive a new one, and since it’s the 1st routee in the list it is still considered the one with the smallest mailbox. If you introduced an artificial delay in the actor dealing with the message it may show different more interesting results.

    worker : {0}, name : (workerActor-0) ->  (1)
    worker : {0}, name : (workerActor-0) ->  (2)
    worker : {0}, name : (workerActor-0) ->  (3)
    worker : {0}, name : (workerActor-0) ->  (4)
    worker : {0}, name : (workerActor-0) ->  (5)
    worker : {0}, name : (workerActor-0) ->  (6)
    worker : {0}, name : (workerActor-0) ->  (7)
    worker : {0}, name : (workerActor-0) ->  (8)
    worker : {0}, name : (workerActor-0) ->  (9)
    worker : {0}, name : (workerActor-0) ->  (10)
    worker : {2}, name : (workerActor-2) ->  saw total messages : (0)
    worker : {4}, name : (workerActor-4) ->  saw total messages : (0)
    worker : {1}, name : (workerActor-1) ->  saw total messages : (0)
    worker : {0}, name : (workerActor-0) ->  saw total messages : (10)
    worker : {3}, name : (workerActor-3) ->  saw total messages : (0)

    Broadcast

    We get this output, where each routee should see ALL messages

    worker : {0}, name : (workerActor-0) ->  (1)
    worker : {2}, name : (workerActor-2) ->  (1)
    worker : {4}, name : (workerActor-4) ->  (1)
    worker : {3}, name : (workerActor-3) ->  (1)
    worker : {1}, name : (workerActor-1) ->  (1)
    worker : {0}, name : (workerActor-0) ->  (2)
    worker : {1}, name : (workerActor-1) ->  (2)
    worker : {4}, name : (workerActor-4) ->  (2)
    worker : {2}, name : (workerActor-2) ->  (2)
    worker : {3}, name : (workerActor-3) ->  (2)
    worker : {0}, name : (workerActor-0) ->  (3)
    worker : {2}, name : (workerActor-2) ->  (3)
    worker : {3}, name : (workerActor-3) ->  (3)
    worker : {4}, name : (workerActor-4) ->  (3)
    worker : {1}, name : (workerActor-1) ->  (3)
    worker : {1}, name : (workerActor-1) ->  (4)
    worker : {4}, name : (workerActor-4) ->  (4)
    worker : {3}, name : (workerActor-3) ->  (4)
    worker : {0}, name : (workerActor-0) ->  (4)
    worker : {2}, name : (workerActor-2) ->  (4)
    worker : {0}, name : (workerActor-0) ->  (5)
    worker : {1}, name : (workerActor-1) ->  (5)
    worker : {4}, name : (workerActor-4) ->  (5)
    worker : {2}, name : (workerActor-2) ->  (5)
    worker : {3}, name : (workerActor-3) ->  (5)
    worker : {3}, name : (workerActor-3) ->  (6)
    worker : {2}, name : (workerActor-2) ->  (6)
    worker : {1}, name : (workerActor-1) ->  (6)
    worker : {4}, name : (workerActor-4) ->  (6)
    worker : {0}, name : (workerActor-0) ->  (6)
    worker : {1}, name : (workerActor-1) ->  (7)
    worker : {0}, name : (workerActor-0) ->  (7)
    worker : {4}, name : (workerActor-4) ->  (7)
    worker : {2}, name : (workerActor-2) ->  (7)
    worker : {3}, name : (workerActor-3) ->  (7)
    worker : {0}, name : (workerActor-0) ->  (8)
    worker : {3}, name : (workerActor-3) ->  (8)
    worker : {1}, name : (workerActor-1) ->  (8)
    worker : {2}, name : (workerActor-2) ->  (8)
    worker : {4}, name : (workerActor-4) ->  (8)
    worker : {2}, name : (workerActor-2) ->  (9)
    worker : {3}, name : (workerActor-3) ->  (9)
    worker : {4}, name : (workerActor-4) ->  (9)
    worker : {1}, name : (workerActor-1) ->  (9)
    worker : {0}, name : (workerActor-0) ->  (9)
    worker : {0}, name : (workerActor-0) ->  (10)
    worker : {2}, name : (workerActor-2) ->  (10)
    worker : {1}, name : (workerActor-1) ->  (10)
    worker : {4}, name : (workerActor-4) ->  (10)
    worker : {3}, name : (workerActor-3) ->  (10)
    worker : {1}, name : (workerActor-1) ->  saw total messages : (10)
    worker : {2}, name : (workerActor-2) ->  saw total messages : (10)
    worker : {0}, name : (workerActor-0) ->  saw total messages : (10)
    worker : {3}, name : (workerActor-3) ->  saw total messages : (10)
    worker : {4}, name : (workerActor-4) ->  saw total messages : (10)

    So that about covers the demos I have created for using your own actor and using the RoutingLogic. Lets now look at using pools, as I have stated already pools take care of supervision for us, so we don’t have to manually take care of that any more.

    As before I have a helper actor to work with the pool, that accepts the router, where the router will receive the messages to send to its routees.

    Here is the demo code

    import java.util.concurrent.TimeUnit
    
    import akka.actor._
    import akka.routing._
    import scala.concurrent.duration.FiniteDuration
    import scala.concurrent.duration._
    import scala.language.postfixOps
    import scala.io.StdIn
    
    object Demo extends App {
    
      //==============================================================
      // Use built Pool router(s) which will do the supervision for us
      //
      //
      //    Comment/Uncomment to try the different router logic
      //
      //==============================================================
      RunScatterGatherFirstCompletedPoolDemo()
      //RunTailChoppingPoolDemo()
    
    
    
      def RunScatterGatherFirstCompletedPoolDemo() : Unit = {
    
        val supervisionStrategy = OneForOneStrategy() {
          case e => SupervisorStrategy.restart
        }
    
        val props = ScatterGatherFirstCompletedPool(
          5, supervisorStrategy = supervisionStrategy,within = 10.seconds).
          props(Props[FibonacciActor])
    
        RunPoolDemo(props)
      }
    
      def RunTailChoppingPoolDemo() : Unit = {
    
        val supervisionStrategy = OneForOneStrategy() {
          case e => SupervisorStrategy.restart
        }
    
        val props = TailChoppingPool(5, within = 10.seconds,
          supervisorStrategy = supervisionStrategy,interval = 20.millis).
          props(Props[FibonacciActor])
    
        RunPoolDemo(props)
      }
    
      def RunPoolDemo(props : Props) : Unit = {
        val system = ActorSystem("RoutingSystem")
        val actorRef = system.actorOf(Props(
          new PoolRouterContainerActor(props,"theRouter")), name = "thePoolContainer")
        actorRef ! WorkMessage
        StdIn.readLine()
        system.terminate()
      }
    }
    

    And here is the help actor

    import akka.actor._
    import akka.util.Timeout
    import scala.concurrent.Await
    import scala.concurrent.duration._
    import akka.pattern.ask
    
    class PoolRouterContainerActor(val props: Props, val name :String)  extends Actor  {
    
      val router: ActorRef = context.actorOf(props, name)
    
      def receive = {
        case WorkMessage =>
          implicit val timeout = Timeout(5 seconds)
          val futureResult = router ? FibonacciNumber(10)
          val (actName,result) = Await.result(futureResult, timeout.duration)
    
          println(s"FibonacciActor : ($actName) came back with result -> $result")
      }
    }
    

    As before we will use 5 routees.

    This is what the routees look like for the pool demo

    import akka.actor.Actor
    import scala.annotation.tailrec
    
    class FibonacciActor extends Actor {
    
      val actName = self.path.name
    
      def receive = {
        case FibonacciNumber(nbr) => {
          println(s"FibonacciActor : ($actName) ->  " +
            s"has been asked to calculate FibonacciNumber")
          val result = fibonacci(nbr)
          sender ! (actName,result)
        }
      }
    
      private def fibonacci(n: Int): Int = {
        @tailrec
        def fib(n: Int, b: Int, a: Int): Int = n match {
          case 0 => a
          case _ => fib(n - 1, a + b, b)
        }
    
        fib(n, 1, 0)
      }
    }
    

    ScatterGatherFirstCompletedPool

    Here is the output when we run this. It can be seen that we simply get the results from the routee that completed first

    FibonacciActor : ($d) ->  has been asked to calculate FibonacciNumber
    FibonacciActor : ($e) ->  has been asked to calculate FibonacciNumber
    FibonacciActor : ($a) ->  has been asked to calculate FibonacciNumber
    FibonacciActor : ($c) ->  has been asked to calculate FibonacciNumber
    FibonacciActor : ($b) ->  has been asked to calculate FibonacciNumber
    FibonacciActor : ($d) came back with result -> 55

    TailChoppingPool

    Here is the output when we run this. It can be seen that we simply get the results from the routee that completed first, out of the few routees that the message was sent to

    FibonacciActor : ($b) ->  has been asked to calculate FibonacciNumber
    FibonacciActor : ($b) came back with result -> 55

     

    What About Custom Routing Strategy

    Akka allows you to create your own routing strategy where you would create a class that extends the inbuilt Akka RoutingLogic. You can read more about this in the official Akka documentation:

    http://doc.akka.io/docs/akka/snapshot/scala/routing.html#Custom_Router

     

    Where Can I Find The Code Examples?

    I will be augmenting this GitHub repo with the example projects as I move through this series

    https://github.com/sachabarber/SachaBarber.AkkaExamples

    Akka

    AKKA : clustering

    Last time we look at remoting. You can kind of think of clustering as an extension to remoting, as some of the same underlying parts are used.  But as we will see clustering is way more powerful (and more fault tolerant too).

    My hope is by the end of this post that you will know enough about Akka clustering that you would be able to create your own clustered Akka apps.

    A Note About All The Demos In This Topic

    I wanted the demos in this section to be as close to real life as possible. The official akka examples tend to have a single process. Which I personally think is quite confusing when you are trying to deal with quite hard concepts. As such I decided to go with multi process projects to demonstrate things. I do however only have 1 laptop, so they are hosted on the same node, but they are separate processes/JVMs.

    I am hoping by doing this it will make the learning process easier, as it is closer to what you would do in real life rather than have 1 main method that spawns an entire cluster. You just would not have that in real life.

     

    What Is Akka Clustering?

    Unlike remoting which is peer to peer, a cluster may constitute many members, which can grow and contract depending on demand/failure. There is also the concept of roles for actors with a cluster, which this post will talk about.

    You can see how this could be very useful, in fact you could see how this may be used to create a general purpose grid calculation engine such as Apache Spark.

     

    Seed Nodes

    Akka has the concept of some initial contact points within the cluster to allow the cluster to bootstrap itself as it were.

    Here is what the official Akka docs say on this:

    You may decide if joining to the cluster should be done manually or automatically to configured initial contact points, so-called seed nodes. When a new node is started it sends a message to all seed nodes and then sends join command to the one that answers first. If no one of the seed nodes replied (might not be started yet) it retries this procedure until successful or shutdown.

    You may choose to configure these “seed nodes” in code, but the easiest way is via configuration. The relevant part of the demo apps configuration is here

    akka {
      .....
      .....
      cluster {
        seed-nodes = [
          "akka.tcp://ClusterSystem@127.0.0.1:2551",
          "akka.tcp://ClusterSystem@127.0.0.1:2552"]
      }
      .....
      .....
    ]
    
    
    
    
    

    The seed nodes can be started in any order and it is not necessary to have all seed nodes running, but the node configured as the first element in the seed-nodes configuration list must be started when initially starting a cluster, otherwise the other seed-nodes will not become initialized and no other node can join the cluster. The reason for the special first seed node is to avoid forming separated islands when starting from an empty cluster. It is quickest to start all configured seed nodes at the same time (order doesn’t matter), otherwise it can take up to the configured seed-node-timeout until the nodes can join.

    Once more than two seed nodes have been started it is no problem to shut down the first seed node. If the first seed node is restarted, it will first try to join the other seed nodes in the existing cluster.

    We will see the entire configuration for the demo app later on this post. For now just be aware that there is a concept of seed nodes and the best way to configure those for the cluster is via configuration.

    Saying that there may be some amongst you that would prefer to use the JVM property system which you may do as follows:

    -Dakka.cluster.seed-nodes.0=akka.tcp://ClusterSystem@127.0.0.1:2551
    -Dakka.cluster.seed-nodes.1=akka.tcp://ClusterSystem@127.0.0.1:2552
    

    Roles

    Akka clustering comes with the concept of roles.You may be asking why would we need that?

    Well its quite simple really, say we have a higher than normal volume of data coming through you akka cluster system, you may want to increase the total processing power of the cluster to deal with this. How do we do that, we spin up more actors within a particular role. The role here may be “backend” that do work designated to them by some other actor say “frontend” role.

    By using roles we can manage which bits of the cluster get dynamically allocated more/less actors.

    You can configure the minimum number of role actor in configuration, which you can read more about here:

    http://doc.akka.io/docs/akka/snapshot/java/cluster-usage.html#How_To_Startup_when_Cluster_Size_Reached

    Member Events

    Akka provides the ability to listen to member events. There are a number of reasons this could be useful, for example

    • Determining if a member has left the cluster
    • If a new member has joined the cluster

    Here is a full list of the me Cluster events that you may choose to listen to

    The events to track the life-cycle of members are:

    • ClusterEvent.MemberJoined – A new member has joined the cluster and its status has been changed to Joining.
    • ClusterEvent.MemberUp – A new member has joined the cluster and its status has been changed to Up.
    • ClusterEvent.MemberExited – A member is leaving the cluster and its status has been changed to Exiting Note that the node might already have been shutdown when this event is published on another node.
    • ClusterEvent.MemberRemoved – Member completely removed from the cluster.
    • ClusterEvent.UnreachableMember – A member is considered as unreachable, detected by the failure detector of at least one other node.
    • ClusterEvent.ReachableMember – A member is considered as reachable again, after having been unreachable. All nodes that previously detected it as unreachable has detected it as reachable again.

    And this is how you might subscribe to these events

    cluster.subscribe(self, classOf[MemberUp])
    

    Which you may use in an actor like this:

    class SomeActor extends Actor {
    
      val cluster = Cluster(context.system)
    
      // subscribe to cluster changes, MemberUp
      override def preStart(): Unit = cluster.subscribe(self, classOf[MemberUp])
      
      def receive = {
        case MemberUp(m) => register(m)
      }
    
      def register(member: Member): Unit =
        if (member.hasRole("frontend"))
         ...
    }
    

    We will see more on this within the demo code which we will walk through later

    ClusterClient

    What use is a cluster which cant receive commands from the outside world?

    Well luckily we don’t have to care about that as Akka comes with 2 things that make this otherwise glib situation ok.

    Akka comes with a ClusterClient which allows actors which are not part of the cluster to talk to the cluster. Here is what the offical Akka docs have to say about this

    An actor system that is not part of the cluster can communicate with actors somewhere in the cluster via this ClusterClient. The client can of course be part of another cluster. It only needs to know the location of one (or more) nodes to use as initial contact points. It will establish a connection to a ClusterReceptionist somewhere in the cluster. It will monitor the connection to the receptionist and establish a new connection if the link goes down. When looking for a new receptionist it uses fresh contact points retrieved from previous establishment, or periodically refreshed contacts, i.e. not necessarily the initial contact points.

     

    Receptionist

    As mentioned above the ClusterClient makes use of a ClusterReceptionist, but what is that, and how do we make a cluster actor available to the client using that?

    The ClusterReceptionist is an Akka contrib extension, and must be configured on ALL the nodes that the ClusterClient will need to talk to.

    There are 2 parts this, firstly we must ensure that the ClusterReceptionist is started on the nodes that ClusterClient will need to communicate with. This is easily done using the following config:

    akka {
      ....
      ....
      ....
      # enable receptionist at start
      extensions = ["akka.cluster.client.ClusterClientReceptionist"]
    
    }
    
    

    The other thing that needs doing, is that any actor within the cluster that you want to be able to talk to using the  ClusterClient will need to register itself as a service with the ClusterClientReceptionist. Here is an example of how to do that

    val system = ActorSystem("ClusterSystem", config)
    val frontend = system.actorOf(Props[TransformationFrontend], name = "frontend")
    ClusterClientReceptionist(system).registerService(frontend)
    

    Now that you have done that you should be able to communicate with this actor within the cluster using the ClusterClient

     

    The Demo Dissection

    I have based the demo for this post largely against the “Transformation” demo that LightBend provide, which you can grab from here :

    http://www.lightbend.com/activator/template/akka-sample-cluster-scala

    The “Official” example as it is, provides a cluster which contains “frontend” and “backend” roles. The “frontend” actors will take a text message and pass it to the register workers (“Backend”s) who will UPPERCASE the message and return to the “frontend”.

    I have taken this sample and added the ability to use the ClusterClient with it, which works using Future[T] and the ask pattern, such that the ClusterClient  will get a response from the cluster request.

    We will dive into all of this in just a moment

    For the demo this is what we are trying to build

    image

    SBT / Dependencies

    Before we dive into the demo code (which as I say is based largely on the official lightbend clustering example anyway) I would just like to dive into the SBT file that drives the demo projects

    This is the complete SBT file for the entire demo

    import sbt._
    import sbt.Keys._
    
    
    lazy val allResolvers = Seq(
      "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/",
      "Akka Snapshot Repository" at "http://repo.akka.io/snapshots/"
    )
    
    lazy val AllLibraryDependencies =
      Seq(
        "com.typesafe.akka" %% "akka-actor"         % "2.4.8",
        "com.typesafe.akka" %% "akka-remote"        % "2.4.8",
        "com.typesafe.akka" %% "akka-cluster"       % "2.4.8",
        "com.typesafe.akka" %% "akka-cluster-tools" % "2.4.8",
        "com.typesafe.akka" %% "akka-contrib"       % "2.4.8"
      )
    
    
    lazy val commonSettings = Seq(
      version := "1.0",
      scalaVersion := "2.11.8",
      resolvers := allResolvers,
      libraryDependencies := AllLibraryDependencies
    )
    
    
    lazy val root =(project in file(".")).
      settings(commonSettings: _*).
      settings(
        name := "Base"
      )
      .aggregate(common, frontend, backend)
      .dependsOn(common, frontend, backend)
    
    lazy val common = (project in file("common")).
      settings(commonSettings: _*).
      settings(
        name := "common"
      )
    
    lazy val frontend = (project in file("frontend")).
      settings(commonSettings: _*).
      settings(
        name := "frontend"
      )
      .aggregate(common)
      .dependsOn(common)
    
    lazy val backend = (project in file("backend")).
      settings(commonSettings: _*).
      settings(
        name := "backend"
      )
      .aggregate(common)
      .dependsOn(common)
    

    There are a few things to note in this

    • We need a few dependencies to get clustering to work. Namely
      • akka-remote
      • akka-cluster
      • akka-cluster-tools
      • akka-contrib
    • There are a few projects
      • root : The cluster client portion
      • common : common files
      • frontend : frontend cluster based actors (the client will talk to these)
      • backend : backend cluster based actors

     

    The Projects

    Now that we have seen the projects involved from an SBT point of view, lets continue to look at how the actual projects perform their duties

    Remember the workflow we are trying to achieve is something like this

    • We should ensure that a frontend (seed node) is started first
    • We should ensure a backend (seed node) is started. This will have the effect of the backend actor registering itself as a worker with the already running frontend actor
    • At this point we could start more frontend/backend non seed nodes actors, if we chose to
    • We start the client app (root) which will periodically send messages to the frontend actor that is looked up by its known seed node information. We would expect the frontend actor to delegate work of to one of its known backend actors, and then send the response back to the client (ClusterClient) where we can use the response to send to a local actor, or consume the response directly

    Common

    The common project simply contains the common objects across the other projects. Which for this demo app are just the messages as shown below

    package sample.cluster.transformation
    
    final case class TransformationJob(text: String)
    final case class TransformationResult(text: String)
    final case class JobFailed(reason: String, job: TransformationJob)
    case object BackendRegistration
    

     

    Root

    This is the client app that will talk to the cluster (in particular the “frontend” seed node which expected to be running on 127.0.0.1:2551.

    This client app uses the following configuration file

    akka {
      actor {
        provider = "akka.cluster.ClusterActorRefProvider"
      }
    
      remote {
        transport = "akka.remote.netty.NettyRemoteTransport"
        log-remote-lifecycle-events = off
        netty.tcp {
          hostname = "127.0.0.1"
          port = 5000
        }
      }
    }
    

    We then use the following main method to kick of the client app

    import java.util.concurrent.atomic.AtomicInteger
    
    import akka.actor.{Props, ActorSystem}
    import akka.util.Timeout
    import scala.io.StdIn
    import scala.concurrent.duration._
    
    
    object DemoClient {
      def main(args : Array[String]) {
    
        val system = ActorSystem("OTHERSYSTEM")
        val clientJobTransformationSendingActor =
          system.actorOf(Props[ClientJobTransformationSendingActor],
            name = "clientJobTransformationSendingActor")
    
        val counter = new AtomicInteger
        import system.dispatcher
        system.scheduler.schedule(2.seconds, 2.seconds) {
          clientJobTransformationSendingActor ! Send(counter.incrementAndGet())
          Thread.sleep(1000)
        }
    
        StdIn.readLine()
        system.terminate()
      }
    }
    
    
    
    
    

    There is not too much to talk about here, we simply create a standard actor, and send it messages on a recurring schedule.

    The message looks like this

    case class Send(count:Int)
    

    The real work of talking to the cluster is inside the ClientJobTransformationSendingActor which we will look at now

    import akka.actor.Actor
    import akka.actor.ActorPath
    import akka.cluster.client.{ClusterClientSettings, ClusterClient}
    import akka.pattern.Patterns
    import sample.cluster.transformation.{TransformationResult, TransformationJob}
    import akka.util.Timeout
    import scala.concurrent.duration._
    import scala.concurrent.ExecutionContext.Implicits.global
    import scala.util.{Failure, Success}
    
    
    class ClientJobTransformationSendingActor extends Actor {
    
      val initialContacts = Set(
        ActorPath.fromString("akka.tcp://ClusterSystem@127.0.0.1:2551/system/receptionist"))
      val settings = ClusterClientSettings(context.system)
        .withInitialContacts(initialContacts)
    
      val c = context.system.actorOf(ClusterClient.props(settings), "demo-client")
    
    
      def receive = {
        case TransformationResult(result) => {
          println("Client response")
          println(result)
        }
        case Send(counter) => {
            val job = TransformationJob("hello-" + counter)
            implicit val timeout = Timeout(5 seconds)
            val result = Patterns.ask(c,ClusterClient.Send("/user/frontend", job, localAffinity = true), timeout)
    
            result.onComplete {
              case Success(transformationResult) => {
                println(s"Client saw result: $transformationResult")
                self ! transformationResult
              }
              case Failure(t) => println("An error has occured: " + t.getMessage)
            }
          }
      }
    }
    

    As you can see this is a regular actor, but there are several important things to note here:

    • We setup the ClusterClient with a known set of seed nodes that we can expect to be able to contact within the cluster (remember these nodes MUST have registered themselves as available services with the ClusterClientReceptionist
    • That we use a new type of actor a ClusterClient
    • That we use the ClusterClient to send a message to a seed node within the cluster (frontend) in our case. We use the ask pattern which will give use a Future[T] which represents the response.
    • We use the response to send a local message to ourself

     

    FrontEnd

    As previously stated the “frontend” role actors serve as the seed nodes for the ClusterClient. There is only one seed node for the frontend which we just saw the client app uses via the ClusterClient.

    So what happens when the client app uses the frontend actors via the ClusterClient, well its quite simple the client app (once a connection is made to the frontend seed node) send a simple TransformationJob which is a simple message that contains a bit of text that the frontend actor will pass on to one of its registered backend workers for processing.

    The backend actor (also in the cluster) will simply convert the TransformationJob contained text to  UPPERCASE and return it to the frontend actor. The frontend actor will then send this TransformationResult back to the sender which happens to be the ClusterClient. The client app will listen to this (which was done using the ask pattern) and will hook up a callback for the Future[T] and will the send the TransformationResult to the clients own actor.

    Happy days.

    So that is what we are trying to achieve, lets see what bits and bobs we need for the frontend side of things

    Here is the configuration the frontend needs

    #//#snippet
    akka {
      actor {
        provider = "akka.cluster.ClusterActorRefProvider"
      }
      remote {
        log-remote-lifecycle-events = off
        netty.tcp {
          hostname = "127.0.0.1"
          port = 0
        }
      }
    
      cluster {
        seed-nodes = [
          "akka.tcp://ClusterSystem@127.0.0.1:2551",
          "akka.tcp://ClusterSystem@127.0.0.1:2552"]
    
        #//#snippet
        # excluded from snippet
        auto-down-unreachable-after = 10s
        #//#snippet
        # auto downing is NOT safe for production deployments.
        # you may want to use it during development, read more about it in the docs.
        #
        # auto-down-unreachable-after = 10s
      }
    
      # enable receptionist at start
      extensions = ["akka.cluster.client.ClusterClientReceptionist"]
    
    }
    
    

    There are a couple of important things to note in this, namely:

    • That we configure the seed nodes
    • That we also use add the ClusterClientReceptionist
    • That we use the ClusterActorRefProvider

    And here is the frontend application

    package sample.cluster.transformation.frontend
    
    import language.postfixOps
    import akka.actor.ActorSystem
    import akka.actor.Props
    import com.typesafe.config.ConfigFactory
    import akka.cluster.client.ClusterClientReceptionist
    
    
    
    object TransformationFrontendApp {
    
      def main(args: Array[String]): Unit = {
    
        // Override the configuration of the port when specified as program argument
        val port = if (args.isEmpty) "0" else args(0)
        val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port").
          withFallback(ConfigFactory.parseString("akka.cluster.roles = [frontend]")).
          withFallback(ConfigFactory.load())
    
        val system = ActorSystem("ClusterSystem", config)
        val frontend = system.actorOf(Props[TransformationFrontend], name = "frontend")
        ClusterClientReceptionist(system).registerService(frontend)
      }
    
    }
    

    The important parts here are that we embellish the read config with the role of “frontend”, and that we also register the frontend actor with the ClusterClientReceptionist such that the actor is available to communicate with by the ClusterClient

    Other than that it is all pretty vanilla akka to be honest

    So lets now focus our attention to the actual frontend actor, which is shown below

    package sample.cluster.transformation.frontend
    
    import sample.cluster.transformation.{TransformationResult, BackendRegistration, JobFailed, TransformationJob}
    import language.postfixOps
    import scala.concurrent.Future
    import akka.actor.Actor
    import akka.actor.ActorRef
    import akka.actor.Terminated
    import akka.util.Timeout
    import scala.concurrent.duration._
    import scala.concurrent.ExecutionContext.Implicits.global
    import akka.pattern.pipe
    import akka.pattern.ask
    
    
    class TransformationFrontend extends Actor {
    
      var backends = IndexedSeq.empty[ActorRef]
      var jobCounter = 0
    
      def receive = {
        case job: TransformationJob if backends.isEmpty =>
          sender() ! JobFailed("Service unavailable, try again later", job)
    
        case job: TransformationJob =>
          println(s"Frontend saw TransformationJob : '$job'")
          jobCounter += 1
          implicit val timeout = Timeout(5 seconds)
          val result  = (backends(jobCounter % backends.size) ? job)
            .map(x => x.asInstanceOf[TransformationResult])
          result pipeTo sender
          //pipe(result) to sender
    
        case BackendRegistration if !backends.contains(sender()) =>
          context watch sender()
          backends = backends :+ sender()
    
        case Terminated(a) =>
          backends = backends.filterNot(_ == a)
      }
    }
    

    The crucial parts here are:

    • That when a backend registers it will send a BackendRegistration, which we then watch and monitor, and if that backend terminates it is removed from the list of this frontend actors known backend actors
    • That we palm off the incoming TransformationJob to a random backend, and then use the pipe pattern to pipe the response back to the client

    And with that, all that is left to do is examine the backend code, lets looks at that now

     

    BackEnd

    As always lets start with the configuration, which for the backend is as follows:

    #//#snippet
    akka {
      actor {
        provider = "akka.cluster.ClusterActorRefProvider"
      }
      remote {
        log-remote-lifecycle-events = off
        netty.tcp {
          hostname = "127.0.0.1"
          port = 0
        }
      }
    
      cluster {
        seed-nodes = [
          "akka.tcp://ClusterSystem@127.0.0.1:2551",
          "akka.tcp://ClusterSystem@127.0.0.1:2552"]
    
        #//#snippet
        # excluded from snippet
        auto-down-unreachable-after = 10s
        #//#snippet
        # auto downing is NOT safe for production deployments.
        # you may want to use it during development, read more about it in the docs.
        #
        # auto-down-unreachable-after = 10s
      }
    
      # enable receptionist at start
      extensions = ["akka.cluster.client.ClusterClientReceptionist"]
    }
    
    
    
    
    

    You can see this is pretty much the same as the frontend, so I won’t speak to this anymore.

    Ok so following what we did with the frontend side of things, lets now look at the backend app

    package sample.cluster.transformation.backend
    
    import language.postfixOps
    import scala.concurrent.duration._
    import akka.actor.ActorSystem
    import akka.actor.Props
    import com.typesafe.config.ConfigFactory
    
    object TransformationBackendApp {
      def main(args: Array[String]): Unit = {
        // Override the configuration of the port when specified as program argument
        val port = if (args.isEmpty) "0" else args(0)
        val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port").
          withFallback(ConfigFactory.parseString("akka.cluster.roles = [backend]")).
          withFallback(ConfigFactory.load())
    
        val system = ActorSystem("ClusterSystem", config)
        system.actorOf(Props[TransformationBackend], name = "backend")
      }
    }
    

    Again this is VERY similar to the front end app, the only notable exception being that we now use a “backend” role instead of a “frontend” one

    So now lets look at the backend actor code, which is the final piece of the puzzle

    package sample.cluster.transformation.backend
    
    import sample.cluster.transformation.{BackendRegistration, TransformationResult, TransformationJob}
    import language.postfixOps
    import scala.concurrent.duration._
    import akka.actor.Actor
    import akka.actor.RootActorPath
    import akka.cluster.Cluster
    import akka.cluster.ClusterEvent.CurrentClusterState
    import akka.cluster.ClusterEvent.MemberUp
    import akka.cluster.Member
    import akka.cluster.MemberStatus
    
    
    class TransformationBackend extends Actor {
    
      val cluster = Cluster(context.system)
    
      // subscribe to cluster changes, MemberUp
      // re-subscribe when restart
      override def preStart(): Unit = cluster.subscribe(self, classOf[MemberUp])
      override def postStop(): Unit = cluster.unsubscribe(self)
    
      def receive = {
        case TransformationJob(text) => {
          val result = text.toUpperCase
          println(s"Backend has transformed the incoming job text of '$text' into '$result'")
          sender() ! TransformationResult(text.toUpperCase)
        }
        case state: CurrentClusterState =>
          state.members.filter(_.status == MemberStatus.Up) foreach register
        case MemberUp(m) => register(m)
      }
    
      def register(member: Member): Unit =
        if (member.hasRole("frontend"))
          context.actorSelection(RootActorPath(member.address) / "user" / "frontend") !
            BackendRegistration
    }
    

    The key points here are:

    • That we use the cluster events, to subscribe to MemberUp such that if its “frontend” role actor, we will register this backend with it by sending a BackendRegistration message to it
    • That for any TrasformationJob received (from the frontend which is ultimately for the client app) we do the work, and send a TransformationResult back, which will make its way all the way back to the client

     

    And in a nutshell that is how the entire demo hangs together. I hope I have not lost anyone along the way.

    Anyway lets now see how we can run the demo

    How do I Run The Demo

    You will need to ensure that you run the following 3 projects in this order (as a minimum. You can run more NON seed node frontend/backend versions before you start the root (client) if you like)

    • Frontend (seed node) : frontend with command line args : 2551
    • Backend (seed node) : backend with command line args : 2551
    • Optionally run more frontend/backend projects but DON’T supply any command line args. This is how you get them to not be treated as seed nodes
    •  Root : This is the client app

     

    Once you run the projects you should see some output like

    The “root” (client) project output:

    [INFO] [10/05/2016 07:22:02.831] [main] [akka.remote.Remoting] Starting remoting
    [INFO] [10/05/2016 07:22:03.302] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://OTHERSYSTEM@127.0.0.1:5000]
    [INFO] [10/05/2016 07:22:03.322] [main] [akka.cluster.Cluster(akka://OTHERSYSTEM)] Cluster Node [akka.tcp://OTHERSYSTEM@127.0.0.1:5000] – Starting up…
    [INFO] [10/05/2016 07:22:03.450] [main] [akka.cluster.Cluster(akka://OTHERSYSTEM)] Cluster Node [akka.tcp://OTHERSYSTEM@127.0.0.1:5000] – Registered cluster JMX MBean [akka:type=Cluster]
    [INFO] [10/05/2016 07:22:03.450] [main] [akka.cluster.Cluster(akka://OTHERSYSTEM)] Cluster Node [akka.tcp://OTHERSYSTEM@127.0.0.1:5000] – Started up successfully
    [INFO] [10/05/2016 07:22:03.463] [OTHERSYSTEM-akka.actor.default-dispatcher-2] [akka.cluster.Cluster(akka://OTHERSYSTEM)] Cluster Node [akka.tcp://OTHERSYSTEM@127.0.0.1:5000] – Metrics will be retreived from MBeans, and may be incorrect on some platforms. To increase metric accuracy add the ‘sigar.jar’ to the classpath and the appropriate platform-specific native libary to ‘java.library.path’. Reason: java.lang.ClassNotFoundException: org.hyperic.sigar.Sigar
    [INFO] [10/05/2016 07:22:03.493] [OTHERSYSTEM-akka.actor.default-dispatcher-2] [akka.cluster.Cluster(akka://OTHERSYSTEM)] Cluster Node [akka.tcp://OTHERSYSTEM@127.0.0.1:5000] – Metrics collection has started successfully
    [WARN] [10/05/2016 07:22:03.772] [OTHERSYSTEM-akka.actor.default-dispatcher-19] [akka.tcp://OTHERSYSTEM@127.0.0.1:5000/system/cluster/core/daemon] Trying to join member with wrong ActorSystem name, but was ignored, expected [OTHERSYSTEM] but was [ClusterSystem]
    [INFO] [10/05/2016 07:22:03.811] [OTHERSYSTEM-akka.actor.default-dispatcher-19] [akka.tcp://OTHERSYSTEM@127.0.0.1:5000/user/demo-client] Connected to [akka.tcp://ClusterSystem@127.0.0.1:2552/system/receptionist]
    [WARN] [10/05/2016 07:22:05.581] [OTHERSYSTEM-akka.remote.default-remote-dispatcher-14] [akka.serialization.Serialization(akka://OTHERSYSTEM)] Using the default Java serializer for class [sample.cluster.transformation.TransformationJob] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting ‘akka.actor.warn-about-java-serializer-usage’
    Client saw result: TransformationResult(HELLO-1)
    Client response
    HELLO-1
    Client saw result: TransformationResult(HELLO-2)
    Client response
    HELLO-2
    Client saw result: TransformationResult(HELLO-3)
    Client response
    HELLO-3
    Client saw result: TransformationResult(HELLO-4)
    Client response
    HELLO-4

    The “frontend” project output:

    [INFO] [10/05/2016 07:21:35.592] [main] [akka.remote.Remoting] Starting remoting
    [INFO] [10/05/2016 07:21:35.883] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://ClusterSystem@127.0.0.1:2551]
    [INFO] [10/05/2016 07:21:35.901] [main] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] – Starting up…
    [INFO] [10/05/2016 07:21:36.028] [main] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] – Registered cluster JMX MBean [akka:type=Cluster]
    [INFO] [10/05/2016 07:21:36.028] [main] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] – Started up successfully
    [INFO] [10/05/2016 07:21:36.037] [ClusterSystem-akka.actor.default-dispatcher-2] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] – Metrics will be retreived from MBeans, and may be incorrect on some platforms. To increase metric accuracy add the ‘sigar.jar’ to the classpath and the appropriate platform-specific native libary to ‘java.library.path’. Reason: java.lang.ClassNotFoundException: org.hyperic.sigar.Sigar
    [INFO] [10/05/2016 07:21:36.040] [ClusterSystem-akka.actor.default-dispatcher-2] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] – Metrics collection has started successfully
    [WARN] [10/05/2016 07:21:37.202] [ClusterSystem-akka.remote.default-remote-dispatcher-5] [akka.tcp://ClusterSystem@127.0.0.1:2551/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.1%3A2552-0] Association with remote system [akka.tcp://ClusterSystem@127.0.0.1:2552] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://ClusterSystem@127.0.0.1:2552]] Caused by: [Connection refused: no further information: /127.0.0.1:2552]
    [INFO] [10/05/2016 07:21:37.229] [ClusterSystem-akka.actor.default-dispatcher-17] [akka://ClusterSystem/deadLetters] Message [akka.cluster.InternalClusterAction$InitJoin$] from Actor[akka://ClusterSystem/system/cluster/core/daemon/firstSeedNodeProcess-1#-2009390233] to Actor[akka://ClusterSystem/deadLetters] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings ‘akka.log-dead-letters’ and ‘akka.log-dead-letters-during-shutdown’.
    [INFO] [10/05/2016 07:21:37.229] [ClusterSystem-akka.actor.default-dispatcher-17] [akka://ClusterSystem/deadLetters] Message [akka.cluster.InternalClusterAction$InitJoin$] from Actor[akka://ClusterSystem/system/cluster/core/daemon/firstSeedNodeProcess-1#-2009390233] to Actor[akka://ClusterSystem/deadLetters] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings ‘akka.log-dead-letters’ and ‘akka.log-dead-letters-during-shutdown’.
    [INFO] [10/05/2016 07:21:37.232] [ClusterSystem-akka.actor.default-dispatcher-21] [akka://ClusterSystem/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.1%3A2552-0/endpointWriter] Message [akka.remote.EndpointWriter$AckIdleCheckTimer$] from Actor[akka://ClusterSystem/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.1%3A2552-0/endpointWriter#-1346529294] to Actor[akka://ClusterSystem/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.1%3A2552-0/endpointWriter#-1346529294] was not delivered. [3] dead letters encountered. This logging can be turned off or adjusted with configuration settings ‘akka.log-dead-letters’ and ‘akka.log-dead-letters-during-shutdown’.
    [INFO] [10/05/2016 07:21:38.085] [ClusterSystem-akka.actor.default-dispatcher-22] [akka://ClusterSystem/deadLetters] Message [akka.cluster.InternalClusterAction$InitJoin$] from Actor[akka://ClusterSystem/system/cluster/core/daemon/firstSeedNodeProcess-1#-2009390233] to Actor[akka://ClusterSystem/deadLetters] was not delivered. [4] dead letters encountered. This logging can be turned off or adjusted with configuration settings ‘akka.log-dead-letters’ and ‘akka.log-dead-letters-during-shutdown’.
    [INFO] [10/05/2016 07:21:39.088] [ClusterSystem-akka.actor.default-dispatcher-14] [akka://ClusterSystem/deadLetters] Message [akka.cluster.InternalClusterAction$InitJoin$] from Actor[akka://ClusterSystem/system/cluster/core/daemon/firstSeedNodeProcess-1#-2009390233] to Actor[akka://ClusterSystem/deadLetters] was not delivered. [5] dead letters encountered. This logging can be turned off or adjusted with configuration settings ‘akka.log-dead-letters’ and ‘akka.log-dead-letters-during-shutdown’.
    [INFO] [10/05/2016 07:21:40.065] [ClusterSystem-akka.actor.default-dispatcher-20] [akka://ClusterSystem/deadLetters] Message [akka.cluster.InternalClusterAction$InitJoin$] from Actor[akka://ClusterSystem/system/cluster/core/daemon/firstSeedNodeProcess-1#-2009390233] to Actor[akka://ClusterSystem/deadLetters] was not delivered. [6] dead letters encountered. This logging can be turned off or adjusted with configuration settings ‘akka.log-dead-letters’ and ‘akka.log-dead-letters-during-shutdown’.
    [INFO] [10/05/2016 07:21:41.095] [ClusterSystem-akka.actor.default-dispatcher-19] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] – Node [akka.tcp://ClusterSystem@127.0.0.1:2551] is JOINING, roles [frontend]
    [INFO] [10/05/2016 07:21:41.123] [ClusterSystem-akka.actor.default-dispatcher-19] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] – Leader is moving node [akka.tcp://ClusterSystem@127.0.0.1:2551] to [Up]
    [INFO] [10/05/2016 07:21:50.837] [ClusterSystem-akka.actor.default-dispatcher-14] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] – Node [akka.tcp://ClusterSystem@127.0.0.1:2552] is JOINING, roles [backend]
    [INFO] [10/05/2016 07:21:51.096] [ClusterSystem-akka.actor.default-dispatcher-16] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] – Leader is moving node [akka.tcp://ClusterSystem@127.0.0.1:2552] to [Up]
    Frontend saw TransformationJob : ‘TransformationJob(hello-1)’
    [WARN] [10/05/2016 07:22:05.669] [ClusterSystem-akka.remote.default-remote-dispatcher-24] [akka.serialization.Serialization(akka://ClusterSystem)] Using the default Java serializer for class [sample.cluster.transformation.TransformationJob] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting ‘akka.actor.warn-about-java-serializer-usage’
    [WARN] [10/05/2016 07:22:05.689] [ClusterSystem-akka.remote.default-remote-dispatcher-23] [akka.serialization.Serialization(akka://ClusterSystem)] Using the default Java serializer for class [sample.cluster.transformation.TransformationResult] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting ‘akka.actor.warn-about-java-serializer-usage’
    Frontend saw TransformationJob : ‘TransformationJob(hello-2)’
    Frontend saw TransformationJob : ‘TransformationJob(hello-3)’
    Frontend saw TransformationJob : ‘TransformationJob(hello-4)’
    .

     

    The “backend”project output:

    [INFO] [10/05/2016 07:21:50.023] [main] [akka.remote.Remoting] Starting remoting
    [INFO] [10/05/2016 07:21:50.338] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://ClusterSystem@127.0.0.1:2552]
    [INFO] [10/05/2016 07:21:50.353] [main] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] – Starting up…
    [INFO] [10/05/2016 07:21:50.430] [main] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] – Registered cluster JMX MBean [akka:type=Cluster]
    [INFO] [10/05/2016 07:21:50.430] [main] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] – Started up successfully
    [INFO] [10/05/2016 07:21:50.437] [ClusterSystem-akka.actor.default-dispatcher-6] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] – Metrics will be retreived from MBeans, and may be incorrect on some platforms. To increase metric accuracy add the ‘sigar.jar’ to the classpath and the appropriate platform-specific native libary to ‘java.library.path’. Reason: java.lang.ClassNotFoundException: org.hyperic.sigar.Sigar
    [INFO] [10/05/2016 07:21:50.441] [ClusterSystem-akka.actor.default-dispatcher-6] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] – Metrics collection has started successfully
    [INFO] [10/05/2016 07:21:50.977] [ClusterSystem-akka.actor.default-dispatcher-2] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] – Welcome from [akka.tcp://ClusterSystem@127.0.0.1:2551]
    [WARN] [10/05/2016 07:21:51.289] [ClusterSystem-akka.remote.default-remote-dispatcher-15] [akka.serialization.Serialization(akka://ClusterSystem)] Using the default Java serializer for class [sample.cluster.transformation.BackendRegistration$] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting ‘akka.actor.warn-about-java-serializer-usage’
    [WARN] [10/05/2016 07:22:05.651] [ClusterSystem-akka.remote.default-remote-dispatcher-7] [akka.serialization.Serialization(akka://ClusterSystem)] Using the default Java serializer for class [sample.cluster.transformation.TransformationJob] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting ‘akka.actor.warn-about-java-serializer-usage’
    Backend has transformed the incoming job text of ‘hello-1’ into ‘HELLO-1’
    [WARN] [10/05/2016 07:22:05.677] [ClusterSystem-akka.remote.default-remote-dispatcher-15] [akka.serialization.Serialization(akka://ClusterSystem)] Using the default Java serializer for class [sample.cluster.transformation.TransformationResult] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting ‘akka.actor.warn-about-java-serializer-usage’
    Backend has transformed the incoming job text of ‘hello-2’ into ‘HELLO-2’
    Backend has transformed the incoming job text of ‘hello-3’ into ‘HELLO-3’
    Backend has transformed the incoming job text of ‘hello-4’ into ‘HELLO-4’

     

    Nat or Docker Considerations

    Akka clustering does not work transparently with Network Address Translation, Load Balancers, or in Docker containers. If this is your case you may need to further configure Akka as described here :

    http://doc.akka.io/docs/akka/snapshot/scala/remoting.html#remote-configuration-nat

     

     

    Where Can I Find The Code Examples?

    I will be augmenting this GitHub repo with the example projects as I move through this series

    https://github.com/sachabarber/SachaBarber.AkkaExamples