CodeProject, Scala

SCALA mocking

 

Last time we looked at writing unit tests for our code, where we looked at using ScalaTest. This time we will be looking at mocking.

In .NET there are several choices available that I like (and a couple that I don’t), such as :

  • Moq
  • FakeItEasy
  • RhinoMocks (this is one I am not keen on)

I personally am most familiar with Moq, so when I started looking at JVM based mocking frameworks I kind of wanted one that used roughly the same syntax as the ones that I had used in .NET land.

There are several choices available that I think are quite nicely, namely :

  • ScalaMock
  • EasyMock
  • JMock
  • Mockito

Which all play nicely with ScalaTest (which I am sure you are all very pleased to here).

So with that list what did I decide upon. I personally opted for Mockito, as I liked the syntax the best, that is not to say the others are not fine and dandy, it is just that I personally liked Mockito and it seemed to have good documentation and favorable Google search results, so Mockito it is.

So for the rest of this post I will talk about how to use Mockito to write our mocks. I will be used Mockito along side ScalaTest which we looked at last time.

SBT Requirements

As with most of the previous posts you will need to grab the libraries using SBT. As such your SBT file will need to use the following:

libraryDependencies ++= Seq(
  "org.mockito" % "mockito-core" % "1.8.5",
  "org.scalatest" %% "scalatest" % "2.2.5" % "test"
)

 

Our First Example

So with all that stated above. Lets have a look at a simple example. This trivial example mocks out a java.util.ArrayList[String]. And also sets up a few verifications

class FlatSpec_Mocking_Tests extends FlatSpec with Matchers with MockitoSugar {


  "Testing using Mockito " should "be easy" in {


    //mock creation
    val mockedList = mock[java.util.ArrayList[String]]

    //using mock object
    mockedList.add("one");
    mockedList.clear

    //verification
    verify(mockedList).add("one")
    verify(mockedList).clear

  }
}

One thing you may notice straight away is how the F*k am I able to mock a ArrayList[T], which is a class which is not abstract by the way. This is pretty cool.

 

Stubbing

Using Mockito we can also stub out things just as you would expect with any 1/2 decent mocking framework. Here is an example where we try and mock out a simple trait.

import java.util.Date
import org.scalatest._
import org.scalatest.mock._
import org.mockito.Mockito._


trait DumbFormatter {

  def formatWithDataTimePrefix(inputString : String, date : Date) : String = {
    s"date : $date : $inputString"
  }

  def getDate() : String = {
    new Date().toString
  }
}



class FlatSpec_Mocking_Tests extends FlatSpec with Matchers with MockitoSugar {

  "Stubbing using Mockito " should "be easy" in {

    var mockDumbFormatter = mock[DumbFormatter]
    when(mockDumbFormatter.getDate()).thenReturn("01/01/2015")
    assert("01/01/2015" === mockDumbFormatter.getDate())
  }
}

It can be seen above that it is quite easy to mock a trait. You can also see how we stub the mock out using the  Mockito functions

  • when
  • thenReturn

 

Return Values

We just saw an example above of how to use the “thenReturn” Mockito function, which is what you would use to setup your return value. If you want a dynamic return value this could quite easily call some other function which deals with creating the return values. Kind of a return value factory method.

 

Argument Matching

Mockito comes with something that allows you to match against any argument value. It also comes with regex matchers, and allows you to write custom matchers if the ones out of the box don’t quite fit your needs.

Here is an example of writing a mock where we use the standard argument matchers:

import java.util.Date
import org.scalatest._
import org.scalatest.mock._
import org.mockito.Mockito._
import org.mockito.Matchers._


trait DumbFormatter {

  def formatWithDataTimePrefix(inputString : String, date : Date) : String = {
    s"date : $date : $inputString"
  }

  def getDate() : String = {
    new Date().toString
  }
}



class FlatSpec_Mocking_Tests extends FlatSpec with Matchers with MockitoSugar {

  "Stubbing using Mockito " should "be easy" in {

    var mockDumbFormatter = mock[DumbFormatter]
    when(mockDumbFormatter.formatWithDataTimePrefix(anyString(),any[Date]())).thenReturn("01/01/2015 Something")
    assert("01/01/2015 Something" === mockDumbFormatter.formatWithDataTimePrefix("blah blah blah", new Date()))
  }
}

Exceptions

To throw exceptions with Mockito we simply need to use the “thenThrow(….) function. Here is how.

import java.util.Date
import org.scalatest._
import org.scalatest.mock._
import org.mockito.Mockito._
import org.mockito.Matchers._


trait DumbFormatter {

  def formatWithDataTimePrefix(inputString : String, date : Date) : String = {
    s"date : $date : $inputString"
  }

  def getDate() : String = {
    new Date().toString
  }
}



class FlatSpec_Mocking_Tests extends FlatSpec with Matchers with MockitoSugar {

  "Stubbing using Mockito " should "be easy" in {

    var mockDumbFormatter = mock[DumbFormatter]
    when(mockDumbFormatter.formatWithDataTimePrefix(anyString(),any[Date]()))
	.thenThrow(new RuntimeException())

    //use the ScalaTest intercept to test for exceptions
    intercept[RuntimeException] {
      mockDumbFormatter.formatWithDataTimePrefix("blah blah blah", new Date())
    }
  }
}

See how we also have to use the ScalaTest “intercept” for the actually testing

 

CallBacks

Callbacks are useful when you want to see what a method was called with and then you can make informed decisions about what you could possibly return.

Here is how you do callbacks in Mockito, note the use of the “thenAnswer” function, and how we use an anonymous Answer object.

import java.util.Date
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
import org.scalatest._
import org.scalatest.mock._
import org.mockito.Mockito._
import org.mockito.Matchers._


trait DumbFormatter {

  def formatWithDataTimePrefix(inputString : String, date : Date) : String = {
    s"date : $date : $inputString"
  }

  def getDate() : String = {
    new Date().toString
  }
}



class FlatSpec_Mocking_Tests extends FlatSpec with Matchers with MockitoSugar {

  "Stubbing using Mockito " should "be easy" in {

    var mockDumbFormatter = mock[DumbFormatter]
    when(mockDumbFormatter.formatWithDataTimePrefix(anyString(),any[Date]()))
      .thenAnswer(new Answer[String] {
        override def answer(invocation: InvocationOnMock): String = {
          val result = "called back nicely sir"
          println(result)
          result
        }
      })

    assert("called back nicely sir" === mockDumbFormatter.formatWithDataTimePrefix("blah blah blah", new Date()))



  }
}

 

Verification

The last thing I wanted to talk about was verification. Which may include verifying functions got called, and were called the right number of times.

Here is a simple example of this:

import java.util.Date
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
import org.scalatest._
import org.scalatest.mock._
import org.mockito.Mockito._
import org.mockito.Matchers._


trait DumbFormatter {

  def formatWithDataTimePrefix(inputString : String, date : Date) : String = {
    s"date : $date : $inputString"
  }

  def getDate() : String = {
    new Date().toString
  }
}



class FlatSpec_Mocking_Tests extends FlatSpec with Matchers with MockitoSugar {

  "Stubbing using Mockito " should "be easy" in {

    var mockDumbFormatter = mock[DumbFormatter]
    when(mockDumbFormatter.formatWithDataTimePrefix(anyString(),any[Date]()))
      .thenReturn("someString")

    val theDate = new Date()
    val theResult = mockDumbFormatter.formatWithDataTimePrefix("blah blah blah", theDate)
    val theResult2 = mockDumbFormatter.formatWithDataTimePrefix("no no no", theDate)

    verify(mockDumbFormatter, atLeastOnce()).formatWithDataTimePrefix("blah blah blah", theDate)
    verify(mockDumbFormatter, times(1)).formatWithDataTimePrefix("no no no", theDate)


  }
}

 

 

Further Reading

You can read more about how to use Mockito from the docs : https://docs.google.com/document/d/15mJ2Qrldx-J14ubTEnBj7nYN2FB8ap7xOn8GRAi24_A/edit

 

 

End Of The Line

Personally my quest goes on, I am going to keep going until I consider myself  good at Scala (which probably means I know nothing).

Anyway behind the scenes I will be studying more and more stuff about how to get myself to that point. As such I guess it is only natural that I may post some more stuff about Scala in the future.

But for now this it it, this is the end of the line for this brief series of posts on Scala. I hope you have all enjoyed the posts, and if you have please feel free to leave a comment, they are always appreciated.

 

Advertisements
CodeProject, Scala

SCALA : TESTING OUR CODE

 

So last time we looked at how to use Slick to connect to a SQL server database.

This time we look at how to use one of the 2 popular Scala testing frameworks.

The 2 big names when it comes to Scala testing are

  • ScalaTest
  • Specs2

I have chosen to use ScalaTest as it seems slightly more popular, when you do a Google search, and I quite liked the syntax. That said Specs2 is also very good. so if you fancy having a look at that you should.

SBT for ScalaTest

So what do we need to get started with ScalaTest. As always we need to grab the JAR, which we do using SBT.

At time of writing this was accomplished using this SBT entry:

name := "ClassesDemo"

version := "1.0"

