CodeProject

Setting up Prometheus and Grafana Monitoring

In this post I want to talk about how to get started with some very nice monitoring tools, namely

At the job I was at previously we used these tools a lot. They served as the backbone to our monitoring for our system overall, and we were chucking a lot of metrics into these tools, and without these tools I think its fair to say we would not have had such a good insight to where our software had performance issues/bottlenecks. These 2 tools really did help us a lot.

But what exactly are these tools?

Prometheus

Prometheus is the metrics capture engine, that come with an inbuilt query language known as PromQL. Prometheus is setup to scrape metrics from your own apps. It does this by way of a config file that you set up to scrape your chosen applications. You can read the getting started guide here : https://prometheus.io/docs/prometheus/latest/getting_started/ and read more about the queries here : https://prometheus.io/docs/prometheus/latest/querying/basics/

 

As I just said you configure Prometheus to scrape your apps using a config file. This file is called prometheus.yml where a small example is shown below. This example is set to scrape Prometheus own metrics and also another app which is running on port 9000. I will show you that app later on.

# my global config
global:
  scrape_interval:     15s # Set the scrape interval to every 15 seconds. Default is every 1 minute.
  evaluation_interval: 15s # Evaluate rules every 15 seconds. The default is every 1 minute.
  # scrape_timeout is set to the global default (10s).

# Alertmanager configuration
alerting:
  alertmanagers:
  - static_configs:
    - targets:
      # - alertmanager:9093

# Load rules once and periodically evaluate them according to the global 'evaluation_interval'.
rule_files:
  # - "first_rules.yml"
  # - "second_rules.yml"

# A scrape configuration containing exactly one endpoint to scrape:
# Here it's Prometheus itself.
scrape_configs:
  # The job name is added as a label `job=<job_name>` to any timeseries scraped from this config.
  - job_name: 'prometheus'

    # metrics_path defaults to '/metrics'
    # scheme defaults to 'http'.

    static_configs:
    - targets: ['localhost:9090','localhost:9000']

So lets assume you have something like this in place, you should then be able to launch Prometheus using a command like this prometheus.exe –config.file=prometheus.yml, and then navigate to the following url to test out the Prometheus setup http://localhost:9090/

This should show you something like this

image

From here you can try and also go to this url which will show you some of the inbuilt Prometheus metrics : http://localhost:9090/metrics, so taking one of these say go_memstats_alloc_bytes, we could go back to http://localhost:9090/ and build a simple graph

 

image

 

So once you have verified this bit looks ok. We can then turn our attention to getting Grafana to work.

Grafana

As we just saw Prometheus comes with fairly crude graphing. However Grafana offers a richer way to setup graphs, and it also comes with inbuilt support for using Prometheus as a data source. To get started you can download from here  : https://grafana.com/docs/grafana/latest/guides/getting_started/. And to start Grafana you just need to launch the bin\grafana-server.exe, but make sure you also have Prometheus running as shown in the previous step. Once you have both Prometheus and Grafana running, we can launch the Grafana UI from http://localhost:3000/

 

Then what we can do is add Prometheus as a data source into Grafana, which can be done as follows:

image

image

image

So once you have done that we can turn our attention into adding a new Graph, this would be done using the “Add Query” button below.

image

If we stick with the example inbuilt metrics that come with Prometheus we can use the go_memstats_alloc_bytes one and add a new Panel and use the “Add Query” button above, where we can enter the following metric go_memstats_alloc_bytes{instance=”localhost:9090″,job=”prometheus”}, once configured we should see a nice little graph like this

image

I won’t go into every option of how to create graphs in Grafana, but one thing I can tell you that is very very useful is the ability to grab one graphs JSON representation and use it ton create other graphs. or you can also duplicate this is also very useful, both of these can be done by using the dropdown at the top of the graph in question

image

This will save you a lot of time when you are trying to create your own panels

Labels

I also just wanted to spend a bit of time talking about labels in Grafana and Prometheus. Labels allow you to to group items to together on one chart but distinguish them in some way based on the label value. For example suppose we wanted to monitor the number of GET vs PUT through some API, we could have a single metric for this but could apply some label value to it somehow. We will see how we can do this using our own .N ET code in the next section, but for now this is the sort of thing you can get with labels

image

This one was setup like this in Grafana

image

We will see how we did this is the next section.

Custom Metrics Inside Your Own .NET App

To use Prometheus in your own .NET code is actually fairly simple thanks to Prometheus.NET which is a nice Nuget package, which you can read more about at the link just provided. Once you have the Nuget installed, its simply a matter of setting up the metrics you want and adding your app to the list of apps that are scraped by Prometheus, which we saw above in the Yaml file at the start of this post. Essentially Prometheus.NET will expose a webserver at the port you chose, which you can then configure to be scraped.

Let’s see an example of configuring a metric. We will use a Gauge type, which is a metric that can go up and down in value. We will also make use of labels, which is what is driving the chart shown above.

using Prometheus;
using System;
using System.Threading;

namespace PrometheusDemo
{
    class Program
    {

        private static readonly Gauge _gauge =
            Metrics.CreateGauge("sampleapp_ticks_total", 
                "Just keeps on ticking",new string[1] { "operation" });
        private static double _gaugeValue = 0;
        private static Random _rand = new Random();

        static void Main(string[] args)
        {
            var server = new MetricServer(hostname: "localhost", port: 9000);
            server.Start();
            while (true)
            {
                if(_gaugeValue > 100)
                {
                    _gaugeValue = 0;
                    _gauge.Set(0);
                }
                _gaugeValue = _gaugeValue + 1;
                if(_rand.NextDouble() > 0.5)
                {
                    _gauge.WithLabels("PUT").Set(_gaugeValue);
                }
                else
                {
                    _gauge.WithLabels("GET").Set(_gaugeValue);
                }

                Console.WriteLine($"Setting gauge valiue to {_gaugeValue}");
                Thread.Sleep(TimeSpan.FromSeconds(1));
            }

            Console.ReadLine();
        }
    }
}

That is the entire listing, and that is enough to expose the single metric sampleapp_ticks_total with 2 labels

  • POST
  • GET

Which are then used to display 2 individual line graphs for the same metric inside of Prometheus

Conclusion

And that is all there is to it.Obviously for more complex queries, you will need to dig into the Prometheus PromQL query syntax. But this gets you started. The other thing I have not shown here is that Grafana is also capable of creating other types of displays such as these

image

CodeProject, Scala

SCALA mocking

 

Last time we looked at writing unit tests for our code, where we looked at using ScalaTest. This time we will be looking at mocking.

In .NET there are several choices available that I like (and a couple that I don’t), such as :

  • Moq
  • FakeItEasy
  • RhinoMocks (this is one I am not keen on)

I personally am most familiar with Moq, so when I started looking at JVM based mocking frameworks I kind of wanted one that used roughly the same syntax as the ones that I had used in .NET land.

There are several choices available that I think are quite nicely, namely :

  • ScalaMock
  • EasyMock
  • JMock
  • Mockito

Which all play nicely with ScalaTest (which I am sure you are all very pleased to here).

So with that list what did I decide upon. I personally opted for Mockito, as I liked the syntax the best, that is not to say the others are not fine and dandy, it is just that I personally liked Mockito and it seemed to have good documentation and favorable Google search results, so Mockito it is.

So for the rest of this post I will talk about how to use Mockito to write our mocks. I will be used Mockito along side ScalaTest which we looked at last time.

SBT Requirements

As with most of the previous posts you will need to grab the libraries using SBT. As such your SBT file will need to use the following:

libraryDependencies ++= Seq(
  "org.mockito" % "mockito-core" % "1.8.5",
  "org.scalatest" %% "scalatest" % "2.2.5" % "test"
)

 

Our First Example

So with all that stated above. Lets have a look at a simple example. This trivial example mocks out a java.util.ArrayList[String]. And also sets up a few verifications

