Author Archives: sachabarber

A Look At Docker

A while ago I worked on a project that used this tech stack

  • Akka HTTP : (actually we used Spray.IO but it is practically the same thing for the purpose of this article). For those that don’t know what Akka HTTP is, it is a simple Akka based framework that is also able to expose a REST interface to communicate with the actor system
  • Cassandra database : Apache Cassandra is a free and open-source distributed database management system designed to handle large amounts of data across many commodity servers, providing high availability with no single point of failure. Cassandra offers robust support for clusters spanning multiple datacenters, with asynchronous masterless replication allowing low latency operations for all clients.

It is a multi node cluster

This was a pain to test, and we were always stepping on each others toes, as you can imagine running up a 5 node cluster of VMs just to satisfy my each developers own testing needs was a bit much. So we ended up with some dedicated test environments, running 5 Cassandra nodes. These was still a PITA to be honest.

This got me thinking perhaps I could use Docker to help me out here, perhaps I could run Cassandra in a Docker container, hell perhaps I could even run my own code that uses Cassandra in a Docker container, and just point my UI at the Akka HTTP REST server running in Docker. mmmmm

I started to dig around, and of course this is entirely possible (otherwise I would not be writing this article now would I).

This is certainly not a new thing here for Codeproject, there are numerous Docker articles,  but I never found one that talked about Cassandra, so I thought why not write another one.

 

Which I have just published here : https://www.codeproject.com/Articles/1175248/A-look-at-Docker

Update scheduled Quartz.Net job by monitoring App.Config

 

Introduction

So I was back in .NET land the other day at work, where I had to schedule some code to run periodically on some schedule.

The business also needed this schedule to be adjustable, so that they could adjust it when things were busier and wind it down when they are not

This adjusting of the schedule time would be done via a setting in the App.Config, where the App.Config is monitored for changes. If there is a change then we would look to use the new schedule value from the App.Config to run the job. Ideally the app must not go down to afford this change of job schedule time.

There are some good job / scheduling libraries out there, but for this I just wanted to use something light weight so I went with Quartz.net

Its easy to setup and use, and has a fairly nice API, supports IOC and CRON schedules. In short it fits the bill

In a netshell this post will simple talk about how you can adjust the schedule of a ALREADY scheduled job, there will also be some special caveats that I personally had to deal with in my requirements, which may or may not be an issue for you

 

Some Weird Issues That I Needed To Cope With

So let me just talk about some of the issues that I had to deal with

The guts of the job code that I run on my schedule is actually writing to Azure Blob Storage and then to Azure SQL DW tables. And as such has several writes to several components one after another.

So this run of the current job run MUST be allowed to complete in FULL (or fail using Exception handling that’s ok to). It would not be acceptable to just stop the Quartz job while there is work in flight.

I guess some folk may be thinking of some sort of transaction here, that must either commit or rollback. Unfortunately that doesn’t work with Azure Blob Storage uploads.

So I had to think of another plan.

So here is what I came up with. I would use threading primitives namely an AutoResetEvent that would control when the Quartz.net job could be changed to use a new schedule.

if a change in the App.Config was seen, then we know that we “should” be using a new schedule time, however the scheduled job MAY have work in flight. So we need to wait for that work to complete (or fail) before we could think about swapping the Quartz.net scheduler time.

So that is what I went for, there are a few other things to be aware of such as I needed threading primitives that worked with Async/Await code. Luckily Stephen Toub from the TPL team has done that for us : asyncautoresetevent

There is also the well known fact that the FileSystemWatcher class fires events twice : http://lmgtfy.com/?q=filesystemwatcher+firing+twice

So as we go through the code you will see how I dealt with those

The Code

Ok so now that we have talked about the problem, lets go through the code.

There are several NuGet packages I am using to make my life easier

So lets start with the entry point, which for me is the simple Program class shown below

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Security.Principal;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Autofac;
using SachaBarber.QuartzJobUpdate.Services;
using Topshelf;


namespace SachaBarber.QuartzJobUpdate
{
    static class Program
    {
        private static ILogger _log = null;


        [STAThread]
        public static void Main()
        {
            try
            {
                var container = ContainerOperations.Container;
                _log = container.Resolve<ILogger>();
                _log.Log("Starting");

                AppDomain.CurrentDomain.UnhandledException += AppDomainUnhandledException;
                TaskScheduler.UnobservedTaskException += TaskSchedulerUnobservedTaskException;
                Thread.CurrentPrincipal = new WindowsPrincipal(WindowsIdentity.GetCurrent());
                
                HostFactory.Run(c =>                                 
                {
                    c.Service<SomeWindowsService>(s =>                        
                    {
                        s.ConstructUsing(() => container.Resolve<SomeWindowsService>());
                        s.WhenStarted(tc => tc.Start());             
                        s.WhenStopped(tc => tc.Stop());               
                    });
                    c.RunAsLocalSystem();                            

                    c.SetDescription("Uploads Calc Payouts/Summary data into Azure blob storage for RiskStore DW ingestion");       
                    c.SetDisplayName("SachaBarber.QuartzJobUpdate");                       
                    c.SetServiceName("SachaBarber.QuartzJobUpdate");                      
                });
            }
            catch (Exception ex)
            {
                _log.Log(ex.Message);
            }
            finally
            {
                _log.Log("Closing");
            }
        }
      

        private static void AppDomainUnhandledException(object sender, UnhandledExceptionEventArgs e)
        {
            ProcessUnhandledException((Exception)e.ExceptionObject);
        }

        private static void TaskSchedulerUnobservedTaskException(object sender, UnobservedTaskExceptionEventArgs e)
        {
            ProcessUnhandledException(e.Exception);
            e.SetObserved();
        }

        private static void ProcessUnhandledException(Exception ex)
        {
            if (ex is TargetInvocationException)
            {
                ProcessUnhandledException(ex.InnerException);
                return;
            }
            _log.Log("Error");
        }
    }
}

All this does it host the actual windows service class for me using TopShelf. Where the actual service class looks like this

using System;
using System.Configuration;
using System.IO;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Xml.Linq;
using Autofac;
using SachaBarber.QuartzJobUpdate.Async;
using SachaBarber.QuartzJobUpdate.Configuration;
using SachaBarber.QuartzJobUpdate.Jobs;
using SachaBarber.QuartzJobUpdate.Services;
//logging
using Quartz;

namespace SachaBarber.QuartzJobUpdate
{
    public class SomeWindowsService
    {
        private readonly ILogger _log;
        private readonly ISchedulingAssistanceService _schedulingAssistanceService;
        private readonly IRxSchedulerService _rxSchedulerService;
        private readonly IObservableFileSystemWatcher _observableFileSystemWatcher;
        private IScheduler _quartzScheduler;
        private readonly AsyncLock _lock = new AsyncLock();
        private readonly SerialDisposable _configWatcherDisposable = new SerialDisposable();
        private static readonly JobKey _someScheduledJobKey = new JobKey("SomeScheduledJobKey");
        private static readonly TriggerKey _someScheduledJobTriggerKey = new TriggerKey("SomeScheduledJobTriggerKey");

        public SomeWindowsService(
            ILogger log,
            ISchedulingAssistanceService schedulingAssistanceService, 
            IRxSchedulerService rxSchedulerService,
            IObservableFileSystemWatcher observableFileSystemWatcher)
        {
            _log = log;
            _schedulingAssistanceService = schedulingAssistanceService;
            _rxSchedulerService = rxSchedulerService;
            _observableFileSystemWatcher = observableFileSystemWatcher;
        }

        public void Start()
        {
            try
            {
                var ass = typeof (SomeWindowsService).Assembly;
                var configFile = $"{ass.Location}.config"; 
                CreateConfigWatcher(new FileInfo(configFile));


                _log.Log("Starting SomeWindowsService");

                _quartzScheduler = ContainerOperations.Container.Resolve<IScheduler>();
                _quartzScheduler.JobFactory = new AutofacJobFactory(ContainerOperations.Container);
                _quartzScheduler.Start();

                //create the Job
                CreateScheduledJob();
            }
            catch (JobExecutionException jeex)
            {
                _log.Log(jeex.Message);
            }
            catch (SchedulerConfigException scex)
            {
                _log.Log(scex.Message);
            }
            catch (SchedulerException sex)
            {
                _log.Log(sex.Message);
            }

        }

        public void Stop()
        {
            _log.Log("Stopping SomeWindowsService");
            _quartzScheduler?.Shutdown();
            _configWatcherDisposable.Dispose();
            _observableFileSystemWatcher.Dispose();
        }


        private void CreateConfigWatcher(FileInfo configFileInfo)
        {
            FileSystemWatcher watcher = new FileSystemWatcher();
            watcher.Path = configFileInfo.DirectoryName;
            watcher.NotifyFilter = 
                NotifyFilters.LastAccess | 
                NotifyFilters.LastWrite | 
                NotifyFilters.FileName | 
                NotifyFilters.DirectoryName;
            watcher.Filter = configFileInfo.Name;
            _observableFileSystemWatcher.SetFile(watcher);
            //FSW is notorious for firing twice see here : 
            //http://stackoverflow.com/questions/1764809/filesystemwatcher-changed-event-is-raised-twice
            //so lets use Rx to Throttle it a bit
            _configWatcherDisposable.Disposable = _observableFileSystemWatcher.Changed.SubscribeOn(
                _rxSchedulerService.TaskPool).Throttle(TimeSpan.FromMilliseconds(500)).Subscribe(
                    async x =>
                    {
                        //at this point the config has changed, start a critical section
                        using (var releaser = await _lock.LockAsync())
                        {
                            //tell current scheduled job that we need to read new config, and wait for it
                            //to signal us that we may continue
                            _log.Log($"Config file {configFileInfo.Name} has changed, attempting to read new config data");
                            _schedulingAssistanceService.RequiresNewSchedulerSetup = true;
                            _schedulingAssistanceService.SchedulerRestartGate.WaitAsync().GetAwaiter().GetResult();
                            //recreate the AzureBlobConfiguration, and recreate the scheduler using new settings
                            ConfigurationManager.RefreshSection("schedulingConfiguration");
                            var newSchedulingConfiguration = SimpleConfig.Configuration.Load<SchedulingConfiguration>();
                            _log.Log($"SchedulingConfiguration section is now : {newSchedulingConfiguration}");
                            ContainerOperations.ReInitialiseSchedulingConfiguration(newSchedulingConfiguration);
                            ReScheduleJob();
                        }
                    },
                    ex =>
                    {
                        _log.Log($"Error encountered attempting to read new config data from config file {configFileInfo.Name}");
                    });
        }

        private void CreateScheduledJob(IJobDetail existingJobDetail = null)
        {
            var azureBlobConfiguration = ContainerOperations.Container.Resolve<SchedulingConfiguration>();
            IJobDetail job = JobBuilder.Create<SomeQuartzJob>()
                    .WithIdentity(_someScheduledJobKey)
                    .Build();

            ITrigger trigger = TriggerBuilder.Create()
                .WithIdentity(_someScheduledJobTriggerKey)
                .WithSimpleSchedule(x => x
                    .RepeatForever()
                    .WithIntervalInSeconds(azureBlobConfiguration.ScheduleTimerInMins)
                )
                .StartAt(DateTimeOffset.Now.AddSeconds(azureBlobConfiguration.ScheduleTimerInMins))
                .Build();

            _quartzScheduler.ScheduleJob(job, trigger);
        }

        private void ReScheduleJob()
        {
            if (_quartzScheduler != null)
            {
                _quartzScheduler.DeleteJob(_someScheduledJobKey);
                CreateScheduledJob();
            }
        }
    }


}

There is a fair bit going on here. So lets list some of the work this code does

  • It creates the initial Quartz.Net job and scheduled it using the values from a custom config section which are read into an object
  • It watches the config file for changes (we will go through that in a moment) and will wait on the AsyncAutoResetEvent to be signalled, at which point it will recreate the Quartz.net job

So lets have a look at some of the small helper parts

This is a simple Rx based file system watcher. The reason Rx is good here is that you can Throttle the events (see this post FileSystemWatcher raises 2 events)

using System;
using System.IO;
using System.Reactive.Linq;

namespace SachaBarber.QuartzJobUpdate.Services
{
    public class ObservableFileSystemWatcher : IObservableFileSystemWatcher
    {
        private FileSystemWatcher _watcher;

        public void SetFile(FileSystemWatcher watcher)
        {
            _watcher = watcher;

            Changed = Observable
                .FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>
                (h => _watcher.Changed += h, h => _watcher.Changed -= h)
                .Select(x => x.EventArgs);

            Renamed = Observable
                .FromEventPattern<RenamedEventHandler, RenamedEventArgs>
                (h => _watcher.Renamed += h, h => _watcher.Renamed -= h)
                .Select(x => x.EventArgs);

            Deleted = Observable
                .FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>
                (h => _watcher.Deleted += h, h => _watcher.Deleted -= h)
                .Select(x => x.EventArgs);

            Errors = Observable
                .FromEventPattern<ErrorEventHandler, ErrorEventArgs>
                (h => _watcher.Error += h, h => _watcher.Error -= h)
                .Select(x => x.EventArgs);

            Created = Observable
                .FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>
                (h => _watcher.Created += h, h => _watcher.Created -= h)
                .Select(x => x.EventArgs);

            All = Changed.Merge(Renamed).Merge(Deleted).Merge(Created);
            _watcher.EnableRaisingEvents = true;
        }

        public void Dispose()
        {
            _watcher.EnableRaisingEvents = false;
            _watcher.Dispose();
        }

        public IObservable<FileSystemEventArgs> Changed { get; private set; }
        public IObservable<RenamedEventArgs> Renamed { get; private set; }
        public IObservable<FileSystemEventArgs> Deleted { get; private set; }
        public IObservable<ErrorEventArgs> Errors { get; private set; }
        public IObservable<FileSystemEventArgs> Created { get; private set; }
        public IObservable<FileSystemEventArgs> All { get; private set; }
    }
}

And this is a small utility class that will contain the results of the custom config section that may be read using SimpleConfig

namespace SachaBarber.QuartzJobUpdate.Configuration
{
    public class SchedulingConfiguration
    {
        public int ScheduleTimerInMins { get; set; }

        public override string ToString()
        {
            return $"ScheduleTimerInMins: {ScheduleTimerInMins}";
        }
    }
}

Which you read from the App.Config like this

 var newSchedulingConfiguration = SimpleConfig.Configuration.Load<SchedulingConfiguration>();

And this is the Async/Await compatible AutoResetEvent that I took from Stephen Toubs blog

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace SachaBarber.QuartzJobUpdate.Async
{
    public class AsyncAutoResetEvent
    {
        private static readonly Task Completed = Task.FromResult(true);
        private readonly Queue<TaskCompletionSource<bool>> _waits = new Queue<TaskCompletionSource<bool>>();
        private bool _signaled;

        public Task WaitAsync()
        {
            lock (_waits)
            {
                if (_signaled)
                {
                    _signaled = false;
                    return Completed;
                }
                else
                {
                    var tcs = new TaskCompletionSource<bool>();
                    _waits.Enqueue(tcs);
                    return tcs.Task;
                }
            }
        }

        public void Set()
        {
            TaskCompletionSource<bool> toRelease = null;
            lock (_waits)
            {
                if (_waits.Count > 0)
                    toRelease = _waits.Dequeue();
                else if (!_signaled)
                    _signaled = true;
            }
            toRelease?.SetResult(true);
        }
    }
}

So the last part of the puzzle is how does the AsynAutoReset event get signalled?

Well as we said above we need to wait for any in progress work to complete first. So the way I tackled that was that within the job code that gets run every Quartz.Net scheduler tick time, we just check whether we have be requested to swap out the current schedule time, and if so we should signal the waiting code of the (shared) AsyncAutoResetEvent, otherwise we just carry on and do the regular job work.

The way that we get the AsyncAutoResetEvent that is used by the waiting code and also the job code (to signal it) is via using a singleton registration in an IOC container. I am using AutoFac which I set up like this, but you could have your own singleton, or IOC container of choice that you could use.

The trick is to make sure that both classes that need to access the AsyncAutoResetEvent use a single instance.

using System;
using System.Reflection;
using Autofac;
using SachaBarber.QuartzJobUpdate.Configuration;
using SachaBarber.QuartzJobUpdate.Services;
using Quartz;
using Quartz.Impl;

namespace SachaBarber.QuartzJobUpdate
{
    public class ContainerOperations
    {
        private static Lazy<IContainer> _containerSingleton = 
            new Lazy<IContainer>(CreateContainer);

        public static IContainer Container => _containerSingleton.Value;

        public static void ReInitialiseSchedulingConfiguration(
            SchedulingConfiguration newSchedulingConfiguration)
        {
            var currentSchedulingConfiguration = 
                Container.Resolve<SchedulingConfiguration>();
            currentSchedulingConfiguration.ScheduleTimerInMins = 
                newSchedulingConfiguration.ScheduleTimerInMins;
        }
        

        private static IContainer CreateContainer()
        {
            var builder = new ContainerBuilder();
            builder.RegisterType<ObservableFileSystemWatcher>()
                .As<IObservableFileSystemWatcher>().ExternallyOwned();
            builder.RegisterType<RxSchedulerService>()
                .As<IRxSchedulerService>().ExternallyOwned();
            builder.RegisterType<Logger>().As<ILogger>().ExternallyOwned();
            builder.RegisterType<SomeWindowsService>();
            builder.RegisterInstance(new SchedulingAssistanceService())
                .As<ISchedulingAssistanceService>();
            builder.RegisterInstance(
                SimpleConfig.Configuration.Load<SchedulingConfiguration>());

            // Quartz/jobs
            builder.Register(c => new StdSchedulerFactory().GetScheduler())
                .As<Quartz.IScheduler>();
            builder.RegisterAssemblyTypes(Assembly.GetExecutingAssembly())
                .Where(x => typeof(IJob).IsAssignableFrom(x));
            return builder.Build();
        }

        
    }
}