scalaVersion := "2.11.7"

libraryDependencies ++= Seq(
  "org.scalatest" %% "scalatest" % "2.2.5" % "test"
)

With that in place, SBT should pull down the JAR from Maven Central for you. So once you are happy that you have the ScalaTest JAR installed, we can not proceed to write some tests.

 

Writing Some Tests

I come from a .NET background, and as such I am using to working with tools such as

  • NUnit
    • TestFixture
    • Setup : To setup the test
    • TearDown : To teardown the test
  • Moq / FakeItEasy : Mocking frameworks

As such I wanted to make sure I could do everything that I was used to in .NET using ScalaTest.

This article will concentrate on the testing side of things, while the next post will be on the mocking side of things.

So let’s carry on for now shall we.

Choosing You Test Style

ScalaTest allows you to use 2 different styles of writing tests.

  • FunSuite : This is more in line with what you get with NUnit say. We would write something like Test(“testing should be easy”)
  • FlatSpec : This is more of a BDD style test declaration, where we would write something like this: “Testing” should “be easy”
     

We will see an examples of both of these styles in just a minute, but before that lets carry on and looks at some of the common things you may want to do with your tests

Setup / TearDown

You may want to run some startup/teardown code that is run. Typically startup would be used to setup mocks for your test cases, and that sort of thing.

In things like NUnit this would simply be done by creating a method and attributing it to say it is the Setup/TearDown methods.

In ScalaTest things are slightly different in that we need to mixin the “BeforeAndAfter”  trait to do this. Lets see an example:

import org.scalatest.{FunSuite, BeforeAndAfter}
import scala.collection.mutable.ListBuffer

class FunSuite_Example_Tests extends FunSuite with BeforeAndAfter {

  val builder = new StringBuilder
  val buffer = new ListBuffer[String]

  before {
    builder.append("ScalaTest is ")
  }

  after {
    builder.clear()
    buffer.clear()
  }
}

It can be seen in this example that the BeforeAndAfter trait, gives you 2 additional functions

  • before
  • after

You can use these to perform your startup/teardown logic.

This example uses the FunSuite style, but the “BeforeAndAfter”  trait mixin is done exactly the same for the FlatSpec style testing.

 

Writing A Test Using FunSuite

I think if you have come from a NUnit / XUnit type of background you will probably identify more with the FunSuite style of testing.

Here is an example of a set of FunSuite tests.

import org.scalatest.{FunSuite, BeforeAndAfter}

import scala.collection.mutable.ListBuffer

class FunSuite_Example_Tests extends FunSuite with BeforeAndAfter {

  val builder = new StringBuilder
  val buffer = new ListBuffer[String]

  before {
    builder.append("ScalaTest is ")
  }

  after {
    builder.clear()
    buffer.clear()
  }

  test("Testing should be easy") {
    builder.append("easy!")
    assert(builder.toString === "ScalaTest is easy!")
    assert(buffer.isEmpty)
    buffer += "sweet"
  }

  test("Testing should be fun") {
    builder.append("fun!")
    assert(builder.toString === "ScalaTest is fun!")
    assert(buffer.isEmpty)
  }
}

It can be see that they follow the very tried and tested approach of tools like NUnit, where you have a test(…) function, where “…” is the text that describes your testcase.

Nothing much more to say there apart from to make sure you mixin the FunSuite trait.

 

Writing A Test Using FlatSpec

ScalaTest also supports another way of writing your tests, which is to use the FlatSpec trait, which you would mixin instead of the FunSuite trait.

When you use FlatSpec you would be writing your tests more like this:

  • “Testing” should “be easy” in {…}
  • it should “be fun” in {…}

Its more of a BDD style way of creating your test cases.

Here is the exact same test suite that we saw above but this time written using the FlatSpec instead of FunSuite.

import scala.collection.mutable.ListBuffer
 
class FlatSpec_Example_Tests extends FlatSpec with BeforeAndAfter {
 
    val builder = new StringBuilder
    val buffer = new ListBuffer[String]
 
     before {
         builder.append("ScalaTest is ")
       }
 
     after {
         builder.clear()
         buffer.clear()
       }
 
    "Testing" should "be easy" in {
         builder.append("easy!")
         assert(builder.toString === "ScalaTest is easy!")
         assert(buffer.isEmpty)
         buffer += "sweet"
       }
 
     it should "be fun" in {
         builder.append("fun!")
         assert(builder.toString === "ScalaTest is fun!")
         assert(buffer.isEmpty)
       }
}

I don’t mind either, I guess it’s down to personal choice/taste at the end of the day.

Using Matchers

Matchers are ScalaTest’s way of providing additonal constraints to assert against. In some testing frameworks we would just use the Assert class for that along with things like

  • Assert.AreEqual(..)
  • Assert.IsNotNull(..)

In ScalaTest you can still use the assert(..) function, but matchers are also a good way of expressing your test conditional.

So what exactly are matchers?

In the words of the ScalaTest creators:

ScalaTest provides a domain specific language (DSL) for expressing assertions in tests using the word should.

So what do we need to do to use these ScalaTest matchers? Well quite simply we need to just mix in Matchers, like this:

import org.scalatest._

class ExampleSpec extends FlatSpec with Matchers { ...}

You can alternatively import the members of the trait, a technique particularly useful when you want to try out matcher expressions in the Scala interpeter. Here’s an example where the members of Matchers are imported:

import org.scalatest._
import Matchers._

class ExampleSpec extends FlatSpec { // Can use matchers here ...

So that give us the ability to use the ScalaTest matchers DSL. So what do these things look like. Lets see a couple of examples:

import org.scalatest._


class FlatSpec_Example_Tests extends FlatSpec with Matchers {

    "Testing" should "probably use some matchers" in {

          //equality examples
          Array(1, 2) should equal (Array(1, 2))
          val resultInt = 23
          resultInt should equal (3) // can customize equality
          resultInt should === (3)   // can customize equality and enforce type constraints
          resultInt should be (3)    // cannot customize equality, so fastest to compile
          resultInt shouldEqual 3    // can customize equality, no parentheses required
          resultInt shouldBe 3       // cannot customize equality, so fastest to compile, no parentheses required

          //length examples
          List(1,2) should have length 2
          "cat" should have length 3

          //string examples
          val helloWorld = "Hello worlld"
          helloWorld should startWith ("Hello")
          helloWorld should endWith ("world")

          val sevenString ="six seven eight"
          sevenString should include ("seven")

          //greater than / less than
          val one = 1
          val zero = 0
          val seven = 7
          one should be < seven
          one should be > zero
          one should be <= seven
          one should be >= zero

          //emptiness
          List() shouldBe empty
          List(1,2) should not be empty
       }

}

 

 

For more information on using matchers, you should consult this documentation, which you can find here:

http://www.scalatest.org/user_guide/using_matchers

 

 

C#, CodeProject

Scheduling With Quartz.Net

The other day I have a requirement to schedule something in my app to run at certain times, and at fixed intervals there after. Typically I would just solve this using either a simple Timer, or turn to my friend Reactive Extensions by way of Observable.Timer(..).

Thing is I decided to have a quick look at something I have always known about but never really used, for scheduling, which is Quartz.net, which actually does have some pretty good documentation up already:

http://www.quartz-scheduler.net/documentation/quartz-2.x/tutorial/index.html

For me I just wanted to get something very basic up and running, so I gave it a blast.

Step 1 : Install Quartz.net

This is as easy as installing the following NuGet package “Quartz

Step 2 Create A Job Class

This again is fairly easy thanks to Quartz nice API. Here is my job class

using System;
using Quartz;

namespace FooBar
{
    public class LoggingJob : IJob
    {
        public void Execute(IJobExecutionContext context)
        {


            Common.Logging.LogManager.Adapter.GetLogger("LoggingJob").Info(
                string.Format("Logging job : {0} {1}, and proceeding to log", 
                    DateTime.Now.ToShortDateString(), DateTime.Now.ToLongTimeString()));

        }
    }
}

That is all you need for a job really. The context object gives you access to a lot of useful stuff.

Step 3 : Setting up a schedule

Again this was mega easy, all I had to do was something like this:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

using Common.Logging;

using Quartz;
using Quartz.Impl;

namespace FooBar
{
    class Program
    {

        private static ILog Log = LogManager.GetCurrentClassLogger();