class FlatSpec_Mocking_Tests extends FlatSpec with Matchers with MockitoSugar {


  "Testing using Mockito " should "be easy" in {


    //mock creation
    val mockedList = mock[java.util.ArrayList[String]]

    //using mock object
    mockedList.add("one");
    mockedList.clear

    //verification
    verify(mockedList).add("one")
    verify(mockedList).clear

  }
}

One thing you may notice straight away is how the F*k am I able to mock a ArrayList[T], which is a class which is not abstract by the way. This is pretty cool.

 

Stubbing

Using Mockito we can also stub out things just as you would expect with any 1/2 decent mocking framework. Here is an example where we try and mock out a simple trait.

import java.util.Date
import org.scalatest._
import org.scalatest.mock._
import org.mockito.Mockito._


trait DumbFormatter {

  def formatWithDataTimePrefix(inputString : String, date : Date) : String = {
    s"date : $date : $inputString"
  }

  def getDate() : String = {
    new Date().toString
  }
}



class FlatSpec_Mocking_Tests extends FlatSpec with Matchers with MockitoSugar {

  "Stubbing using Mockito " should "be easy" in {

    var mockDumbFormatter = mock[DumbFormatter]
    when(mockDumbFormatter.getDate()).thenReturn("01/01/2015")
    assert("01/01/2015" === mockDumbFormatter.getDate())
  }
}

It can be seen above that it is quite easy to mock a trait. You can also see how we stub the mock out using the  Mockito functions

  • when
  • thenReturn

 

Return Values

We just saw an example above of how to use the “thenReturn” Mockito function, which is what you would use to setup your return value. If you want a dynamic return value this could quite easily call some other function which deals with creating the return values. Kind of a return value factory method.

 

Argument Matching

Mockito comes with something that allows you to match against any argument value. It also comes with regex matchers, and allows you to write custom matchers if the ones out of the box don’t quite fit your needs.

Here is an example of writing a mock where we use the standard argument matchers:

import java.util.Date
import org.scalatest._
import org.scalatest.mock._
import org.mockito.Mockito._
import org.mockito.Matchers._


trait DumbFormatter {

  def formatWithDataTimePrefix(inputString : String, date : Date) : String = {
    s"date : $date : $inputString"
  }

  def getDate() : String = {
    new Date().toString
  }
}



class FlatSpec_Mocking_Tests extends FlatSpec with Matchers with MockitoSugar {

  "Stubbing using Mockito " should "be easy" in {

    var mockDumbFormatter = mock[DumbFormatter]
    when(mockDumbFormatter.formatWithDataTimePrefix(anyString(),any[Date]())).thenReturn("01/01/2015 Something")
    assert("01/01/2015 Something" === mockDumbFormatter.formatWithDataTimePrefix("blah blah blah", new Date()))
  }
}

Exceptions

To throw exceptions with Mockito we simply need to use the “thenThrow(….) function. Here is how.

import java.util.Date
import org.scalatest._
import org.scalatest.mock._
import org.mockito.Mockito._
import org.mockito.Matchers._


trait DumbFormatter {

  def formatWithDataTimePrefix(inputString : String, date : Date) : String = {
    s"date : $date : $inputString"
  }

  def getDate() : String = {
    new Date().toString
  }
}



class FlatSpec_Mocking_Tests extends FlatSpec with Matchers with MockitoSugar {

  "Stubbing using Mockito " should "be easy" in {

    var mockDumbFormatter = mock[DumbFormatter]
    when(mockDumbFormatter.formatWithDataTimePrefix(anyString(),any[Date]()))
	.thenThrow(new RuntimeException())

    //use the ScalaTest intercept to test for exceptions
    intercept[RuntimeException] {
      mockDumbFormatter.formatWithDataTimePrefix("blah blah blah", new Date())
    }
  }
}

See how we also have to use the ScalaTest “intercept” for the actually testing

 

CallBacks

Callbacks are useful when you want to see what a method was called with and then you can make informed decisions about what you could possibly return.

Here is how you do callbacks in Mockito, note the use of the “thenAnswer” function, and how we use an anonymous Answer object.

import java.util.Date
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
import org.scalatest._
import org.scalatest.mock._
import org.mockito.Mockito._
import org.mockito.Matchers._


trait DumbFormatter {

  def formatWithDataTimePrefix(inputString : String, date : Date) : String = {
    s"date : $date : $inputString"
  }

  def getDate() : String = {
    new Date().toString
  }
}



class FlatSpec_Mocking_Tests extends FlatSpec with Matchers with MockitoSugar {

  "Stubbing using Mockito " should "be easy" in {

    var mockDumbFormatter = mock[DumbFormatter]
    when(mockDumbFormatter.formatWithDataTimePrefix(anyString(),any[Date]()))
      .thenAnswer(new Answer[String] {
        override def answer(invocation: InvocationOnMock): String = {
          val result = "called back nicely sir"
          println(result)
          result
        }
      })

    assert("called back nicely sir" === mockDumbFormatter.formatWithDataTimePrefix("blah blah blah", new Date()))



  }
}

 

Verification

The last thing I wanted to talk about was verification. Which may include verifying functions got called, and were called the right number of times.

Here is a simple example of this:

import java.util.Date
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
import org.scalatest._
import org.scalatest.mock._
import org.mockito.Mockito._
import org.mockito.Matchers._


trait DumbFormatter {

  def formatWithDataTimePrefix(inputString : String, date : Date) : String = {
    s"date : $date : $inputString"
  }

  def getDate() : String = {
    new Date().toString
  }
}



class FlatSpec_Mocking_Tests extends FlatSpec with Matchers with MockitoSugar {

  "Stubbing using Mockito " should "be easy" in {

    var mockDumbFormatter = mock[DumbFormatter]
    when(mockDumbFormatter.formatWithDataTimePrefix(anyString(),any[Date]()))
      .thenReturn("someString")

    val theDate = new Date()
    val theResult = mockDumbFormatter.formatWithDataTimePrefix("blah blah blah", theDate)
    val theResult2 = mockDumbFormatter.formatWithDataTimePrefix("no no no", theDate)

    verify(mockDumbFormatter, atLeastOnce()).formatWithDataTimePrefix("blah blah blah", theDate)
    verify(mockDumbFormatter, times(1)).formatWithDataTimePrefix("no no no", theDate)


  }
}

 

 

Further Reading

You can read more about how to use Mockito from the docs : https://docs.google.com/document/d/15mJ2Qrldx-J14ubTEnBj7nYN2FB8ap7xOn8GRAi24_A/edit

 

 

End Of The Line

Personally my quest goes on, I am going to keep going until I consider myself  good at Scala (which probably means I know nothing).

Anyway behind the scenes I will be studying more and more stuff about how to get myself to that point. As such I guess it is only natural that I may post some more stuff about Scala in the future.

But for now this it it, this is the end of the line for this brief series of posts on Scala. I hope you have all enjoyed the posts, and if you have please feel free to leave a comment, they are always appreciated.

 

CodeProject, Scala

SCALA : TESTING OUR CODE

 

So last time we looked at how to use Slick to connect to a SQL server database.

This time we look at how to use one of the 2 popular Scala testing frameworks.

The 2 big names when it comes to Scala testing are

  • ScalaTest
  • Specs2

I have chosen to use ScalaTest as it seems slightly more popular, when you do a Google search, and I quite liked the syntax. That said Specs2 is also very good. so if you fancy having a look at that you should.

SBT for ScalaTest

So what do we need to get started with ScalaTest. As always we need to grab the JAR, which we do using SBT.

At time of writing this was accomplished using this SBT entry:

name := "ClassesDemo"

version := "1.0"

scalaVersion := "2.11.7"

libraryDependencies ++= Seq(
  "org.scalatest" %% "scalatest" % "2.2.5" % "test"
)

With that in place, SBT should pull down the JAR from Maven Central for you. So once you are happy that you have the ScalaTest JAR installed, we can not proceed to write some tests.

 