Where the shared instance in my case is this class

using SachaBarber.QuartzJobUpdate.Async;

namespace SachaBarber.QuartzJobUpdate.Services
{
    public class SchedulingAssistanceService : ISchedulingAssistanceService
    {
        public SchedulingAssistanceService()
        {
            SchedulerRestartGate = new AsyncAutoResetEvent();
            RequiresNewSchedulerSetup = false;
        }    

        public AsyncAutoResetEvent SchedulerRestartGate { get; }
        public bool RequiresNewSchedulerSetup { get; set; }
    }
}

Here is the actual job code that will check to see if a change in the App.Config has been detected. Which would require this code to signal the waiting code that it may continue.

using System;
using System.IO;
using System.Net;
using System.Threading.Tasks;
using Quartz;

namespace SachaBarber.QuartzJobUpdate.Services
{
    public class SomeQuartzJob : IJob
    {
        private readonly ILogger _log;
        private readonly ISchedulingAssistanceService _schedulingAssistanceService;

        public SomeQuartzJob(
            ILogger log, 
            ISchedulingAssistanceService schedulingAssistanceService)
        {
            _log = log;
            _schedulingAssistanceService = schedulingAssistanceService;
        }


        public void Execute(IJobExecutionContext context)
        {
            try
            {
                ExecuteAsync(context).GetAwaiter().GetResult();
            }
            catch (JobExecutionException jeex)
            {
                _log.Log(jeex.Message);
                throw;
            }
            catch (SchedulerConfigException scex)
            {
                _log.Log(scex.Message);
                throw;
            }
            catch (SchedulerException sex)
            {
                _log.Log(sex.Message);
                throw;
            }
            catch (ArgumentNullException anex)
            {
                _log.Log(anex.Message);
                throw;
            }
            catch (OperationCanceledException ocex)
            {
                _log.Log(ocex.Message);
                throw;
            }
            catch (IOException ioex)
            {
                _log.Log(ioex.Message);
                throw;
            }
        }


        /// <summary>
        /// This is called every time the Quartz.net scheduler CRON time ticks
        /// </summary>
        public async Task ExecuteAsync(IJobExecutionContext context)
        {
            await Task.Run(async () =>
            {
                if (_schedulingAssistanceService.RequiresNewSchedulerSetup)
                {
                    //signal the waiting scheduler restart code that it can now restart the scheduler
                    _schedulingAssistanceService.RequiresNewSchedulerSetup = false;
                    _log.Log("Job has been asked to stop, to allow job reschedule due to change in config");
                    _schedulingAssistanceService.SchedulerRestartGate.Set();
                }
                else
                {
                    await Task.Delay(1000);
                    _log.Log("Doing the uninterruptible work now");
                }
            });
        }
    }
}

So when the AsyncAutoResetEvent is signalled the waiting code (inside the subscribe code of the Rx file system watcher inside the SomeWindowsService.cs code) will proceed to swap out the Quartz.Net scheduler time.

It can do this safely as we know there is NO work in flight as the job has told this waiting to code to proceed, which it can only do if there is no work in flight.

This swapping over of the scheduler time to use the newly read App.Config values is also protected in an AsyncLock class (again taken from Stephen Toub)

using System;
using System.Threading;
using System.Threading.Tasks;

namespace SachaBarber.QuartzJobUpdate.Async
{
    /// <summary>
    /// See http://blogs.msdn.com/b/pfxteam/archive/2012/02/12/10266988.aspx
    /// from the fabulous Stephen Toub
    /// </summary>    
    public class AsyncLock
    {
        private readonly AsyncSemaphore m_semaphore;
        private readonly Task<Releaser> m_releaser;

        public AsyncLock()
        {
            m_semaphore = new AsyncSemaphore(1);
            m_releaser = Task.FromResult(new Releaser(this));
        }

        public Task<Releaser> LockAsync()
        {
            var wait = m_semaphore.WaitAsync();
            return wait.IsCompleted ?
                m_releaser :
                wait.ContinueWith((_, state) => new Releaser((AsyncLock)state),
                    this, CancellationToken.None,
                    TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
        }

        public struct Releaser : IDisposable
        {
            private readonly AsyncLock m_toRelease;

            internal Releaser(AsyncLock toRelease) { m_toRelease = toRelease; }

            public void Dispose()
            {
                if (m_toRelease != null)
                    m_toRelease.m_semaphore.Release();
            }
        }
    }
}

Where this relies on AsyncSemaphore

using System;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace SachaBarber.QuartzJobUpdate.Async
{
    /// <summary>
    /// See http://blogs.msdn.com/b/pfxteam/archive/2012/02/12/10266983.aspx
    /// from the fabulous Stephen Toub
    /// </summary>
    public class AsyncSemaphore
    {
        private static readonly Task s_completed = Task.FromResult(true);
        private readonly Queue<TaskCompletionSource<bool>> _mWaiters = new Queue<TaskCompletionSource<bool>>();
        private int _mCurrentCount;

        public AsyncSemaphore(int initialCount)
        {
            if (initialCount < 0) throw new ArgumentOutOfRangeException("initialCount");
            _mCurrentCount = initialCount;
        }

        public Task WaitAsync()
        {
            lock (_mWaiters)
            {
                if (_mCurrentCount > 0)
                {
                    --_mCurrentCount;
                    return s_completed;
                }
                else
                {
                    var waiter = new TaskCompletionSource<bool>();
                    _mWaiters.Enqueue(waiter);
                    return waiter.Task;
                }
            }
        }


        public void Release()
        {
            TaskCompletionSource<bool> toRelease = null;
            lock (_mWaiters)
            {
                if (_mWaiters.Count > 0)
                    toRelease = _mWaiters.Dequeue();
                else
                    ++_mCurrentCount;
            }
            if (toRelease != null)
                toRelease.SetResult(true);
        }


    }
}

Just for completeness this is how you get an App.Config section to refresh at runtime

 ConfigurationManager.RefreshSection("schedulingConfiguration");

Anyway this works fine for me, I now have a reactive app that changes to changes in the App.Config without the need to restart the app, and it does so by allowing inflight work to be completed.

Hope it helps someone out there

 

Where Is The Code?

The code can be found here : : https://github.com/sachabarber/SachaBarber.QuartzJobUpdate

scala environment config options

This is going to be a slightly weird post in a way as it is going to go round the houses a bit, and not going to contain any actual code, but shall talk about possible techniques of how to best manage specific environment config values for a multi project scala setup

Coming from .NET

So as many of you know I came from .NET, where we have a simple config model. We have App.Config or Web.Config.

We have tools at our disposal such as the XmlTransformation MsBuild tasks which allow us to maintain a set of different App.Config values in them that will be transformed for your different environments

I wrote a post about this here

https://sachabarbs.wordpress.com/2015/07/07/app-config-transforms-outside-of-web-project/

Here is a small example of what this might look like

 

image

So when I started doing multi project Scala projects using SBT where I might have the following project requirements

image

In the above diagram the following is assumed

  • There is a Sacha.Common library that we want to use across multiple projects
  • That both Sacaha.Rest.Endpoints and Sacha.Play.FrontEnd are both apps which will need some environment specific configuration in them
  • That there is going to be more than 1 environment that we wish to use such as
    • PROD
    • QA
    • DEV

Now coming from .NET my initial instinct was to put a a bunch of folders in the 2 apps, so taking the Sacha.Rest.Endpoints app as an example we may have something like this

image

So the idea would be that we would have specific application.conf files for the different environments that we need to target (I am assuming there is some build process which takes care of putting the correct file in place for the correct build within the CI server).

This is very easy to understand, if we want QA we would end up using the QA version of the application.conf file

This is a very common way of thinking about this problem in .NET.

Why Is This Bad?

But hang on here this is only 1 app, what if we had 100 apps that made up our software in total. That means we need to maintain all these separate config files for all the environments in ALL the separate apps.

Wow that doesn’t sound so cool anymore.

Another Approach!

A colleague and I were talking about this in some scala code that was being written for a new project, and this is kind of what was being discussed.

I should point out that this idea was not in fact mine, but my colleagues Andy Sprague, which is not something I credited him for in the 1st draft of this post. Which is bad, sorry Andy.

Anyway how about this for another idea. How about the Sacha.Common JAR hold just the specific bits of changing config in separate config files such as

  • “Qa.conf”
  • “Prod.conf”
  • “Dev.conf”

And then the individual apps that already reference the Sacha.Common JAR just include the environment config they need.

This is entirely possible thanks to the way that the Typesafe config library works, where it is designed to include extra config files. These extra config files in this case are just inside of the a JAR that is external -> Sacha.Common

Here is what this might like look for a consumer of the Sacha.Common jar

image

Where we just include the relevant environment config from Sacha.Common in the application.conf for the current app

And this is what the Sacha.Common may look like, where it provides the separate environment config files that consumers may use

image

This diagram may help to illustrate this further

image

Why Is This Cool?

The good thing about this design over the separate config files per environment per application is that we now ONLY need to maintain one set of environment specific settings, which are those in the common Jar Sacha.Common

I wish we could do this within the .NET configuration system.

Hope this helps, I personally think that this is a fairly nice way to manage your configs for multiple applications and multiple environments

 

 

 

 

 

 

No longer an mvp

So January is here, and I have been informed I am no longer an MVP.

I held the MVP award for 9 years, so I am happy with that, would have been nice to make it a 10th, but hey ho

To be honest I am not too surprised by this announcement, as I have become more and more interested in a wider range of things, such as

  • Scala
  • Akka
  • Play
  • Cassandra
  • Kafka

I also spent most of the last year blogging about these subjects.

I was told that MVPs these days are contributing about 120 blobs per year in the UK so that is what one is up against.

I have 2 kids so that’s not going to happen for me, like ever, anyone with 2 or more kids will know what I mean here

Like I say I am not surprised, it has been great to be recognized as an MVP, even though I never did go to a single MVP summit (should have doh)

And I would like to thank Microsoft for the opportunity to preach the good word.

I would also like to personally thank Chris Maunder of codeproject.com who was the chap that originally nominated me, thanks so much Chris. Chris you rock.

As for what’s next for me, I am going to continue to blog, and I hope to get back into writing articles for codeproject.com again after a break from that.

You can expect me to keep on writing about stuff that I enjoy.

So for now over and out, thanks for all the support over the years guys/girls.

RX Over the wire

 

Introduction

Now you know I’m a RX fan boy, I think it’s the bees knees in fact. Of late I have also gotten into Akka, and Akka Streams, which is one implementation of the Reactive Streams API.

I also have a colleague who recently attend the ReactConf and came back raving about all the good work that NetFlix were doing using this uber duper specific socket that they have developed to allow RX type operators over the wire using a socket.

NetFlix call their implementation ReactiveSocket : http://reactivesocket.io/ 

Which offers these things

  • request/response (stream of 1)
  • request/stream (finite stream of many)
  • fire-and-forget (no response)
  • event subscription (infinite stream of many)
  • channel (bi-directional streams)

Mmmm, sounds pretty cool. Thing is I was sure I had seen this before, and a long time ago too (well a long time in software years).

IQbservable – The Dual of IQueryable

Back in 2010 Bart De Smet of the RX team posted a couple of intriguing resources around this VERY badly named interface.

Most informative one being this video:

https://channel9.msdn.com/shows/Going+Deep/Bart-De-Smet-Observations-on-IQbservable-The-Dual-of-IQueryable/

For those that can not be bothered to watch the video here are some of the highlights

IQbservable allows the following

  • Combines LINQ Queryable and RX Observable functionality
  • Queryable – allows you to create a query client side using LINQ, and pass that to a datasource (server, database, web service etc)
  • Observable – instead of blocking until the data comes back, will just notify you know when it gets the data

So those are the key take away points. But how about a nice diagram or 2 to really set the scene

I think these are the best 2 diagrams (at least for my money)

image

image

So that is what IQbservable is all about. So what is the rest of this post about then?

Well it just so happens that Dave Sexton one of the Reactive Extension Extensions guys (meant to be slightly tongue in cheek) has written an extremely useful and fairly lightweight library that does much of what is described above.

Dave calls it QActive. He has done a great job of it, and has written serialized expression trees and parsers, which allow us to create client client queries in LINQ which are sent across the wire.

In the rest of this post I will be showcasing a very simple demo based on QActive, and I’ll point out some more links that Dave Sexton provides, which are invaluable reading

 Dave has 2 of his own posts covering more than I do here, which you can go to here:

 

A Small Demo

So it may not come as a surprise to know that we need a server side and a client side. We will look at both of these for a simple example, and in the download at my github there is also an forms based server that allows you to push items to the client on demand

What Is Not Inlcuded

There is no form of fault tolerance, if you want that you could do worse than to read my SignalR + RX code, which shows you how to make a resilient connection using RX

https://www.codeproject.com/Articles/851437/SignalR-plus-RX-Streaming-Data-Demo-App-of:

We will now proceed to look at a simple server/client. This is the most simplistic of examples that has a server that runs on a timer, and the client provides a LINQ where (filter) to this that will be applied to the server side stream ON THE SERVER.

Simple Server

This is all we need for a simple server

using System;
using System.Net;
using System.Reactive.Linq;
using Qactive;

namespace Server
{
    class Program
    {
        static void Main(string[] args)
        {
            IObservable<long> source = Observable.Interval(TimeSpan.FromSeconds(1));
            var service = source.ServeQbservableTcp(
                new IPEndPoint(IPAddress.Loopback, 3205),
                new QbservableServiceOptions()
                {
                    SendServerErrorsToClients = true,
                    EnableDuplex = true,
                    AllowExpressionsUnrestricted = true
                }
            );
            using (service.Subscribe(
              client => Console.WriteLine(
                  "Client shutdown."),
              ex => Console.WriteLine(
                  "Fatal error: {0}", ex.Message),
              () => Console.WriteLine(
                  "This will never be printed because a service host never completes.")))
            {
                Console.ReadKey();
            }
        }
    }
}

Take away points here are:

  • We use a TCIP IPEndpoint to bind the server too
  • We can subscribe to the IObservable to see what is happening with the connected client

On Demand Server Notification

If you want to see a server that allows you to send notifications to the clients on demand have a look at the FormsServer in my GitHub repo.

 

Simple Client

This is all we need for a simple client

using System;
using System.Net;
using System.Reactive;
using System.Reactive.Linq;
using Qactive;

namespace Client
{
    class Program
    {
        static void Main(string[] args)
        {
            var client = new TcpQbservableClient<long>(new IPEndPoint(IPAddress.Loopback, 3205));

            //thie expression tree filtering will happen server side
            //THAT IS AWESOME
            IQbservable<string> query =
              from value in client.Query()
              where value <= 5 || value >= 8
              select string.Format("The incoming value has been doubled to {0}", value * 2);

            using (query.Subscribe(
              value => Console.WriteLine("Client observed: " + value),
              ex => Console.WriteLine("Error: {0}", ex.Message),
              () => Console.WriteLine("Completed")))
            {
                Console.ReadKey();
            }
        }
    }
}

 

Take away points here are:

  • The client is also using a TCIP IPEndpoint
  • The client IS ABLE to use LINQ expressions which WILL be serialized and sent to the server where they will be applied

Here is the output when the simple server is run, and 2 clients are started one after another.

image

 

Why Is This Cool?

This is very cool (uber cool in fact), we have just created a push based notification system that supports server side filtering (thanks to LINQ) in about 20 lines of code.

If you can’t see what is cool about that, well I can’t help you.

There may be some amongst you that go well any messaging framework that has a server and a client would/could do that in the same amount of code, what and the server side push down delegates (thanks to serializable expression trees)….mmmmm Don’t think so.

Only thing I can think of that even comes close is OData, but that requires a fair bit of infrastructure/baggage to make it work.

 

 

Where Can I Get The Code?

As usual I have posted the code  to my GitHub account :

https://github.com/sachabarber/RxOverTheWire

AKKA STREAMS

Last time we looked at Akka Http, this time we will look at Akka Streams.

Akka Streams is a vast topic, and you will definitely need to supplement this  post with the official documentation.

Akka Streams is one of the founding members of Reactive Streams, and Akka streams is one implementation (there are many) of the Reactive Streams APIs.

Reactive Streams  is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure. This encompasses efforts aimed at runtime environments (JVM and JavaScript) as well as network protocols.

Introduction

There may be some readers who have come from .NET such as myself who have used RX.

You may even have heard of Reactive Streams before. So what exactly makes reactive streams different from Rx?

The central thing that is the big win with reactive streams over Rx is the idea of back pressure. Here is what the Akka docs say about back pressure

The back pressure protocol is defined in terms of the number of elements a downstream Subscriber is able to receive and buffer, referred to as demand. The source of data, referred to as Publisher in Reactive Streams terminology and implemented as Source in Akka Streams, guarantees that it will never emit more elements than the received total demand for any given Subscriber.

http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-flows-and-basics.html#back-pressure-explained

Luckily this is all inbuilt to Akka streams, you do not have to worry about this too much as a user of Akka streams.

You can pretty much decide how you want the built in streams pipelines (which we will be diving into in more details below) in terms of backpressure using the OverflowStrategy enum value. Here is a very simple example

Source(1 to 10).buffer(10, OverflowStrategy.backpressure)

Where the following are the available OverflowStrategy values

object OverflowStrategy {
  /**
   * If the buffer is full when a new element arrives, drops the oldest element from the buffer to make space for
   * the new element.
   */
  def dropHead: OverflowStrategy = DropHead

  /**
   * If the buffer is full when a new element arrives, drops the youngest element from the buffer to make space for
   * the new element.
   */
  def dropTail: OverflowStrategy = DropTail

  /**
   * If the buffer is full when a new element arrives, drops all the buffered elements to make space for the new element.
   */
  def dropBuffer: OverflowStrategy = DropBuffer

  /**
   * If the buffer is full when a new element arrives, drops the new element.
   */
  def dropNew: OverflowStrategy = DropNew