        static void Main(string[] args)
        {
            try
            {
                // construct a scheduler factory
                ISchedulerFactory schedFact = new StdSchedulerFactory();

                // get a scheduler
                IScheduler sched = schedFact.GetScheduler();
                sched.Start();

                IJobDetail job = JobBuilder.Create<LoggingJob>()
                    .WithIdentity("myJob", "group1")
                    .Build();

                ITrigger trigger = TriggerBuilder.Create()
                    .WithDailyTimeIntervalSchedule
                    (s =>
                        s.WithIntervalInSeconds(10)
                            .OnEveryDay()
                            .StartingDailyAt(TimeOfDay.HourAndMinuteOfDay(10, 15))
                    )
                    .Build();

                sched.ScheduleJob(job, trigger);
            }
            catch (ArgumentException e)
            {
                Log.Error(e);
            }


        }
    }
}

And that was enough for my job to get scheduled, every 10 seconds starting at 10:15 AM.

I was fairly happy with Quartz, and I will certainly make more use of it, when I have bigger, bolder, badder scheduling needs.

ASP MVC, CodeProject

Paper Effect Google Maps

A friend of mine Marlon Grech, has his own business and he has a nice parallax effect web site : http://www.thynksoftware.com/ and over there on his “contact us2 page, it has this very cool folding Google maps thing. I have wondered how it was done for a while now, today I decided to find out. A colleague of mine was like WHY!….Ashic if you are reading this, sorry but I think it is cool, but I promise you I will be back to trying to slay the world with sockets tomorrow, a slight distraction shall we say.

Not Really My Idea – Credit Where Credit Is Due

Now the code I present in this post is not my own at all, I have added the ability to toggle the folding of the map, but that really is all I have done. None the less, I think it is still of interest to describe how it was done, and worth a small write up. I think the original authors have done a great job, but did not really explain anything, so hopefully by the time you get to the end of this post, the effect will be a bit more familiar to you.

The original code that forms the basis to this post is here : http://experiments.bonnevoy.com/foldedmap/demo/

The Basic Idea

The idea is actually not too hard to grasp. There is a master DIV, which contains the actual Google map, this DIV has a VERY low opacity, so low you can’t actually see it. Then there are 6 other DIVS that get a slice of the original DIV, this is done by some clever margins, as can be seen in this image and the code that follows it:

image

The relevant HTML is here:

<!doctype html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Folding google maps</title>
    <link rel="stylesheet" type="text/css" href="style.css">
    <script src="http://maps.google.com/maps/api/js?sensor=true"></script>
    <script src="//ajax.googleapis.com/ajax/libs/jquery/1.7.1/jquery.min.js"></script>
    <script src="script.js"></script>
    <!–[if lt IE 9]>
    <script src="http://html5shim.googlecode.com/svn/trunk/html5.js"></script&gt;
    <![endif]–>
</head>
<body>
    <header>
        <a href="/foldedmap/" id="prev">Click to toggle</a>
        <p>Folded Google Maps (Webkit only)</p>
    </header>
    
    <div id="map_master"></div>
    <div id="container">
        <div id="mapContainer1" class="mapContainer odd first"></div>
        <div id="mapContainer2" class="mapContainer even"></div>
        <div id="mapContainer3" class="mapContainer odd"></div>
        <div id="mapContainer4" class="mapContainer even"></div>
        <div id="mapContainer5" class="mapContainer odd"></div>
                        <div id="mapContainer6" class="mapContainer even last"></div>
    </div>
</body>
</html>

Each of these slices is a fixed width of 160px and a height of 400px. The overflow is also hidden, that part is important, as will see in just a minute. The relevant CSS is here:

.mapContainer {
    background:#fff;
    float:left;
    width: 160px;
    height: 400px;
    overflow: hidden;
    border-top: 10px solid #fff;
    border-bottom: 10px solid #fff;
    -webkit-transform-style: preserve-3d;
    -moz-transform-style: preserve-3d;
    -ms-transform-style: preserve-3d;
    -o-transform-style: preserve-3d;
    transform-style: preserve-3d;
    box-shadow:0px 4px 0 #10335C;
}

 

Each slice of the original gets a certain margin applied, where the overflow for the 6 DIVS is hidden. So by moving the slice into the desired position by way of clever margin positioning the remaining portion of the map for that slice would not be seen, thanks to overflow being hidden. Sneaky

mapSync: function() {
    var map_clone = $('#map_master').clone().css('opacity',1).css('z-index',-1);
    $('#mapContainer1').html(map_clone.clone().css('marginLeft','80px'));
    $('#mapContainer2').html(map_clone.clone().css('marginLeft','240px'));
    $('#mapContainer3').html(map_clone.clone().css('marginLeft','400px'));
    $('#mapContainer4').html(map_clone.clone().css('marginLeft','560px'));
    $('#mapContainer5').html(map_clone.clone().css('marginLeft','720px'));
    $('#mapContainer6').html(map_clone.clone().css('marginLeft','880px'));
},

 

The next part of the trick is to apply some web kit 3D transforms. This is done inside a Timeout function so it happens every 10 milliseconds, up to/down from some predetermined values.

Here is the code that does applies the transforms

applyTransforms: function () {
    var prefixes = ['webkit', 'moz', 'ms', 'o', ''];
    for(i in prefixes) {
        $('.odd').css(prefixes[i] + 'Transform', 'rotateX(' +
            this.rotateX + 'deg) rotateY(' + –this.rotateY + 'deg)');
        $('.even').css(prefixes[i] + 'Transform', 'rotateX(' +
            this.rotateX + 'deg) rotateY(' + this.rotateY + 'deg)');
    }
    $('.mapContainer').css('marginLeft',
        -160 * (1 – Math.cos(this.rotateY / 360 * 2 * Math.PI)) + 'px');
},

 

It can be seen above that the applying of the transforms is done for each of the vendor specific flavours, Mozilla, Webkit etc etc

The code below unfolds/folds the 6 DIVS by calling he applyTransforms function outlined above.

unfoldMap: function () {
    if(this.rotateY > 20)
    {
        this.rotateY -= 0.5;
        this.rotateX += 0.1;
        this.applyTransforms();
    }
    else
    {
        clearInterval(this.fold_out);
        this.isOpen = true;
    }
},

foldMap: function () {
    if(this.rotateY < 90)
    {
        this.rotateY += 0.5;
        this.rotateX -= 0.1;
        this.applyTransforms();
    }
    else
    {
        clearInterval(this.fold_in);
        this.isOpen = false;
    }
}

 

Anyway like I say the code is pretty much as seen in the original link, all I added was the ability to toggle between a unfolded map back to a folded one.

Though I have added a bit more of an explanation, so hopefully it added some value

Where Is The Code?

Anyway if you want a small demo project I have created one for VS2013, that you can grab from GitHub here : https://github.com/sachabarber/FoldingCSSGooogleMap

IOS Code Like This

My old WPF Disciple buddy Colin Eberhardt has a similar post using Objective C for IOS development. May be of interest to some of you : http://www.scottlogic.com/blog/2013/09/20/creating-a-custom-flip-view-controller-transition.html

C#, CodeProject, Distributed Systems

ZeroMQ #7: A Simple Actor Model

Last time we looked at using ZeroMQ to use a “Divide And Conquer” pattern to distribute work to a number of workers and then combine the results again.

Since I wrote that last post I have had a bit of think about this series of posts, and realised that nothing I can say here would be as good or as thorough as the guide, so I have has to rethink my strategy a bit for the posts that I may write on ZeroMQ from here on in. So rather than me regurgitate what has already been said by Pieter on the guide web site, I will instead only be writing about stuff that I think is new, or worthy of a post. Now this could mean that the posts are less frequent, but I hope when there is one it will be of more interest, than me just saying here is a NetMQ version of the “Paranoid Pirate Pattern”, go check this link at the guide for more information.

So where does that leave this series of posts? Well to be honest slightly in limbo, but I have also been in contact with Pieter Hintjens, who was kind enough to give me a little push into something that may be of interest.

Pieter notified by of a Actor Model that was part of the high level C library for ZeroMQ called “czmq”, which is not contained in the NetMQ GitHub repository. So I had a call with Pieter, and looked into that.

This post will discuss a very simple actor model, that I have written to work with NetMQ, Pieter has given it the once over, and I have also talked it through with a regular ZeroMQ user at work, so I think it an ok version of the original C ZeroMQ “czmq” version.

Where Is The Code?

As always before we start, it’s only polite to tell you where the code is, and it is as before on GitHub:

https://github.com/sachabarber/ZeroMqDemos

What Is An Actor Model?

Here is what Wikipedia has to same in the introduction to what an Actor Model is.

The actor model in computer science is a mathematical model of concurrent computation that treats “actors” as the universal primitives of concurrent digital computation: in response to a message that it receives, an actor can make local decisions, create more actors, send more messages, and determine how to respond to the next message received.

….

….

The Actor model adopts the philosophy that everything is an actor. This is similar to the everything is an object philosophy used by some object-oriented programming languages, but differs in that object-oriented software is typically executed sequentially, while the Actor model is inherently concurrent.

An actor is a computational entity that, in response to a message it receives, can concurrently:

  • send a finite number of messages to other actors;
  • create a finite number of new actors;
  • designate the behavior to be used for the next message it receives.

There is no assumed sequence to the above actions and they could be carried out in parallel.

Decoupling the sender from communications sent was a fundamental advance of the Actor model enabling asynchronous communication and control structures as patterns of passing messages.[8]

Recipients of messages are identified by address, sometimes called “mailing address”. Thus an actor can only communicate with actors whose addresses it has. It can obtain those from a message it receives, or if the address is for an actor it has itself created.

The Actor model is characterized by inherent concurrency of computation within and among actors, dynamic creation of actors, inclusion of actor addresses in messages, and interaction only through direct asynchronous message passing with no restriction on message arrival order.

http://en.wikipedia.org/wiki/Actor_model

 

How I like to think of Actors is that they may be used to alleviate some of synchronization concerns of using shared data structures. This is achieved by your application code talking to actors via message passing/receiving. The actor itself may pass messages to other actors, or work on the passed message itself. By using message passing rather than using shared data structures, it may help to think of the actor (or any subsequent actors its send messages to) working on a copy of the data rather than working on the same shared structures. Which kind of gets rid of the need to worry about nasty things like lock(s) and any nasty timing issues that may arise from carrying out multi threaded code. If the actor is working with its own copy of the data then we should have no issues with other threads wanting to work with the data  the actor has, as the only place that data can be is within the actor itself, that is unless we pass another message to a different actor. If we were to do that though the new message to the other actor would also be a copy of the data, so would also be thread safe.

I hope you see what I am trying to explain there, may be a diagram may help.

Multi Threaded Application Using Shared Data Structure

A fairly common thing to do is have multiple threads running to speed things up, but then you realise that your threads need to mutate the state of some shared data structure, so then you have to involve threading synchronization primitives (most commonly lock(..) statements, to create your user defined critical sections). This will work, but now you are introducing artificial delays due to having to wait for the lock to be released so you can run Thread X’s code.

image 

To take this one step further, lets see some code that may illustrate this further, imagine we had this sort of data structure representing a very slim bank account

namespace ConsoleApplication1
{
    public class Account
    {

        public Account()
        {

        }

        public Account(int id, string name,
            string sortCode, decimal balance)
        {
            Id = id;
            Name = name;
            SortCode = sortCode;
            Balance = balance;
        }