Writing Some Tests

I come from a .NET background, and as such I am using to working with tools such as

  • NUnit
    • TestFixture
    • Setup : To setup the test
    • TearDown : To teardown the test
  • Moq / FakeItEasy : Mocking frameworks

As such I wanted to make sure I could do everything that I was used to in .NET using ScalaTest.

This article will concentrate on the testing side of things, while the next post will be on the mocking side of things.

So let’s carry on for now shall we.

Choosing You Test Style

ScalaTest allows you to use 2 different styles of writing tests.

  • FunSuite : This is more in line with what you get with NUnit say. We would write something like Test(“testing should be easy”)
  • FlatSpec : This is more of a BDD style test declaration, where we would write something like this: “Testing” should “be easy”
     

We will see an examples of both of these styles in just a minute, but before that lets carry on and looks at some of the common things you may want to do with your tests

Setup / TearDown

You may want to run some startup/teardown code that is run. Typically startup would be used to setup mocks for your test cases, and that sort of thing.

In things like NUnit this would simply be done by creating a method and attributing it to say it is the Setup/TearDown methods.

In ScalaTest things are slightly different in that we need to mixin the “BeforeAndAfter”  trait to do this. Lets see an example:

import org.scalatest.{FunSuite, BeforeAndAfter}
import scala.collection.mutable.ListBuffer

class FunSuite_Example_Tests extends FunSuite with BeforeAndAfter {

  val builder = new StringBuilder
  val buffer = new ListBuffer[String]

  before {
    builder.append("ScalaTest is ")
  }

  after {
    builder.clear()
    buffer.clear()
  }
}

It can be seen in this example that the BeforeAndAfter trait, gives you 2 additional functions

  • before
  • after

You can use these to perform your startup/teardown logic.

This example uses the FunSuite style, but the “BeforeAndAfter”  trait mixin is done exactly the same for the FlatSpec style testing.

 

Writing A Test Using FunSuite

I think if you have come from a NUnit / XUnit type of background you will probably identify more with the FunSuite style of testing.

Here is an example of a set of FunSuite tests.

import org.scalatest.{FunSuite, BeforeAndAfter}

import scala.collection.mutable.ListBuffer

class FunSuite_Example_Tests extends FunSuite with BeforeAndAfter {

  val builder = new StringBuilder
  val buffer = new ListBuffer[String]

  before {
    builder.append("ScalaTest is ")
  }

  after {
    builder.clear()
    buffer.clear()
  }

  test("Testing should be easy") {
    builder.append("easy!")
    assert(builder.toString === "ScalaTest is easy!")
    assert(buffer.isEmpty)
    buffer += "sweet"
  }

  test("Testing should be fun") {
    builder.append("fun!")
    assert(builder.toString === "ScalaTest is fun!")
    assert(buffer.isEmpty)
  }
}

It can be see that they follow the very tried and tested approach of tools like NUnit, where you have a test(…) function, where “…” is the text that describes your testcase.

Nothing much more to say there apart from to make sure you mixin the FunSuite trait.

 

Writing A Test Using FlatSpec

ScalaTest also supports another way of writing your tests, which is to use the FlatSpec trait, which you would mixin instead of the FunSuite trait.

When you use FlatSpec you would be writing your tests more like this:

  • “Testing” should “be easy” in {…}
  • it should “be fun” in {…}

Its more of a BDD style way of creating your test cases.

Here is the exact same test suite that we saw above but this time written using the FlatSpec instead of FunSuite.

import scala.collection.mutable.ListBuffer
 
class FlatSpec_Example_Tests extends FlatSpec with BeforeAndAfter {
 
    val builder = new StringBuilder
    val buffer = new ListBuffer[String]
 
     before {
         builder.append("ScalaTest is ")
       }
 
     after {
         builder.clear()
         buffer.clear()
       }
 
    "Testing" should "be easy" in {
         builder.append("easy!")
         assert(builder.toString === "ScalaTest is easy!")
         assert(buffer.isEmpty)
         buffer += "sweet"
       }
 
     it should "be fun" in {
         builder.append("fun!")
         assert(builder.toString === "ScalaTest is fun!")
         assert(buffer.isEmpty)
       }
}

I don’t mind either, I guess it’s down to personal choice/taste at the end of the day.

Using Matchers

Matchers are ScalaTest’s way of providing additonal constraints to assert against. In some testing frameworks we would just use the Assert class for that along with things like

  • Assert.AreEqual(..)
  • Assert.IsNotNull(..)

In ScalaTest you can still use the assert(..) function, but matchers are also a good way of expressing your test conditional.

So what exactly are matchers?

In the words of the ScalaTest creators:

ScalaTest provides a domain specific language (DSL) for expressing assertions in tests using the word should.

So what do we need to do to use these ScalaTest matchers? Well quite simply we need to just mix in Matchers, like this:

import org.scalatest._

class ExampleSpec extends FlatSpec with Matchers { ...}

You can alternatively import the members of the trait, a technique particularly useful when you want to try out matcher expressions in the Scala interpeter. Here’s an example where the members of Matchers are imported:

import org.scalatest._
import Matchers._

class ExampleSpec extends FlatSpec { // Can use matchers here ...

So that give us the ability to use the ScalaTest matchers DSL. So what do these things look like. Lets see a couple of examples:

import org.scalatest._


class FlatSpec_Example_Tests extends FlatSpec with Matchers {

    "Testing" should "probably use some matchers" in {

          //equality examples
          Array(1, 2) should equal (Array(1, 2))
          val resultInt = 23
          resultInt should equal (3) // can customize equality
          resultInt should === (3)   // can customize equality and enforce type constraints
          resultInt should be (3)    // cannot customize equality, so fastest to compile
          resultInt shouldEqual 3    // can customize equality, no parentheses required
          resultInt shouldBe 3       // cannot customize equality, so fastest to compile, no parentheses required

          //length examples
          List(1,2) should have length 2
          "cat" should have length 3

          //string examples
          val helloWorld = "Hello worlld"
          helloWorld should startWith ("Hello")
          helloWorld should endWith ("world")

          val sevenString ="six seven eight"
          sevenString should include ("seven")

          //greater than / less than
          val one = 1
          val zero = 0
          val seven = 7
          one should be < seven
          one should be > zero
          one should be <= seven
          one should be >= zero

          //emptiness
          List() shouldBe empty
          List(1,2) should not be empty
       }

}

 

 

For more information on using matchers, you should consult this documentation, which you can find here:

http://www.scalatest.org/user_guide/using_matchers

 

 

C#, CodeProject

Scheduling With Quartz.Net

The other day I have a requirement to schedule something in my app to run at certain times, and at fixed intervals there after. Typically I would just solve this using either a simple Timer, or turn to my friend Reactive Extensions by way of Observable.Timer(..).

Thing is I decided to have a quick look at something I have always known about but never really used, for scheduling, which is Quartz.net, which actually does have some pretty good documentation up already:

http://www.quartz-scheduler.net/documentation/quartz-2.x/tutorial/index.html

For me I just wanted to get something very basic up and running, so I gave it a blast.

Step 1 : Install Quartz.net

This is as easy as installing the following NuGet package “Quartz

Step 2 Create A Job Class

This again is fairly easy thanks to Quartz nice API. Here is my job class

using System;
using Quartz;

namespace FooBar
{
    public class LoggingJob : IJob
    {
        public void Execute(IJobExecutionContext context)
        {


            Common.Logging.LogManager.Adapter.GetLogger("LoggingJob").Info(
                string.Format("Logging job : {0} {1}, and proceeding to log", 
                    DateTime.Now.ToShortDateString(), DateTime.Now.ToLongTimeString()));

        }
    }
}

That is all you need for a job really. The context object gives you access to a lot of useful stuff.

Step 3 : Setting up a schedule

Again this was mega easy, all I had to do was something like this:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

using Common.Logging;

using Quartz;
using Quartz.Impl;

namespace FooBar
{
    class Program
    {