  /**
   * If the buffer is full when a new element is available this strategy backpressures the upstream publisher until
   * space becomes available in the buffer.
   */
  def backpressure: OverflowStrategy = Backpressure

  /**
   * If the buffer is full when a new element is available this strategy completes the stream with failure.
   */
  def fail: OverflowStrategy = Fail
}

So that is the basic idea, Akka streams does provide a lot of stuff, such as

  • Built in stages/shapes
  • A graph API
  • Ability to create your own stages/shapes

For the rest of this post we will be looking at some examples of these 3 points.

Working With The Akka Streams APIs

As stated at the beginning of this post the Akka Streams implementation is vast. There is a lot of ground to cover, far more than I can reasonably cover in a small blog post. The official docs are still the place to go, but if you have not heard of Akka Streams this post may be enough to get you into it.

The official docs (at time of writing) are here:

http://doc.akka.io/docs/akka/2.4.2/scala/stream/index.html

 

Working With Built In Stages/Shapes

Akka comes with loads of prebuilt stages which we can make use of. However before I mention those lets try and just spend a bit of time taking a bit about how you use the Akka Streams APIs in their most basic form.

The idea is that we have 4 different parts that make up a useable pipeline.

Source
A processing stage with exactly one output, emitting data elements whenever downstream processing stages are ready to receive them.

Sink
A processing stage with exactly one input, requesting and accepting data elements possibly slowing down the upstream producer of elements

Flow
A processing stage which has exactly one input and output, which connects its up- and downstreams by transforming the data elements flowing through it.

RunnableGraph
A Flow that has both ends “attached” to a Source and Sink respectively, and is ready to be run().

As I say Akka comes with loads of inbuilt stages to make our lives easier here. For example these are the available stages at time of writing

Source Stages

  • fromIterator
  • apply
  • single
  • repeat
  • tick
  • fromFuture
  • fromCompletionStage
  • unfold
  • unfoldAsync
  • empty
  • maybe
  • failed
  • actorPublisher
  • actorRef
  • combine
  • queue
  • asSubscriber
  • fromPublisher
  • fromFile

Sink Stages

  • head
  • headOption
  • last
  • lastOption
  • ignore
  • cancelled
  • seq
  • foreach
  • foreachParallel
  • onComplete
  • fold
  • reduce
  • combine
  • actorRef
  • actorRefWithAck
  • actorSubscriber
    asPublisher
  • fromSubscriber
  • toFile

We will now look at some example of using some of these

def simpleFlow() : Unit = {
  val source = Source(1 to 10)
  val sink = Sink.fold[Int, Int](0)(_ + _)
  // connect the Source to the Sink, obtaining a RunnableGraph
  val runnable: RunnableGraph[Future[Int]] = source.toMat(sink)(Keep.right)
  // materialize the flow and get the value of the FoldSink
  implicit val timeout = Timeout(5 seconds)
  val sumFuture: Future[Int] = runnable.run()
  val sum = Await.result(sumFuture, timeout.duration)
  println(s"source.toMat(sink)(Keep.right) Sum = $sum")

  // Use the shorthand source.runWith(sink)
  val sumFuture2: Future[Int] = source.runWith(sink)
  val sum2 = Await.result(sumFuture2, timeout.duration)
  println(s"source.runWith(sink) Sum = $sum")
}

In this simple example we have s Source(1 to 10) which we then wire up to a Sink which adds the numbers coming in.

This block demonstrates various different Source(s) and Sink(s)

def differentSourcesAndSinks() : Unit = {
  //various sources
  Source(List(1, 2, 3)).runWith(Sink.foreach(println))
  Source.single("only one element").runWith(Sink.foreach(println))
  //actor sink
  val helloActor = system.actorOf(Props[HelloActor], name = "helloactor")
  Source(List("hello", "hello"))
    .runWith(Sink.actorRef(helloActor,DoneMessage))
  //future source
  val futureString = Source.fromFuture(Future.successful("Hello Streams!"))
    .toMat(Sink.head)(Keep.right).run()
  implicit val timeout = Timeout(5 seconds)
  val theString = Await.result(futureString, timeout.duration)
  println(s"theString = $theString")
}

And this block demos using a simple Map on a Source

def mapFlow() : Unit = {
  val source = Source(11 to 16)
  val doublerSource = source.map(x => x * 2)
  val sink = Sink.foreach(println)
  implicit val timeout = Timeout(5 seconds)

  // Use the shorthand source.runWith(sink)
  val printSinkFuture: Future[Done] = doublerSource.runWith(sink)
  Await.result(printSinkFuture, timeout.duration)
}

Working With The Graph API

Akka streams also comes with a pretty funky graph building DSL. You would use this when you want to create quite elaborate flows.

The other very interesting thing about the graph builder DSL is that you can use custom shapes inside it, and you can also leave it partially connected. Such that you could potentially use it as a Source/Sink.

Lets say you had an output from the graph you built using the graph DSL, you could then use that partially constructed graph as a Source in its own right.

The same goes if you had an unconnected input in the graph you created you could use that as a Sink.

You can read more about this here :

http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-graphs.html#constructing-sources-sinks-and-flows-from-partial-graphs

I urge you all to have a read of that as its quite cool what can be done with the graph DSL

Ok so time for an example, this example comes directly from the TypeSafe activator code

http://www.lightbend.com/activator/template/akka-stream-scala

package com.sas.graphs

import java.io.File

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.ClosedShape
import akka.stream.scaladsl._
import akka.util.ByteString

import scala.concurrent.forkjoin.ThreadLocalRandom
import scala.util.{ Failure, Success }

class WritePrimesDemo {

  def run(): Unit = {
    implicit val system = ActorSystem("Sys")
    import system.dispatcher
    implicit val materializer = ActorMaterializer()

    // generate random numbers
    val maxRandomNumberSize = 1000000
    val primeSource: Source[Int, NotUsed] =
      Source.fromIterator(() => Iterator.continually(ThreadLocalRandom.current().nextInt(maxRandomNumberSize))).
        // filter prime numbers
        filter(rnd => isPrime(rnd)).
        // and neighbor +2 is also prime
        filter(prime => isPrime(prime + 2))

    // write to file sink
    val fileSink = FileIO.toPath(new File("target/primes.txt").toPath)
    val slowSink = Flow[Int]
      // act as if processing is really slow
      .map(i => { Thread.sleep(1000); ByteString(i.toString) })
      .toMat(fileSink)((_, bytesWritten) => bytesWritten)

    // console output sink
    val consoleSink = Sink.foreach[Int](println)

    // send primes to both slow file sink and console sink using graph API
    val graph = GraphDSL.create(slowSink, consoleSink)((slow, _) => slow) { implicit builder =>
      (slow, console) =>
        import GraphDSL.Implicits._
        val broadcast = builder.add(Broadcast[Int](2)) // the splitter - like a Unix tee
        primeSource ~> broadcast ~> slow // connect primes to splitter, and one side to file
        broadcast ~> console // connect other side of splitter to console
        ClosedShape
    }
    val materialized = RunnableGraph.fromGraph(graph).run()

    // ensure the output file is closed and the system shutdown upon completion
    materialized.onComplete {
      case Success(_) =>
        system.terminate()
      case Failure(e) =>
        println(s"Failure: ${e.getMessage}")
        system.terminate()
    }

  }

  def isPrime(n: Int): Boolean = {
    if (n <= 1) false
    else if (n == 2) true
    else !(2 to (n - 1)).exists(x => n % x == 0)
  }
}

The most important part of this code is this part

// send primes to both slow file sink and console sink using graph API
val graph = GraphDSL.create(slowSink, consoleSink)((slow, _) => slow) { implicit builder =>
  (slow, console) =>
    import GraphDSL.Implicits._
    val broadcast = builder.add(Broadcast[Int](2)) // the splitter - like a Unix tee
    primeSource ~> broadcast ~> slow // connect primes to splitter, and one side to file
    broadcast ~> console // connect other side of splitter to console
    ClosedShape
}
val materialized = RunnableGraph.fromGraph(graph).run()

There is 2 sinks defined before we use the Graph

  • A file Sink
  • A console Sink

There is also a Source that generates random primes

So the Graph DSL allows you to um well create graphs. It allows you to take in inputs and create other shapes using the implicit builder that is provided.

The DSL then allows you to connect inputs/other builder creates stages/shapes to the inputs and even expose the connected stages to an output.

This is done using the ~> syntax than simply means connect

As previously stated you can create partially connected graphs, but if you have all inputs and outputs connected it is considered a ClosedShape, that can be used as an isolated component

Here is an example of the output of running this graph example

image

Create Custom Shapes/Stages

It doesn’t stop there, we can also create out own shapes that can be used in flows. This is a pretty complex subject and you will definitely benefit from reading this page

http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-customize.html

There is no way this little post will cover enough, but here are some highlights of the official documentation

This is the basic pattern you would use to create a custom stage

import akka.stream.SourceShape
import akka.stream.stage.GraphStage
 
class NumbersSource extends GraphStage[SourceShape[Int]] {
  // Define the (sole) output port of this stage
  val out: Outlet[Int] = Outlet("NumbersSource")
  // Define the shape of this stage, which is SourceShape with the port we defined above
  override val shape: SourceShape[Int] = SourceShape(out)
 
  // This is where the actual (possibly stateful) logic will live
  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = ???
}

Most of the actual logic will be inside the createLogic method. But in order to do anything useful in there you will need to use handlers. Handlers are what you use to handle input/output. There are InHandler and OutHandler.

Each of which has its own state machine flow. For example this is the state machine for an OutHandler

image

Whilst this is the one for InHandler

image

This is the best page to read to learn more about these handlers

http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-customize.html#Port_states__InHandler_and_OutHandler

The one and ONLY place that state should be maintained is within the createLogic method.

Lets consider a small example. Lets say we have some objects like this

case class Element(id: Int, value: Int)

And we want to build a custom stage that will allow us to select a value from this type, and should only emit an output value for unique values as provided by the property selector.

We could call this DistinctUntilChanged. Lets see what an example for this could look like

package com.sas.customshapes

import akka.stream.stage.{GraphStageLogic, InHandler, OutHandler, GraphStage}
import akka.stream.{Outlet, Attributes, Inlet, FlowShape}

import scala.collection.immutable

final class DistinctUntilChanged[E, P](propertyExtractor: E => P)
  extends GraphStage[FlowShape[E, E]] {

  val in = Inlet[E]("DistinctUntilChanged.in")
  val out = Outlet[E]("DistinctUntilChanged.out")

  override def shape = FlowShape.of(in, out)

  override def createLogic(attributes: Attributes) = new GraphStageLogic(shape) {

    private var savedState : Option[E] = None

    setHandlers(in, out, new InHandler with OutHandler {

      override def onPush(): Unit = {
        val nextElement = grab(in)
        val nextState = propertyExtractor(nextElement)

        if (savedState.isEmpty  || propertyExtractor(savedState.get) != nextState) {
          savedState = Some(nextElement)
          push(out, savedState.get)
        }
        else {
          pull(in)
        }
        savedState = Some(nextElement)
      }

      override def onPull(): Unit = {
        pull(in)
      }

      override def onUpstreamFinish(): Unit = {
        completeStage()
      }
    })

    override def postStop(): Unit = {
      savedState = None
    }
  }
}



The highlights of this are

  • We have a single Inlet
  • We have a single Outlet
  • We expose a FlowShape (in/out only) there are many shapes but FlowShape is what we want for one in/out out
  • We use createLogic to do the work
  • We use an InHandler to handle input
  • We use an OutHandler to handle output

One other important thing (at least for this single in/out example) is that we DO NOT call pull/push more than once in the createLogic

Lets assume we have these elements

package com.sas.customshapes

import scala.collection.immutable

object SampleElements {

  val E11 = Element(1, 1)
  val E21 = Element(2, 1)
  val E31 = Element(3, 1)
  val E42 = Element(4, 2)
  val E52 = Element(5, 2)
  val E63 = Element(6, 3)

  val Ones = immutable.Seq(E11, E21, E31)
  val Twos = immutable.Seq(E42, E52)
  val Threes = immutable.Seq(E63)

  val All = Ones ++ Twos ++ Threes
}

And this demo code

def runDistinctUntilChanged() : Unit = {
  Source(SampleElements.All)
    .via(new DistinctUntilChanged(_.value))
    .runWith(Sink.foreach(println))
}

We would get this output to the Sink

image

This example does owe a lot to a nice blog post I found here :

https://www.softwaremill.com/implementing-a-custom-akka-streams-graph-stage/
 

That’s It

Anyway that is the end of the series I hope you have enjoyed it, and have learnt you some Akka along the way

I am going to have a small break now and then start looking into some Azure/Web stuff I think

 

Where Can I Find The Code Examples?

I will be augmenting this GitHub repo with the example projects as I move through this series

https://github.com/sachabarber/SachaBarber.AkkaExamples

Akka http

Last time we talked about routing within Akka. This time we will be looking at Akka’s support for http.

But just before that, a bit of history. Before Akka.Http there was already a fairly successful Akk based http option available to you as a Scala developer, called Spray. There is a lot of Spray documentation available here http://spray.io/

This framework was extremely well thought of, so much so that the good people at Akka have taken on much of the good work done by this team, and it now forms much of the codebase for Akka Http.

In fact if you are familiar with Spray, you will certainly notice quite a lot of similarities in the way routes and JSON are handled in Akka.Http, as it is pretty much the Spray code.

 

Introduction

Akka.Http comes with server side and client side libraries. It also comes with good support for standard serialization such as JSON/XML and the ability to roll your own serialization should you want to.

It also comes with a fairly nifty routing DSL which is very much inspired by the work done in Spray.

This post will concentrate on the common use cases that you may come across when working with HTTP.

 

SBT Dependencies

As usual we need to make sure we have the correct JARs referenced. So here is the SBT file that I am using for both the server side/client side and common messages that pass between them

import sbt._
import sbt.Keys._


lazy val allResolvers = Seq(
  "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/",
  "Akka Snapshot Repository" at "http://repo.akka.io/snapshots/"
)


lazy val AllLibraryDependencies =
  Seq(
    "com.typesafe.akka" % "akka-actor_2.11" % "2.4.12",
    "com.typesafe.akka" % "akka-http_2.11" % "3.0.0-RC1",
    "com.typesafe.akka" % "akka-http-core_2.11" % "3.0.0-RC1",
    "com.typesafe.akka" % "akka-http-spray-json_2.11" % "3.0.0-RC1"
  )


lazy val commonSettings = Seq(
  version := "1.0",
  scalaVersion := "2.11.8",
  resolvers := allResolvers,
  libraryDependencies := AllLibraryDependencies
)


lazy val serverside =(project in file("serverside")).
  settings(commonSettings: _*).
  settings(
    name := "serverside"
  )
  .aggregate(common, clientside)
  .dependsOn(common, clientside)

lazy val common = (project in file("common")).
  settings(commonSettings: _*).
  settings(
    name := "common"
  )

lazy val clientside = (project in file("clientside")).
  settings(commonSettings: _*).
  settings(
    name := "clientside"
  )
  .aggregate(common)
  .dependsOn(common)

It can be seen that the JSON dependency is contained in this JAR

akka-http-spray-json_2.11

Told you is was inspired by Spray a fair bit

 

Server Side

This section will talk about the server side element of Akka.Http

 

Hosting The Service

To have  a correctly formed/hostable server side we need a couple of things in place, namely the following

  • An actor system
  • A materializer (Akka http uses flows which is the subject of the next and final post)
  • An execution context
  • Routing

Once we have these things it is really just a question of binding the route to a host name and port.

Shown below is a barebones skeleton of what this may look like

import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{ContentTypes, HttpEntity}
import akka.http.scaladsl.server.Directives
import akka.stream.scaladsl.Flow
import common.{Item, JsonSupport}
import scala.io.StdIn
import scala.concurrent.Future
import akka.http.scaladsl.model.ws.{Message, TextMessage}
import akka.stream._
import akka.stream.scaladsl._


object Demo extends App with Directives with JsonSupport {

  implicit val system = ActorSystem("my-system")
  implicit val materializer = ActorMaterializer()


  val route = .....

  val (host, port) = ("localhost", 8080)
  val bindingFuture = Http().bindAndHandle(route, host, port)

  bindingFuture.onFailure {
    case ex: Exception =>
      println(s"$ex Failed to bind to $host:$port!")
  }

  println(s"Server online at http://localhost:8080/\nPress RETURN to stop...")
  StdIn.readLine() // let it run until user presses return
  bindingFuture
    .flatMap(_.unbind()) // trigger unbinding from the port
    .onComplete(_ => system.terminate()) // and shutdown when done
}

We will be looking at the routing DSL separately

 

Routing DSL

As stated, Akka.Http owes much to Spray, and the routing DSL in particular is practically unchanged from Spray, so it is well worth reading the Spray routing documentation which is available here : http://spray.io/documentation/1.2.4/spray-routing/ and for completeness here is the Akka.Http docs link too : http://doc.akka.io/docs/akka/2.4.7/scala/http/introduction.html#routing-dsl-for-http-servers

There is way too many possible routes to go into for a single post. Lets consider a few basic examples and deconstruct them

Some of these examples do rely on JSON which is the next topic, so for now just understand that there is a way to accept/return JSON.

Lets consider the following use cases

  • GET that returns a simple string
  • GET that returns a JSON representation of an Item
  • POST that accept a new Item

In all these cases this is what an Item looks like

package common

final case class Item(name: String, id: Long)

So lets see the routing DSL that makes the above examples work

val route =
  path("hello") {
    get {
      complete(HttpEntity(
	ContentTypes.`text/html(UTF-8)`, 
	"<h1>Say hello to akka-http</h1>"))
    }
  } ~
  path("randomitem") {
    get {
      // will marshal Item to JSON
      complete(Item("thing", 42))
    }
  } ~
  path("saveitem") {
    post {
      // will unmarshal JSON to Item
      entity(as[Item]) { item =>
        println(s"Server saw Item : $item")
        complete(item)
      }
    }
  }

It can be seen that there are some common routing DSL bits and bobs in there, such as:

  • path : which satisfies the route name part of the route
  • get : which tells us that we should go further into the route matching if it’s a GET http request and it matched the path route DSL part
  • post: which tells us that we should go further into the route matching if it’s a POST http request and it matched the path route DSL part
  • complete : This is the final result from the route

These parts of the DSL are known as directives. The general anatomy of a directive is as follows:

name(arguments) { extractions =>
  ... // inner route
}

It has a name, zero or more arguments and optionally an inner route (The RouteDirectives are special in that they are always used at the leaf-level and as such cannot have inner routes). Additionally directives can “extract” a number of values and make them available to their inner routes as function arguments. When seen “from the outside” a directive with its inner route form an expression of type Route.

Taken from http://doc.akka.io/docs/akka/2.4.7/scala/http/routing-dsl/directives/index.html#directives up on date 15/11/16

What Directives Do?

A directive can do one or more of the following:

  • Transform the incoming RequestContext before passing it on to its inner route (i.e. modify the request)
  • Filter the RequestContext according to some logic, i.e. only pass on certain requests and reject others
  • Extract values from the RequestContext and make them available to its inner route as “extractions”
  • Chain some logic into the RouteResult future transformation chain (i.e. modify the response or rejection)
  • Complete the request

 

This means a Directive completely wraps the functionality of its inner route and can apply arbitrarily complex transformations, both (or either) on the request and on the response side.

Ok so now that we have taken a whistle stop tour of the routing DSL and directives, lets have a look at the few we discussed above

 

For this work I would strongly recommend the use of the “Postman” google app, which you can grab from here

https://chrome.google.com/webstore/detail/postman/fhbjgbiflinjbdggehcddcbncdddomop?hl=en

GET

We can see this route looks like this

path("hello") {
  get {
    complete(HttpEntity(ContentTypes.`text/html(UTF-8)`, "<h1>Say hello to akka-http</h1>"))
  }
}

So we use the path, and also the get directives to establish a get route. We then use complete to complete the route with some static string representing the html we would like to return

So let’s see this one in postman

image

 

GET Item (as JSON)

We can see this route looks like this

path("randomitem") {
  get {
    // will marshal Item to JSON
    complete(Item("thing", 42))
  }
} 

So again we use the path/get directives, but this time we complete with an Item. This is done due to the JSON support that is able to create the right serialization data for us. We will look at this in the next section

So let’s see this one in postman

image

POST Item

We can see this route looks like this

path("saveitem") {
  post {
    // will unmarshal JSON to Item
    entity(as[Item]) { item =>
      println(s"Server saw Item : $item")
      complete(item)
    }
  }
} 

So again we use the path directive, but this time we use a post, where the post expects an item as JSON to be provided. The converting from the incoming JSON string to an Item is done using an Unmarshaller, we will look at this in the next section

So let’s see this one in postman

image

 

JSON Support

Akka.http provides JSON support using this library akka-http-spray-json-experimental which you can grab from Maven Central Repo.

JsonProtocol

When using spray we may use the SprayJsonProtocol and DefaultJsonProtocol to create the JSON protcol for your custom objects

Lets consider the Item class we have seen in the demos so far

package common

final case class Item(name: String, id: Long)

This is how we might write the JSON protocol code for this simple class

package common

import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import spray.json.DefaultJsonProtocol

trait JsonSupport extends SprayJsonSupport with DefaultJsonProtocol {
  implicit val itemFormat = jsonFormat2(Item)
}

It can be seen that there are jsonFormatXX helpers that can be used for very simple cases. In this case jsonFormat2 is used as our item class had 2 parameters

Most of the time this inbuilt helpers are all we need. If however you want something more elaborate you are free to create your own jsonFormat read / write methods

 

Marshalling

Marshalling is sprays process of taking objects and create a JSON string representation of them to send across the wire.

The Akka Spray JAR comes with a bunch of default marshallers that allow us to take custom classes and turn them into JSON

These are the most common default marshallers that you will most likely use

type ToEntityMarshaller[T] = Marshaller[T, MessageEntity]
type ToHeadersAndEntityMarshaller[T] = Marshaller[T, (immutable.Seq[HttpHeader], MessageEntity)]
type ToResponseMarshaller[T] = Marshaller[T, HttpResponse]
type ToRequestMarshaller[T] = Marshaller[T, HttpRequest]

You can read more about this here : http://doc.akka.io/docs/akka/2.4.7/scala/http/common/marshalling.html

Luckily you don’t really have to get that involved with these that often as the routing DSL does most of the heavy lifting for you when you do the complete this is taken care of for you providing there is a marshaller that can be found implicitly

Unmarshalling

Unmarshalling is the process of taking the on the wire format (JSON string in these examples) back into a scala class (Item class in this case)

You can read more about this at the official Akka docs page : http://doc.akka.io/docs/akka/2.4.7/scala/http/common/unmarshalling.html

Luckily you don’t really have to get that involved with these that often as the routing DSL does most of the heavy lifting for you, which is what we use this part of the routing DSL, where this will use an unmarshaller to create the Item from the JSON string on the wire

entity(as[Item]) { item =>

WebSockets

Akka Http also supports web sockets too. Lets start this investigation with looking at what is required from the routing DSL perspective, which starts like this

path("websocket") {
  get {
    handleWebSocketMessages(websocketFlow)
  }
} ~

If we look at this special directive a bit more, what exactly does the handleWebSocketMessages directive look like

Well it looks like this:

def handleWebSocketMessages(handler: Flow[Message, Message, Any]): Route

So we need to supply a flow. A Flow is part of akka reactive streams which will look at in the next part. But for now just be aware that you can create a Flow from a Sink/Source and Materializer to materialize the flow.

For this websocket example here is what the Flow looks like

val (websocketSink, websocketSource) =
  MergeHub.source[String].toMat(BroadcastHub.sink[String])(Keep.both).run()

val websocketFlow: Flow[Message, Message, NotUsed] =
  Flow[Message].mapAsync(1) {
    // transform websocket message to domain message (string)
    case TextMessage.Strict(text) =>       Future.successful(text)
    case streamed: TextMessage.Streamed => streamed.textStream.runFold("")(_ ++ _)
  }.via(Flow.fromSinkAndSource(websocketSink, websocketSource))
    .map[Message](string => TextMessage(string))

The idea is that when a websocket client connects and sends an initial message they will get a reply TextMessage sent over the websocket to them

This uses some pretty new akka stream stages namely

  • MergeHub : Creates a Source that emits elements merged from a dynamic set of producers.
  • Broadcast : Emit each incoming element each of n outputs

 

Lets start by running the server, and then opening the “WebSocketTestClient.html” page which should look like this

image

image

Once the page is open, type something in the textbox and hit the “Send” button, you should see this

image

All fairly normal socket type stuff so far, we send a message from the web page client side to the server and the server responds with the text we sent.

But what about if we wanted to send message to the client on demand, say from another route which could be a command to do some work, which notifies the clients of the websocket?

With this Flow in place, we are also able to push back messages to the client end of the websocket.

Lets see another route which will simulate some work, which results in messages being sent down the websocket back to the client (if its still connected)

Here is the route

path("sendmessagetowebsocket" / IntNumber) { msgCount =>
  post {
    for(i <- 0 until msgCount)
    {
      Source.single(s"sendmessagetowebsocket $i").runWith(websocketSink)
    }
    complete("done")
  }
}

It can be seen that we simply create a new source which is run with the existing Sink that was part of the Flow used by the websocket

Here is what this would look like in postman

image

And here is what the web page client side websocket example looks like after this route has been called as above

image

 

 

Client Side

Akka http support comes with 3 types of client API that one can use

In this article I will only be using the last of these APIs, as in my opinion it is the most sensible client side choice.

So what does the request level client API look like.

GET

If we consider that we want to conduct this request

http://localhost:8080/randomitem

which when run via postman gives the following JSON response

image

So lets see what the code looks like to do this using the request level client API

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.marshalling.Marshal
import akka.http.scaladsl.model._
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import scala.concurrent.{Await, Future}
import concurrent.ExecutionContext.Implicits.global
import common.{Item, JsonSupport}
import concurrent.duration._
import scala.io.StdIn

class RegularRoutesDemo extends JsonSupport {

  def Run() : Unit = {
    implicit val system = ActorSystem()
    implicit val materializer = ActorMaterializer()

    val httpClient = Http().outgoingConnection(host = "localhost", port = 8080)

    //+++++++++++++++++++++++++++++++++++++++++++++++
    // GET http://localhost:8080/randomitem
    //+++++++++++++++++++++++++++++++++++++++++++++++
    val randomItemUrl = s"""/randomitem"""
    val flowGet : Future[Item] =
      Source.single(
        HttpRequest(
          method = HttpMethods.GET,
          uri = Uri(randomItemUrl))
        )
        .via(httpClient)
        .mapAsync(1)(response => Unmarshal(response.entity).to[Item])
        .runWith(Sink.head)
    val start = System.currentTimeMillis()
    val result = Await.result(flowGet, 5 seconds)
    val end = System.currentTimeMillis()
    println(s"Result in ${end-start} millis: $result")

  }
}

There are a couple of take away points in the code above

  • We use a Source which is a HttpRequest, where we can specify the HTTP verb and other request type things
  • We use Unmarshal to convert the incoming JSON string to an Item. We discussed Marshalling/Unmarshalling above.
  • This obviously relies on the Spray JSON support that we discussed above

 

POST

If we consider that we want to conduct this request

http://localhost:8080/saveitem

which when run via postman gives the following JSON response

image

So lets see what the code looks like to do this using the request level client API

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.marshalling.Marshal
import akka.http.scaladsl.model._
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import scala.concurrent.{Await, Future}
import concurrent.ExecutionContext.Implicits.global
import common.{Item, JsonSupport}
import concurrent.duration._
import scala.io.StdIn

class RegularRoutesDemo extends JsonSupport {

  def Run() : Unit = {
    implicit val system = ActorSystem()
    implicit val materializer = ActorMaterializer()

    val httpClient = Http().outgoingConnection(host = "localhost", port = 8080)

    //+++++++++++++++++++++++++++++++++++++++++++++++
    // POST http://localhost:8080/saveitem
    //+++++++++++++++++++++++++++++++++++++++++++++++
    val saveItemUrl = s"""/saveitem"""
    val itemToSave = Item("newItemHere",12)
    val flowPost = for {
      requestEntity <- Marshal(itemToSave).to[RequestEntity]
      response <-
      Source.single(
        HttpRequest(
          method = HttpMethods.POST,
          uri = Uri(saveItemUrl),
          entity = requestEntity)
        )
        .via(httpClient)
        .mapAsync(1)(response => Unmarshal(response.entity).to[Item])
        .runWith(Sink.head)
    } yield response
    val startPost = System.currentTimeMillis()
    val resultPost = Await.result(flowPost, 5 seconds)
    val endPost = System.currentTimeMillis()
    println(s"Result in ${endPost-startPost} millis: $resultPost")
  }
}

The only thing that is different this time, is that we need to pass a JSON string representation of an Item which we pass to the HttpRequest.

This is done use a JSON marshaller which must be in scope implicitly.

 

Where Can I Find The Code Examples?

I will be augmenting this GitHub repo with the example projects as I move through this series

https://github.com/sachabarber/SachaBarber.AkkaExamples

AKKA routing

 

Last time we looked at Akka Clustering, this time we will look at routing.

Routing allows messages to be routed to one or more actors known as routees, by sending the messages to a router that will know how to route the messages to the routees.

Akka comes with quite a few inbuilt routing strategies that we can make use of. We will look at these next.

Types Of Routing Strategy

Akka comes with a whole bunch of inbuilt routing strategies such as :

RoundRobin : Routes in a round-robin fashion to its routees.

Random : This router type selects one of its routees randomly for each message.

SmallestMailBox : A Router that tries to send to the non-suspended child routee with fewest messages in mailbox. The selection is done in this order: pick any idle routee (not processing message) with empty mailbox pick any routee with empty mailbox pick routee with fewest pending messages in mailbox pick any remote routee, remote actors are consider lowest priority, since their mailbox size is unknown

Broadcast : A broadcast router forwards the message it receives to all its routees.

ScatterGatherFirstCompleted : The ScatterGatherFirstCompletedRouter will send the message on to all its routees. It then waits for first reply it gets back. This result will be sent back to original sender. Other replies are discarded.

TailChopping : The TailChoppingRouter will first send the message to one, randomly picked, routee and then after a small delay to a second routee (picked randomly from the remaining routees) and so on. It waits for first reply it gets back and forwards it back to original sender. Other replies are discarded.

The goal of this router is to decrease latency by performing redundant queries to multiple routees, assuming that one of the other actors may still be faster to respond than the initial one.

Regular Actor As A Router

Akka allows you to create routers in 2 ways, the first way is to use RoutingLogic to setup your router.

Therere are quite a few specializations of the RoutingLogic, such as

  • RoundRobinRoutingLogic
  • RandomRoutingLogic
  • SmallestMailboxRoutingLogic
  • BroadcastRoutingLogic

You would typically use this in a regular actor. The actor in which you use the RoutingLogic would be the router. If you go down this path you would be responsible for managing the routers children, ie the routees. That means you would be responsible for managing ALL aspects of the routees, including adding them to a list of available routees, watching them for Termination to remove them from the list of available routees (which sounds a lot like supervision doesn’t it).

Here is what a skeleton for an actor that is setup manually as a router may look like

import java.util.concurrent.atomic.AtomicInteger

import akka.actor.{Actor, Props, Terminated}
import akka.routing.{RoutingLogic, ActorRefRoutee, RoundRobinRoutingLogic, Router}


class RouterActor(val routingLogic : RoutingLogic)  extends Actor  {

  val counter : AtomicInteger = new AtomicInteger()

  val routees = Vector.fill(5) {
    val workerCount = counter.getAndIncrement()
    val r = context.actorOf(Props(
      new WorkerActor(workerCount)), name = s"workerActor-$workerCount")
    context watch r
    ActorRefRoutee(r)
  }

  //create a Router based on the incoming class field
  //RoutingLogic which will really determine what type of router
  //we end up with
  var router = Router(routingLogic, routees)

  def receive = {
    case WorkMessage =>
      router.route(WorkMessage, sender())
    case Report => routees.foreach(ref => ref.send(Report, sender()))
    case Terminated(a) =>
      router = router.removeRoutee(a)
      val workerCount = counter.getAndIncrement()
      val r = context.actorOf(Props(
        new WorkerActor(workerCount)), name = s"workerActor-$workerCount")
      context watch r
      router = router.addRoutee(r)
  }
}

It can be seen that I pass in the RoutingLogic, which would be one of the available RoutingLogic strategies that akka comes with.

The other thing to note is that as we stated earlier we need to FULLY manage the collection of routee actors ourselves, including watching them for Termination.

Sure there is a better way?

Well yes thankfully there is, Akka also provides a Pool for this job. We will look at that next.

Pool

Akka comes with the ability to create a router using a pool where we tell it what actors we want to use as the routees, how many routees we want, and how the supervision should be handled.

Here is some code from by demo code that uses 2 utility methods to create a pool created router that will use a simple FibboniciActor which is sent messages via an actor that is created using the pool router value

def RunTailChoppingPoolDemo() : Unit = {

  val supervisionStrategy = OneForOneStrategy() {
    case e => SupervisorStrategy.restart
  }

  val props = TailChoppingPool(5, within = 10.seconds,
    supervisorStrategy = supervisionStrategy,interval = 20.millis).
    props(Props[FibonacciActor])

  RunPoolDemo(props)
}

def RunPoolDemo(props : Props) : Unit = {
  val system = ActorSystem("RoutingSystem")
  val actorRef = system.actorOf(Props(
    new PoolRouterContainerActor(props,"theRouter")), name = "thePoolContainer")
  actorRef ! WorkMessage
  StdIn.readLine()
  system.terminate()
}



import akka.actor._
import akka.util.Timeout
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.pattern.ask

class PoolRouterContainerActor(val props: Props, val name :String)  extends Actor  {

  val router: ActorRef = context.actorOf(props, name)

  def receive = {
    case WorkMessage =>
      implicit val timeout = Timeout(5 seconds)
      val futureResult = router ? FibonacciNumber(10)
      val (actName,result) = Await.result(futureResult, timeout.duration)

      println(s"FibonacciActor : ($actName) came back with result -> $result")
  }
}



import akka.actor.Actor
import scala.annotation.tailrec

class FibonacciActor extends Actor {

  val actName = self.path.name

  def receive = {
    case FibonacciNumber(nbr) => {
      println(s"FibonacciActor : ($actName) ->  " +
        s"has been asked to calculate FibonacciNumber")
      val result = fibonacci(nbr)
      sender ! (actName,result)
    }
  }

  private def fibonacci(n: Int): Int = {
    @tailrec
    def fib(n: Int, b: Int, a: Int): Int = n match {
      case 0 => a
      case _ => fib(n - 1, a + b, b)
    }

    fib(n, 1, 0)
  }
}

Supervision Using Pool

Routees that are created by a pool router will be created as the router’s children. The router is therefore also the children’s supervisor.

The supervision strategy of the router actor can be configured with the supervisorStrategy property of the Pool. If no configuration is provided, routers default to a strategy of “always escalate”. This means that errors are passed up to the router’s supervisor for handling. The router’s supervisor will decide what to do about any errors.

Note the router’s supervisor will treat the error as an error with the router itself. Therefore a directive to stop or restart will cause the router itself to stop or restart. The router, in turn, will cause its children to stop and restart.

It should be mentioned that the router’s restart behavior has been overridden so that a restart, while still re-creating the children, will still preserve the same number of actors in the pool.

This means that if you have not specified supervisorStrategy of the router or its parent a failure in a routee will escalate to the parent of the router, which will by default restart the router, which will restart all routees (it uses Escalate and does not stop routees during restart). The reason is to make the default behave such that adding withRouter to a child’s definition does not change the supervision strategy applied to the child. This might be an inefficiency that you can avoid by specifying the strategy when defining the router.

http://doc.akka.io/docs/akka/snapshot/scala/routing.html#Supervision up on 01/11/16

Group

You may also wish to create your routees separately and let the router know about them. This is achievable using Groups. This is not something I decided to cover in this post, but if this sounds of interest to you, you can read more about it at the official documentation here:

http://doc.akka.io/docs/akka/snapshot/scala/routing.html#Group

Routing Strategy Demos

For the demos I am using a mixture of RoutingLogic hosted in my own actor, and also Pool based routers.

Here is the basic setup for a RoutingLogic based actor of my own, where I have to manage all supervision concerns manually.

There are ALWAYS 5 routees involved with this demo.

import java.util.concurrent.TimeUnit

import akka.actor._
import akka.routing._
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.io.StdIn

object Demo extends App {

  //==============================================================
  //Standard Actor that does routing using Router class
  //where we apply relevant RoutingLogic
  //Supervision is done manually within the Actor that hosts
  //the Router, where we monitor the routees and remove /recreate
  //them on 'Terminated'
  //==============================================================
  RunRoutingDemo(RoundRobinRoutingLogic())



  def RunRoutingDemo(routingLogic : RoutingLogic) : Unit = {
    val system = ActorSystem("RoutingSystem")
    val actorRef = system.actorOf(Props(
      new RouterActor(routingLogic)), name = "theRouter")

    for (i <- 0 until 10) {
      actorRef ! WorkMessage
      Thread.sleep(1000)
    }
    actorRef ! Report

    StdIn.readLine()
    system.terminate()
  }
}

Where we make use of the following generic actor code that uses the specific RoutingLogic that is passed in.

import java.util.concurrent.atomic.AtomicInteger

import akka.actor.{Actor, Props, Terminated}
import akka.routing.{RoutingLogic, ActorRefRoutee, RoundRobinRoutingLogic, Router}


class RouterActor(val routingLogic : RoutingLogic)  extends Actor  {

  val counter : AtomicInteger = new AtomicInteger()

  val routees = Vector.fill(5) {
    val workerCount = counter.getAndIncrement()
    val r = context.actorOf(Props(
      new WorkerActor(workerCount)), name = s"workerActor-$workerCount")
    context watch r
    ActorRefRoutee(r)
  }

  //create a Router based on the incoming class field
  //RoutingLogic which will really determine what type of router
  //we end up with
  var router = Router(routingLogic, routees)

  def receive = {
    case WorkMessage =>
      router.route(WorkMessage, sender())
    case Report => routees.foreach(ref => ref.send(Report, sender()))
    case Terminated(a) =>
      router = router.removeRoutee(a)
      val workerCount = counter.getAndIncrement()
      val r = context.actorOf(Props(
        new WorkerActor(workerCount)), name = s"workerActor-$workerCount")
      context watch r
      router = router.addRoutee(r)
  }
}

This is what the routees look like for this set of demos

import akka.actor.Actor

class WorkerActor(val id : Int) extends Actor {

  var msgCount = 0
  val actName = self.path.name

  def receive = {
    case WorkMessage => {
      msgCount += 1
      println(s"worker : {$id}, name : ($actName) ->  ($msgCount)")
    }
    case Report => {
      println(s"worker : {$id}, name : ($actName) ->  saw total messages : ($msgCount)")
    }
    case _       => println("unknown message")
  }
}

Ok so lets have a look at some examples of using this code shall we:

RoundRobin

We get this output, where each routee gets the message round robin strategy applied

worker : {0}, name : (workerActor-0) ->  (1)
worker : {1}, name : (workerActor-1) ->  (1)
worker : {2}, name : (workerActor-2) ->  (1)
worker : {3}, name : (workerActor-3) ->  (1)
worker : {4}, name : (workerActor-4) ->  (1)
worker : {0}, name : (workerActor-0) ->  (2)
worker : {1}, name : (workerActor-1) ->  (2)
worker : {2}, name : (workerActor-2) ->  (2)
worker : {3}, name : (workerActor-3) ->  (2)
worker : {4}, name : (workerActor-4) ->  (2)
worker : {0}, name : (workerActor-0) ->  saw total messages : (2)
worker : {1}, name : (workerActor-1) ->  saw total messages : (2)
worker : {2}, name : (workerActor-2) ->  saw total messages : (2)
worker : {4}, name : (workerActor-4) ->  saw total messages : (2)
worker : {3}, name : (workerActor-3) ->  saw total messages : (2)

Random

We get this output, where the messages are sent to routees randomly

worker : {1}, name : (workerActor-1) ->  (1)
worker : {1}, name : (workerActor-1) ->  (2)
worker : {4}, name : (workerActor-4) ->  (1)
worker : {0}, name : (workerActor-0) ->  (1)
worker : {0}, name : (workerActor-0) ->  (2)
worker : {2}, name : (workerActor-2) ->  (1)
worker : {3}, name : (workerActor-3) ->  (1)
worker : {4}, name : (workerActor-4) ->  (2)
worker : {0}, name : (workerActor-0) ->  (3)
worker : {0}, name : (workerActor-0) ->  (4)
worker : {1}, name : (workerActor-1) ->  saw total messages : (2)
worker : {0}, name : (workerActor-0) ->  saw total messages : (4)
worker : {2}, name : (workerActor-2) ->  saw total messages : (1)
worker : {4}, name : (workerActor-4) ->  saw total messages : (2)
worker : {3}, name : (workerActor-3) ->  saw total messages : (1)

SmallestMailBox

We get this output, where the routee with the smallest mailbox will get the message sent to it. This example may look a bit weird, but if you think about it, by the time the new message is sent the 1st routee (workerActor0) will have dealt with the 1st message, and it ready to receive a new one, and since it’s the 1st routee in the list it is still considered the one with the smallest mailbox. If you introduced an artificial delay in the actor dealing with the message it may show different more interesting results.

worker : {0}, name : (workerActor-0) ->  (1)
worker : {0}, name : (workerActor-0) ->  (2)
worker : {0}, name : (workerActor-0) ->  (3)
worker : {0}, name : (workerActor-0) ->  (4)
worker : {0}, name : (workerActor-0) ->  (5)
worker : {0}, name : (workerActor-0) ->  (6)
worker : {0}, name : (workerActor-0) ->  (7)
worker : {0}, name : (workerActor-0) ->  (8)
worker : {0}, name : (workerActor-0) ->  (9)
worker : {0}, name : (workerActor-0) ->  (10)
worker : {2}, name : (workerActor-2) ->  saw total messages : (0)
worker : {4}, name : (workerActor-4) ->  saw total messages : (0)
worker : {1}, name : (workerActor-1) ->  saw total messages : (0)
worker : {0}, name : (workerActor-0) ->  saw total messages : (10)
worker : {3}, name : (workerActor-3) ->  saw total messages : (0)

Broadcast

We get this output, where each routee should see ALL messages

worker : {0}, name : (workerActor-0) ->  (1)
worker : {2}, name : (workerActor-2) ->  (1)
worker : {4}, name : (workerActor-4) ->  (1)
worker : {3}, name : (workerActor-3) ->  (1)
worker : {1}, name : (workerActor-1) ->  (1)
worker : {0}, name : (workerActor-0) ->  (2)
worker : {1}, name : (workerActor-1) ->  (2)
worker : {4}, name : (workerActor-4) ->  (2)
worker : {2}, name : (workerActor-2) ->  (2)
worker : {3}, name : (workerActor-3) ->  (2)
worker : {0}, name : (workerActor-0) ->  (3)
worker : {2}, name : (workerActor-2) ->  (3)
worker : {3}, name : (workerActor-3) ->  (3)
worker : {4}, name : (workerActor-4) ->  (3)
worker : {1}, name : (workerActor-1) ->  (3)
worker : {1}, name : (workerActor-1) ->  (4)
worker : {4}, name : (workerActor-4) ->  (4)
worker : {3}, name : (workerActor-3) ->  (4)
worker : {0}, name : (workerActor-0) ->  (4)
worker : {2}, name : (workerActor-2) ->  (4)
worker : {0}, name : (workerActor-0) ->  (5)
worker : {1}, name : (workerActor-1) ->  (5)
worker : {4}, name : (workerActor-4) ->  (5)
worker : {2}, name : (workerActor-2) ->  (5)
worker : {3}, name : (workerActor-3) ->  (5)
worker : {3}, name : (workerActor-3) ->  (6)
worker : {2}, name : (workerActor-2) ->  (6)
worker : {1}, name : (workerActor-1) ->  (6)
worker : {4}, name : (workerActor-4) ->  (6)
worker : {0}, name : (workerActor-0) ->  (6)
worker : {1}, name : (workerActor-1) ->  (7)
worker : {0}, name : (workerActor-0) ->  (7)
worker : {4}, name : (workerActor-4) ->  (7)
worker : {2}, name : (workerActor-2) ->  (7)
worker : {3}, name : (workerActor-3) ->  (7)
worker : {0}, name : (workerActor-0) ->  (8)
worker : {3}, name : (workerActor-3) ->  (8)
worker : {1}, name : (workerActor-1) ->  (8)
worker : {2}, name : (workerActor-2) ->  (8)
worker : {4}, name : (workerActor-4) ->  (8)
worker : {2}, name : (workerActor-2) ->  (9)
worker : {3}, name : (workerActor-3) ->  (9)
worker : {4}, name : (workerActor-4) ->  (9)
worker : {1}, name : (workerActor-1) ->  (9)
worker : {0}, name : (workerActor-0) ->  (9)
worker : {0}, name : (workerActor-0) ->  (10)
worker : {2}, name : (workerActor-2) ->  (10)
worker : {1}, name : (workerActor-1) ->  (10)
worker : {4}, name : (workerActor-4) ->  (10)
worker : {3}, name : (workerActor-3) ->  (10)
worker : {1}, name : (workerActor-1) ->  saw total messages : (10)
worker : {2}, name : (workerActor-2) ->  saw total messages : (10)
worker : {0}, name : (workerActor-0) ->  saw total messages : (10)
worker : {3}, name : (workerActor-3) ->  saw total messages : (10)
worker : {4}, name : (workerActor-4) ->  saw total messages : (10)

So that about covers the demos I have created for using your own actor and using the RoutingLogic. Lets now look at using pools, as I have stated already pools take care of supervision for us, so we don’t have to manually take care of that any more.

As before I have a helper actor to work with the pool, that accepts the router, where the router will receive the messages to send to its routees.

Here is the demo code

import java.util.concurrent.TimeUnit

import akka.actor._
import akka.routing._
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.io.StdIn

object Demo extends App {

  //==============================================================
  // Use built Pool router(s) which will do the supervision for us
  //
  //
  //    Comment/Uncomment to try the different router logic
  //
  //==============================================================
  RunScatterGatherFirstCompletedPoolDemo()
  //RunTailChoppingPoolDemo()



  def RunScatterGatherFirstCompletedPoolDemo() : Unit = {

    val supervisionStrategy = OneForOneStrategy() {
      case e => SupervisorStrategy.restart
    }

    val props = ScatterGatherFirstCompletedPool(
      5, supervisorStrategy = supervisionStrategy,within = 10.seconds).
      props(Props[FibonacciActor])

    RunPoolDemo(props)
  }

  def RunTailChoppingPoolDemo() : Unit = {

    val supervisionStrategy = OneForOneStrategy() {
      case e => SupervisorStrategy.restart
    }

    val props = TailChoppingPool(5, within = 10.seconds,
      supervisorStrategy = supervisionStrategy,interval = 20.millis).
      props(Props[FibonacciActor])

    RunPoolDemo(props)
  }

  def RunPoolDemo(props : Props) : Unit = {
    val system = ActorSystem("RoutingSystem")
    val actorRef = system.actorOf(Props(
      new PoolRouterContainerActor(props,"theRouter")), name = "thePoolContainer")
    actorRef ! WorkMessage
    StdIn.readLine()
    system.terminate()
  }
}

And here is the help actor

import akka.actor._
import akka.util.Timeout
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.pattern.ask

class PoolRouterContainerActor(val props: Props, val name :String)  extends Actor  {

  val router: ActorRef = context.actorOf(props, name)

  def receive = {
    case WorkMessage =>
      implicit val timeout = Timeout(5 seconds)
      val futureResult = router ? FibonacciNumber(10)
      val (actName,result) = Await.result(futureResult, timeout.duration)

      println(s"FibonacciActor : ($actName) came back with result -> $result")
  }
}

As before we will use 5 routees.

This is what the routees look like for the pool demo

import akka.actor.Actor
import scala.annotation.tailrec

class FibonacciActor extends Actor {

  val actName = self.path.name

  def receive = {
    case FibonacciNumber(nbr) => {
      println(s"FibonacciActor : ($actName) ->  " +
        s"has been asked to calculate FibonacciNumber")
      val result = fibonacci(nbr)
      sender ! (actName,result)
    }
  }

  private def fibonacci(n: Int): Int = {
    @tailrec
    def fib(n: Int, b: Int, a: Int): Int = n match {
      case 0 => a
      case _ => fib(n - 1, a + b, b)
    }

    fib(n, 1, 0)
  }
}

ScatterGatherFirstCompletedPool

Here is the output when we run this. It can be seen that we simply get the results from the routee that completed first

FibonacciActor : ($d) ->  has been asked to calculate FibonacciNumber
FibonacciActor : ($e) ->  has been asked to calculate FibonacciNumber
FibonacciActor : ($a) ->  has been asked to calculate FibonacciNumber
FibonacciActor : ($c) ->  has been asked to calculate FibonacciNumber
FibonacciActor : ($b) ->  has been asked to calculate FibonacciNumber
FibonacciActor : ($d) came back with result -> 55

TailChoppingPool

Here is the output when we run this. It can be seen that we simply get the results from the routee that completed first, out of the few routees that the message was sent to

FibonacciActor : ($b) ->  has been asked to calculate FibonacciNumber
FibonacciActor : ($b) came back with result -> 55

 

What About Custom Routing Strategy

Akka allows you to create your own routing strategy where you would create a class that extends the inbuilt Akka RoutingLogic. You can read more about this in the official Akka documentation:

http://doc.akka.io/docs/akka/snapshot/scala/routing.html#Custom_Router

 

Where Can I Find The Code Examples?

I will be augmenting this GitHub repo with the example projects as I move through this series

https://github.com/sachabarber/SachaBarber.AkkaExamples

AKKA : clustering

Last time we look at remoting. You can kind of think of clustering as an extension to remoting, as some of the same underlying parts are used.  But as we will see clustering is way more powerful (and more fault tolerant too).

My hope is by the end of this post that you will know enough about Akka clustering that you would be able to create your own clustered Akka apps.

A Note About All The Demos In This Topic

I wanted the demos in this section to be as close to real life as possible. The official akka examples tend to have a single process. Which I personally think is quite confusing when you are trying to deal with quite hard concepts. As such I decided to go with multi process projects to demonstrate things. I do however only have 1 laptop, so they are hosted on the same node, but they are separate processes/JVMs.

I am hoping by doing this it will make the learning process easier, as it is closer to what you would do in real life rather than have 1 main method that spawns an entire cluster. You just would not have that in real life.

 

What Is Akka Clustering?

Unlike remoting which is peer to peer, a cluster may constitute many members, which can grow and contract depending on demand/failure. There is also the concept of roles for actors with a cluster, which this post will talk about.

You can see how this could be very useful, in fact you could see how this may be used to create a general purpose grid calculation engine such as Apache Spark.

 

Seed Nodes

Akka has the concept of some initial contact points within the cluster to allow the cluster to bootstrap itself as it were.

Here is what the official Akka docs say on this:

You may decide if joining to the cluster should be done manually or automatically to configured initial contact points, so-called seed nodes. When a new node is started it sends a message to all seed nodes and then sends join command to the one that answers first. If no one of the seed nodes replied (might not be started yet) it retries this procedure until successful or shutdown.

You may choose to configure these “seed nodes” in code, but the easiest way is via configuration. The relevant part of the demo apps configuration is here

akka {
  .....
  .....
  cluster {
    seed-nodes = [
      "akka.tcp://ClusterSystem@127.0.0.1:2551",
      "akka.tcp://ClusterSystem@127.0.0.1:2552"]
  }
  .....
  .....
]




The seed nodes can be started in any order and it is not necessary to have all seed nodes running, but the node configured as the first element in the seed-nodes configuration list must be started when initially starting a cluster, otherwise the other seed-nodes will not become initialized and no other node can join the cluster. The reason for the special first seed node is to avoid forming separated islands when starting from an empty cluster. It is quickest to start all configured seed nodes at the same time (order doesn’t matter), otherwise it can take up to the configured seed-node-timeout until the nodes can join.

Once more than two seed nodes have been started it is no problem to shut down the first seed node. If the first seed node is restarted, it will first try to join the other seed nodes in the existing cluster.

We will see the entire configuration for the demo app later on this post. For now just be aware that there is a concept of seed nodes and the best way to configure those for the cluster is via configuration.

Saying that there may be some amongst you that would prefer to use the JVM property system which you may do as follows:

-Dakka.cluster.seed-nodes.0=akka.tcp://ClusterSystem@127.0.0.1:2551
-Dakka.cluster.seed-nodes.1=akka.tcp://ClusterSystem@127.0.0.1:2552

Roles

Akka clustering comes with the concept of roles.You may be asking why would we need that?

Well its quite simple really, say we have a higher than normal volume of data coming through you akka cluster system, you may want to increase the total processing power of the cluster to deal with this. How do we do that, we spin up more actors within a particular role. The role here may be “backend” that do work designated to them by some other actor say “frontend” role.

By using roles we can manage which bits of the cluster get dynamically allocated more/less actors.

You can configure the minimum number of role actor in configuration, which you can read more about here:

http://doc.akka.io/docs/akka/snapshot/java/cluster-usage.html#How_To_Startup_when_Cluster_Size_Reached

Member Events

Akka provides the ability to listen to member events. There are a number of reasons this could be useful, for example

  • Determining if a member has left the cluster
  • If a new member has joined the cluster

Here is a full list of the me Cluster events that you may choose to listen to

The events to track the life-cycle of members are:

  • ClusterEvent.MemberJoined – A new member has joined the cluster and its status has been changed to Joining.
  • ClusterEvent.MemberUp – A new member has joined the cluster and its status has been changed to Up.
  • ClusterEvent.MemberExited – A member is leaving the cluster and its status has been changed to Exiting Note that the node might already have been shutdown when this event is published on another node.
  • ClusterEvent.MemberRemoved – Member completely removed from the cluster.
  • ClusterEvent.UnreachableMember – A member is considered as unreachable, detected by the failure detector of at least one other node.
  • ClusterEvent.ReachableMember – A member is considered as reachable again, after having been unreachable. All nodes that previously detected it as unreachable has detected it as reachable again.

And this is how you might subscribe to these events

cluster.subscribe(self, classOf[MemberUp])

Which you may use in an actor like this:

class SomeActor extends Actor {

  val cluster = Cluster(context.system)

  // subscribe to cluster changes, MemberUp
  override def preStart(): Unit = cluster.subscribe(self, classOf[MemberUp])
  
  def receive = {
    case MemberUp(m) => register(m)
  }

  def register(member: Member): Unit =
    if (member.hasRole("frontend"))
     ...
}

We will see more on this within the demo code which we will walk through later

ClusterClient

What use is a cluster which cant receive commands from the outside world?

Well luckily we don’t have to care about that as Akka comes with 2 things that make this otherwise glib situation ok.

Akka comes with a ClusterClient which allows actors which are not part of the cluster to talk to the cluster. Here is what the offical Akka docs have to say about this

An actor system that is not part of the cluster can communicate with actors somewhere in the cluster via this ClusterClient. The client can of course be part of another cluster. It only needs to know the location of one (or more) nodes to use as initial contact points. It will establish a connection to a ClusterReceptionist somewhere in the cluster. It will monitor the connection to the receptionist and establish a new connection if the link goes down. When looking for a new receptionist it uses fresh contact points retrieved from previous establishment, or periodically refreshed contacts, i.e. not necessarily the initial contact points.

 

Receptionist

As mentioned above the ClusterClient makes use of a ClusterReceptionist, but what is that, and how do we make a cluster actor available to the client using that?

The ClusterReceptionist is an Akka contrib extension, and must be configured on ALL the nodes that the ClusterClient will need to talk to.

There are 2 parts this, firstly we must ensure that the ClusterReceptionist is started on the nodes that ClusterClient will need to communicate with. This is easily done using the following config:

akka {
  ....
  ....
  ....
  # enable receptionist at start
  extensions = ["akka.cluster.client.ClusterClientReceptionist"]

}

The other thing that needs doing, is that any actor within the cluster that you want to be able to talk to using the  ClusterClient will need to register itself as a service with the ClusterClientReceptionist. Here is an example of how to do that

val system = ActorSystem("ClusterSystem", config)
val frontend = system.actorOf(Props[TransformationFrontend], name = "frontend")
ClusterClientReceptionist(system).registerService(frontend)

Now that you have done that you should be able to communicate with this actor within the cluster using the ClusterClient

 

The Demo Dissection

I have based the demo for this post largely against the “Transformation” demo that LightBend provide, which you can grab from here :

http://www.lightbend.com/activator/template/akka-sample-cluster-scala

The “Official” example as it is, provides a cluster which contains “frontend” and “backend” roles. The “frontend” actors will take a text message and pass it to the register workers (“Backend”s) who will UPPERCASE the message and return to the “frontend”.

I have taken this sample and added the ability to use the ClusterClient with it, which works using Future[T] and the ask pattern, such that the ClusterClient  will get a response from the cluster request.

We will dive into all of this in just a moment

For the demo this is what we are trying to build

image

SBT / Dependencies

Before we dive into the demo code (which as I say is based largely on the official lightbend clustering example anyway) I would just like to dive into the SBT file that drives the demo projects

This is the complete SBT file for the entire demo

import sbt._
import sbt.Keys._


lazy val allResolvers = Seq(
  "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/",
  "Akka Snapshot Repository" at "http://repo.akka.io/snapshots/"
)

lazy val AllLibraryDependencies =
  Seq(
    "com.typesafe.akka" %% "akka-actor"         % "2.4.8",
    "com.typesafe.akka" %% "akka-remote"        % "2.4.8",
    "com.typesafe.akka" %% "akka-cluster"       % "2.4.8",
    "com.typesafe.akka" %% "akka-cluster-tools" % "2.4.8",
    "com.typesafe.akka" %% "akka-contrib"       % "2.4.8"
  )


lazy val commonSettings = Seq(
  version := "1.0",
  scalaVersion := "2.11.8",
  resolvers := allResolvers,
  libraryDependencies := AllLibraryDependencies
)


lazy val root =(project in file(".")).
  settings(commonSettings: _*).
  settings(
    name := "Base"
  )
  .aggregate(common, frontend, backend)
  .dependsOn(common, frontend, backend)

lazy val common = (project in file("common")).
  settings(commonSettings: _*).
  settings(
    name := "common"
  )

lazy val frontend = (project in file("frontend")).
  settings(commonSettings: _*).
  settings(
    name := "frontend"
  )
  .aggregate(common)
  .dependsOn(common)

lazy val backend = (project in file("backend")).
  settings(commonSettings: _*).
  settings(
    name := "backend"
  )
  .aggregate(common)
  .dependsOn(common)

There are a few things to note in this

  • We need a few dependencies to get clustering to work. Namely
    • akka-remote
    • akka-cluster
    • akka-cluster-tools
    • akka-contrib
  • There are a few projects
    • root : The cluster client portion
    • common : common files
    • frontend : frontend cluster based actors (the client will talk to these)
    • backend : backend cluster based actors

 

The Projects

Now that we have seen the projects involved from an SBT point of view, lets continue to look at how the actual projects perform their duties

Remember the workflow we are trying to achieve is something like this

  • We should ensure that a frontend (seed node) is started first
  • We should ensure a backend (seed node) is started. This will have the effect of the backend actor registering itself as a worker with the already running frontend actor
  • At this point we could start more frontend/backend non seed nodes actors, if we chose to
  • We start the client app (root) which will periodically send messages to the frontend actor that is looked up by its known seed node information. We would expect the frontend actor to delegate work of to one of its known backend actors, and then send the response back to the client (ClusterClient) where we can use the response to send to a local actor, or consume the response directly

Common

The common project simply contains the common objects across the other projects. Which for this demo app are just the messages as shown below

package sample.cluster.transformation

final case class TransformationJob(text: String)
final case class TransformationResult(text: String)
final case class JobFailed(reason: String, job: TransformationJob)
case object BackendRegistration

 

Root

This is the client app that will talk to the cluster (in particular the “frontend” seed node which expected to be running on 127.0.0.1:2551.

This client app uses the following configuration file

akka {
  actor {
    provider = "akka.cluster.ClusterActorRefProvider"
  }

  remote {
    transport = "akka.remote.netty.NettyRemoteTransport"
    log-remote-lifecycle-events = off
    netty.tcp {
      hostname = "127.0.0.1"
      port = 5000
    }
  }
}

We then use the following main method to kick of the client app

import java.util.concurrent.atomic.AtomicInteger

import akka.actor.{Props, ActorSystem}
import akka.util.Timeout
import scala.io.StdIn
import scala.concurrent.duration._


object DemoClient {
  def main(args : Array[String]) {

    val system = ActorSystem("OTHERSYSTEM")
    val clientJobTransformationSendingActor =
      system.actorOf(Props[ClientJobTransformationSendingActor],
        name = "clientJobTransformationSendingActor")

    val counter = new AtomicInteger
    import system.dispatcher
    system.scheduler.schedule(2.seconds, 2.seconds) {
      clientJobTransformationSendingActor ! Send(counter.incrementAndGet())
      Thread.sleep(1000)
    }

    StdIn.readLine()
    system.terminate()
  }
}




There is not too much to talk about here, we simply create a standard actor, and send it messages on a recurring schedule.

The message looks like this

case class Send(count:Int)

The real work of talking to the cluster is inside the ClientJobTransformationSendingActor which we will look at now

import akka.actor.Actor
import akka.actor.ActorPath
import akka.cluster.client.{ClusterClientSettings, ClusterClient}
import akka.pattern.Patterns
import sample.cluster.transformation.{TransformationResult, TransformationJob}
import akka.util.Timeout
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}


class ClientJobTransformationSendingActor extends Actor {

  val initialContacts = Set(
    ActorPath.fromString("akka.tcp://ClusterSystem@127.0.0.1:2551/system/receptionist"))
  val settings = ClusterClientSettings(context.system)
    .withInitialContacts(initialContacts)

  val c = context.system.actorOf(ClusterClient.props(settings), "demo-client")


  def receive = {
    case TransformationResult(result) => {
      println("Client response")
      println(result)
    }
    case Send(counter) => {
        val job = TransformationJob("hello-" + counter)
        implicit val timeout = Timeout(5 seconds)
        val result = Patterns.ask(c,ClusterClient.Send("/user/frontend", job, localAffinity = true), timeout)

        result.onComplete {
          case Success(transformationResult) => {
            println(s"Client saw result: $transformationResult")
            self ! transformationResult
          }
          case Failure(t) => println("An error has occured: " + t.getMessage)
        }
      }
  }
}

As you can see this is a regular actor, but there are several important things to note here:

  • We setup the ClusterClient with a known set of seed nodes that we can expect to be able to contact within the cluster (remember these nodes MUST have registered themselves as available services with the ClusterClientReceptionist
  • That we use a new type of actor a ClusterClient
  • That we use the ClusterClient to send a message to a seed node within the cluster (frontend) in our case. We use the ask pattern which will give use a Future[T] which represents the response.
  • We use the response to send a local message to ourself

 

FrontEnd

As previously stated the “frontend” role actors serve as the seed nodes for the ClusterClient. There is only one seed node for the frontend which we just saw the client app uses via the ClusterClient.

So what happens when the client app uses the frontend actors via the ClusterClient, well its quite simple the client app (once a connection is made to the frontend seed node) send a simple TransformationJob which is a simple message that contains a bit of text that the frontend actor will pass on to one of its registered backend workers for processing.

The backend actor (also in the cluster) will simply convert the TransformationJob contained text to  UPPERCASE and return it to the frontend actor. The frontend actor will then send this TransformationResult back to the sender which happens to be the ClusterClient. The client app will listen to this (which was done using the ask pattern) and will hook up a callback for the Future[T] and will the send the TransformationResult to the clients own actor.

Happy days.

So that is what we are trying to achieve, lets see what bits and bobs we need for the frontend side of things

Here is the configuration the frontend needs

#//#snippet
akka {
  actor {
    provider = "akka.cluster.ClusterActorRefProvider"
  }
  remote {
    log-remote-lifecycle-events = off
    netty.tcp {
      hostname = "127.0.0.1"
      port = 0
    }
  }

  cluster {
    seed-nodes = [
      "akka.tcp://ClusterSystem@127.0.0.1:2551",
      "akka.tcp://ClusterSystem@127.0.0.1:2552"]

    #//#snippet
    # excluded from snippet
    auto-down-unreachable-after = 10s
    #//#snippet
    # auto downing is NOT safe for production deployments.
    # you may want to use it during development, read more about it in the docs.
    #
    # auto-down-unreachable-after = 10s
  }

  # enable receptionist at start
  extensions = ["akka.cluster.client.ClusterClientReceptionist"]

}

There are a couple of important things to note in this, namely:

  • That we configure the seed nodes
  • That we also use add the ClusterClientReceptionist
  • That we use the ClusterActorRefProvider

And here is the frontend application

package sample.cluster.transformation.frontend

import language.postfixOps
import akka.actor.ActorSystem
import akka.actor.Props
import com.typesafe.config.ConfigFactory
import akka.cluster.client.ClusterClientReceptionist



object TransformationFrontendApp {

  def main(args: Array[String]): Unit = {

    // Override the configuration of the port when specified as program argument
    val port = if (args.isEmpty) "0" else args(0)
    val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port").
      withFallback(ConfigFactory.parseString("akka.cluster.roles = [frontend]")).
      withFallback(ConfigFactory.load())

    val system = ActorSystem("ClusterSystem", config)
    val frontend = system.actorOf(Props[TransformationFrontend], name = "frontend")
    ClusterClientReceptionist(system).registerService(frontend)
  }

}

The important parts here are that we embellish the read config with the role of “frontend”, and that we also register the frontend actor with the ClusterClientReceptionist such that the actor is available to communicate with by the ClusterClient

Other than that it is all pretty vanilla akka to be honest

So lets now focus our attention to the actual frontend actor, which is shown below

package sample.cluster.transformation.frontend

import sample.cluster.transformation.{TransformationResult, BackendRegistration, JobFailed, TransformationJob}
import language.postfixOps
import scala.concurrent.Future
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Terminated
import akka.util.Timeout
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import akka.pattern.pipe
import akka.pattern.ask


class TransformationFrontend extends Actor {

  var backends = IndexedSeq.empty[ActorRef]
  var jobCounter = 0

  def receive = {
    case job: TransformationJob if backends.isEmpty =>
      sender() ! JobFailed("Service unavailable, try again later", job)

    case job: TransformationJob =>
      println(s"Frontend saw TransformationJob : '$job'")
      jobCounter += 1
      implicit val timeout = Timeout(5 seconds)
      val result  = (backends(jobCounter % backends.size) ? job)
        .map(x => x.asInstanceOf[TransformationResult])
      result pipeTo sender
      //pipe(result) to sender

    case BackendRegistration if !backends.contains(sender()) =>
      context watch sender()
      backends = backends :+ sender()

    case Terminated(a) =>
      backends = backends.filterNot(_ == a)
  }
}

The crucial parts here are:

  • That when a backend registers it will send a BackendRegistration, which we then watch and monitor, and if that backend terminates it is removed from the list of this frontend actors known backend actors
  • That we palm off the incoming TransformationJob to a random backend, and then use the pipe pattern to pipe the response back to the client

And with that, all that is left to do is examine the backend code, lets looks at that now

 

BackEnd

As always lets start with the configuration, which for the backend is as follows:

#//#snippet
akka {
  actor {
    provider = "akka.cluster.ClusterActorRefProvider"
  }
  remote {
    log-remote-lifecycle-events = off
    netty.tcp {
      hostname = "127.0.0.1"
      port = 0
    }
  }

  cluster {
    seed-nodes = [
      "akka.tcp://ClusterSystem@127.0.0.1:2551",
      "akka.tcp://ClusterSystem@127.0.0.1:2552"]

    #//#snippet
    # excluded from snippet
    auto-down-unreachable-after = 10s
    #//#snippet
    # auto downing is NOT safe for production deployments.
    # you may want to use it during development, read more about it in the docs.
    #
    # auto-down-unreachable-after = 10s
  }

  # enable receptionist at start
  extensions = ["akka.cluster.client.ClusterClientReceptionist"]
}




You can see this is pretty much the same as the frontend, so I won’t speak to this anymore.

Ok so following what we did with the frontend side of things, lets now look at the backend app

package sample.cluster.transformation.backend

import language.postfixOps
import scala.concurrent.duration._
import akka.actor.ActorSystem
import akka.actor.Props
import com.typesafe.config.ConfigFactory

object TransformationBackendApp {
  def main(args: Array[String]): Unit = {
    // Override the configuration of the port when specified as program argument
    val port = if (args.isEmpty) "0" else args(0)
    val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port").
      withFallback(ConfigFactory.parseString("akka.cluster.roles = [backend]")).
      withFallback(ConfigFactory.load())

    val system = ActorSystem("ClusterSystem", config)
    system.actorOf(Props[TransformationBackend], name = "backend")
  }
}

Again this is VERY similar to the front end app, the only notable exception being that we now use a “backend” role instead of a “frontend” one

So now lets look at the backend actor code, which is the final piece of the puzzle

package sample.cluster.transformation.backend

import sample.cluster.transformation.{BackendRegistration, TransformationResult, TransformationJob}
import language.postfixOps
import scala.concurrent.duration._
import akka.actor.Actor
import akka.actor.RootActorPath
import akka.cluster.Cluster
import akka.cluster.ClusterEvent.CurrentClusterState
import akka.cluster.ClusterEvent.MemberUp
import akka.cluster.Member
import akka.cluster.MemberStatus


class TransformationBackend extends Actor {

  val cluster = Cluster(context.system)

  // subscribe to cluster changes, MemberUp
  // re-subscribe when restart
  override def preStart(): Unit = cluster.subscribe(self, classOf[MemberUp])
  override def postStop(): Unit = cluster.unsubscribe(self)

  def receive = {
    case TransformationJob(text) => {
      val result = text.toUpperCase
      println(s"Backend has transformed the incoming job text of '$text' into '$result'")
      sender() ! TransformationResult(text.toUpperCase)
    }
    case state: CurrentClusterState =>
      state.members.filter(_.status == MemberStatus.Up) foreach register
    case MemberUp(m) => register(m)
  }

  def register(member: Member): Unit =
    if (member.hasRole("frontend"))
      context.actorSelection(RootActorPath(member.address) / "user" / "frontend") !
        BackendRegistration
}

The key points here are:

  • That we use the cluster events, to subscribe to MemberUp such that if its “frontend” role actor, we will register this backend with it by sending a BackendRegistration message to it
  • That for any TrasformationJob received (from the frontend which is ultimately for the client app) we do the work, and send a TransformationResult back, which will make its way all the way back to the client

 

And in a nutshell that is how the entire demo hangs together. I hope I have not lost anyone along the way.

Anyway lets now see how we can run the demo

How do I Run The Demo

You will need to ensure that you run the following 3 projects in this order (as a minimum. You can run more NON seed node frontend/backend versions before you start the root (client) if you like)

  • Frontend (seed node) : frontend with command line args : 2551
  • Backend (seed node) : backend with command line args : 2551
  • Optionally run more frontend/backend projects but DON’T supply any command line args. This is how you get them to not be treated as seed nodes
  •  Root : This is the client app

 

Once you run the projects you should see some output like

The “root” (client) project output:

[INFO] [10/05/2016 07:22:02.831] [main] [akka.remote.Remoting] Starting remoting
[INFO] [10/05/2016 07:22:03.302] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://OTHERSYSTEM@127.0.0.1:5000]
[INFO] [10/05/2016 07:22:03.322] [main] [akka.cluster.Cluster(akka://OTHERSYSTEM)] Cluster Node [akka.tcp://OTHERSYSTEM@127.0.0.1:5000] – Starting up…
[INFO] [10/05/2016 07:22:03.450] [main] [akka.cluster.Cluster(akka://OTHERSYSTEM)] Cluster Node [akka.tcp://OTHERSYSTEM@127.0.0.1:5000] – Registered cluster JMX MBean [akka:type=Cluster]
[INFO] [10/05/2016 07:22:03.450] [main] [akka.cluster.Cluster(akka://OTHERSYSTEM)] Cluster Node [akka.tcp://OTHERSYSTEM@127.0.0.1:5000] – Started up successfully
[INFO] [10/05/2016 07:22:03.463] [OTHERSYSTEM-akka.actor.default-dispatcher-2] [akka.cluster.Cluster(akka://OTHERSYSTEM)] Cluster Node [akka.tcp://OTHERSYSTEM@127.0.0.1:5000] – Metrics will be retreived from MBeans, and may be incorrect on some platforms. To increase metric accuracy add the ‘sigar.jar’ to the classpath and the appropriate platform-specific native libary to ‘java.library.path’. Reason: java.lang.ClassNotFoundException: org.hyperic.sigar.Sigar
[INFO] [10/05/2016 07:22:03.493] [OTHERSYSTEM-akka.actor.default-dispatcher-2] [akka.cluster.Cluster(akka://OTHERSYSTEM)] Cluster Node [akka.tcp://OTHERSYSTEM@127.0.0.1:5000] – Metrics collection has started successfully
[WARN] [10/05/2016 07:22:03.772] [OTHERSYSTEM-akka.actor.default-dispatcher-19] [akka.tcp://OTHERSYSTEM@127.0.0.1:5000/system/cluster/core/daemon] Trying to join member with wrong ActorSystem name, but was ignored, expected [OTHERSYSTEM] but was [ClusterSystem]
[INFO] [10/05/2016 07:22:03.811] [OTHERSYSTEM-akka.actor.default-dispatcher-19] [akka.tcp://OTHERSYSTEM@127.0.0.1:5000/user/demo-client] Connected to [akka.tcp://ClusterSystem@127.0.0.1:2552/system/receptionist]
[WARN] [10/05/2016 07:22:05.581] [OTHERSYSTEM-akka.remote.default-remote-dispatcher-14] [akka.serialization.Serialization(akka://OTHERSYSTEM)] Using the default Java serializer for class [sample.cluster.transformation.TransformationJob] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting ‘akka.actor.warn-about-java-serializer-usage’
Client saw result: TransformationResult(HELLO-1)
Client response
HELLO-1
Client saw result: TransformationResult(HELLO-2)
Client response
HELLO-2
Client saw result: TransformationResult(HELLO-3)
Client response
HELLO-3
Client saw result: TransformationResult(HELLO-4)
Client response
HELLO-4

The “frontend” project output:

[INFO] [10/05/2016 07:21:35.592] [main] [akka.remote.Remoting] Starting remoting
[INFO] [10/05/2016 07:21:35.883] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://ClusterSystem@127.0.0.1:2551]
[INFO] [10/05/2016 07:21:35.901] [main] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] – Starting up…
[INFO] [10/05/2016 07:21:36.028] [main] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] – Registered cluster JMX MBean [akka:type=Cluster]
[INFO] [10/05/2016 07:21:36.028] [main] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] – Started up successfully
[INFO] [10/05/2016 07:21:36.037] [ClusterSystem-akka.actor.default-dispatcher-2] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] – Metrics will be retreived from MBeans, and may be incorrect on some platforms. To increase metric accuracy add the ‘sigar.jar’ to the classpath and the appropriate platform-specific native libary to ‘java.library.path’. Reason: java.lang.ClassNotFoundException: org.hyperic.sigar.Sigar
[INFO] [10/05/2016 07:21:36.040] [ClusterSystem-akka.actor.default-dispatcher-2] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] – Metrics collection has started successfully
[WARN] [10/05/2016 07:21:37.202] [ClusterSystem-akka.remote.default-remote-dispatcher-5] [akka.tcp://ClusterSystem@127.0.0.1:2551/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.1%3A2552-0] Association with remote system [akka.tcp://ClusterSystem@127.0.0.1:2552] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://ClusterSystem@127.0.0.1:2552]] Caused by: [Connection refused: no further information: /127.0.0.1:2552]
[INFO] [10/05/2016 07:21:37.229] [ClusterSystem-akka.actor.default-dispatcher-17] [akka://ClusterSystem/deadLetters] Message [akka.cluster.InternalClusterAction$InitJoin$] from Actor[akka://ClusterSystem/system/cluster/core/daemon/firstSeedNodeProcess-1#-2009390233] to Actor[akka://ClusterSystem/deadLetters] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings ‘akka.log-dead-letters’ and ‘akka.log-dead-letters-during-shutdown’.
[INFO] [10/05/2016 07:21:37.229] [ClusterSystem-akka.actor.default-dispatcher-17] [akka://ClusterSystem/deadLetters] Message [akka.cluster.InternalClusterAction$InitJoin$] from Actor[akka://ClusterSystem/system/cluster/core/daemon/firstSeedNodeProcess-1#-2009390233] to Actor[akka://ClusterSystem/deadLetters] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings ‘akka.log-dead-letters’ and ‘akka.log-dead-letters-during-shutdown’.
[INFO] [10/05/2016 07:21:37.232] [ClusterSystem-akka.actor.default-dispatcher-21] [akka://ClusterSystem/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.1%3A2552-0/endpointWriter] Message [akka.remote.EndpointWriter$AckIdleCheckTimer$] from Actor[akka://ClusterSystem/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.1%3A2552-0/endpointWriter#-1346529294] to Actor[akka://ClusterSystem/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.1%3A2552-0/endpointWriter#-1346529294] was not delivered. [3] dead letters encountered. This logging can be turned off or adjusted with configuration settings ‘akka.log-dead-letters’ and ‘akka.log-dead-letters-during-shutdown’.
[INFO] [10/05/2016 07:21:38.085] [ClusterSystem-akka.actor.default-dispatcher-22] [akka://ClusterSystem/deadLetters] Message [akka.cluster.InternalClusterAction$InitJoin$] from Actor[akka://ClusterSystem/system/cluster/core/daemon/firstSeedNodeProcess-1#-2009390233] to Actor[akka://ClusterSystem/deadLetters] was not delivered. [4] dead letters encountered. This logging can be turned off or adjusted with configuration settings ‘akka.log-dead-letters’ and ‘akka.log-dead-letters-during-shutdown’.
[INFO] [10/05/2016 07:21:39.088] [ClusterSystem-akka.actor.default-dispatcher-14] [akka://ClusterSystem/deadLetters] Message [akka.cluster.InternalClusterAction$InitJoin$] from Actor[akka://ClusterSystem/system/cluster/core/daemon/firstSeedNodeProcess-1#-2009390233] to Actor[akka://ClusterSystem/deadLetters] was not delivered. [5] dead letters encountered. This logging can be turned off or adjusted with configuration settings ‘akka.log-dead-letters’ and ‘akka.log-dead-letters-during-shutdown’.
[INFO] [10/05/2016 07:21:40.065] [ClusterSystem-akka.actor.default-dispatcher-20] [akka://ClusterSystem/deadLetters] Message [akka.cluster.InternalClusterAction$InitJoin$] from Actor[akka://ClusterSystem/system/cluster/core/daemon/firstSeedNodeProcess-1#-2009390233] to Actor[akka://ClusterSystem/deadLetters] was not delivered. [6] dead letters encountered. This logging can be turned off or adjusted with configuration settings ‘akka.log-dead-letters’ and ‘akka.log-dead-letters-during-shutdown’.
[INFO] [10/05/2016 07:21:41.095] [ClusterSystem-akka.actor.default-dispatcher-19] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] – Node [akka.tcp://ClusterSystem@127.0.0.1:2551] is JOINING, roles [frontend]
[INFO] [10/05/2016 07:21:41.123] [ClusterSystem-akka.actor.default-dispatcher-19] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] – Leader is moving node [akka.tcp://ClusterSystem@127.0.0.1:2551] to [Up]
[INFO] [10/05/2016 07:21:50.837] [ClusterSystem-akka.actor.default-dispatcher-14] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] – Node [akka.tcp://ClusterSystem@127.0.0.1:2552] is JOINING, roles [backend]
[INFO] [10/05/2016 07:21:51.096] [ClusterSystem-akka.actor.default-dispatcher-16] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] – Leader is moving node [akka.tcp://ClusterSystem@127.0.0.1:2552] to [Up]
Frontend saw TransformationJob : ‘TransformationJob(hello-1)’
[WARN] [10/05/2016 07:22:05.669] [ClusterSystem-akka.remote.default-remote-dispatcher-24] [akka.serialization.Serialization(akka://ClusterSystem)] Using the default Java serializer for class [sample.cluster.transformation.TransformationJob] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting ‘akka.actor.warn-about-java-serializer-usage’
[WARN] [10/05/2016 07:22:05.689] [ClusterSystem-akka.remote.default-remote-dispatcher-23] [akka.serialization.Serialization(akka://ClusterSystem)] Using the default Java serializer for class [sample.cluster.transformation.TransformationResult] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting ‘akka.actor.warn-about-java-serializer-usage’
Frontend saw TransformationJob : ‘TransformationJob(hello-2)’
Frontend saw TransformationJob : ‘TransformationJob(hello-3)’
Frontend saw TransformationJob : ‘TransformationJob(hello-4)’
.

 

The “backend”project output:

[INFO] [10/05/2016 07:21:50.023] [main] [akka.remote.Remoting] Starting remoting
[INFO] [10/05/2016 07:21:50.338] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://ClusterSystem@127.0.0.1:2552]
[INFO] [10/05/2016 07:21:50.353] [main] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] – Starting up…
[INFO] [10/05/2016 07:21:50.430] [main] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] – Registered cluster JMX MBean [akka:type=Cluster]
[INFO] [10/05/2016 07:21:50.430] [main] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] – Started up successfully
[INFO] [10/05/2016 07:21:50.437] [ClusterSystem-akka.actor.default-dispatcher-6] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] – Metrics will be retreived from MBeans, and may be incorrect on some platforms. To increase metric accuracy add the ‘sigar.jar’ to the classpath and the appropriate platform-specific native libary to ‘java.library.path’. Reason: java.lang.ClassNotFoundException: org.hyperic.sigar.Sigar
[INFO] [10/05/2016 07:21:50.441] [ClusterSystem-akka.actor.default-dispatcher-6] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] – Metrics collection has started successfully
[INFO] [10/05/2016 07:21:50.977] [ClusterSystem-akka.actor.default-dispatcher-2] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] – Welcome from [akka.tcp://ClusterSystem@127.0.0.1:2551]
[WARN] [10/05/2016 07:21:51.289] [ClusterSystem-akka.remote.default-remote-dispatcher-15] [akka.serialization.Serialization(akka://ClusterSystem)] Using the default Java serializer for class [sample.cluster.transformation.BackendRegistration$] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting ‘akka.actor.warn-about-java-serializer-usage’
[WARN] [10/05/2016 07:22:05.651] [ClusterSystem-akka.remote.default-remote-dispatcher-7] [akka.serialization.Serialization(akka://ClusterSystem)] Using the default Java serializer for class [sample.cluster.transformation.TransformationJob] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting ‘akka.actor.warn-about-java-serializer-usage’
Backend has transformed the incoming job text of ‘hello-1’ into ‘HELLO-1’
[WARN] [10/05/2016 07:22:05.677] [ClusterSystem-akka.remote.default-remote-dispatcher-15] [akka.serialization.Serialization(akka://ClusterSystem)] Using the default Java serializer for class [sample.cluster.transformation.TransformationResult] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting ‘akka.actor.warn-about-java-serializer-usage’
Backend has transformed the incoming job text of ‘hello-2’ into ‘HELLO-2’
Backend has transformed the incoming job text of ‘hello-3’ into ‘HELLO-3’
Backend has transformed the incoming job text of ‘hello-4’ into ‘HELLO-4’

 

Nat or Docker Considerations

Akka clustering does not work transparently with Network Address Translation, Load Balancers, or in Docker containers. If this is your case you may need to further configure Akka as described here :

http://doc.akka.io/docs/akka/snapshot/scala/remoting.html#remote-configuration-nat

 

 

Where Can I Find The Code Examples?

I will be augmenting this GitHub repo with the example projects as I move through this series

https://github.com/sachabarber/SachaBarber.AkkaExamples

Akka : remoting

It has been a while since I wrote a post, the reason for this is actually this post.

I would consider remoting/clustering to be some of the more advanced stuff you could do with Akka. That said this and the next post will outline all of this good stuff for you, and by the end of this post I would hope to have demonstrated enough for you guys to go off and write Akka remoting/clustered apps.

I have decided to split the remoting and clustering stuff into 2 posts, to make it more focused and digestible. I think this is the right thing to do.

 

A Note About All The Demos In This Topic

I wanted the demos in this section to be as close to real life as possible. The official akka examples tend to have a single process. Which I personally think is quite confusing when you are trying to deal with quite hard concepts. As such I decided to go with multi process projects to demonstrate things. I do however only have 1 laptop, so they are hosted on the same node, but they are separate processes/JVMs.

I am hoping by doing this it will make the learning process easier, as it is closer to what you would do in real life rather than have 1 main method that spawns an entire cluster. You just would not have that in real life.

 

What Is Akka Remoting

If you have ever used RMI in Java or Remoting/WCF in C# you can kind of think of Akka remoting as something similar to that. Where there is the ability to call a remote objects method as is it were local. It is essentially peer-to-peer.

Obviously in Akkas case the remote object is actually an Actor, and you will not actually be calling a method at all,but will instead by treating the remote actor just like any other actor where you simply pass messages to it, and the remote actor will work just like any other actor where it will receive the message and act on it accordingly.

This is actually quite unique actually, I have work with Java Remoting and also C# Remoting, and done a lot with .NET WCF. What all of these had in common was that there was some code voodoo that you had to do, where the difference between working with a local object and working with a remote object required a fair bit of code, be it remoting channels, proxies etc etc

In Akka there is literally no change in coding style to work with remoting, it is completely configuration driven. This is quite nice.

Akkas Remoting Interaction Models

 Akka supports 2 ways of using remoting

  • Lookup : Where we use actorSelection to lookup an already running remote actor
  • Creation : Where an actor will be created on the remote node

We will be looking at both these approaches

Requirements

As I have stated on numerous occasions I have chosen to use SBT. As such this is my SBT file dependencies section for both these Remoting examples.

lazy val AllLibraryDependencies =
  Seq(
    "com.typesafe.akka" %% "akka-actor" % "2.4.8",
    "com.typesafe.akka" %% "akka-remote" % "2.4.8"
  )

It can be seen that the crucial dependency is akka-remote library

 

Remote Selection

As stated above “Remote Selection” will try and use actorSelection to look up a remote actor. The remote actor IS expected to be available and running.

In this example there will be 2 projects configured in the SBT file

  • Remote : The remote actor, that is expected to be running before the local actor tries to communicate with it
  • Local : The local actor that will call the remote

Here is the complete SBT file for this section

import sbt._
import sbt.Keys._


lazy val allResolvers = Seq(
  "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/",
  "Akka Snapshot Repository" at "http://repo.akka.io/snapshots/"
)

lazy val AllLibraryDependencies =
  Seq(
    "com.typesafe.akka" %% "akka-actor" % "2.4.8",
    "com.typesafe.akka" %% "akka-remote" % "2.4.8"
  )


lazy val commonSettings = Seq(
  name := "AkkaRemoting",
  version := "1.0",
  scalaVersion := "2.11.8",
  resolvers := allResolvers,
  libraryDependencies := AllLibraryDependencies
)


lazy val remote = (project in file("remote")).
  settings(commonSettings: _*).
  settings(
    // other settings
  )

lazy val local = (project in file("local")).
  settings(commonSettings: _*).
  settings(
    // other settings
  )



This simply creates 2 projects for us, Remote and Local.

Remote Project

Now that we have the relevant projects in place, lets talk about how we expose a remote actor for selection.

We must ensure that the remote actor is available for selection, which requires the use of an IP address and a port.

Here is how we do this

akka {
  actor {
    provider = "akka.remote.RemoteActorRefProvider"
  }
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "127.0.0.1"
      port = 4444
    }
    log-sent-messages = on
    log-received-messages = on
  }
}

Also note the akka.actor.provider is set to

akka.remote.RemoteActorRefProvider

Ok so now we have the ability to expose the remote actor on a IP address and port, lets have a look at the remote actor code.

Here is the remote actor itself

import akka.actor.Actor

class RemoteActor extends Actor {
  def receive = {
    case msg: String =>
      println(s"RemoteActor received message '$msg'")
      sender ! "Hello from the RemoteActor"
  }
}

Nothing too special, we simply receive a string message and send a response string message back to the sender (the local actor would be the sender in this case)

And here is the main method that drives the remote project

import akka.actor.{Props, ActorSystem}

import scala.io.StdIn

object RemoteDemo extends App  {
  val system = ActorSystem("RemoteDemoSystem")
  val remoteActor = system.actorOf(Props[RemoteActor], name = "RemoteActor")
  remoteActor ! "The RemoteActor is alive"
  StdIn.readLine()
  system.terminate()
  StdIn.readLine()
}

Again pretty standard stuff, no voodoo here

And that is all there is to the remote side of the “remote selection” remoting version. Lets now turn our attention to the local side.

Local Project

So far we have a remote actor which is configured to up and running at 127.0.0.1:4444.

We now need to open up the local side of things. This is done using the following configuration.Notice the port is a different port from the already in use 4444. Obviously if you host these actors on physically different boxes there would be nothing to stop you using port 4444 again, but from a sanity point of view, I find it is better to not do that.

akka {
  actor {
    provider = "akka.remote.RemoteActorRefProvider"
  }
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "127.0.0.1"
      port = 0
    }
  }
}

Plain Selection

We can make use of plain old actor selection to select any actor by a path of our chosing. Where we may use the resolveOne (if we expect to only match one actor, remember we can use wildcards so there are times we may match more than one) to give us a ActorRef.

context.actorSelection(path).resolveOne()

When we use resolveOne() we would get a Future[ActorRef] that we can use in any of the normal ways we would handle and work with Futute[T]. I have chosen to use a for comprehension to capture the result of the ActorRef of the more actor. I also monitor the remote actor using context.watch such that if it terminates we will see a Terminated message and can shutdown the local actor system.

We also make use of the become (see the state machines post for more info on that) to swap out the message loop for the local actor, so work differently once we have a remote ActorRef.

Once we have an ActorRef representing the remote actor it is pretty standard stuff where we just send messages to the remote actor ref using the ActorRef that represents it.

Here is the entire code for the plain actor selection approach of dealing with a remote actor.

import java.util.concurrent.atomic.AtomicInteger
import akka.actor.{Terminated, ActorRef, ReceiveTimeout, Actor}
import akka.util.Timeout
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global

case object Init

class LocalActorUsingPlainSelection extends Actor {

  val path = "akka.tcp://RemoteDemoSystem@127.0.0.1:4444/user/RemoteActor"
  val atomicInteger = new AtomicInteger();
  context.setReceiveTimeout(3 seconds)

  def receive = identifying

  def identifying: Receive = {
    case Init => {
      implicit val resolveTimeout = Timeout(5 seconds)
      for (ref : ActorRef <- context.actorSelection(path).resolveOne()) {
        println("Resolved remote actor ref using Selection")
        context.watch(ref)
        context.become(active(ref))
        context.setReceiveTimeout(Duration.Undefined)
        self ! Start
      }
    }
    case ReceiveTimeout => println("timeout")
  }

  def active(actor: ActorRef): Receive = {
    case Start =>
      actor ! "Hello from the LocalActorUsingPlainSelection"
    case msg: String =>
      println(s"LocalActorUsingPlainSelection received message: '$msg'")
      if (atomicInteger.get() < 5) {
        sender ! "Hello back to you"
        atomicInteger.getAndAdd(1)
      }
    case Terminated(`actor`) =>
      println("Receiver terminated")
      context.system.terminate()
  }
}

 

Using Identity Messages

Another approach that can be taken rather than relying on straight actor selection is by using some special Akka messages, namely Identify and ActorIdentity.

The idea is that we still use actorSelection for a given path, but rather than using resolveOne we sent the send the ActorSelection a special Identify message. The actor that was chosen by the ActorSelection should see this Identify message and should respond with a ActorIdentity message.

As this point the local actor can simply listen for ActorIdentity messages and when it sees one, it can test this messages correlationId to see if it matches the requested path, if it does you know that is the correct actor and you can then use the ActorRef of the ActorIdentity message.

As in the previous example we also make use of the become (see the state machines post for more info on that) to swap out the message loop for the local actor, so work differently once we have a remote ActorRef.

Once we have an ActorRef representing the remote actor it is pretty standard stuff where we just send messages to the remote actor ref using the ActorRef that represents it.

Here is the entire code for the Identity actor approach

import java.util.concurrent.atomic.AtomicInteger
import akka.actor._
import scala.concurrent.duration._


case object Start


class LocalActorUsingIdentity extends Actor {

  val path = "akka.tcp://RemoteDemoSystem@127.0.0.1:4444/user/RemoteActor"
  val atomicInteger = new AtomicInteger();
  context.setReceiveTimeout(3 seconds)
  sendIdentifyRequest()

  def receive = identifying

  def sendIdentifyRequest(): Unit =
    context.actorSelection(path) ! Identify(path)

  def identifying: Receive = {
    case identity : ActorIdentity =>
      if(identity.correlationId.equals(path)) {
        identity.ref match {
          case Some(remoteRef) => {
            context.watch(remoteRef)
            context.become(active(remoteRef))
            context.setReceiveTimeout(Duration.Undefined)
            self ! Start
          }
          case None => println(s"Remote actor not available: $path")
        }
      }
    case ReceiveTimeout => sendIdentifyRequest()
  }

  def active(actor: ActorRef): Receive = {
    case Start =>
      actor ! "Hello from the LocalActorUsingIdentity"
    case msg: String =>
      println(s"LocalActorUsingIdentity received message: '$msg'")
      if (atomicInteger.get() < 5) {
        sender ! "Hello back to you"
        atomicInteger.getAndAdd(1)
      }
    case Terminated(`actor`) =>
      println("Receiver terminated")
      context.system.terminate()
  }
}

How do I Run The Demo

You will need to ensure that you run the following 2 projects in this order:

  • Remote
  • Local

Once you run the 2 projects you should see some output like this

The Remote project output:

[INFO] [10/03/2016 07:02:54.282] [main] [akka.remote.Remoting] Starting remoting
[INFO] [10/03/2016 07:02:54.842] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://RemoteDemoSystem@127.0.0.1:4444]
[INFO] [10/03/2016 07:02:54.844] [main] [akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://RemoteDemoSystem@127.0.0.1:4444]
RemoteActor received message ‘The RemoteActor is alive’
[INFO] [10/03/2016 07:02:54.867] [RemoteDemoSystem-akka.actor.default-dispatcher-15] [akka://RemoteDemoSystem/deadLetters]
Message [java.lang.String] from Actor[akka://RemoteDemoSystem/user/RemoteActor#-109465353] to Actor[akka://RemoteDemoSystem/deadLetters]
was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings
‘akka.log-dead-letters’ and ‘akka.log-dead-letters-during-shutdown’.
RemoteActor received message ‘Hello from the LocalActorUsingPlainSelection’
RemoteActor received message ‘Hello back to you’
RemoteActor received message ‘Hello back to you’
RemoteActor received message ‘Hello back to you’
RemoteActor received message ‘Hello back to you’
RemoteActor received message ‘Hello back to you’

 

The Local project output:

[INFO] [10/03/2016 07:03:09.489] [main] [akka.remote.Remoting] Starting remoting
[INFO] [10/03/2016 07:03:09.961] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://LocalDemoSystem@127.0.0.1:64945]
[INFO] [10/03/2016 07:03:09.963] [main] [akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://LocalDemoSystem@127.0.0.1:64945]
Resolved remote actor ref using Selection
LocalActorUsingPlainSelection received message: ‘Hello from the RemoteActor’
LocalActorUsingPlainSelection received message: ‘Hello from the RemoteActor’
LocalActorUsingPlainSelection received message: ‘Hello from the RemoteActor’
LocalActorUsingPlainSelection received message: ‘Hello from the RemoteActor’
LocalActorUsingPlainSelection received message: ‘Hello from the RemoteActor’
LocalActorUsingPlainSelection received message: ‘Hello from the RemoteActor’

 

 

 

Remote Creation

Just as we did with remote selection, remote creation shall be split into a remote project and a local project, however since this time the local project must know about the type of the more actor to create it in the first place we introduce a common project which both the remote and local depend on.

In this example there will be 3 projects configured in the SBT file

  • Remote : The remote actor, that is expected to be created by the local actor
  • Common : The common files that both Local/Remote projects depend on
  • Local : The local actor that will create and call the remote actor

Here is the complete SBT file for this section

import sbt._
import sbt.Keys._


lazy val allResolvers = Seq(
  "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/",
  "Akka Snapshot Repository" at "http://repo.akka.io/snapshots/"
)

lazy val AllLibraryDependencies =
  Seq(
    "com.typesafe.akka" %% "akka-actor" % "2.4.8",
    "com.typesafe.akka" %% "akka-remote" % "2.4.8"
  )


lazy val commonSettings = Seq(
  version := "1.0",
  scalaVersion := "2.11.8",
  resolvers := allResolvers,
  libraryDependencies := AllLibraryDependencies
)


lazy val root =(project in file(".")).
  settings(commonSettings: _*).
  settings(
    name := "Base"
  )
  .aggregate(common, remote)
  .dependsOn(common, remote)

lazy val common = (project in file("common")).
  settings(commonSettings: _*).
  settings(
    name := "common"
  )

lazy val remote = (project in file("remote")).
  settings(commonSettings: _*).
  settings(
    name := "remote"
  )
  .aggregate(common)
  .dependsOn(common)

It can be seen that both the local/remote will aggregate/depend on the common project. This is standard SBT stuff so I will not go into that.

So now that we understand a bit more about the SBT side of things lets focus on the remote side of things.

This may seem odd since we are expecting the local actor to create the remote aren’t we?

Well yes we are but the remote actor system must still be available prior to start/deploy and actor in it via the local system.

So it still makes sense to examine the remote side of things first.

Remote Project

Now that we have the relevant projects in place, lets talk about how we expose a remote actor for creation.

We must ensure that the remote system is available for creation requests, which requires the use of an IP address and a port.

Here is how we do this

akka {

  actor {
    provider = "akka.remote.RemoteActorRefProvider"
  }

  remote {
    netty.tcp {
      hostname = "127.0.0.1",
      # LISTEN on tcp port 2552
      port=2552
    }
  }

}

I also mentioned that the remote actor system MUST be started to allow remote creation to work, as such the entire codebase for the remote end of thing (excluding the actor remote actor which gets created by the local side of thing) is shown below

import com.typesafe.config.ConfigFactory
import akka.actor.ActorSystem

object CalculatorApplication {

  def main(args: Array[String]): Unit = {
    startRemoteWorkerSystem()
  }

  def startRemoteWorkerSystem(): Unit = {
    ActorSystem("CalculatorWorkerSystem", ConfigFactory.load("calculator"))
    println("Started CalculatorWorkerSystem")
  }

}

All that is happening here is that the remote actor system gets created.

Local Project

Most of the hard work is done in this project. As it is the local side of things that is responsible for creating and deploying the remote actor in the remote actor system, before it can then make use of it.

Lets start with the deployment of the remote actor from the local side.

Firstly we need this configuration to allow this happen

akka {

  actor {
    provider = "akka.remote.RemoteActorRefProvider",
    deployment {
      "/creationActor/*" {
        remote = "akka.tcp://CalculatorWorkerSystem@127.0.0.1:2552"
      }
    }
  }

  remote {
    netty.tcp {
      hostname = "127.0.0.1",
      port=2554
    }
  }

}

If you look carefully at this configuration file and the one in the remote end you will see that the ip address/poprt/actor system name used within the deployment section all match. This is how the local actor system is able to create and deploy an actor to the remote actor system (which must be running prior to the local actor system trying to deploy a remote actor to it)

So now that we have seen this config, lets see how it is used by the local

import sample.remote.calculator.{Divide, Multiply, CreationActor}
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import scala.util.Random
import akka.actor.ActorSystem
import akka.actor.Props

object CreationApplication {

  def main(args: Array[String]): Unit = {
    startRemoteCreationSystem()
  }

  def startRemoteCreationSystem(): Unit = {
    val system =
      ActorSystem("CreationSystem", ConfigFactory.load("remotecreation"))
    val actor = system.actorOf(Props[CreationActor],
      name = "creationActor")

    println("Started CreationSystem")
    import system.dispatcher
    system.scheduler.schedule(1.second, 1.second) {
      if (Random.nextInt(100) % 2 == 0)
        actor ! Multiply(Random.nextInt(20), Random.nextInt(20))
      else
        actor ! Divide(Random.nextInt(10000), (Random.nextInt(99) + 1))
    }
  }

}

It can be seen that the first thing we try and do is try and create the remote actor (CreationActor) using the config above. If this all works we will end up with a CreationActor being created in the already running remote actor system. This CreationActor can then be used just like any other actor.

For completeness here is the code of the CreationActor

package sample.remote.calculator

import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Props

class CreationActor extends Actor {

  def receive = {
    case op: MathOp =>
      val calculator = context.actorOf(Props[CalculatorActor])
      calculator ! op
    case result: MathResult => result match {
      case MultiplicationResult(n1, n2, r) =>
        printf("Mul result: %d * %d = %d\n", n1, n2, r)
        context.stop(sender())
      case DivisionResult(n1, n2, r) =>
        printf("Div result: %.0f / %d = %.2f\n", n1, n2, r)
        context.stop(sender())
    }
  }
}
It can be seen that the CreationActor above also creates another actor called CalculatorActor which does the real work. Lets see the code for that one
package sample.remote.calculator

import akka.actor.Props
import akka.actor.Actor

class CalculatorActor extends Actor {
  def receive = {
    case Add(n1, n2) =>
      println("Calculating %d + %d".format(n1, n2))
      sender() ! AddResult(n1, n2, n1 + n2)
    case Subtract(n1, n2) =>
      println("Calculating %d - %d".format(n1, n2))
      sender() ! SubtractResult(n1, n2, n1 - n2)
    case Multiply(n1, n2) =>
      println("Calculating %d * %d".format(n1, n2))
      sender() ! MultiplicationResult(n1, n2, n1 * n2)
    case Divide(n1, n2) =>
      println("Calculating %.0f / %d".format(n1, n2))
      sender() ! DivisionResult(n1, n2, n1 / n2)
  }
}

Nothing special there really, its just a standard actor

So we now have a complete pipeline.

How do I Run The Demo

You will need to ensure that you run the following 2 projects in this order:

  • Remote
  • Root

Once you run the 2 projects you should see some output like this

The Remote project output:

[INFO] [10/03/2016 07:30:58.763] [main] [akka.remote.Remoting] Starting remoting
[INFO] [10/03/2016 07:30:59.235] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://CalculatorWorkerSystem@127.0.0.1:2552]
[INFO] [10/03/2016 07:30:59.237] [main] [akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://CalculatorWorkerSystem@127.0.0.1:2552]
Started CalculatorWorkerSystem
Calculating 6 * 15
[WARN] [10/03/2016 07:31:10.988] [CalculatorWorkerSystem-akka.remote.default-remote-dispatcher-6] [akka.serialization.Serialization(akka://CalculatorWorkerSystem)] Using the default Java serializer for class [sample.remote.calculator.MultiplicationResult] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting ‘akka.actor.warn-about-java-serializer-usage’
Calculating 1346 / 82
[WARN] [10/03/2016 07:31:11.586] [CalculatorWorkerSystem-akka.remote.default-remote-dispatcher-6] [akka.serialization.Serialization(akka://CalculatorWorkerSystem)] Using the default Java serializer for class [sample.remote.calculator.DivisionResult] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting ‘akka.actor.warn-about-java-serializer-usage’
Calculating 2417 / 31
Calculating 229 / 66
Calculating 9966 / 43
Calculating 4 * 12
Calculating 9 * 5
Calculating 1505 / 91

The Root project output:

[INFO] [10/03/2016 07:31:08.849] [main] [akka.remote.Remoting] Starting remoting
[INFO] [10/03/2016 07:31:09.470] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://CreationSystem@127.0.0.1:2554]
[INFO] [10/03/2016 07:31:09.472] [main] [akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://CreationSystem@127.0.0.1:2554]
Started CreationSystem
[WARN] [10/03/2016 07:31:10.808] [CreationSystem-akka.remote.default-remote-dispatcher-5] [akka.serialization.Serialization(akka://CreationSystem)] Using the default Java serializer for class [com.typesafe.config.impl.SimpleConfig] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting ‘akka.actor.warn-about-java-serializer-usage’
[WARN] [10/03/2016 07:31:10.848] [CreationSystem-akka.remote.default-remote-dispatcher-5] [akka.serialization.Serialization(akka://CreationSystem)] Using the default Java serializer for class [sample.remote.calculator.Multiply] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting ‘akka.actor.warn-about-java-serializer-usage’
Mul result: 6 * 15 = 90
[WARN] [10/03/2016 07:31:11.559] [CreationSystem-akka.remote.default-remote-dispatcher-6] [akka.serialization.Serialization(akka://CreationSystem)] Using the default Java serializer for class [sample.remote.calculator.Divide] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting ‘akka.actor.warn-about-java-serializer-usage’
Div result: 1346 / 82 = 16.41
Div result: 2417 / 31 = 77.97
Div result: 229 / 66 = 3.47
Div result: 9966 / 43 = 231.77
Mul result: 4 * 12 = 48
Mul result: 9 * 5 = 45
Div result: 1505 / 91 = 16.54
Mul result: 7 * 4 = 28
Div result: 1797 / 95 = 18.92
Mul result: 12 * 17 = 204
Div result: 2998 / 72 = 41.64
Div result: 1157 / 98 = 11.81
Div result: 1735 / 22 = 78.86
Mul result: 4 * 19 = 76
Div result: 6257 / 51 = 122.69
Mul result: 14 * 4 = 56
Div result: 587 / 27 = 21.74
Div result: 2528 / 98 = 25.80
Mul result: 9 * 16 = 144
Mul result: 13 * 10 = 130

 

 

 

 

Nat or Docker Considerations

Akka Remoting does not work transparently with Network Address Translation, Load Balancers, or in Docker containers. If this is your case you may need to further configure Akka as described here :

http://doc.akka.io/docs/akka/snapshot/scala/remoting.html#remote-configuration-nat

 

 

 

Where Can I Find The Code Examples?

I will be augmenting this GitHub repo with the example projects as I move through this series

https://github.com/sachabarber/SachaBarber.AkkaExamples