        public int Id { get; set; }
        public string Name { get; set; }
        public string SortCode { get; set; }
        public decimal Balance { get; set; }
    }
}

 

Nothing fancy there, just some fields. So lets now move onto looking at some threading code, I have chosen to just show two threads acting on a shared Account instance.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Management.Instrumentation;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication1
{
    class Program
    {
        private object syncLock = new object();
        private Account clientBankAccount;
        public Program()
        {
            clientBankAccount = new Account(1,"sacha barber","112233",0);
        }

        public async Task Run()
        {
            try
            {
                await Task.Run(() =>
                {
                    Console.WriteLine("Tread Id {0}, Account balance before: {1}",
                        Thread.CurrentThread.ManagedThreadId, clientBankAccount.Balance);
                        
                    lock (syncLock)
                    {
                        Console.WriteLine("Tread Id {0}, Adding 10 to balance",
                           Thread.CurrentThread.ManagedThreadId);
                        clientBankAccount.Balance += 10;
                        Console.WriteLine("Tread Id {0}, Account balance before: {1}",
                            Thread.CurrentThread.ManagedThreadId, clientBankAccount.Balance);
                    }
                });

                await Task.Run(() =>
                {
                    Console.WriteLine("Tread Id {0}, Account balance before: {1}",
                        Thread.CurrentThread.ManagedThreadId, clientBankAccount.Balance);
                    lock (syncLock)
                    {
                        Console.WriteLine("Tread Id {0}, Subtracting 4 to balance",
                           Thread.CurrentThread.ManagedThreadId);
                        clientBankAccount.Balance -= 4;
                        Console.WriteLine("Tread Id {0}, Account balance before: {1}",
                            Thread.CurrentThread.ManagedThreadId, clientBankAccount.Balance);
                    }
                });
            }
            catch (Exception e)
            {
                Console.WriteLine(e);
            }

        }

        static void Main(string[] args)
        {
           Program p = new Program();
           p.Run().Wait();
           Console.ReadLine();
        }
    }
}

 

I have possible picked an example that you think may not actually happen in real life, and to be honest this scenario may not popup in real life, as who would do something as silly as crediting an account in one thread, and debiting it in another…we are all diligent developers, we would not let this one into the code would we?

To be honest whether the actual example has real world merit or not, the point remains the same, since we have more than one thread accessing a shared data structure, access to it must be synchronized, which is typically done using a lock(..) statement, as can be seen in the code.

Now don’t get me wrong the above code does work, as shown in the output below:

image

Perhaps there might be a more interesting way though!

Actor Model

The actor model, takes a different approach, where by message passing is used, which may involve some form of serialization as the messages are pass down the wire, which kind of guarantees no shared structures to contend with. Now I am not saying all Actor frameworks use message passing down the wire (serialization) but the code presented in this article does.

The basic idea is that each thread would talk to an Actor, and send/receive message with the actor.

If you wanted to get even more isolation, you could use thread local storage where each thread could have its own copy of the actor which it, and it alone talks to.

image

Anyway enough talking, I am sure some of you want to see the code right?

The Implementation

The idea is that the Actor itself may be treated like a ZeroMQ (NetMQ in my case) sockets, and may therefor be used to Send/Receive messages. Now you may be wondering if I send the Actor a message, who is listening to that message, and how is it that I am able to receive a message from the Actor?

The answer to that lies inside the implementation of the simple Actor framework in this post. Internally the Actor spins up another thread. Within that new thread is one end of a PairSocket where the Actor itself is the other end of the pipe which is also a PairSocket (recall I said the Actor is able to act as a socket). The Actor and the other end of the pipe communicate via message passing, and they use an in process (inproc) protocol to do so.

The initial message passed to the Actor forms some some of protocol  that both the other end of the Actor pipe (i.e. the thread the Actor created ZeroMQ czmq implementation), which I am calling the “shim” (borrowed from the) MUST know how to deal with the protocol that the user code sends via the Actor.

The way I have chosen to do this, is you (i.e. the user of the simple Actor library will need to create a “shim” handler class. This “shim” handler class may be a very simple protocol or a very complicated one (for the demo I have stuck to very simple ones), as long as it understands the message/command being sent from the Actor, and knows what to do with it. That is up to you to come up with, I have no silver bullet for that

One final thing to explain is that the “shim” handler may be passed some initial arguments (outside of the message passing) should you want to make use of this feature. It is something you may not want/need to use, but it is there should you want to use it.

Actor Code

Here is the code for the Actor itself, where it can be seen that it may be treated as a socket using the Send/Receive methods. The Actor also creates a new thread which is used to run the code in the shim handler. The Actor also creates the shim and the pair of PairSocket(s) for message passing.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using NetMQ;
using NetMQ.Sockets;
using NetMQ.zmq;

namespace NetMQActors
{
    /// <summary>
    /// The Actor represents one end of a two way pipe between 2 PairSocket(s). Where
    /// the actor may be passed messages, that are sent to the other end of the pipe
    /// which I am calling the "shim"
    /// </summary>
    public class Actor : IOutgoingSocket, IReceivingSocket, IDisposable
    {
        private readonly PairSocket self;
        private readonly Shim shim;
        private Random rand = new Random();
        private CancellationTokenSource cts = new CancellationTokenSource();

        private string GetEndPointName()
        {
            return string.Format("inproc://zactor-{0}-{1}",
                rand.Next(0, 10000), rand.Next(0, 10000));
        }

        public Actor(NetMQContext context, IShimHandler shimHandler, object[] args)
        {
            this.self = context.CreatePairSocket();
            this.shim = new Shim(shimHandler, context.CreatePairSocket());
            this.self.Options.SendHighWatermark = 1000;
            this.self.Options.SendHighWatermark = 1000;

            //now binding and connect pipe ends
            string endPoint = string.Empty;
            while (true)
            {
                Action bindAction = () =>
                {
                    endPoint = GetEndPointName();
                    self.Bind(endPoint);
                };

                try
                {
                    bindAction();
                    break;
                }
                catch (NetMQException nex)
                {
                    if (nex.ErrorCode == ErrorCode.EFAULT)
                    {
                        bindAction();
                    }
                }

            }

            shim.Pipe.Connect(endPoint);

            //Create Shim thread handler
            CreateShimThread(args);
        }

        private void CreateShimThread(object[] args)
        {
            Task shimTask = Task.Factory.StartNew(
                (state) => this.shim.Handler.Run(this.shim.Pipe, (object[])state, cts.Token),
                args,
                cts.Token,
                TaskCreationOptions.LongRunning,
                TaskScheduler.Default);

            shimTask.ContinueWith(ant =>
            {
                if (ant.Exception == null) return;

                Exception baseException = ant.Exception.Flatten().GetBaseException();
                if (baseException.GetType() == typeof (NetMQException))
                {
                    Console.WriteLine(string.Format("NetMQException caught : {0}",
                        baseException.Message));
                }
                else if (baseException.GetType() == typeof (ObjectDisposedException))
                {
                    Console.WriteLine(string.Format("ObjectDisposedException caught : {0}",
                        baseException.Message));
                }
                else
                {
                    Console.WriteLine(string.Format("Exception caught : {0}",
                        baseException.Message));
                }
            }, TaskContinuationOptions.OnlyOnFaulted);
        }

        ~Actor()
        {
            Dispose(false);
        }

        public void Dispose(){
            Dispose(true);
            GC.SuppressFinalize(this);
        }

        protected virtual void Dispose(bool disposing)
        {
            //cancel shim thread
            cts.Cancel();

            // release other disposable objects
            if (disposing)
            {
                if (self != null) self.Dispose();
                if (shim != null) shim.Dispose();
            }
        }

        public void Send(byte[] data, int length, bool dontWait = false, bool sendMore = false)
        {
            self.Send(data, length, dontWait, sendMore);
        }

        public byte[] Receive(bool dontWait, out bool hasMore)
        {
            return self.Receive(dontWait, out hasMore);
        }
    }
}

 

Shim Code

The shim represents the other end of the pipe, the shim essentially is a property bag, but it does hold a reference to the IShimHandler that the thread in the actual Actor will run. The IShimHandler is the one that MUST understand the protocol, and carry out any work.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

using NetMQ.Sockets;

namespace NetMQActors
{
    /// <summary>
    /// Shim represents one end of the in process pipe, where the Shim expects
    /// to be supplied with a <c>IShimHandler</c> that it would use for running the pipe
    /// protocol with the original Actor PairSocket the other end of the pipe
    /// </summary>
    public class Shim : IDisposable
    {
        public Shim(IShimHandler shimHandler, PairSocket pipe)
        {
            this.Handler = shimHandler;
            this.Pipe = pipe;
        }

        public IShimHandler Handler { get; private set; }
        public PairSocket Pipe { get; private set; }

        ~Shim()
        {
            Dispose(false);
        }

        public void Dispose(){
            Dispose(true);
            GC.SuppressFinalize(this);
        }

        protected virtual void Dispose(bool disposing)
        {
            if (disposing)
            {
                // release disposable objects
                if (Pipe != null) Pipe.Dispose();
            }
        }
    }
}

 

Handler Interface

You shim handler code MUST implement this interface

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using NetMQ.Sockets;

namespace NetMQActors
{
    /// <summary>
    /// Simple interface that all shims should implement
    /// </summary>
    public interface IShimHandler
    {
        void Run(PairSocket shim, object[] args, CancellationToken token);
    }
}

An Example : Simple EchoShim Handler

This example shows how to create a simple echo shim handler that can be used with standard actor code above. The EchoShimHandler presented here, uses an EXTREMELY simple protocol is does the following:

  • It expects the initial arguments to be 1 in length
  • It expects the initial argument element 0 to be “Hello World”
  • It expects a multi part message, where the 1st message frame is the command string “ECHO”

If all of those criteria are satisfied, then the EchoShimHandler will write the its end of the PairSocket pipe. The user of the Actor at the other end of the pipe (i.e. the other PairSocket), can then receive the value from the EchoShimHandler. Remember in this mini library the Actor may act as a regular NetMQ socket.

Here is the code for the EchoShimHandler

using System;
using System.Linq;
using System.Threading;
using NetMQ;
using NetMQ.Sockets;
using NetMQ.zmq;

namespace NetMQActors
{
    /// <summary>
    /// This hander class is specific implementation that you would need
    /// to implement per actor. This essentially contains your commands/protocol
    /// and should deal with any command workload, as well as sending back to the
    /// other end of the PairSocket which calling code would receive by using the
    /// Actor classes various RecieveXXX() methods
    ///
    /// This is a VERY simple protocol but it just demonstrates what you would need
    /// to do to implement your own Shim handler
    /// </summary>
    public class EchoShimHandler : IShimHandler
    {
        public void Run(PairSocket shim, object[] args, CancellationToken token)
        {
            if (args == null || args.Count() != 1 || (string)args[0] != "Hello World")
                throw new InvalidOperationException(
                    "Args were not correct, expected 'Hello World'");

            while (!token.IsCancellationRequested)
            {
                //Message for this actor/shim handler is expected to be
                //Frame[0] : Command
                //Frame[1] : Payload
                //
                //Result back to actor is a simple echoing of the Payload, where
                //the payload is prefixed with "ECHO BACK "
                NetMQMessage msg = null;

                //this may throw NetMQException if we have disposed of the actor
                //end of the pipe, and the CancellationToken.IsCancellationRequested
                //did not get picked up this loop cycle
                msg = shim.ReceiveMessage();

                if (msg == null)
                    break;

                if (msg[0].ConvertToString() == "ECHO")
                {
                    shim.Send(string.Format("ECHO BACK : {0}",
                        msg[1].ConvertToString()));
                }
                else
                {
                    throw NetMQException.Create("Unexpected command",
                        ErrorCode.EFAULT);
                }
            }
        }
    }
}

 

And here is the Actor test code that goes with this, where it can be seen that we are able send/receive using the Actor. There is also an example here that shows us trying to use a previously disposed Actor, which we expect to fail, and it does

//Round 1 : Should work fine
EchoShimHandler echoShimHandler = new EchoShimHandler();

Actor actor = new Actor(NetMQContext.Create(), echoShimHandler, new object[] { "Hello World" });
actor.SendMore("ECHO");
string actorMessage = "This is a string";
actor.Send(actorMessage);
var result = actor.ReceiveString();
Console.WriteLine("ROUND1");
Console.WriteLine("========================");
string expectedEchoHandlerResult = string.Format("ECHO BACK : {0}", actorMessage);
Console.WriteLine("ExpectedEchoHandlerResult: '{0}'\r\nGot : '{1}'\r\n",
    expectedEchoHandlerResult, result);
actor.Dispose();

//Round 2 : Should NOT work, as we are now using Disposed actor
try
{
    Console.WriteLine("ROUND2");
    Console.WriteLine("========================");
    actor.SendMore("ECHO");
    actor.Send("This is a string");
    result = actor.ReceiveString();
}
catch (NetMQException nex)
{
    Console.WriteLine("NetMQException : Actor has been disposed so this is expected\r\n");
}

//Round 3 : Should work fine
echoShimHandler = new EchoShimHandler();

actor = new Actor(NetMQContext.Create(), echoShimHandler, new object[] { "Hello World" });
actor.SendMore("ECHO");
actorMessage = "Another Go";
actor.Send(actorMessage);
result = actor.ReceiveString();
Console.WriteLine("ROUND3");
Console.WriteLine("========================");
expectedEchoHandlerResult = string.Format("ECHO BACK : {0}", actorMessage);
Console.WriteLine("ExpectedEchoHandlerResult: '{0}'\r\nGot : '{1}'\r\n",
    expectedEchoHandlerResult, result);
actor.Dispose();

 

Which would give output something like this

image

 

Another Example : Sending JSON Objects

This example shows how to create a simple account shim handler that can be used with standard actor code above. The AccountShimHandler presented here, uses another simple protocol (on purpose, you may choose to make this as simple or as complex as you wish) is does the following:

  • It expects the initial arguments to be 1 in length
  • It expects the initial argument element 0 to be a JSON serialized string of an AccountAction
  • It expects a multi part message, where the 1st message frame is the command string “AMEND ACCOUNT”
  • It expects the 2nd message frame to be a JSON serialized string of an Account

If all these criteria are met, then the AccountShimHandler deserialize the JSON Account object into an actual Account object, and will either debit/credit the Account that was passed into the AccountShimHandler, and then serialize the modified Account object back into JSON and send it back to the Actor via the PairSocket in the AccountShimHandler

The AccountAction class looks like this

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace NetMQActors.Models
{
    public enum TransactionType {  Debit=1, Credit=2}
    public class AccountAction
    {
        public AccountAction()
        {
            
        }

        public AccountAction(TransactionType transactionType, decimal amount)
        {
            TransactionType = transactionType;
            Amount = amount;
        }

        public TransactionType TransactionType { get; set; }
        public decimal Amount { get; set; }
    }
}

 

The Account class looks like this

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace NetMQActors.Models
{
    public class Account
    {

        public Account()
        {
            
        }

        public Account(int id, string name, string sortCode, decimal balance)
        {
            Id = id;
            Name = name;
            SortCode = sortCode;
            Balance = balance;
        }

        public int Id { get; set; }
        public string Name { get; set; }
        public string SortCode { get; set; }
        public decimal Balance { get; set; }
    }
}

 

Here is the code for the AccountShimHandler

using System;
using System.Linq;
using System.Threading;
using NetMQ;
using NetMQ.Sockets;
using NetMQ.zmq;
using NetMQActors.Models;
using Newtonsoft.Json;

namespace NetMQActors
{
    /// <summary>
    /// This hander class is specific implementation that you would need
    /// to implement per actor. This essentially contains your commands/protocol
    /// and should deal with any command workload, as well as sending back to the
    /// other end of the PairSocket which calling code would receive by using the
    /// Actor classes various RecieveXXX() methods
    ///
    /// This is a VERY simple protocol but it just demonstrates what you would need
    /// to do to implement your own Shim handler
    /// </summary>
    public class AccountShimHandler : IShimHandler
    {

        private void AmmendAccount(AccountAction action, Account account)
        {
            decimal currentAmount = account.Balance;
            account.Balance = action.TransactionType == TransactionType.Debit
                ? currentAmount – action.Amount
                : currentAmount + action.Amount;
        }

        public void Run(PairSocket shim, object[] args, CancellationToken token)
        {
            if (args == null || args.Count() != 1)
                throw new InvalidOperationException(
                    "Args were not correct, expected one argument");

            AccountAction accountAction = JsonConvert.DeserializeObject<AccountAction>(args[0].ToString());

            while (!token.IsCancellationRequested)
            {
                //Message for this actor/shim handler is expected to be
                //Frame[0] : Command
                //Frame[1] : Payload
                //
                //Result back to actor is a simple echoing of the Payload, where
                //the payload is prefixed with "AMEND ACCOUNT"
                NetMQMessage msg = null;

                //this may throw NetMQException if we have disposed of the actor
                //end of the pipe, and the CancellationToken.IsCancellationRequested
                //did not get picked up this loop cycle
                msg = shim.ReceiveMessage();

                if (msg == null)
                    break;

                if (msg[0].ConvertToString() == "AMEND ACCOUNT")
                {
                    string json = msg[1].ConvertToString();
                    Account account = JsonConvert.DeserializeObject<Account>(json);
                    AmmendAccount(accountAction, account);
                    shim.Send(JsonConvert.SerializeObject(account));
                }
                else
                {
                    throw NetMQException.Create("Unexpected command",
                        ErrorCode.EFAULT);
                }
            }
        }
    }
}

 

And here is the relevant Actor code

//Round 4 : Should work fine
AccountShimHandler accountShimHandler = new AccountShimHandler();

AccountAction accountAction = new AccountAction(TransactionType.Credit, 10);
Account account = new Account(1, "Test Account", "11223", 0);

Actor accountActor = new Actor(NetMQContext.Create(), accountShimHandler,
    new object[] { JsonConvert.SerializeObject(accountAction) });
accountActor.SendMore("AMEND ACCOUNT");
accountActor.Send(JsonConvert.SerializeObject(account));
Account updatedAccount =
    JsonConvert.DeserializeObject<Account>(accountActor.ReceiveString());
Console.WriteLine("ROUND4");
Console.WriteLine("========================");
decimal expectedAccountBalance = 10.0m;
Console.WriteLine(
    "Exected Account Balance: '{0}'\r\nGot : '{1}'\r\n" +
    "Are Same Account Object : '{2}'\r\n",
    expectedAccountBalance, updatedAccount.Balance,
    ReferenceEquals(accountActor, updatedAccount));
accountActor.Dispose();

 

Which gives a result something like this, if you read the code above you will see the Account object we send, and the receive are NOT the same object. This is due to the fact they have been sent down the wire using NetMQ sockets.

image

Some Actor Frameworks To Look At

There are a couple of Actor frameworks out there that I am aware of. Namely the following ones, there will be more, but these are the main ones I am aware of.

I have only really given Akka a cursory look, but I remember when Axum first came out and gave it a good try, and thought this is neat, no concurrency hell to worry about here. Cool.

For me at least, I wish there was an Actor model in .NET

C#, CodeProject, Distributed Systems

ZeroMQ #6 : Divide And Conquer

Last time we looked at how to send from multiple sockets. Believe it or not we have pretty much introduced most of the core concepts you will need. As a recap here is what we have already covered

So from here on in it is just a matter of going through some of the well known patterns from the ZeroMQ guide.

Now it would be immoral (even fraudulent) of me to not mention this up front, in the main, the information that I present in the remaining posts in this series of posts, will be based quite heavily on the ZeroMQ guide by Pieter Hintjens. Pieter has actually been in touch with me regarding this series of posts, and has been kind enough to let me run each new post by him. I think is generous, and I am extremely pleased to have Pieter on hand, to run them past. What that means to you, is that if there are any misunderstandings/mistakes on my behalf, I am sure Pieter will be pointing them out (at which point I will obviously correct any mistakes made,  hopefully I will not make any). So big thanks go out to Pieter, cheers as would say in England.

It is all good publicity for ZeroMQ though, and as NetMQ is a native port it is not one of the ones covered by the language bindings on the ZeroMQ guide site. So even though I am basing my content on the fantastic work done by Pieter, it will obviously be using NetMQ, so from that point of view the code is still very much relevant.

Where Is The Code?

As always before we start, it’s only polite to tell you where the code is, and it is as before on GitHub:

https://github.com/sachabarber/ZeroMqDemos

 

What Will We Be Doing This Time?

This time we will continue to look at ZeroMQ patterns. Which is actually what the remaining posts will all pretty much be focussed on.

The pattern that we will look at this time involves dividing a problem domain into smaller chunks and distributed them across workers, and then collating the results back together again.

This pattern is really a “divide and conquer” one, but it has also been called “Parallel Pipeline”. With all the remaining posts, I will be linking back to the original portion of the guide such that you can read more about the problem and Pieter’s solution.

ZeroMQ Guide Divide And Conquer : http://zguide.zeromq.org/page:all#Divide-and-Conquer

The idea is that you have something that generates work, and then distributes the work out to n-many workers. The workers each do some work, and push their results to some other process (could be a thread too) where the workers’ results are accumulated.

In the ZeroMQ guide, it shows an example that has the work generator just tell each worker to sleep for a period of time. I toyed with creating a more elaborate example than this, but in the end felt that the examples simplicity was quite important, so have stuck with the workload for each worker just being a value that tells the work to sleep for a number of Milliseconds (thus simulating some actual work).  This as I say has been borrowed from the ZeroMQ guide.

In real life the work could obviously be anything, though you would more than likely want the work to be something that could be cut up and distributed without the work generator caring/knowing how many workers there are.

Here is what we are trying to achieve :

image 

Ventilator

using System;
using NetMQ;

namespace Ventilator
{
    public class Program
    {
        public static void Main(string[] args)
        {
            // Task Ventilator
            // Binds PUSH socket to tcp://localhost:5557
            // Sends batch of tasks to workers via that socket
            Console.WriteLine("====== VENTILATOR ======");
           
            using (NetMQContext ctx = NetMQContext.Create())
            {
                //socket to send messages on
                using (var sender = ctx.CreatePushSocket())
                {
                    sender.Bind("tcp://*:5557");

                    using (var sink = ctx.CreatePushSocket())
                    {
                        sink.Connect("tcp://localhost:5558");

                        Console.WriteLine("Press enter when worker are ready");
                        Console.ReadLine();
                        
                        //the first message it "0" and signals start of batch
                        //see the Sink.csproj Program.cs file for where this is used
                        Console.WriteLine("Sending start of batch to Sink");    
                        sink.Send("0");

                        Console.WriteLine("Sending tasks to workers");

                        //initialise random number generator
                        Random rand= new Random(0);

                        //expected costs in Ms
                        int totalMs = 0;
                        
                        //send 100 tasks (workload for tasks, is just some random sleep time that
                        //the workers can perform, in real life each work would do more than sleep
                        for (int taskNumber = 0; taskNumber < 100; taskNumber++)
                        {
                            //Random workload from 1 to 100 msec
                            int workload = rand.Next(0, 100);
                            totalMs += workload;
                            Console.WriteLine("Workload : {0}", workload);
                            sender.Send(workload.ToString());
                        }
                        Console.WriteLine("Total expected cost : {0} msec", totalMs);
                        Console.WriteLine("Press Enter to quit");
                        Console.ReadLine();
                    }
                }
            }
        }
    }
}

 

Worker

using System;
using System.Threading;
using NetMQ;

namespace Worker
{
    public class Program
    {
        public static void Main(string[] args)
        {
            // Task Worker
            // Connects PULL socket to tcp://localhost:5557
            // collects workload for socket from Ventilator via that socket
            // Connects PUSH socket to tcp://localhost:5558
            // Sends results to Sink via that socket
            Console.WriteLine("====== WORKER ======");

            using (NetMQContext ctx = NetMQContext.Create())
            {
                //socket to receive messages on
                using (var receiver = ctx.CreatePullSocket())
                {
                    receiver.Connect("tcp://localhost:5557");

                     //socket to send messages on
                    using (var sender = ctx.CreatePushSocket())
                    {
                        sender.Connect("tcp://localhost:5558");

                        //process tasks forever
                        while (true)
                        {
                            //workload from the vetilator is a simple delay
                            //to simulate some work being done, see
                            //Ventilator.csproj Proram.cs for the workload sent
                            //In real life some more meaningful work would be done
                            string workload = receiver.ReceiveString();

                            //simulate some work being done
                            Thread.Sleep(int.Parse(workload));

                            //send results to sink, sink just needs to know worker
                            //is done, message content is not important, just the precence of
                            //a message means worker is done.
                            //See Sink.csproj Proram.cs
                            Console.WriteLine("Sending to Sink");
                            sender.Send(string.Empty);
                        }
                    }

                }
            }

        }
    }
}

 

Sink

using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using NetMQ;

namespace Sink
{
    public class Program
    {
        public static void Main(string[] args)
        {

            // Task Sink
            // Bindd PULL socket to tcp://localhost:5558
            // Collects results from workers via that socket
            Console.WriteLine("====== SINK ======");
           
            using (NetMQContext ctx = NetMQContext.Create())
            {
                //socket to receive messages on
                using (var receiver = ctx.CreatePullSocket())
                {
                    receiver.Bind("tcp://localhost:5558");

                    //wait for start of batch (see Ventilator.csproj Program.cs)
                    var startOfBatchTrigger = receiver.ReceiveString();
                    Console.WriteLine("Seen start of batch");

                    //Start our clock now
                    Stopwatch watch = new Stopwatch();
                    watch.Start();

                    for (int taskNumber = 0; taskNumber < 100; taskNumber++)
                    {
                        var workerDoneTrigger = receiver.ReceiveString();
                        if (taskNumber % 10 == 0)
                        {
                            Console.Write(":");
                        }
                        else
                        {
                            Console.Write(".");
                        }
                    }
                    watch.Stop();
                    //Calculate and report duration of batch
                    Console.WriteLine();
                    Console.WriteLine("Total elapsed time {0} msec", watch.ElapsedMilliseconds);
                    Console.ReadLine();
                }
            }
        }
    }
}

 

There is a couple of batch files you can use to spin up different amounts of workers, see:

Run1Worker.bat : One worker

Which when run should give you some output like this in the Sink process console output

====== SINK ======
Seen start of batch
:………:………:………:………:………:………:………:………
:………:………
Total elapsed time 5695 msec

 

Run2Worker.bat : two workers

Which when run should give you some output like this in the Sink process Console output

====== SINK ======
Seen start of batch
:………:………:………:………:………:………:………:………
:………:………
Total elapsed time 2959 msec

 

Run4Worker.bat : four workers

Which when run should give you some output like this in the Sink process Console output

====== SINK ======
Seen start of batch
:………:………:………:………:………:………:………:………
:………:………
Total elapsed time 1492 msec

 

There are a couple of points to be aware of with this pattern

  • The Ventilator uses a NetMQ PushSocket to distribute work to the workers, this is referred to as load balancing
  • The Ventilator and the Sink are the static parts of the system, where as workers are dynamic. It is trivial to add more workers, we can just spin up a new instance of a worker, and in theory the work gets done quicker.
  • We need to synchronize the starting of the batch (when workers are ready), as if we did not do that, the first worker that connected would get more messages that the rest, which is not really load balanced
  • The Sink uses a NetMQ PullSocket to accumulate the results from the workers
C#, CodeProject, Distributed Systems

ZeroMQ #5 : Sending From Multiple Sockets

Last time we looked at how to use the Poller to work with multiple sockets, and detect their readiness. This time we will continue to work with the familiar request/response model that we have been using thus far. We will however be beefing things up a bit, and shall examine several ways in which you can have more than one thread pushing messages to the server and getting responses, which is a fairly typical requirement (at least in my book it is).

Where Is The Code?

As always before we start, it’s only polite to tell you where the code is, and it is as before on GitHub:

https://github.com/sachabarber/ZeroMqDemos

 

One Thing Before We Start

As you may have realised by now, ZeroMQ is a messaging library, and as such, promotes the idea of lock free messaging. I also happen to think this is a very good idea. You can achieve an excellent throughput of messages and save yourself a lot of synchronization pain, if you try and avoid shared data structures. By doing this you will also be saving yourself the pain of having to synchronize access to them. So in general try and work with ZeroMQ in the way it wants to be worked with, which is via message passing, and avoiding locks, shared data structures.

 

Setting The Scene For This Post

Ok so we are nearly at the point where we can start to look at some code, but before we do that, let’s just talk a little bit more about what this post is trying to discuss.

In the code I typically write, it is quite common for a bunch of client threads all to be running at once, each capable of talking to the server. If this sounds like a requirement that you have had to deal with, then you may find this post of use, as this is exactly the scenario this post is aimed at solving.

As the aim of this post is to have asynchronous client, we need a asynchronous server too, so we use DealerSocket(s) for the client(s) and a RouterSocket for the server.

As with most things there is more than one way to skin a cat, so we will look at a couple of options, each with the their own pros/cons.

 

Option 1 : Each Thread Has It’ Own DealerSocket

The first options does need a bit of .NET threading knowledge, but if you have that, then the idea is a simple one. For each client thread we also create a dedicated DealerSocket that *should be* used exclusively by that thread.

This is achieved using the ThreadLocal<T> .NET class, which allows us to have a DealerSocket per thread. We add each of the client created DealerSocket(s) to a Poller instance, and listen to the ReceieveReady event on each socket, which allows us to get the message back from the server.

The obvious downside to this approach is that there will be more socket(s) created on the client side. The upside is that it is  very easy to implement, and just works.

Here is an image showing what we are trying to achieve here

image

Here is the code for this scenario:

using System;
using System.Threading;
using System.Threading.Tasks;
using NetMQ;
using NetMQ.Sockets;

namespace ManualThreadingDemo
{
    public class Program
    {
        public void Run()
        {

            //NOTES
            //1. Use ThreadLocal<DealerSocket> where each thread has
            //  its own client DealerSocket to talk to server
            //2. Each thread can send using it own socket
            //3. Each thread socket is added to poller
            
            ThreadLocal<DealerSocket> clientSocketPerThread =
                new ThreadLocal<DealerSocket>();
            int delay = 3000;
            Poller poller = new Poller();

            using (NetMQContext ctx = NetMQContext.Create())
            {
                using (var server = ctx.CreateRouterSocket())
                {
                    server.Bind("tcp://127.0.0.1:5556");

                    //start some threads, each with its own DealerSocket
                    //to talk to the server socket. Creates lots of sockets,
                    //but no nasty race conditions no shared state, each
                    //thread has its own socket, happy days
                    for (int i = 0; i < 3; i++)
                    {
                        Task.Factory.StartNew((state) =>
                        {
                            DealerSocket client = null;

                            if (!clientSocketPerThread.IsValueCreated)
                            {
                                client = ctx.CreateDealerSocket();
                                client.Connect("tcp://127.0.0.1:5556");
                                client.ReceiveReady += Client_ReceiveReady;
                                clientSocketPerThread.Value = client;
                                poller.AddSocket(client);
                            }
                            else
                            {
                                client = clientSocketPerThread.Value;
                            }

                            while (true)
                            {
                                var messageToServer = new NetMQMessage();
                                messageToServer.AppendEmptyFrame();
                                messageToServer.Append(state.ToString());
                                client.SendMessage(messageToServer);
                                Thread.Sleep(delay);
                            }

                        },string.Format("client {0}", i), TaskCreationOptions.LongRunning);
                    }

                    //start the poller
                    Task task = Task.Factory.StartNew(poller.Start);

                    //server loop
                    while (true)
                    {
                        var clientMessage = server.ReceiveMessage();
                        Console.WriteLine("========================");
                        Console.WriteLine(" INCOMING CLIENT MESSAGE ");
                        Console.WriteLine("========================");
                        for (int i = 0; i < clientMessage.FrameCount; i++)
                        {
                            Console.WriteLine("Frame[{0}] = {1}", i,
                                clientMessage[i].ConvertToString());
                        }

                        if (clientMessage.FrameCount == 3)
                        {
                            var clientAddress = clientMessage[0];
                            var clientOriginalMessage = clientMessage[2].ConvertToString();
                            string response = string.Format("{0} back from server {1}",
                                clientOriginalMessage, DateTime.Now.ToLongTimeString());
                            var messageToClient = new NetMQMessage();
                            messageToClient.Append(clientAddress);
                            messageToClient.AppendEmptyFrame();
                            messageToClient.Append(response);
                            server.SendMessage(messageToClient);
                        }
                    }
                }
            }
        }

        void Client_ReceiveReady(object sender, NetMQSocketEventArgs e)
        {
            bool hasmore = false;
            e.Socket.Receive(out hasmore);
            if (hasmore)
            {
                string result = e.Socket.ReceiveString(out hasmore);
                Console.WriteLine("REPLY " + result);
            }
        }

        [STAThread]
        public static void Main(string[] args)
        {
            Program p = new Program();
            p.Run();
        }

    }
}

 

If you were to run this, you would see something like this:

image

 

Option 2 : Each Thread Delegates Of To A Local Broker

The next example keeps the idea of a separate threads that want to send message(s) to the server. This time however we will use a broker on the client side. The idea being that the client threads will push to a shared data queue, I know I have told you to avoid shared data structures. Thing is, this is not a shared data structure it is just a thread safe queue, that many threads can write to. Where as a a shared data structure may mean several threads all trying to update the current Bid rate of an Fx option quote price. There is a difference. OK the shared queue will have some synchronization somewhere to make it thread safe, thankfully we can rely on the good work of the PFX team at Microsoft for that. Those guys are smart and I am sure the Concurrent collections namespace is pretty well designed and can be trusted to be pretty optimal.

Again we need to call on a bit of .NET know how, so for the centralized queue we use a ConcurrentQueue<T>. All client threads will enqueue  their messages for the server here.

There will also be another thread started. This extra thread is the one that will be processing the messages that have been queued onto the centralized queue. When there is a message taken of the centralized queue it will be sent to the server. The thing is only the thread that reads from the centralized queue will send messages to the server.

As we still want messages to be sent out asynchronously we stick with using a DealerSocket, but since their is now only one place where we send messages to the server we only need a single DealerSocket.

We add the SINGLE DealerSocket(s) to a Poller instance, and listen to the ReceieveReady event on each socket, which allows us to get the message back from the server.

This is more complex than the first example as there are more moving parts, but we no longer have loads of sockets being create. There is just one.

As before here is a diagram of what we are trying to achieve here

image

Here is the code for this scenario:

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using NetMQ;

namespace ConcurrentQueueDemo
{
    public class Program
    {
        public void Run()
        {
            //NOTES
            //1. Use many threads each writing to ConcurrentQueue
            //2. Extra thread to read from ConcurrentQueue, and this is the one that
            //   will deal with writing to the server
            ConcurrentQueue<string> messages = new ConcurrentQueue<string>();
            int delay = 3000;
            Poller poller = new Poller();

            using (NetMQContext ctx = NetMQContext.Create())
            {
                using (var server = ctx.CreateRouterSocket())
                {
                    server.Bind("tcp://127.0.0.1:5556");

                    //start some threads, where each thread, will use a client side
                    //broker (simple thread that monitors a CooncurrentQueue), where
                    //ONLY the client side broker talks to the server
                    for (int i = 0; i < 3; i++)
                    {
                        Task.Factory.StartNew((state) =>
                        {
                            while (true)
                            {
                                messages.Enqueue(state.ToString());
                                Thread.Sleep(delay);
                            }

                        }, string.Format("client {0}", i), TaskCreationOptions.LongRunning);
                    }

                    //single sender loop
                    Task.Factory.StartNew((state) =>
                    {
                        var client = ctx.CreateDealerSocket();
                        client.Connect("tcp://127.0.0.1:5556");
                        client.ReceiveReady += Client_ReceiveReady;
                        poller.AddSocket(client);

                        while (true)
                        {
                            string clientMessage = null;
                            if (messages.TryDequeue(out clientMessage))
                            {
                                var messageToServer = new NetMQMessage();
                                messageToServer.AppendEmptyFrame();
                                messageToServer.Append(clientMessage);
                                client.SendMessage(messageToServer);
                            }
                        }

                    }, TaskCreationOptions.LongRunning);

                    //start the poller
                    Task task = Task.Factory.StartNew(poller.Start);

                    //server loop
                    while (true)
                    {
                        var clientMessage = server.ReceiveMessage();
                        Console.WriteLine("========================");
                        Console.WriteLine(" INCOMING CLIENT MESSAGE ");
                        Console.WriteLine("========================");
                        for (int i = 0; i < clientMessage.FrameCount; i++)
                        {
                            Console.WriteLine("Frame[{0}] = {1}", i,
                                clientMessage[i].ConvertToString());
                        }

                        if (clientMessage.FrameCount == 3)
                        {
                            var clientAddress = clientMessage[0];
                            var clientOriginalMessage = clientMessage[2].ConvertToString();
                            string response = string.Format("{0} back from server {1}",
                                clientOriginalMessage, DateTime.Now.ToLongTimeString());
                            var messageToClient = new NetMQMessage();
                            messageToClient.Append(clientAddress);
                            messageToClient.AppendEmptyFrame();
                            messageToClient.Append(response);
                            server.SendMessage(messageToClient);
                        }
                    }
                }
            }
        }

        void Client_ReceiveReady(object sender, NetMQSocketEventArgs e)
        {
            bool hasmore = false;
            e.Socket.Receive(out hasmore);
            if (hasmore)
            {
                string result = e.Socket.ReceiveString(out hasmore);
                Console.WriteLine("REPLY " + result);
            }
        }

        [STAThread]
        public static void Main(string[] args)
        {
            Program p = new Program();
            p.Run();
        }
    }
}

 

If you were to run this, you would see something like this:

image

 

Option 3 : Use NetMQScheduler

The final option is to use the NetMQ library class : NetMQScheduler. I think the best place to start with that is by reading the link I just included. Then come back here.

…….

…….

Time passes

…….

…….

Oh hello you’re back. Ok so now you know that the NetMQScheduler offers us a way to use TPL to schedule work and that there is a Poller that we pass into the NetMQScheduler. Cool.

The NetMQScheduler is a custom TPL scheduler, which allows us to create tasks that we want done, and it will take care of the threading aspects of them. Since we told the NetMQScheduler about the Poller we want to use we are able to hook up the ReceiveReady event and use that to get messages back from the server.

The difference here is that since we are using TPL and NetMQ we need to use TPL Task(s) and the NetMQScheduler instance whenever we want to Send/Receive.

To be honest, I think I like this design the least, as it mixes up too many concepts, and the TPL stuff tends to be mixing a bit too much with the ZeroMQ goodness for my taste. I did however just want to show this example for completeness.

So the code for this example has two parts. A simple client, and then the code that spins up a client instance and then multiple threads that use the client instance to send messages to the server. There is also a basic server loop (which I will show below under the title “The Rest”)

Client Code

Here is the client code, where it can be seen that we create a NetMQScheduler which gets handed a new Poller instance to use internally. The idea is that anyone can send a message simply by calling the clients SendMessage(..) method

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using NetMQ;

namespace NetMQSchedulerDemo
{
    public class Client : IDisposable
    {
        private readonly NetMQContext context;
        private readonly string address;
        private Poller poller;
        private NetMQScheduler scheduler;
        private NetMQSocket clientSocket;

        public Client(NetMQContext context, string address)
        {
            this.context = context;
            this.address = address;
        }

        public void Start()
        {
            poller = new Poller();
            clientSocket = context.CreateDealerSocket();
            clientSocket.ReceiveReady += clientSocket_ReceiveReady;
            clientSocket.Connect(address);
            scheduler = new NetMQScheduler(context, poller);
            Task.Factory.StartNew(poller.Start, TaskCreationOptions.LongRunning);
        }

        void clientSocket_ReceiveReady(object sender, NetMQSocketEventArgs e)
        {
            string result = e.Socket.ReceiveString();
            Console.WriteLine("REPLY " + result);
        }

        public async Task SendMessage(NetMQMessage message)
        {
            // instead of creating inproc socket which listen to messages and then send
            //to the server we just creating task and run a code on
            // the poller thread which the the thread of the clientSocket
            Task task = new Task(() => clientSocket.SendMessage(message));
            task.Start(scheduler);
            await task;
            await ReceiveMessage();
        }

        public async Task ReceiveMessage()
        {
            Task task = new Task(() =>
            {
                var result = clientSocket.ReceiveString();
                Console.WriteLine("REPLY " + result);
            });
            task.Start(scheduler);
            await task;
        }

        public void Dispose()
        {
            scheduler.Dispose();
            clientSocket.Dispose();
            poller.Stop();
        }
    }
}

The Rest

And here is the rest of the code that is responsible for spinning up the client and extra threads to push messages through the client (using the SendMessage(..) method above)

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.SqlServer.Server;
using NetMQ;
using NetMQ.Sockets;
using NetMQSchedulerDemo;
using NUnit.Framework;

namespace NetMQSchedulerDemo
{
    public class Program
    {
        public void Run()
        {
            //NOTES
            //1. Use NetMQs NetMQScheduler to communicate with the
            //   server. All Send/Receive MUST be done via the
            //   NetMQScheduler and TPL Tasks. See the Client class
            //   for more information on this

            int delay = 3000;

            using (NetMQContext ctx = NetMQContext.Create())
            {
                using (var server = ctx.CreateRouterSocket())
                {
                    server.Bind("tcp://127.0.0.1:5556");

                    using (var client = new Client(ctx, "tcp://127.0.0.1:5556"))
                    {
                        client.Start();

                        //start some theads, each thread will use the
                        //Clients NetMQScheduler to send/receieve messages
                        //to/from the server
                        for (int i = 0; i < 2; i++)
                        {
                            Task.Factory.StartNew(async (state) =>
                            {
                                while (true)
                                {
                                    var messageToServer = new NetMQMessage();
                                    messageToServer.AppendEmptyFrame();
                                    messageToServer.Append(state.ToString());
                                    await client.SendMessage(messageToServer);
                                    Thread.Sleep(delay);
                                }
                            }, string.Format("client {0}", i), TaskCreationOptions.LongRunning);
                        }

                        //server loop
                        while (true)
                        {
                            var clientMessage = server.ReceiveMessage();
                            Console.WriteLine("========================");
                            Console.WriteLine(" INCOMING CLIENT MESSAGE ");
                            Console.WriteLine("========================");
                            for (int i = 0; i < clientMessage.FrameCount; i++)
                            {
                                Console.WriteLine("Frame[{0}] = {1}", i,
                                    clientMessage[i].ConvertToString());
                            }

                            if (clientMessage.FrameCount == 3)
                            {
                                var clientAddress = clientMessage[0];
                                var clientOriginalMessage = clientMessage[2].ConvertToString();
                                string response = string.Format("{0} back from server {1}",
                                    clientOriginalMessage, DateTime.Now.ToLongTimeString());
                                var messageToClient = new NetMQMessage();
                                messageToClient.Append(clientAddress);
                                messageToClient.AppendEmptyFrame();
                                messageToClient.Append(response);
                                server.SendMessage(messageToClient);
                            }
                        }
                    }
                }
            }
        }

        [STAThread]
        public static void Main(string[] args)
        {
            Program p = new Program();
            p.Run();
        }
    }
}

 

If you run this code you may see something like this:

image