        private static ILog Log = LogManager.GetCurrentClassLogger();

        static void Main(string[] args)
        {
            try
            {
                // construct a scheduler factory
                ISchedulerFactory schedFact = new StdSchedulerFactory();

                // get a scheduler
                IScheduler sched = schedFact.GetScheduler();
                sched.Start();

                IJobDetail job = JobBuilder.Create<LoggingJob>()
                    .WithIdentity("myJob", "group1")
                    .Build();

                ITrigger trigger = TriggerBuilder.Create()
                    .WithDailyTimeIntervalSchedule
                    (s =>
                        s.WithIntervalInSeconds(10)
                            .OnEveryDay()
                            .StartingDailyAt(TimeOfDay.HourAndMinuteOfDay(10, 15))
                    )
                    .Build();

                sched.ScheduleJob(job, trigger);
            }
            catch (ArgumentException e)
            {
                Log.Error(e);
            }


        }
    }
}

And that was enough for my job to get scheduled, every 10 seconds starting at 10:15 AM.

I was fairly happy with Quartz, and I will certainly make more use of it, when I have bigger, bolder, badder scheduling needs.

ASP MVC, CodeProject

Paper Effect Google Maps

A friend of mine Marlon Grech, has his own business and he has a nice parallax effect web site : http://www.thynksoftware.com/ and over there on his “contact us2 page, it has this very cool folding Google maps thing. I have wondered how it was done for a while now, today I decided to find out. A colleague of mine was like WHY!….Ashic if you are reading this, sorry but I think it is cool, but I promise you I will be back to trying to slay the world with sockets tomorrow, a slight distraction shall we say.

Not Really My Idea – Credit Where Credit Is Due

Now the code I present in this post is not my own at all, I have added the ability to toggle the folding of the map, but that really is all I have done. None the less, I think it is still of interest to describe how it was done, and worth a small write up. I think the original authors have done a great job, but did not really explain anything, so hopefully by the time you get to the end of this post, the effect will be a bit more familiar to you.

The original code that forms the basis to this post is here : http://experiments.bonnevoy.com/foldedmap/demo/

The Basic Idea

The idea is actually not too hard to grasp. There is a master DIV, which contains the actual Google map, this DIV has a VERY low opacity, so low you can’t actually see it. Then there are 6 other DIVS that get a slice of the original DIV, this is done by some clever margins, as can be seen in this image and the code that follows it:

image

The relevant HTML is here:

<!doctype html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Folding google maps</title>
    <link rel="stylesheet" type="text/css" href="style.css">
    <script src="http://maps.google.com/maps/api/js?sensor=true"></script>
    <script src="//ajax.googleapis.com/ajax/libs/jquery/1.7.1/jquery.min.js"></script>
    <script src="script.js"></script>
    <!–[if lt IE 9]>
    <script src="http://html5shim.googlecode.com/svn/trunk/html5.js"></script&gt;
    <![endif]–>
</head>
<body>
    <header>
        <a href="/foldedmap/" id="prev">Click to toggle</a>
        <p>Folded Google Maps (Webkit only)</p>
    </header>
    
    <div id="map_master"></div>
    <div id="container">
        <div id="mapContainer1" class="mapContainer odd first"></div>
        <div id="mapContainer2" class="mapContainer even"></div>
        <div id="mapContainer3" class="mapContainer odd"></div>
        <div id="mapContainer4" class="mapContainer even"></div>
        <div id="mapContainer5" class="mapContainer odd"></div>
                        <div id="mapContainer6" class="mapContainer even last"></div>
    </div>
</body>
</html>

Each of these slices is a fixed width of 160px and a height of 400px. The overflow is also hidden, that part is important, as will see in just a minute. The relevant CSS is here:

.mapContainer {
    background:#fff;
    float:left;
    width: 160px;
    height: 400px;
    overflow: hidden;
    border-top: 10px solid #fff;
    border-bottom: 10px solid #fff;
    -webkit-transform-style: preserve-3d;
    -moz-transform-style: preserve-3d;
    -ms-transform-style: preserve-3d;
    -o-transform-style: preserve-3d;
    transform-style: preserve-3d;
    box-shadow:0px 4px 0 #10335C;
}

 

Each slice of the original gets a certain margin applied, where the overflow for the 6 DIVS is hidden. So by moving the slice into the desired position by way of clever margin positioning the remaining portion of the map for that slice would not be seen, thanks to overflow being hidden. Sneaky

mapSync: function() {
    var map_clone = $('#map_master').clone().css('opacity',1).css('z-index',-1);
    $('#mapContainer1').html(map_clone.clone().css('marginLeft','80px'));
    $('#mapContainer2').html(map_clone.clone().css('marginLeft','240px'));
    $('#mapContainer3').html(map_clone.clone().css('marginLeft','400px'));
    $('#mapContainer4').html(map_clone.clone().css('marginLeft','560px'));
    $('#mapContainer5').html(map_clone.clone().css('marginLeft','720px'));
    $('#mapContainer6').html(map_clone.clone().css('marginLeft','880px'));
},

 

The next part of the trick is to apply some web kit 3D transforms. This is done inside a Timeout function so it happens every 10 milliseconds, up to/down from some predetermined values.

Here is the code that does applies the transforms

applyTransforms: function () {
    var prefixes = ['webkit', 'moz', 'ms', 'o', ''];
    for(i in prefixes) {
        $('.odd').css(prefixes[i] + 'Transform', 'rotateX(' +
            this.rotateX + 'deg) rotateY(' + –this.rotateY + 'deg)');
        $('.even').css(prefixes[i] + 'Transform', 'rotateX(' +
            this.rotateX + 'deg) rotateY(' + this.rotateY + 'deg)');
    }
    $('.mapContainer').css('marginLeft',
        -160 * (1 – Math.cos(this.rotateY / 360 * 2 * Math.PI)) + 'px');
},

 

It can be seen above that the applying of the transforms is done for each of the vendor specific flavours, Mozilla, Webkit etc etc

The code below unfolds/folds the 6 DIVS by calling he applyTransforms function outlined above.

unfoldMap: function () {
    if(this.rotateY > 20)
    {
        this.rotateY -= 0.5;
        this.rotateX += 0.1;
        this.applyTransforms();
    }
    else
    {
        clearInterval(this.fold_out);
        this.isOpen = true;
    }
},

foldMap: function () {
    if(this.rotateY < 90)
    {
        this.rotateY += 0.5;
        this.rotateX -= 0.1;
        this.applyTransforms();
    }
    else
    {
        clearInterval(this.fold_in);
        this.isOpen = false;
    }
}

 

Anyway like I say the code is pretty much as seen in the original link, all I added was the ability to toggle between a unfolded map back to a folded one.

Though I have added a bit more of an explanation, so hopefully it added some value

Where Is The Code?

Anyway if you want a small demo project I have created one for VS2013, that you can grab from GitHub here : https://github.com/sachabarber/FoldingCSSGooogleMap

IOS Code Like This

My old WPF Disciple buddy Colin Eberhardt has a similar post using Objective C for IOS development. May be of interest to some of you : http://www.scottlogic.com/blog/2013/09/20/creating-a-custom-flip-view-controller-transition.html

C#, CodeProject, Distributed Systems

ZeroMQ #7: A Simple Actor Model

Last time we looked at using ZeroMQ to use a “Divide And Conquer” pattern to distribute work to a number of workers and then combine the results again.

Since I wrote that last post I have had a bit of think about this series of posts, and realised that nothing I can say here would be as good or as thorough as the guide, so I have has to rethink my strategy a bit for the posts that I may write on ZeroMQ from here on in. So rather than me regurgitate what has already been said by Pieter on the guide web site, I will instead only be writing about stuff that I think is new, or worthy of a post. Now this could mean that the posts are less frequent, but I hope when there is one it will be of more interest, than me just saying here is a NetMQ version of the “Paranoid Pirate Pattern”, go check this link at the guide for more information.

So where does that leave this series of posts? Well to be honest slightly in limbo, but I have also been in contact with Pieter Hintjens, who was kind enough to give me a little push into something that may be of interest.

Pieter notified by of a Actor Model that was part of the high level C library for ZeroMQ called “czmq”, which is not contained in the NetMQ GitHub repository. So I had a call with Pieter, and looked into that.

This post will discuss a very simple actor model, that I have written to work with NetMQ, Pieter has given it the once over, and I have also talked it through with a regular ZeroMQ user at work, so I think it an ok version of the original C ZeroMQ “czmq” version.

Where Is The Code?

As always before we start, it’s only polite to tell you where the code is, and it is as before on GitHub:

https://github.com/sachabarber/ZeroMqDemos

What Is An Actor Model?

Here is what Wikipedia has to same in the introduction to what an Actor Model is.

The actor model in computer science is a mathematical model of concurrent computation that treats “actors” as the universal primitives of concurrent digital computation: in response to a message that it receives, an actor can make local decisions, create more actors, send more messages, and determine how to respond to the next message received.

….

….

The Actor model adopts the philosophy that everything is an actor. This is similar to the everything is an object philosophy used by some object-oriented programming languages, but differs in that object-oriented software is typically executed sequentially, while the Actor model is inherently concurrent.

An actor is a computational entity that, in response to a message it receives, can concurrently:

  • send a finite number of messages to other actors;
  • create a finite number of new actors;
  • designate the behavior to be used for the next message it receives.

There is no assumed sequence to the above actions and they could be carried out in parallel.

Decoupling the sender from communications sent was a fundamental advance of the Actor model enabling asynchronous communication and control structures as patterns of passing messages.[8]

Recipients of messages are identified by address, sometimes called “mailing address”. Thus an actor can only communicate with actors whose addresses it has. It can obtain those from a message it receives, or if the address is for an actor it has itself created.

The Actor model is characterized by inherent concurrency of computation within and among actors, dynamic creation of actors, inclusion of actor addresses in messages, and interaction only through direct asynchronous message passing with no restriction on message arrival order.

http://en.wikipedia.org/wiki/Actor_model

 

How I like to think of Actors is that they may be used to alleviate some of synchronization concerns of using shared data structures. This is achieved by your application code talking to actors via message passing/receiving. The actor itself may pass messages to other actors, or work on the passed message itself. By using message passing rather than using shared data structures, it may help to think of the actor (or any subsequent actors its send messages to) working on a copy of the data rather than working on the same shared structures. Which kind of gets rid of the need to worry about nasty things like lock(s) and any nasty timing issues that may arise from carrying out multi threaded code. If the actor is working with its own copy of the data then we should have no issues with other threads wanting to work with the data  the actor has, as the only place that data can be is within the actor itself, that is unless we pass another message to a different actor. If we were to do that though the new message to the other actor would also be a copy of the data, so would also be thread safe.

I hope you see what I am trying to explain there, may be a diagram may help.

Multi Threaded Application Using Shared Data Structure

A fairly common thing to do is have multiple threads running to speed things up, but then you realise that your threads need to mutate the state of some shared data structure, so then you have to involve threading synchronization primitives (most commonly lock(..) statements, to create your user defined critical sections). This will work, but now you are introducing artificial delays due to having to wait for the lock to be released so you can run Thread X’s code.

image 

To take this one step further, lets see some code that may illustrate this further, imagine we had this sort of data structure representing a very slim bank account

namespace ConsoleApplication1
{
    public class Account
    {

        public Account()
        {

        }

        public Account(int id, string name,
            string sortCode, decimal balance)
        {
            Id = id;
            Name = name;
            SortCode = sortCode;
            Balance = balance;
        }

        public int Id { get; set; }
        public string Name { get; set; }
        public string SortCode { get; set; }
        public decimal Balance { get; set; }
    }
}

 

Nothing fancy there, just some fields. So lets now move onto looking at some threading code, I have chosen to just show two threads acting on a shared Account instance.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Management.Instrumentation;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication1
{
    class Program
    {
        private object syncLock = new object();
        private Account clientBankAccount;
        public Program()
        {
            clientBankAccount = new Account(1,"sacha barber","112233",0);
        }

        public async Task Run()
        {
            try
            {
                await Task.Run(() =>
                {
                    Console.WriteLine("Tread Id {0}, Account balance before: {1}",
                        Thread.CurrentThread.ManagedThreadId, clientBankAccount.Balance);
                        
                    lock (syncLock)
                    {
                        Console.WriteLine("Tread Id {0}, Adding 10 to balance",
                           Thread.CurrentThread.ManagedThreadId);
                        clientBankAccount.Balance += 10;
                        Console.WriteLine("Tread Id {0}, Account balance before: {1}",
                            Thread.CurrentThread.ManagedThreadId, clientBankAccount.Balance);
                    }
                });

                await Task.Run(() =>
                {
                    Console.WriteLine("Tread Id {0}, Account balance before: {1}",
                        Thread.CurrentThread.ManagedThreadId, clientBankAccount.Balance);
                    lock (syncLock)
                    {
                        Console.WriteLine("Tread Id {0}, Subtracting 4 to balance",
                           Thread.CurrentThread.ManagedThreadId);
                        clientBankAccount.Balance -= 4;
                        Console.WriteLine("Tread Id {0}, Account balance before: {1}",
                            Thread.CurrentThread.ManagedThreadId, clientBankAccount.Balance);
                    }
                });
            }
            catch (Exception e)
            {
                Console.WriteLine(e);
            }

        }

        static void Main(string[] args)
        {
           Program p = new Program();
           p.Run().Wait();
           Console.ReadLine();
        }
    }
}

 

I have possible picked an example that you think may not actually happen in real life, and to be honest this scenario may not popup in real life, as who would do something as silly as crediting an account in one thread, and debiting it in another…we are all diligent developers, we would not let this one into the code would we?

To be honest whether the actual example has real world merit or not, the point remains the same, since we have more than one thread accessing a shared data structure, access to it must be synchronized, which is typically done using a lock(..) statement, as can be seen in the code.

Now don’t get me wrong the above code does work, as shown in the output below:

image

Perhaps there might be a more interesting way though!

Actor Model

The actor model, takes a different approach, where by message passing is used, which may involve some form of serialization as the messages are pass down the wire, which kind of guarantees no shared structures to contend with. Now I am not saying all Actor frameworks use message passing down the wire (serialization) but the code presented in this article does.

The basic idea is that each thread would talk to an Actor, and send/receive message with the actor.

If you wanted to get even more isolation, you could use thread local storage where each thread could have its own copy of the actor which it, and it alone talks to.

image

Anyway enough talking, I am sure some of you want to see the code right?

The Implementation

The idea is that the Actor itself may be treated like a ZeroMQ (NetMQ in my case) sockets, and may therefor be used to Send/Receive messages. Now you may be wondering if I send the Actor a message, who is listening to that message, and how is it that I am able to receive a message from the Actor?

The answer to that lies inside the implementation of the simple Actor framework in this post. Internally the Actor spins up another thread. Within that new thread is one end of a PairSocket where the Actor itself is the other end of the pipe which is also a PairSocket (recall I said the Actor is able to act as a socket). The Actor and the other end of the pipe communicate via message passing, and they use an in process (inproc) protocol to do so.

The initial message passed to the Actor forms some some of protocol  that both the other end of the Actor pipe (i.e. the thread the Actor created ZeroMQ czmq implementation), which I am calling the “shim” (borrowed from the) MUST know how to deal with the protocol that the user code sends via the Actor.

The way I have chosen to do this, is you (i.e. the user of the simple Actor library will need to create a “shim” handler class. This “shim” handler class may be a very simple protocol or a very complicated one (for the demo I have stuck to very simple ones), as long as it understands the message/command being sent from the Actor, and knows what to do with it. That is up to you to come up with, I have no silver bullet for that

One final thing to explain is that the “shim” handler may be passed some initial arguments (outside of the message passing) should you want to make use of this feature. It is something you may not want/need to use, but it is there should you want to use it.

Actor Code

Here is the code for the Actor itself, where it can be seen that it may be treated as a socket using the Send/Receive methods. The Actor also creates a new thread which is used to run the code in the shim handler. The Actor also creates the shim and the pair of PairSocket(s) for message passing.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using NetMQ;
using NetMQ.Sockets;
using NetMQ.zmq;

namespace NetMQActors
{
    /// <summary>
    /// The Actor represents one end of a two way pipe between 2 PairSocket(s). Where
    /// the actor may be passed messages, that are sent to the other end of the pipe
    /// which I am calling the "shim"
    /// </summary>
    public class Actor : IOutgoingSocket, IReceivingSocket, IDisposable
    {
        private readonly PairSocket self;
        private readonly Shim shim;
        private Random rand = new Random();
        private CancellationTokenSource cts = new CancellationTokenSource();

        private string GetEndPointName()
        {
            return string.Format("inproc://zactor-{0}-{1}",
                rand.Next(0, 10000), rand.Next(0, 10000));
        }

        public Actor(NetMQContext context, IShimHandler shimHandler, object[] args)
        {
            this.self = context.CreatePairSocket();
            this.shim = new Shim(shimHandler, context.CreatePairSocket());
            this.self.Options.SendHighWatermark = 1000;
            this.self.Options.SendHighWatermark = 1000;

            //now binding and connect pipe ends
            string endPoint = string.Empty;
            while (true)
            {
                Action bindAction = () =>
                {
                    endPoint = GetEndPointName();
                    self.Bind(endPoint);
                };

                try
                {
                    bindAction();
                    break;
                }
                catch (NetMQException nex)
                {
                    if (nex.ErrorCode == ErrorCode.EFAULT)
                    {
                        bindAction();
                    }
                }

            }

            shim.Pipe.Connect(endPoint);

            //Create Shim thread handler
            CreateShimThread(args);
        }

        private void CreateShimThread(object[] args)
        {
            Task shimTask = Task.Factory.StartNew(
                (state) => this.shim.Handler.Run(this.shim.Pipe, (object[])state, cts.Token),
                args,
                cts.Token,
                TaskCreationOptions.LongRunning,
                TaskScheduler.Default);

            shimTask.ContinueWith(ant =>
            {
                if (ant.Exception == null) return;

                Exception baseException = ant.Exception.Flatten().GetBaseException();
                if (baseException.GetType() == typeof (NetMQException))
                {
                    Console.WriteLine(string.Format("NetMQException caught : {0}",
                        baseException.Message));
                }
                else if (baseException.GetType() == typeof (ObjectDisposedException))
                {
                    Console.WriteLine(string.Format("ObjectDisposedException caught : {0}",
                        baseException.Message));
                }
                else
                {
                    Console.WriteLine(string.Format("Exception caught : {0}",
                        baseException.Message));
                }
            }, TaskContinuationOptions.OnlyOnFaulted);
        }

        ~Actor()
        {
            Dispose(false);
        }

        public void Dispose(){
            Dispose(true);
            GC.SuppressFinalize(this);
        }

        protected virtual void Dispose(bool disposing)
        {
            //cancel shim thread
            cts.Cancel();

            // release other disposable objects
            if (disposing)
            {
                if (self != null) self.Dispose();
                if (shim != null) shim.Dispose();
            }
        }

        public void Send(byte[] data, int length, bool dontWait = false, bool sendMore = false)
        {
            self.Send(data, length, dontWait, sendMore);
        }

        public byte[] Receive(bool dontWait, out bool hasMore)
        {
            return self.Receive(dontWait, out hasMore);
        }
    }
}

 

Shim Code

The shim represents the other end of the pipe, the shim essentially is a property bag, but it does hold a reference to the IShimHandler that the thread in the actual Actor will run. The IShimHandler is the one that MUST understand the protocol, and carry out any work.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

using NetMQ.Sockets;

namespace NetMQActors
{
    /// <summary>
    /// Shim represents one end of the in process pipe, where the Shim expects
    /// to be supplied with a <c>IShimHandler</c> that it would use for running the pipe
    /// protocol with the original Actor PairSocket the other end of the pipe
    /// </summary>
    public class Shim : IDisposable
    {
        public Shim(IShimHandler shimHandler, PairSocket pipe)
        {
            this.Handler = shimHandler;
            this.Pipe = pipe;
        }

        public IShimHandler Handler { get; private set; }
        public PairSocket Pipe { get; private set; }

        ~Shim()
        {
            Dispose(false);
        }

        public void Dispose(){
            Dispose(true);
            GC.SuppressFinalize(this);
        }

        protected virtual void Dispose(bool disposing)
        {
            if (disposing)
            {
                // release disposable objects
                if (Pipe != null) Pipe.Dispose();
            }
        }
    }
}

 

Handler Interface

You shim handler code MUST implement this interface

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using NetMQ.Sockets;

namespace NetMQActors
{
    /// <summary>
    /// Simple interface that all shims should implement
    /// </summary>
    public interface IShimHandler
    {
        void Run(PairSocket shim, object[] args, CancellationToken token);
    }
}

An Example : Simple EchoShim Handler

This example shows how to create a simple echo shim handler that can be used with standard actor code above. The EchoShimHandler presented here, uses an EXTREMELY simple protocol is does the following:

  • It expects the initial arguments to be 1 in length
  • It expects the initial argument element 0 to be “Hello World”
  • It expects a multi part message, where the 1st message frame is the command string “ECHO”

If all of those criteria are satisfied, then the EchoShimHandler will write the its end of the PairSocket pipe. The user of the Actor at the other end of the pipe (i.e. the other PairSocket), can then receive the value from the EchoShimHandler. Remember in this mini library the Actor may act as a regular NetMQ socket.

Here is the code for the EchoShimHandler

using System;
using System.Linq;
using System.Threading;
using NetMQ;
using NetMQ.Sockets;
using NetMQ.zmq;

namespace NetMQActors
{
    /// <summary>
    /// This hander class is specific implementation that you would need
    /// to implement per actor. This essentially contains your commands/protocol
    /// and should deal with any command workload, as well as sending back to the
    /// other end of the PairSocket which calling code would receive by using the
    /// Actor classes various RecieveXXX() methods
    ///
    /// This is a VERY simple protocol but it just demonstrates what you would need
    /// to do to implement your own Shim handler
    /// </summary>
    public class EchoShimHandler : IShimHandler
    {
        public void Run(PairSocket shim, object[] args, CancellationToken token)
        {
            if (args == null || args.Count() != 1 || (string)args[0] != "Hello World")
                throw new InvalidOperationException(
                    "Args were not correct, expected 'Hello World'");

            while (!token.IsCancellationRequested)
            {
                //Message for this actor/shim handler is expected to be
                //Frame[0] : Command
                //Frame[1] : Payload
                //
                //Result back to actor is a simple echoing of the Payload, where
                //the payload is prefixed with "ECHO BACK "
                NetMQMessage msg = null;

                //this may throw NetMQException if we have disposed of the actor
                //end of the pipe, and the CancellationToken.IsCancellationRequested
                //did not get picked up this loop cycle
                msg = shim.ReceiveMessage();

                if (msg == null)
                    break;

                if (msg[0].ConvertToString() == "ECHO")
                {
                    shim.Send(string.Format("ECHO BACK : {0}",
                        msg[1].ConvertToString()));
                }
                else
                {
                    throw NetMQException.Create("Unexpected command",
                        ErrorCode.EFAULT);
                }
            }
        }
    }
}

 

And here is the Actor test code that goes with this, where it can be seen that we are able send/receive using the Actor. There is also an example here that shows us trying to use a previously disposed Actor, which we expect to fail, and it does

//Round 1 : Should work fine
EchoShimHandler echoShimHandler = new EchoShimHandler();

Actor actor = new Actor(NetMQContext.Create(), echoShimHandler, new object[] { "Hello World" });
actor.SendMore("ECHO");
string actorMessage = "This is a string";
actor.Send(actorMessage);
var result = actor.ReceiveString();
Console.WriteLine("ROUND1");
Console.WriteLine("========================");
string expectedEchoHandlerResult = string.Format("ECHO BACK : {0}", actorMessage);
Console.WriteLine("ExpectedEchoHandlerResult: '{0}'\r\nGot : '{1}'\r\n",
    expectedEchoHandlerResult, result);
actor.Dispose();

//Round 2 : Should NOT work, as we are now using Disposed actor
try
{
    Console.WriteLine("ROUND2");
    Console.WriteLine("========================");
    actor.SendMore("ECHO");
    actor.Send("This is a string");
    result = actor.ReceiveString();
}
catch (NetMQException nex)
{
    Console.WriteLine("NetMQException : Actor has been disposed so this is expected\r\n");
}

//Round 3 : Should work fine
echoShimHandler = new EchoShimHandler();

actor = new Actor(NetMQContext.Create(), echoShimHandler, new object[] { "Hello World" });
actor.SendMore("ECHO");
actorMessage = "Another Go";
actor.Send(actorMessage);
result = actor.ReceiveString();
Console.WriteLine("ROUND3");
Console.WriteLine("========================");
expectedEchoHandlerResult = string.Format("ECHO BACK : {0}", actorMessage);
Console.WriteLine("ExpectedEchoHandlerResult: '{0}'\r\nGot : '{1}'\r\n",
    expectedEchoHandlerResult, result);
actor.Dispose();

 

Which would give output something like this

image

 

Another Example : Sending JSON Objects

This example shows how to create a simple account shim handler that can be used with standard actor code above. The AccountShimHandler presented here, uses another simple protocol (on purpose, you may choose to make this as simple or as complex as you wish) is does the following:

  • It expects the initial arguments to be 1 in length
  • It expects the initial argument element 0 to be a JSON serialized string of an AccountAction
  • It expects a multi part message, where the 1st message frame is the command string “AMEND ACCOUNT”
  • It expects the 2nd message frame to be a JSON serialized string of an Account

If all these criteria are met, then the AccountShimHandler deserialize the JSON Account object into an actual Account object, and will either debit/credit the Account that was passed into the AccountShimHandler, and then serialize the modified Account object back into JSON and send it back to the Actor via the PairSocket in the AccountShimHandler

The AccountAction class looks like this

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace NetMQActors.Models
{
    public enum TransactionType {  Debit=1, Credit=2}
    public class AccountAction
    {
        public AccountAction()
        {
            
        }

        public AccountAction(TransactionType transactionType, decimal amount)
        {
            TransactionType = transactionType;
            Amount = amount;
        }

        public TransactionType TransactionType { get; set; }
        public decimal Amount { get; set; }
    }
}

 

The Account class looks like this

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace NetMQActors.Models
{
    public class Account
    {

        public Account()
        {
            
        }

        public Account(int id, string name, string sortCode, decimal balance)
        {
            Id = id;
            Name = name;
            SortCode = sortCode;
            Balance = balance;
        }

        public int Id { get; set; }
        public string Name { get; set; }
        public string SortCode { get; set; }
        public decimal Balance { get; set; }
    }
}

 

Here is the code for the AccountShimHandler

using System;
using System.Linq;
using System.Threading;
using NetMQ;
using NetMQ.Sockets;
using NetMQ.zmq;
using NetMQActors.Models;
using Newtonsoft.Json;

namespace NetMQActors
{
    /// <summary>
    /// This hander class is specific implementation that you would need
    /// to implement per actor. This essentially contains your commands/protocol
    /// and should deal with any command workload, as well as sending back to the
    /// other end of the PairSocket which calling code would receive by using the
    /// Actor classes various RecieveXXX() methods
    ///
    /// This is a VERY simple protocol but it just demonstrates what you would need
    /// to do to implement your own Shim handler
    /// </summary>
    public class AccountShimHandler : IShimHandler
    {

        private void AmmendAccount(AccountAction action, Account account)
        {
            decimal currentAmount = account.Balance;
            account.Balance = action.TransactionType == TransactionType.Debit
                ? currentAmount – action.Amount
                : currentAmount + action.Amount;
        }

        public void Run(PairSocket shim, object[] args, CancellationToken token)
        {
            if (args == null || args.Count() != 1)
                throw new InvalidOperationException(
                    "Args were not correct, expected one argument");

            AccountAction accountAction = JsonConvert.DeserializeObject<AccountAction>(args[0].ToString());

            while (!token.IsCancellationRequested)
            {
                //Message for this actor/shim handler is expected to be
                //Frame[0] : Command
                //Frame[1] : Payload
                //
                //Result back to actor is a simple echoing of the Payload, where
                //the payload is prefixed with "AMEND ACCOUNT"
                NetMQMessage msg = null;

                //this may throw NetMQException if we have disposed of the actor
                //end of the pipe, and the CancellationToken.IsCancellationRequested
                //did not get picked up this loop cycle
                msg = shim.ReceiveMessage();

                if (msg == null)
                    break;

                if (msg[0].ConvertToString() == "AMEND ACCOUNT")
                {
                    string json = msg[1].ConvertToString();
                    Account account = JsonConvert.DeserializeObject<Account>(json);
                    AmmendAccount(accountAction, account);
                    shim.Send(JsonConvert.SerializeObject(account));
                }
                else
                {
                    throw NetMQException.Create("Unexpected command",
                        ErrorCode.EFAULT);
                }
            }
        }
    }
}

 

And here is the relevant Actor code

//Round 4 : Should work fine
AccountShimHandler accountShimHandler = new AccountShimHandler();

AccountAction accountAction = new AccountAction(TransactionType.Credit, 10);
Account account = new Account(1, "Test Account", "11223", 0);

Actor accountActor = new Actor(NetMQContext.Create(), accountShimHandler,
    new object[] { JsonConvert.SerializeObject(accountAction) });
accountActor.SendMore("AMEND ACCOUNT");
accountActor.Send(JsonConvert.SerializeObject(account));
Account updatedAccount =
    JsonConvert.DeserializeObject<Account>(accountActor.ReceiveString());
Console.WriteLine("ROUND4");
Console.WriteLine("========================");
decimal expectedAccountBalance = 10.0m;
Console.WriteLine(
    "Exected Account Balance: '{0}'\r\nGot : '{1}'\r\n" +
    "Are Same Account Object : '{2}'\r\n",
    expectedAccountBalance, updatedAccount.Balance,
    ReferenceEquals(accountActor, updatedAccount));
accountActor.Dispose();

 

Which gives a result something like this, if you read the code above you will see the Account object we send, and the receive are NOT the same object. This is due to the fact they have been sent down the wire using NetMQ sockets.

image

Some Actor Frameworks To Look At

There are a couple of Actor frameworks out there that I am aware of. Namely the following ones, there will be more, but these are the main ones I am aware of.

I have only really given Akka a cursory look, but I remember when Axum first came out and gave it a good try, and thought this is neat, no concurrency hell to worry about here. Cool.

For me at least, I wish there was an Actor model in .NET

C#, CodeProject, Distributed Systems

ZeroMQ #6 : Divide And Conquer

Last time we looked at how to send from multiple sockets. Believe it or not we have pretty much introduced most of the core concepts you will need. As a recap here is what we have already covered

So from here on in it is just a matter of going through some of the well known patterns from the ZeroMQ guide.

Now it would be immoral (even fraudulent) of me to not mention this up front, in the main, the information that I present in the remaining posts in this series of posts, will be based quite heavily on the ZeroMQ guide by Pieter Hintjens. Pieter has actually been in touch with me regarding this series of posts, and has been kind enough to let me run each new post by him. I think is generous, and I am extremely pleased to have Pieter on hand, to run them past. What that means to you, is that if there are any misunderstandings/mistakes on my behalf, I am sure Pieter will be pointing them out (at which point I will obviously correct any mistakes made,  hopefully I will not make any). So big thanks go out to Pieter, cheers as would say in England.

It is all good publicity for ZeroMQ though, and as NetMQ is a native port it is not one of the ones covered by the language bindings on the ZeroMQ guide site. So even though I am basing my content on the fantastic work done by Pieter, it will obviously be using NetMQ, so from that point of view the code is still very much relevant.

Where Is The Code?

As always before we start, it’s only polite to tell you where the code is, and it is as before on GitHub:

https://github.com/sachabarber/ZeroMqDemos

 

What Will We Be Doing This Time?

This time we will continue to look at ZeroMQ patterns. Which is actually what the remaining posts will all pretty much be focussed on.

The pattern that we will look at this time involves dividing a problem domain into smaller chunks and distributed them across workers, and then collating the results back together again.

This pattern is really a “divide and conquer” one, but it has also been called “Parallel Pipeline”. With all the remaining posts, I will be linking back to the original portion of the guide such that you can read more about the problem and Pieter’s solution.

ZeroMQ Guide Divide And Conquer : http://zguide.zeromq.org/page:all#Divide-and-Conquer

The idea is that you have something that generates work, and then distributes the work out to n-many workers. The workers each do some work, and push their results to some other process (could be a thread too) where the workers’ results are accumulated.

In the ZeroMQ guide, it shows an example that has the work generator just tell each worker to sleep for a period of time. I toyed with creating a more elaborate example than this, but in the end felt that the examples simplicity was quite important, so have stuck with the workload for each worker just being a value that tells the work to sleep for a number of Milliseconds (thus simulating some actual work).  This as I say has been borrowed from the ZeroMQ guide.

In real life the work could obviously be anything, though you would more than likely want the work to be something that could be cut up and distributed without the work generator caring/knowing how many workers there are.

Here is what we are trying to achieve :

image 

Ventilator

using System;
using NetMQ;

namespace Ventilator
{
    public class Program
    {
        public static void Main(string[] args)
        {
            // Task Ventilator
            // Binds PUSH socket to tcp://localhost:5557
            // Sends batch of tasks to workers via that socket
            Console.WriteLine("====== VENTILATOR ======");
           
            using (NetMQContext ctx = NetMQContext.Create())
            {
                //socket to send messages on
                using (var sender = ctx.CreatePushSocket())
                {
                    sender.Bind("tcp://*:5557");

                    using (var sink = ctx.CreatePushSocket())
                    {
                        sink.Connect("tcp://localhost:5558");

                        Console.WriteLine("Press enter when worker are ready");
                        Console.ReadLine();
                        
                        //the first message it "0" and signals start of batch
                        //see the Sink.csproj Program.cs file for where this is used
                        Console.WriteLine("Sending start of batch to Sink");    
                        sink.Send("0");

                        Console.WriteLine("Sending tasks to workers");

                        //initialise random number generator
                        Random rand= new Random(0);

                        //expected costs in Ms
                        int totalMs = 0;
                        
                        //send 100 tasks (workload for tasks, is just some random sleep time that
                        //the workers can perform, in real life each work would do more than sleep
                        for (int taskNumber = 0; taskNumber < 100; taskNumber++)
                        {
                            //Random workload from 1 to 100 msec
                            int workload = rand.Next(0, 100);
                            totalMs += workload;
                            Console.WriteLine("Workload : {0}", workload);
                            sender.Send(workload.ToString());
                        }
                        Console.WriteLine("Total expected cost : {0} msec", totalMs);
                        Console.WriteLine("Press Enter to quit");
                        Console.ReadLine();
                    }
                }
            }
        }
    }
}

 

Worker

using System;
using System.Threading;
using NetMQ;

namespace Worker
{
    public class Program
    {
        public static void Main(string[] args)
        {
            // Task Worker
            // Connects PULL socket to tcp://localhost:5557
            // collects workload for socket from Ventilator via that socket
            // Connects PUSH socket to tcp://localhost:5558
            // Sends results to Sink via that socket
            Console.WriteLine("====== WORKER ======");

            using (NetMQContext ctx = NetMQContext.Create())
            {
                //socket to receive messages on
                using (var receiver = ctx.CreatePullSocket())
                {
                    receiver.Connect("tcp://localhost:5557");

                     //socket to send messages on
                    using (var sender = ctx.CreatePushSocket())
                    {
                        sender.Connect("tcp://localhost:5558");

                        //process tasks forever
                        while (true)
                        {
                            //workload from the vetilator is a simple delay
                            //to simulate some work being done, see
                            //Ventilator.csproj Proram.cs for the workload sent
                            //In real life some more meaningful work would be done
                            string workload = receiver.ReceiveString();

                            //simulate some work being done
                            Thread.Sleep(int.Parse(workload));

                            //send results to sink, sink just needs to know worker
                            //is done, message content is not important, just the precence of
                            //a message means worker is done.
                            //See Sink.csproj Proram.cs
                            Console.WriteLine("Sending to Sink");
                            sender.Send(string.Empty);
                        }
                    }

                }
            }

        }
    }
}

 

Sink

using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using NetMQ;

namespace Sink
{
    public class Program
    {
        public static void Main(string[] args)
        {

            // Task Sink
            // Bindd PULL socket to tcp://localhost:5558
            // Collects results from workers via that socket
            Console.WriteLine("====== SINK ======");
           
            using (NetMQContext ctx = NetMQContext.Create())
            {
                //socket to receive messages on
                using (var receiver = ctx.CreatePullSocket())
                {
                    receiver.Bind("tcp://localhost:5558");

                    //wait for start of batch (see Ventilator.csproj Program.cs)
                    var startOfBatchTrigger = receiver.ReceiveString();
                    Console.WriteLine("Seen start of batch");

                    //Start our clock now
                    Stopwatch watch = new Stopwatch();
                    watch.Start();

                    for (int taskNumber = 0; taskNumber < 100; taskNumber++)
                    {
                        var workerDoneTrigger = receiver.ReceiveString();
                        if (taskNumber % 10 == 0)
                        {
                            Console.Write(":");
                        }
                        else
                        {
                            Console.Write(".");
                        }
                    }
                    watch.Stop();
                    //Calculate and report duration of batch
                    Console.WriteLine();
                    Console.WriteLine("Total elapsed time {0} msec", watch.ElapsedMilliseconds);
                    Console.ReadLine();
                }
            }
        }
    }
}

 

There is a couple of batch files you can use to spin up different amounts of workers, see:

Run1Worker.bat : One worker

Which when run should give you some output like this in the Sink process console output

====== SINK ======
Seen start of batch
:………:………:………:………:………:………:………:………
:………:………
Total elapsed time 5695 msec

 

Run2Worker.bat : two workers

Which when run should give you some output like this in the Sink process Console output

====== SINK ======
Seen start of batch
:………:………:………:………:………:………:………:………
:………:………
Total elapsed time 2959 msec

 

Run4Worker.bat : four workers

Which when run should give you some output like this in the Sink process Console output

====== SINK ======
Seen start of batch
:………:………:………:………:………:………:………:………
:………:………
Total elapsed time 1492 msec

 

There are a couple of points to be aware of with this pattern

  • The Ventilator uses a NetMQ PushSocket to distribute work to the workers, this is referred to as load balancing
  • The Ventilator and the Sink are the static parts of the system, where as workers are dynamic. It is trivial to add more workers, we can just spin up a new instance of a worker, and in theory the work gets done quicker.
  • We need to synchronize the starting of the batch (when workers are ready), as if we did not do that, the first worker that connected would get more messages that the rest, which is not really load balanced
  • The Sink uses a NetMQ PullSocket to accumulate the results from the workers