Powershell : Killing all processes of name

Now I am just starting with PowerShell, so I will likely come up with some ridiculously simple examples.

One of thing I really like about PowerShell is the ability to pipe things from one CmdLet  to another.

Imagine you want to get all instances of the the running process “notepad” and kill them.

This is easily achieved using the following code

Get-Process "notepad" | Stop-Process

 

Like  I say this is ridiculously simple, and hardy worthy of a blog post at all, but I aim to build a set of common tasks posts, and this will just form one of those.

So until next time

Powershell selecting from SQL Server

I am just getting into PowerShell, and today a work colleague of mine stated he has a table in SQL server that he needed to examine. The table contained names of files on the disk drive. He then needed to examine the names of the files in the table, and if the file existed rename it to include todays date. He asked if this could be done in PowerShell, at lunch I decided to try it and came up with this:

SQL

Say I have this table

USE [SachaTest]
GO

/****** Object:  Table [dbo].[Files]    Script Date: 10/22/2014 13:59:07 ******/
SET ANSI_NULLS ON
GO

SET QUOTED_IDENTIFIER ON
GO

CREATE TABLE [dbo].[Files](
      [FilePath] [nvarchar](max) NOT NULL
) ON [PRIMARY]

GO

Which was populated like this

INSERT INTO [SachaTest].[dbo].[Files]
           ([FilePath])
     VALUES
           ('C:\Users\barbers\Desktop\PowerShellTest\dummy1.txt')
GO
INSERT INTO [SachaTest].[dbo].[Files]
           ([FilePath])
     VALUES
           ('C:\Users\barbers\Desktop\PowerShellTest\dummy2.txt')
GO

Where I have the following directory on disk

image

I then came up with the following PowerShell file to carry out the work as described in the opening paragraph of this article

<# 
    Establish SQL connection, and grab the stuff from the 
   "Files" table
#>
$SqlConnection = New-Object System.Data.SqlClient.SqlConnection
$SqlConnection.ConnectionString = "Data Source=omnidev;Initial Catalog=SachaTest;
	Integrated Security=True;Timeout=180;MultipleActiveResultSets=true;"
$SqlCmd = New-Object System.Data.SqlClient.SqlCommand
$SqlCmd.CommandText = "select * from Files"
$SqlCmd.Connection = $SqlConnection
$SqlAdapter = New-Object System.Data.SqlClient.SqlDataAdapter
$SqlAdapter.SelectCommand = $SqlCmd
$DataSet = New-Object System.Data.DataSet
$SqlAdapter.Fill($DataSet)
$SqlConnection.Close()
 
 
<#
	Processes each DataRow handed to it, where it will grab the FilePath" column value within the DataRow
	and shall create a new file in the format of currentFilenameDDMMYY
#>
Function ProcessFile (){
    Process {
       
          # use the column called "FilePath" to grab the file name from within the DataRow
          $fileOnDisk = New-Object -TypeName System.IO.FileInfo($_["FilePath"])
          write-host "full name is : " + $fileOnDisk.FullName
          $datePartForFile = (Get-Date -format d).Replace("/","")
          $justFileName = [System.IO.Path]::GetFileNameWithoutExtension($fileOnDisk.FullName)
          $newFileName = $fileOnDisk.DirectoryName + '\' + $justFileName + '_' + 
			$datePartForFile + $fileOnDisk.Extension
          write-host "new file name is : " +  $newFileName
 
          If (Test-Path $newFileName){
               Remove-Item $newFileName
          }
 
          [System.IO.File]::Copy($fileOnDisk.FullName, $newFileName);
    }
}
 
# Skip null objects filter
filter Skip-Null { $_|?{ $_ } }
 
<#
	Loop through the DataSet.Tables[0] (where this will be the Files table, 
	which only has one column called "FilePath"
	then process each file by calling the "ProcessFile" function
#>
 
$DataSet.Tables[0] |
Select-Object $_.Rows |
Skip-Null |
ProcessFile

Which when run gives the following results

image

REST : Simple REST Framework

My wife has just given birth 2 my second son, and I took 2 weeks off to help around the house. Though I did find a little bit of time to try and create a very simple REST framework from scratch using just the base class library of .NET (i.e., no 3rd party libraries), ok I did use JSON.NET for serializing JSON, but other than that it is all my own code.

I found a bit more time this week, and decided to write it up over here:

http://www.codeproject.com/Articles/826383/REST-A-Simple-REST-framework

Now it is no way production quality code, and I would not use it ever, but it was a fun exercise, and I enjoyed the ride, may be interesting read for some of you.

Hope you enjoy

PS : I start a 10 week F# course next week, so you may see some more F# posts from me in the future.

Paper Effect Google Maps

A friend of mine Marlon Grech, has his own business and he has a nice parallax effect web site : http://www.thynksoftware.com/ and over there on his “contact us2 page, it has this very cool folding Google maps thing. I have wondered how it was done for a while now, today I decided to find out. A colleague of mine was like WHY!….Ashic if you are reading this, sorry but I think it is cool, but I promise you I will be back to trying to slay the world with sockets tomorrow, a slight distraction shall we say.

Not Really My Idea – Credit Where Credit Is Due

Now the code I present in this post is not my own at all, I have added the ability to toggle the folding of the map, but that really is all I have done. None the less, I think it is still of interest to describe how it was done, and worth a small write up. I think the original authors have done a great job, but did not really explain anything, so hopefully by the time you get to the end of this post, the effect will be a bit more familiar to you.

The original code that forms the basis to this post is here : http://experiments.bonnevoy.com/foldedmap/demo/

The Basic Idea

The idea is actually not too hard to grasp. There is a master DIV, which contains the actual Google map, this DIV has a VERY low opacity, so low you can’t actually see it. Then there are 6 other DIVS that get a slice of the original DIV, this is done by some clever margins, as can be seen in this image and the code that follows it:

image

The relevant HTML is here:

<!doctype html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Folding google maps</title>
    <link rel="stylesheet" type="text/css" href="style.css">
    <script src="http://maps.google.com/maps/api/js?sensor=true"></script>
    <script src="//ajax.googleapis.com/ajax/libs/jquery/1.7.1/jquery.min.js"></script>
    <script src="script.js"></script>
    <!–[if lt IE 9]>
    <script src="http://html5shim.googlecode.com/svn/trunk/html5.js"></script&gt;
    <![endif]–>
</head>
<body>
    <header>
        <a href="/foldedmap/" id="prev">Click to toggle</a>
        <p>Folded Google Maps (Webkit only)</p>
    </header>
    
    <div id="map_master"></div>
    <div id="container">
        <div id="mapContainer1" class="mapContainer odd first"></div>
        <div id="mapContainer2" class="mapContainer even"></div>
        <div id="mapContainer3" class="mapContainer odd"></div>
        <div id="mapContainer4" class="mapContainer even"></div>
        <div id="mapContainer5" class="mapContainer odd"></div>
                        <div id="mapContainer6" class="mapContainer even last"></div>
    </div>
</body>
</html>

Each of these slices is a fixed width of 160px and a height of 400px. The overflow is also hidden, that part is important, as will see in just a minute. The relevant CSS is here:

.mapContainer {
    background:#fff;
    float:left;
    width: 160px;
    height: 400px;
    overflow: hidden;
    border-top: 10px solid #fff;
    border-bottom: 10px solid #fff;
    -webkit-transform-style: preserve-3d;
    -moz-transform-style: preserve-3d;
    -ms-transform-style: preserve-3d;
    -o-transform-style: preserve-3d;
    transform-style: preserve-3d;
    box-shadow:0px 4px 0 #10335C;
}

 

Each slice of the original gets a certain margin applied, where the overflow for the 6 DIVS is hidden. So by moving the slice into the desired position by way of clever margin positioning the remaining portion of the map for that slice would not be seen, thanks to overflow being hidden. Sneaky

mapSync: function() {
    var map_clone = $('#map_master').clone().css('opacity',1).css('z-index',-1);
    $('#mapContainer1').html(map_clone.clone().css('marginLeft','-80px'));
    $('#mapContainer2').html(map_clone.clone().css('marginLeft','-240px'));
    $('#mapContainer3').html(map_clone.clone().css('marginLeft','-400px'));
    $('#mapContainer4').html(map_clone.clone().css('marginLeft','-560px'));
    $('#mapContainer5').html(map_clone.clone().css('marginLeft','-720px'));
    $('#mapContainer6').html(map_clone.clone().css('marginLeft','-880px'));
},

 

The next part of the trick is to apply some web kit 3D transforms. This is done inside a Timeout function so it happens every 10 milliseconds, up to/down from some predetermined values.

Here is the code that does applies the transforms

applyTransforms: function () {
    var prefixes = ['webkit', 'moz', 'ms', 'o', ''];
    for(i in prefixes) {
        $('.odd').css(prefixes[i] + 'Transform', 'rotateX(' +
            this.rotateX + 'deg) rotateY(' + -this.rotateY + 'deg)');
        $('.even').css(prefixes[i] + 'Transform', 'rotateX(' +
            this.rotateX + 'deg) rotateY(' + this.rotateY + 'deg)');
    }
    $('.mapContainer').css('marginLeft',
        -160 * (1 – Math.cos(this.rotateY / 360 * 2 * Math.PI)) + 'px');
},

 

It can be seen above that the applying of the transforms is done for each of the vendor specific flavours, Mozilla, Webkit etc etc

The code below unfolds/folds the 6 DIVS by calling he applyTransforms function outlined above.

unfoldMap: function () {
    if(this.rotateY > 20)
    {
        this.rotateY -= 0.5;
        this.rotateX += 0.1;
        this.applyTransforms();
    }
    else
    {
        clearInterval(this.fold_out);
        this.isOpen = true;
    }
},

foldMap: function () {
    if(this.rotateY < 90)
    {
        this.rotateY += 0.5;
        this.rotateX -= 0.1;
        this.applyTransforms();
    }
    else
    {
        clearInterval(this.fold_in);
        this.isOpen = false;
    }
}

 

Anyway like I say the code is pretty much as seen in the original link, all I added was the ability to toggle between a unfolded map back to a folded one.

Though I have added a bit more of an explanation, so hopefully it added some value

Where Is The Code?

Anyway if you want a small demo project I have created one for VS2013, that you can grab from GitHub here : https://github.com/sachabarber/FoldingCSSGooogleMap

IOS Code Like This

My old WPF Disciple buddy Colin Eberhardt has a similar post using Objective C for IOS development. May be of interest to some of you : http://www.scottlogic.com/blog/2013/09/20/creating-a-custom-flip-view-controller-transition.html

ZeroMQ #7: A Simple Actor Model

Last time we looked at using ZeroMQ to use a “Divide And Conquer” pattern to distribute work to a number of workers and then combine the results again.

Since I wrote that last post I have had a bit of think about this series of posts, and realised that nothing I can say here would be as good or as thorough as the guide, so I have has to rethink my strategy a bit for the posts that I may write on ZeroMQ from here on in. So rather than me regurgitate what has already been said by Pieter on the guide web site, I will instead only be writing about stuff that I think is new, or worthy of a post. Now this could mean that the posts are less frequent, but I hope when there is one it will be of more interest, than me just saying here is a NetMQ version of the “Paranoid Pirate Pattern”, go check this link at the guide for more information.

So where does that leave this series of posts? Well to be honest slightly in limbo, but I have also been in contact with Pieter Hintjens, who was kind enough to give me a little push into something that may be of interest.

Pieter notified by of a Actor Model that was part of the high level C library for ZeroMQ called “czmq”, which is not contained in the NetMQ GitHub repository. So I had a call with Pieter, and looked into that.

This post will discuss a very simple actor model, that I have written to work with NetMQ, Pieter has given it the once over, and I have also talked it through with a regular ZeroMQ user at work, so I think it an ok version of the original C ZeroMQ “czmq” version.

Where Is The Code?

As always before we start, it’s only polite to tell you where the code is, and it is as before on GitHub:

https://github.com/sachabarber/ZeroMqDemos

What Is An Actor Model?

Here is what Wikipedia has to same in the introduction to what an Actor Model is.

The actor model in computer science is a mathematical model of concurrent computation that treats “actors” as the universal primitives of concurrent digital computation: in response to a message that it receives, an actor can make local decisions, create more actors, send more messages, and determine how to respond to the next message received.

….

….

The Actor model adopts the philosophy that everything is an actor. This is similar to the everything is an object philosophy used by some object-oriented programming languages, but differs in that object-oriented software is typically executed sequentially, while the Actor model is inherently concurrent.

An actor is a computational entity that, in response to a message it receives, can concurrently:

  • send a finite number of messages to other actors;
  • create a finite number of new actors;
  • designate the behavior to be used for the next message it receives.

There is no assumed sequence to the above actions and they could be carried out in parallel.

Decoupling the sender from communications sent was a fundamental advance of the Actor model enabling asynchronous communication and control structures as patterns of passing messages.[8]

Recipients of messages are identified by address, sometimes called “mailing address”. Thus an actor can only communicate with actors whose addresses it has. It can obtain those from a message it receives, or if the address is for an actor it has itself created.

The Actor model is characterized by inherent concurrency of computation within and among actors, dynamic creation of actors, inclusion of actor addresses in messages, and interaction only through direct asynchronous message passing with no restriction on message arrival order.

http://en.wikipedia.org/wiki/Actor_model

 

How I like to think of Actors is that they may be used to alleviate some of synchronization concerns of using shared data structures. This is achieved by your application code talking to actors via message passing/receiving. The actor itself may pass messages to other actors, or work on the passed message itself. By using message passing rather than using shared data structures, it may help to think of the actor (or any subsequent actors its send messages to) working on a copy of the data rather than working on the same shared structures. Which kind of gets rid of the need to worry about nasty things like lock(s) and any nasty timing issues that may arise from carrying out multi threaded code. If the actor is working with its own copy of the data then we should have no issues with other threads wanting to work with the data  the actor has, as the only place that data can be is within the actor itself, that is unless we pass another message to a different actor. If we were to do that though the new message to the other actor would also be a copy of the data, so would also be thread safe.

I hope you see what I am trying to explain there, may be a diagram may help.

Multi Threaded Application Using Shared Data Structure

A fairly common thing to do is have multiple threads running to speed things up, but then you realise that your threads need to mutate the state of some shared data structure, so then you have to involve threading synchronization primitives (most commonly lock(..) statements, to create your user defined critical sections). This will work, but now you are introducing artificial delays due to having to wait for the lock to be released so you can run Thread X’s code.

image 

To take this one step further, lets see some code that may illustrate this further, imagine we had this sort of data structure representing a very slim bank account

namespace ConsoleApplication1
{
    public class Account
    {

        public Account()
        {

        }

        public Account(int id, string name,
            string sortCode, decimal balance)
        {
            Id = id;
            Name = name;
            SortCode = sortCode;
            Balance = balance;
        }

        public int Id { get; set; }
        public string Name { get; set; }
        public string SortCode { get; set; }
        public decimal Balance { get; set; }
    }
}

 

Nothing fancy there, just some fields. So lets now move onto looking at some threading code, I have chosen to just show two threads acting on a shared Account instance.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Management.Instrumentation;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication1
{
    class Program
    {
        private object syncLock = new object();
        private Account clientBankAccount;
        public Program()
        {
            clientBankAccount = new Account(1,"sacha barber","112233",0);
        }

        public async Task Run()
        {
            try
            {
                await Task.Run(() =>
                {
                    Console.WriteLine("Tread Id {0}, Account balance before: {1}",
                        Thread.CurrentThread.ManagedThreadId, clientBankAccount.Balance);
                        
                    lock (syncLock)
                    {
                        Console.WriteLine("Tread Id {0}, Adding 10 to balance",
                           Thread.CurrentThread.ManagedThreadId);
                        clientBankAccount.Balance += 10;
                        Console.WriteLine("Tread Id {0}, Account balance before: {1}",
                            Thread.CurrentThread.ManagedThreadId, clientBankAccount.Balance);
                    }
                });

                await Task.Run(() =>
                {
                    Console.WriteLine("Tread Id {0}, Account balance before: {1}",
                        Thread.CurrentThread.ManagedThreadId, clientBankAccount.Balance);
                    lock (syncLock)
                    {
                        Console.WriteLine("Tread Id {0}, Subtracting 4 to balance",
                           Thread.CurrentThread.ManagedThreadId);
                        clientBankAccount.Balance -= 4;
                        Console.WriteLine("Tread Id {0}, Account balance before: {1}",
                            Thread.CurrentThread.ManagedThreadId, clientBankAccount.Balance);
                    }
                });
            }
            catch (Exception e)
            {
                Console.WriteLine(e);
            }

        }

        static void Main(string[] args)
        {
           Program p = new Program();
           p.Run().Wait();
           Console.ReadLine();
        }
    }
}

 

I have possible picked an example that you think may not actually happen in real life, and to be honest this scenario may not popup in real life, as who would do something as silly as crediting an account in one thread, and debiting it in another…we are all diligent developers, we would not let this one into the code would we?

To be honest whether the actual example has real world merit or not, the point remains the same, since we have more than one thread accessing a shared data structure, access to it must be synchronized, which is typically done using a lock(..) statement, as can be seen in the code.

Now don’t get me wrong the above code does work, as shown in the output below:

image

Perhaps there might be a more interesting way though!

Actor Model

The actor model, takes a different approach, where by message passing is used, which may involve some form of serialization as the messages are pass down the wire, which kind of guarantees no shared structures to contend with. Now I am not saying all Actor frameworks use message passing down the wire (serialization) but the code presented in this article does.

The basic idea is that each thread would talk to an Actor, and send/receive message with the actor.

If you wanted to get even more isolation, you could use thread local storage where each thread could have its own copy of the actor which it, and it alone talks to.

image

Anyway enough talking, I am sure some of you want to see the code right?

The Implementation

The idea is that the Actor itself may be treated like a ZeroMQ (NetMQ in my case) sockets, and may therefor be used to Send/Receive messages. Now you may be wondering if I send the Actor a message, who is listening to that message, and how is it that I am able to receive a message from the Actor?

The answer to that lies inside the implementation of the simple Actor framework in this post. Internally the Actor spins up another thread. Within that new thread is one end of a PairSocket where the Actor itself is the other end of the pipe which is also a PairSocket (recall I said the Actor is able to act as a socket). The Actor and the other end of the pipe communicate via message passing, and they use an in process (inproc) protocol to do so.

The initial message passed to the Actor forms some some of protocol  that both the other end of the Actor pipe (i.e. the thread the Actor created ZeroMQ czmq implementation), which I am calling the “shim” (borrowed from the) MUST know how to deal with the protocol that the user code sends via the Actor.

The way I have chosen to do this, is you (i.e. the user of the simple Actor library will need to create a “shim” handler class. This “shim” handler class may be a very simple protocol or a very complicated one (for the demo I have stuck to very simple ones), as long as it understands the message/command being sent from the Actor, and knows what to do with it. That is up to you to come up with, I have no silver bullet for that

One final thing to explain is that the “shim” handler may be passed some initial arguments (outside of the message passing) should you want to make use of this feature. It is something you may not want/need to use, but it is there should you want to use it.

Actor Code

Here is the code for the Actor itself, where it can be seen that it may be treated as a socket using the Send/Receive methods. The Actor also creates a new thread which is used to run the code in the shim handler. The Actor also creates the shim and the pair of PairSocket(s) for message passing.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using NetMQ;
using NetMQ.Sockets;
using NetMQ.zmq;

namespace NetMQActors
{
    /// <summary>
    /// The Actor represents one end of a two way pipe between 2 PairSocket(s). Where
    /// the actor may be passed messages, that are sent to the other end of the pipe
    /// which I am calling the "shim"
    /// </summary>
    public class Actor : IOutgoingSocket, IReceivingSocket, IDisposable
    {
        private readonly PairSocket self;
        private readonly Shim shim;
        private Random rand = new Random();
        private CancellationTokenSource cts = new CancellationTokenSource();

        private string GetEndPointName()
        {
            return string.Format("inproc://zactor-{0}-{1}",
                rand.Next(0, 10000), rand.Next(0, 10000));
        }

        public Actor(NetMQContext context, IShimHandler shimHandler, object[] args)
        {
            this.self = context.CreatePairSocket();
            this.shim = new Shim(shimHandler, context.CreatePairSocket());
            this.self.Options.SendHighWatermark = 1000;
            this.self.Options.SendHighWatermark = 1000;

            //now binding and connect pipe ends
            string endPoint = string.Empty;
            while (true)
            {
                Action bindAction = () =>
                {
                    endPoint = GetEndPointName();
                    self.Bind(endPoint);
                };

                try
                {
                    bindAction();
                    break;
                }
                catch (NetMQException nex)
                {
                    if (nex.ErrorCode == ErrorCode.EFAULT)
                    {
                        bindAction();
                    }
                }

            }

            shim.Pipe.Connect(endPoint);

            //Create Shim thread handler
            CreateShimThread(args);
        }

        private void CreateShimThread(object[] args)
        {
            Task shimTask = Task.Factory.StartNew(
                (state) => this.shim.Handler.Run(this.shim.Pipe, (object[])state, cts.Token),
                args,
                cts.Token,
                TaskCreationOptions.LongRunning,
                TaskScheduler.Default);

            shimTask.ContinueWith(ant =>
            {
                if (ant.Exception == null) return;

                Exception baseException = ant.Exception.Flatten().GetBaseException();
                if (baseException.GetType() == typeof (NetMQException))
                {
                    Console.WriteLine(string.Format("NetMQException caught : {0}",
                        baseException.Message));
                }
                else if (baseException.GetType() == typeof (ObjectDisposedException))
                {
                    Console.WriteLine(string.Format("ObjectDisposedException caught : {0}",
                        baseException.Message));
                }
                else
                {
                    Console.WriteLine(string.Format("Exception caught : {0}",
                        baseException.Message));
                }
            }, TaskContinuationOptions.OnlyOnFaulted);
        }

        ~Actor()
        {
            Dispose(false);
        }

        public void Dispose(){
            Dispose(true);
            GC.SuppressFinalize(this);
        }

        protected virtual void Dispose(bool disposing)
        {
            //cancel shim thread
            cts.Cancel();

            // release other disposable objects
            if (disposing)
            {
                if (self != null) self.Dispose();
                if (shim != null) shim.Dispose();
            }
        }

        public void Send(byte[] data, int length, bool dontWait = false, bool sendMore = false)
        {
            self.Send(data, length, dontWait, sendMore);
        }

        public byte[] Receive(bool dontWait, out bool hasMore)
        {
            return self.Receive(dontWait, out hasMore);
        }
    }
}

 

Shim Code

The shim represents the other end of the pipe, the shim essentially is a property bag, but it does hold a reference to the IShimHandler that the thread in the actual Actor will run. The IShimHandler is the one that MUST understand the protocol, and carry out any work.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

using NetMQ.Sockets;

namespace NetMQActors
{
    /// <summary>
    /// Shim represents one end of the in process pipe, where the Shim expects
    /// to be supplied with a <c>IShimHandler</c> that it would use for running the pipe
    /// protocol with the original Actor PairSocket the other end of the pipe
    /// </summary>
    public class Shim : IDisposable
    {
        public Shim(IShimHandler shimHandler, PairSocket pipe)
        {
            this.Handler = shimHandler;
            this.Pipe = pipe;
        }

        public IShimHandler Handler { get; private set; }
        public PairSocket Pipe { get; private set; }

        ~Shim()
        {
            Dispose(false);
        }

        public void Dispose(){
            Dispose(true);
            GC.SuppressFinalize(this);
        }

        protected virtual void Dispose(bool disposing)
        {
            if (disposing)
            {
                // release disposable objects
                if (Pipe != null) Pipe.Dispose();
            }
        }
    }
}

 

Handler Interface

You shim handler code MUST implement this interface

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using NetMQ.Sockets;

namespace NetMQActors
{
    /// <summary>
    /// Simple interface that all shims should implement
    /// </summary>
    public interface IShimHandler
    {
        void Run(PairSocket shim, object[] args, CancellationToken token);
    }
}

An Example : Simple EchoShim Handler

This example shows how to create a simple echo shim handler that can be used with standard actor code above. The EchoShimHandler presented here, uses an EXTREMELY simple protocol is does the following:

  • It expects the initial arguments to be 1 in length
  • It expects the initial argument element 0 to be “Hello World”
  • It expects a multi part message, where the 1st message frame is the command string “ECHO”

If all of those criteria are satisfied, then the EchoShimHandler will write the its end of the PairSocket pipe. The user of the Actor at the other end of the pipe (i.e. the other PairSocket), can then receive the value from the EchoShimHandler. Remember in this mini library the Actor may act as a regular NetMQ socket.

Here is the code for the EchoShimHandler

using System;
using System.Linq;
using System.Threading;
using NetMQ;
using NetMQ.Sockets;
using NetMQ.zmq;

namespace NetMQActors
{
    /// <summary>
    /// This hander class is specific implementation that you would need
    /// to implement per actor. This essentially contains your commands/protocol
    /// and should deal with any command workload, as well as sending back to the
    /// other end of the PairSocket which calling code would receive by using the
    /// Actor classes various RecieveXXX() methods
    ///
    /// This is a VERY simple protocol but it just demonstrates what you would need
    /// to do to implement your own Shim handler
    /// </summary>
    public class EchoShimHandler : IShimHandler
    {
        public void Run(PairSocket shim, object[] args, CancellationToken token)
        {
            if (args == null || args.Count() != 1 || (string)args[0] != "Hello World")
                throw new InvalidOperationException(
                    "Args were not correct, expected 'Hello World'");

            while (!token.IsCancellationRequested)
            {
                //Message for this actor/shim handler is expected to be
                //Frame[0] : Command
                //Frame[1] : Payload
                //
                //Result back to actor is a simple echoing of the Payload, where
                //the payload is prefixed with "ECHO BACK "
                NetMQMessage msg = null;

                //this may throw NetMQException if we have disposed of the actor
                //end of the pipe, and the CancellationToken.IsCancellationRequested
                //did not get picked up this loop cycle
                msg = shim.ReceiveMessage();

                if (msg == null)
                    break;

                if (msg[0].ConvertToString() == "ECHO")
                {
                    shim.Send(string.Format("ECHO BACK : {0}",
                        msg[1].ConvertToString()));
                }
                else
                {
                    throw NetMQException.Create("Unexpected command",
                        ErrorCode.EFAULT);
                }
            }
        }
    }
}

 

And here is the Actor test code that goes with this, where it can be seen that we are able send/receive using the Actor. There is also an example here that shows us trying to use a previously disposed Actor, which we expect to fail, and it does

//Round 1 : Should work fine
EchoShimHandler echoShimHandler = new EchoShimHandler();

Actor actor = new Actor(NetMQContext.Create(), echoShimHandler, new object[] { "Hello World" });
actor.SendMore("ECHO");
string actorMessage = "This is a string";
actor.Send(actorMessage);
var result = actor.ReceiveString();
Console.WriteLine("ROUND1");
Console.WriteLine("========================");
string expectedEchoHandlerResult = string.Format("ECHO BACK : {0}", actorMessage);
Console.WriteLine("ExpectedEchoHandlerResult: '{0}'\r\nGot : '{1}'\r\n",
    expectedEchoHandlerResult, result);
actor.Dispose();

//Round 2 : Should NOT work, as we are now using Disposed actor
try
{
    Console.WriteLine("ROUND2");
    Console.WriteLine("========================");
    actor.SendMore("ECHO");
    actor.Send("This is a string");
    result = actor.ReceiveString();
}
catch (NetMQException nex)
{
    Console.WriteLine("NetMQException : Actor has been disposed so this is expected\r\n");
}

//Round 3 : Should work fine
echoShimHandler = new EchoShimHandler();

actor = new Actor(NetMQContext.Create(), echoShimHandler, new object[] { "Hello World" });
actor.SendMore("ECHO");
actorMessage = "Another Go";
actor.Send(actorMessage);
result = actor.ReceiveString();
Console.WriteLine("ROUND3");
Console.WriteLine("========================");
expectedEchoHandlerResult = string.Format("ECHO BACK : {0}", actorMessage);
Console.WriteLine("ExpectedEchoHandlerResult: '{0}'\r\nGot : '{1}'\r\n",
    expectedEchoHandlerResult, result);
actor.Dispose();

 

Which would give output something like this

image

 

Another Example : Sending JSON Objects

This example shows how to create a simple account shim handler that can be used with standard actor code above. The AccountShimHandler presented here, uses another simple protocol (on purpose, you may choose to make this as simple or as complex as you wish) is does the following:

  • It expects the initial arguments to be 1 in length
  • It expects the initial argument element 0 to be a JSON serialized string of an AccountAction
  • It expects a multi part message, where the 1st message frame is the command string “AMEND ACCOUNT”
  • It expects the 2nd message frame to be a JSON serialized string of an Account

If all these criteria are met, then the AccountShimHandler deserialize the JSON Account object into an actual Account object, and will either debit/credit the Account that was passed into the AccountShimHandler, and then serialize the modified Account object back into JSON and send it back to the Actor via the PairSocket in the AccountShimHandler

The AccountAction class looks like this

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace NetMQActors.Models
{
    public enum TransactionType {  Debit=1, Credit=2}
    public class AccountAction
    {
        public AccountAction()
        {
            
        }

        public AccountAction(TransactionType transactionType, decimal amount)
        {
            TransactionType = transactionType;
            Amount = amount;
        }

        public TransactionType TransactionType { get; set; }
        public decimal Amount { get; set; }
    }
}

 

The Account class looks like this

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace NetMQActors.Models
{
    public class Account
    {

        public Account()
        {
            
        }

        public Account(int id, string name, string sortCode, decimal balance)
        {
            Id = id;
            Name = name;
            SortCode = sortCode;
            Balance = balance;
        }

        public int Id { get; set; }
        public string Name { get; set; }
        public string SortCode { get; set; }
        public decimal Balance { get; set; }
    }
}

 

Here is the code for the AccountShimHandler

using System;
using System.Linq;
using System.Threading;
using NetMQ;
using NetMQ.Sockets;
using NetMQ.zmq;
using NetMQActors.Models;
using Newtonsoft.Json;

namespace NetMQActors
{
    /// <summary>
    /// This hander class is specific implementation that you would need
    /// to implement per actor. This essentially contains your commands/protocol
    /// and should deal with any command workload, as well as sending back to the
    /// other end of the PairSocket which calling code would receive by using the
    /// Actor classes various RecieveXXX() methods
    ///
    /// This is a VERY simple protocol but it just demonstrates what you would need
    /// to do to implement your own Shim handler
    /// </summary>
    public class AccountShimHandler : IShimHandler
    {

        private void AmmendAccount(AccountAction action, Account account)
        {
            decimal currentAmount = account.Balance;
            account.Balance = action.TransactionType == TransactionType.Debit
                ? currentAmount – action.Amount
                : currentAmount + action.Amount;
        }

        public void Run(PairSocket shim, object[] args, CancellationToken token)
        {
            if (args == null || args.Count() != 1)
                throw new InvalidOperationException(
                    "Args were not correct, expected one argument");

            AccountAction accountAction = JsonConvert.DeserializeObject<AccountAction>(args[0].ToString());

            while (!token.IsCancellationRequested)
            {
                //Message for this actor/shim handler is expected to be
                //Frame[0] : Command
                //Frame[1] : Payload
                //
                //Result back to actor is a simple echoing of the Payload, where
                //the payload is prefixed with "AMEND ACCOUNT"
                NetMQMessage msg = null;

                //this may throw NetMQException if we have disposed of the actor
                //end of the pipe, and the CancellationToken.IsCancellationRequested
                //did not get picked up this loop cycle
                msg = shim.ReceiveMessage();

                if (msg == null)
                    break;

                if (msg[0].ConvertToString() == "AMEND ACCOUNT")
                {
                    string json = msg[1].ConvertToString();
                    Account account = JsonConvert.DeserializeObject<Account>(json);
                    AmmendAccount(accountAction, account);
                    shim.Send(JsonConvert.SerializeObject(account));
                }
                else
                {
                    throw NetMQException.Create("Unexpected command",
                        ErrorCode.EFAULT);
                }
            }
        }
    }
}

 

And here is the relevant Actor code

//Round 4 : Should work fine
AccountShimHandler accountShimHandler = new AccountShimHandler();

AccountAction accountAction = new AccountAction(TransactionType.Credit, 10);
Account account = new Account(1, "Test Account", "11223", 0);

Actor accountActor = new Actor(NetMQContext.Create(), accountShimHandler,
    new object[] { JsonConvert.SerializeObject(accountAction) });
accountActor.SendMore("AMEND ACCOUNT");
accountActor.Send(JsonConvert.SerializeObject(account));
Account updatedAccount =
    JsonConvert.DeserializeObject<Account>(accountActor.ReceiveString());
Console.WriteLine("ROUND4");
Console.WriteLine("========================");
decimal expectedAccountBalance = 10.0m;
Console.WriteLine(
    "Exected Account Balance: '{0}'\r\nGot : '{1}'\r\n" +
    "Are Same Account Object : '{2}'\r\n",
    expectedAccountBalance, updatedAccount.Balance,
    ReferenceEquals(accountActor, updatedAccount));
accountActor.Dispose();

 

Which gives a result something like this, if you read the code above you will see the Account object we send, and the receive are NOT the same object. This is due to the fact they have been sent down the wire using NetMQ sockets.

image

Some Actor Frameworks To Look At

There are a couple of Actor frameworks out there that I am aware of. Namely the following ones, there will be more, but these are the main ones I am aware of.

I have only really given Akka a cursory look, but I remember when Axum first came out and gave it a good try, and thought this is neat, no concurrency hell to worry about here. Cool.

For me at least, I wish there was an Actor model in .NET

ZeroMQ #6 : Divide And Conquer

Last time we looked at how to send from multiple sockets. Believe it or not we have pretty much introduced most of the core concepts you will need. As a recap here is what we have already covered

So from here on in it is just a matter of going through some of the well known patterns from the ZeroMQ guide.

Now it would be immoral (even fraudulent) of me to not mention this up front, in the main, the information that I present in the remaining posts in this series of posts, will be based quite heavily on the ZeroMQ guide by Pieter Hintjens. Pieter has actually been in touch with me regarding this series of posts, and has been kind enough to let me run each new post by him. I think is generous, and I am extremely pleased to have Pieter on hand, to run them past. What that means to you, is that if there are any misunderstandings/mistakes on my behalf, I am sure Pieter will be pointing them out (at which point I will obviously correct any mistakes made,  hopefully I will not make any). So big thanks go out to Pieter, cheers as would say in England.

It is all good publicity for ZeroMQ though, and as NetMQ is a native port it is not one of the ones covered by the language bindings on the ZeroMQ guide site. So even though I am basing my content on the fantastic work done by Pieter, it will obviously be using NetMQ, so from that point of view the code is still very much relevant.

Where Is The Code?

As always before we start, it’s only polite to tell you where the code is, and it is as before on GitHub:

https://github.com/sachabarber/ZeroMqDemos

 

What Will We Be Doing This Time?

This time we will continue to look at ZeroMQ patterns. Which is actually what the remaining posts will all pretty much be focussed on.

The pattern that we will look at this time involves dividing a problem domain into smaller chunks and distributed them across workers, and then collating the results back together again.

This pattern is really a “divide and conquer” one, but it has also been called “Parallel Pipeline”. With all the remaining posts, I will be linking back to the original portion of the guide such that you can read more about the problem and Pieter’s solution.

ZeroMQ Guide Divide And Conquer : http://zguide.zeromq.org/page:all#Divide-and-Conquer

The idea is that you have something that generates work, and then distributes the work out to n-many workers. The workers each do some work, and push their results to some other process (could be a thread too) where the workers’ results are accumulated.

In the ZeroMQ guide, it shows an example that has the work generator just tell each worker to sleep for a period of time. I toyed with creating a more elaborate example than this, but in the end felt that the examples simplicity was quite important, so have stuck with the workload for each worker just being a value that tells the work to sleep for a number of Milliseconds (thus simulating some actual work).  This as I say has been borrowed from the ZeroMQ guide.

In real life the work could obviously be anything, though you would more than likely want the work to be something that could be cut up and distributed without the work generator caring/knowing how many workers there are.

Here is what we are trying to achieve :

image 

Ventilator

using System;
using NetMQ;

namespace Ventilator
{
    public class Program
    {
        public static void Main(string[] args)
        {
            // Task Ventilator
            // Binds PUSH socket to tcp://localhost:5557
            // Sends batch of tasks to workers via that socket
            Console.WriteLine("====== VENTILATOR ======");
           
            using (NetMQContext ctx = NetMQContext.Create())
            {
                //socket to send messages on
                using (var sender = ctx.CreatePushSocket())
                {
                    sender.Bind("tcp://*:5557");

                    using (var sink = ctx.CreatePushSocket())
                    {
                        sink.Connect("tcp://localhost:5558");

                        Console.WriteLine("Press enter when worker are ready");
                        Console.ReadLine();
                        
                        //the first message it "0" and signals start of batch
                        //see the Sink.csproj Program.cs file for where this is used
                        Console.WriteLine("Sending start of batch to Sink");    
                        sink.Send("0");

                        Console.WriteLine("Sending tasks to workers");

                        //initialise random number generator
                        Random rand= new Random(0);

                        //expected costs in Ms
                        int totalMs = 0;
                        
                        //send 100 tasks (workload for tasks, is just some random sleep time that
                        //the workers can perform, in real life each work would do more than sleep
                        for (int taskNumber = 0; taskNumber < 100; taskNumber++)
                        {
                            //Random workload from 1 to 100 msec
                            int workload = rand.Next(0, 100);
                            totalMs += workload;
                            Console.WriteLine("Workload : {0}", workload);
                            sender.Send(workload.ToString());
                        }
                        Console.WriteLine("Total expected cost : {0} msec", totalMs);
                        Console.WriteLine("Press Enter to quit");
                        Console.ReadLine();
                    }
                }
            }
        }
    }
}

 

Worker

using System;
using System.Threading;
using NetMQ;

namespace Worker
{
    public class Program
    {
        public static void Main(string[] args)
        {
            // Task Worker
            // Connects PULL socket to tcp://localhost:5557
            // collects workload for socket from Ventilator via that socket
            // Connects PUSH socket to tcp://localhost:5558
            // Sends results to Sink via that socket
            Console.WriteLine("====== WORKER ======");

            using (NetMQContext ctx = NetMQContext.Create())
            {
                //socket to receive messages on
                using (var receiver = ctx.CreatePullSocket())
                {
                    receiver.Connect("tcp://localhost:5557");

                     //socket to send messages on
                    using (var sender = ctx.CreatePushSocket())
                    {
                        sender.Connect("tcp://localhost:5558");

                        //process tasks forever
                        while (true)
                        {
                            //workload from the vetilator is a simple delay
                            //to simulate some work being done, see
                            //Ventilator.csproj Proram.cs for the workload sent
                            //In real life some more meaningful work would be done
                            string workload = receiver.ReceiveString();

                            //simulate some work being done
                            Thread.Sleep(int.Parse(workload));

                            //send results to sink, sink just needs to know worker
                            //is done, message content is not important, just the precence of
                            //a message means worker is done.
                            //See Sink.csproj Proram.cs
                            Console.WriteLine("Sending to Sink");
                            sender.Send(string.Empty);
                        }
                    }

                }
            }

        }
    }
}

 

Sink

using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using NetMQ;

namespace Sink
{
    public class Program
    {
        public static void Main(string[] args)
        {

            // Task Sink
            // Bindd PULL socket to tcp://localhost:5558
            // Collects results from workers via that socket
            Console.WriteLine("====== SINK ======");
           
            using (NetMQContext ctx = NetMQContext.Create())
            {
                //socket to receive messages on
                using (var receiver = ctx.CreatePullSocket())
                {
                    receiver.Bind("tcp://localhost:5558");

                    //wait for start of batch (see Ventilator.csproj Program.cs)
                    var startOfBatchTrigger = receiver.ReceiveString();
                    Console.WriteLine("Seen start of batch");

                    //Start our clock now
                    Stopwatch watch = new Stopwatch();
                    watch.Start();

                    for (int taskNumber = 0; taskNumber < 100; taskNumber++)
                    {
                        var workerDoneTrigger = receiver.ReceiveString();
                        if (taskNumber % 10 == 0)
                        {
                            Console.Write(":");
                        }
                        else
                        {
                            Console.Write(".");
                        }
                    }
                    watch.Stop();
                    //Calculate and report duration of batch
                    Console.WriteLine();
                    Console.WriteLine("Total elapsed time {0} msec", watch.ElapsedMilliseconds);
                    Console.ReadLine();
                }
            }
        }
    }
}

 

There is a couple of batch files you can use to spin up different amounts of workers, see:

Run1Worker.bat : One worker

Which when run should give you some output like this in the Sink process console output

====== SINK ======
Seen start of batch
:………:………:………:………:………:………:………:………
:………:………
Total elapsed time 5695 msec

 

Run2Worker.bat : two workers

Which when run should give you some output like this in the Sink process Console output

====== SINK ======
Seen start of batch
:………:………:………:………:………:………:………:………
:………:………
Total elapsed time 2959 msec

 

Run4Worker.bat : four workers

Which when run should give you some output like this in the Sink process Console output

====== SINK ======
Seen start of batch
:………:………:………:………:………:………:………:………
:………:………
Total elapsed time 1492 msec

 

There are a couple of points to be aware of with this pattern

  • The Ventilator uses a NetMQ PushSocket to distribute work to the workers, this is referred to as load balancing
  • The Ventilator and the Sink are the static parts of the system, where as workers are dynamic. It is trivial to add more workers, we can just spin up a new instance of a worker, and in theory the work gets done quicker.
  • We need to synchronize the starting of the batch (when workers are ready), as if we did not do that, the first worker that connected would get more messages that the rest, which is not really load balanced
  • The Sink uses a NetMQ PullSocket to accumulate the results from the workers

ZeroMQ #5 : Sending From Multiple Sockets

Last time we looked at how to use the Poller to work with multiple sockets, and detect their readiness. This time we will continue to work with the familiar request/response model that we have been using thus far. We will however be beefing things up a bit, and shall examine several ways in which you can have more than one thread pushing messages to the server and getting responses, which is a fairly typical requirement (at least in my book it is).

Where Is The Code?

As always before we start, it’s only polite to tell you where the code is, and it is as before on GitHub:

https://github.com/sachabarber/ZeroMqDemos

 

One Thing Before We Start

As you may have realised by now, ZeroMQ is a messaging library, and as such, promotes the idea of lock free messaging. I also happen to think this is a very good idea. You can achieve an excellent throughput of messages and save yourself a lot of synchronization pain, if you try and avoid shared data structures. By doing this you will also be saving yourself the pain of having to synchronize access to them. So in general try and work with ZeroMQ in the way it wants to be worked with, which is via message passing, and avoiding locks, shared data structures.

 

Setting The Scene For This Post

Ok so we are nearly at the point where we can start to look at some code, but before we do that, let’s just talk a little bit more about what this post is trying to discuss.

In the code I typically write, it is quite common for a bunch of client threads all to be running at once, each capable of talking to the server. If this sounds like a requirement that you have had to deal with, then you may find this post of use, as this is exactly the scenario this post is aimed at solving.

As the aim of this post is to have asynchronous client, we need a asynchronous server too, so we use DealerSocket(s) for the client(s) and a RouterSocket for the server.

As with most things there is more than one way to skin a cat, so we will look at a couple of options, each with the their own pros/cons.

 

Option 1 : Each Thread Has It’ Own DealerSocket

The first options does need a bit of .NET threading knowledge, but if you have that, then the idea is a simple one. For each client thread we also create a dedicated DealerSocket that *should be* used exclusively by that thread.

This is achieved using the ThreadLocal<T> .NET class, which allows us to have a DealerSocket per thread. We add each of the client created DealerSocket(s) to a Poller instance, and listen to the ReceieveReady event on each socket, which allows us to get the message back from the server.

The obvious downside to this approach is that there will be more socket(s) created on the client side. The upside is that it is  very easy to implement, and just works.

Here is an image showing what we are trying to achieve here

image

Here is the code for this scenario:

using System;
using System.Threading;
using System.Threading.Tasks;
using NetMQ;
using NetMQ.Sockets;

namespace ManualThreadingDemo
{
    public class Program
    {
        public void Run()
        {

            //NOTES
            //1. Use ThreadLocal<DealerSocket> where each thread has
            //  its own client DealerSocket to talk to server
            //2. Each thread can send using it own socket
            //3. Each thread socket is added to poller
            
            ThreadLocal<DealerSocket> clientSocketPerThread =
                new ThreadLocal<DealerSocket>();
            int delay = 3000;
            Poller poller = new Poller();

            using (NetMQContext ctx = NetMQContext.Create())
            {
                using (var server = ctx.CreateRouterSocket())
                {
                    server.Bind("tcp://127.0.0.1:5556");

                    //start some threads, each with its own DealerSocket
                    //to talk to the server socket. Creates lots of sockets,
                    //but no nasty race conditions no shared state, each
                    //thread has its own socket, happy days
                    for (int i = 0; i < 3; i++)
                    {
                        Task.Factory.StartNew((state) =>
                        {
                            DealerSocket client = null;

                            if (!clientSocketPerThread.IsValueCreated)
                            {
                                client = ctx.CreateDealerSocket();
                                client.Connect("tcp://127.0.0.1:5556");
                                client.ReceiveReady += Client_ReceiveReady;
                                clientSocketPerThread.Value = client;
                                poller.AddSocket(client);
                            }
                            else
                            {
                                client = clientSocketPerThread.Value;
                            }

                            while (true)
                            {
                                var messageToServer = new NetMQMessage();
                                messageToServer.AppendEmptyFrame();
                                messageToServer.Append(state.ToString());
                                client.SendMessage(messageToServer);
                                Thread.Sleep(delay);
                            }

                        },string.Format("client {0}", i), TaskCreationOptions.LongRunning);
                    }

                    //start the poller
                    Task task = Task.Factory.StartNew(poller.Start);

                    //server loop
                    while (true)
                    {
                        var clientMessage = server.ReceiveMessage();
                        Console.WriteLine("========================");
                        Console.WriteLine(" INCOMING CLIENT MESSAGE ");
                        Console.WriteLine("========================");
                        for (int i = 0; i < clientMessage.FrameCount; i++)
                        {
                            Console.WriteLine("Frame[{0}] = {1}", i,
                                clientMessage[i].ConvertToString());
                        }

                        if (clientMessage.FrameCount == 3)
                        {
                            var clientAddress = clientMessage[0];
                            var clientOriginalMessage = clientMessage[2].ConvertToString();
                            string response = string.Format("{0} back from server {1}",
                                clientOriginalMessage, DateTime.Now.ToLongTimeString());
                            var messageToClient = new NetMQMessage();
                            messageToClient.Append(clientAddress);
                            messageToClient.AppendEmptyFrame();
                            messageToClient.Append(response);
                            server.SendMessage(messageToClient);
                        }
                    }
                }
            }
        }

        void Client_ReceiveReady(object sender, NetMQSocketEventArgs e)
        {
            bool hasmore = false;
            e.Socket.Receive(out hasmore);
            if (hasmore)
            {
                string result = e.Socket.ReceiveString(out hasmore);
                Console.WriteLine("REPLY " + result);
            }
        }

        [STAThread]
        public static void Main(string[] args)
        {
            Program p = new Program();
            p.Run();
        }

    }
}

 

If you were to run this, you would see something like this:

image

 

Option 2 : Each Thread Delegates Of To A Local Broker

The next example keeps the idea of a separate threads that want to send message(s) to the server. This time however we will use a broker on the client side. The idea being that the client threads will push to a shared data queue, I know I have told you to avoid shared data structures. Thing is, this is not a shared data structure it is just a thread safe queue, that many threads can write to. Where as a a shared data structure may mean several threads all trying to update the current Bid rate of an Fx option quote price. There is a difference. OK the shared queue will have some synchronization somewhere to make it thread safe, thankfully we can rely on the good work of the PFX team at Microsoft for that. Those guys are smart and I am sure the Concurrent collections namespace is pretty well designed and can be trusted to be pretty optimal.

Again we need to call on a bit of .NET know how, so for the centralized queue we use a ConcurrentQueue<T>. All client threads will enqueue  their messages for the server here.

There will also be another thread started. This extra thread is the one that will be processing the messages that have been queued onto the centralized queue. When there is a message taken of the centralized queue it will be sent to the server. The thing is only the thread that reads from the centralized queue will send messages to the server.

As we still want messages to be sent out asynchronously we stick with using a DealerSocket, but since their is now only one place where we send messages to the server we only need a single DealerSocket.

We add the SINGLE DealerSocket(s) to a Poller instance, and listen to the ReceieveReady event on each socket, which allows us to get the message back from the server.

This is more complex than the first example as there are more moving parts, but we no longer have loads of sockets being create. There is just one.

As before here is a diagram of what we are trying to achieve here

image

Here is the code for this scenario:

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using NetMQ;

namespace ConcurrentQueueDemo
{
    public class Program
    {
        public void Run()
        {
            //NOTES
            //1. Use many threads each writing to ConcurrentQueue
            //2. Extra thread to read from ConcurrentQueue, and this is the one that
            //   will deal with writing to the server
            ConcurrentQueue<string> messages = new ConcurrentQueue<string>();
            int delay = 3000;
            Poller poller = new Poller();

            using (NetMQContext ctx = NetMQContext.Create())
            {
                using (var server = ctx.CreateRouterSocket())
                {
                    server.Bind("tcp://127.0.0.1:5556");

                    //start some threads, where each thread, will use a client side
                    //broker (simple thread that monitors a CooncurrentQueue), where
                    //ONLY the client side broker talks to the server
                    for (int i = 0; i < 3; i++)
                    {
                        Task.Factory.StartNew((state) =>
                        {
                            while (true)
                            {
                                messages.Enqueue(state.ToString());
                                Thread.Sleep(delay);
                            }

                        }, string.Format("client {0}", i), TaskCreationOptions.LongRunning);
                    }

                    //single sender loop
                    Task.Factory.StartNew((state) =>
                    {
                        var client = ctx.CreateDealerSocket();
                        client.Connect("tcp://127.0.0.1:5556");
                        client.ReceiveReady += Client_ReceiveReady;
                        poller.AddSocket(client);

                        while (true)
                        {
                            string clientMessage = null;
                            if (messages.TryDequeue(out clientMessage))
                            {
                                var messageToServer = new NetMQMessage();
                                messageToServer.AppendEmptyFrame();
                                messageToServer.Append(clientMessage);
                                client.SendMessage(messageToServer);
                            }
                        }

                    }, TaskCreationOptions.LongRunning);

                    //start the poller
                    Task task = Task.Factory.StartNew(poller.Start);

                    //server loop
                    while (true)
                    {
                        var clientMessage = server.ReceiveMessage();
                        Console.WriteLine("========================");
                        Console.WriteLine(" INCOMING CLIENT MESSAGE ");
                        Console.WriteLine("========================");
                        for (int i = 0; i < clientMessage.FrameCount; i++)
                        {
                            Console.WriteLine("Frame[{0}] = {1}", i,
                                clientMessage[i].ConvertToString());
                        }

                        if (clientMessage.FrameCount == 3)
                        {
                            var clientAddress = clientMessage[0];
                            var clientOriginalMessage = clientMessage[2].ConvertToString();
                            string response = string.Format("{0} back from server {1}",
                                clientOriginalMessage, DateTime.Now.ToLongTimeString());
                            var messageToClient = new NetMQMessage();
                            messageToClient.Append(clientAddress);
                            messageToClient.AppendEmptyFrame();
                            messageToClient.Append(response);
                            server.SendMessage(messageToClient);
                        }
                    }
                }
            }
        }

        void Client_ReceiveReady(object sender, NetMQSocketEventArgs e)
        {
            bool hasmore = false;
            e.Socket.Receive(out hasmore);
            if (hasmore)
            {
                string result = e.Socket.ReceiveString(out hasmore);
                Console.WriteLine("REPLY " + result);
            }
        }

        [STAThread]
        public static void Main(string[] args)
        {
            Program p = new Program();
            p.Run();
        }
    }
}

 

If you were to run this, you would see something like this:

image

 

Option 3 : Use NetMQScheduler

The final option is to use the NetMQ library class : NetMQScheduler. I think the best place to start with that is by reading the link I just included. Then come back here.

…….

…….

Time passes

…….

…….

Oh hello you’re back. Ok so now you know that the NetMQScheduler offers us a way to use TPL to schedule work and that there is a Poller that we pass into the NetMQScheduler. Cool.

The NetMQScheduler is a custom TPL scheduler, which allows us to create tasks that we want done, and it will take care of the threading aspects of them. Since we told the NetMQScheduler about the Poller we want to use we are able to hook up the ReceiveReady event and use that to get messages back from the server.

The difference here is that since we are using TPL and NetMQ we need to use TPL Task(s) and the NetMQScheduler instance whenever we want to Send/Receive.

To be honest, I think I like this design the least, as it mixes up too many concepts, and the TPL stuff tends to be mixing a bit too much with the ZeroMQ goodness for my taste. I did however just want to show this example for completeness.

So the code for this example has two parts. A simple client, and then the code that spins up a client instance and then multiple threads that use the client instance to send messages to the server. There is also a basic server loop (which I will show below under the title “The Rest”)

Client Code

Here is the client code, where it can be seen that we create a NetMQScheduler which gets handed a new Poller instance to use internally. The idea is that anyone can send a message simply by calling the clients SendMessage(..) method

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using NetMQ;

namespace NetMQSchedulerDemo
{
    public class Client : IDisposable
    {
        private readonly NetMQContext context;
        private readonly string address;
        private Poller poller;
        private NetMQScheduler scheduler;
        private NetMQSocket clientSocket;

        public Client(NetMQContext context, string address)
        {
            this.context = context;
            this.address = address;
        }

        public void Start()
        {
            poller = new Poller();
            clientSocket = context.CreateDealerSocket();
            clientSocket.ReceiveReady += clientSocket_ReceiveReady;
            clientSocket.Connect(address);
            scheduler = new NetMQScheduler(context, poller);
            Task.Factory.StartNew(poller.Start, TaskCreationOptions.LongRunning);
        }

        void clientSocket_ReceiveReady(object sender, NetMQSocketEventArgs e)
        {
            string result = e.Socket.ReceiveString();
            Console.WriteLine("REPLY " + result);
        }

        public async Task SendMessage(NetMQMessage message)
        {
            // instead of creating inproc socket which listen to messages and then send
            //to the server we just creating task and run a code on
            // the poller thread which the the thread of the clientSocket
            Task task = new Task(() => clientSocket.SendMessage(message));
            task.Start(scheduler);
            await task;
            await ReceiveMessage();
        }

        public async Task ReceiveMessage()
        {
            Task task = new Task(() =>
            {
                var result = clientSocket.ReceiveString();
                Console.WriteLine("REPLY " + result);
            });
            task.Start(scheduler);
            await task;
        }

        public void Dispose()
        {
            scheduler.Dispose();
            clientSocket.Dispose();
            poller.Stop();
        }
    }
}

The Rest

And here is the rest of the code that is responsible for spinning up the client and extra threads to push messages through the client (using the SendMessage(..) method above)

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.SqlServer.Server;
using NetMQ;
using NetMQ.Sockets;
using NetMQSchedulerDemo;
using NUnit.Framework;

namespace NetMQSchedulerDemo
{
    public class Program
    {
        public void Run()
        {
            //NOTES
            //1. Use NetMQs NetMQScheduler to communicate with the
            //   server. All Send/Receive MUST be done via the
            //   NetMQScheduler and TPL Tasks. See the Client class
            //   for more information on this

            int delay = 3000;

            using (NetMQContext ctx = NetMQContext.Create())
            {
                using (var server = ctx.CreateRouterSocket())
                {
                    server.Bind("tcp://127.0.0.1:5556");

                    using (var client = new Client(ctx, "tcp://127.0.0.1:5556"))
                    {
                        client.Start();

                        //start some theads, each thread will use the
                        //Clients NetMQScheduler to send/receieve messages
                        //to/from the server
                        for (int i = 0; i < 2; i++)
                        {
                            Task.Factory.StartNew(async (state) =>
                            {
                                while (true)
                                {
                                    var messageToServer = new NetMQMessage();
                                    messageToServer.AppendEmptyFrame();
                                    messageToServer.Append(state.ToString());
                                    await client.SendMessage(messageToServer);
                                    Thread.Sleep(delay);
                                }
                            }, string.Format("client {0}", i), TaskCreationOptions.LongRunning);
                        }

                        //server loop
                        while (true)
                        {
                            var clientMessage = server.ReceiveMessage();
                            Console.WriteLine("========================");
                            Console.WriteLine(" INCOMING CLIENT MESSAGE ");
                            Console.WriteLine("========================");
                            for (int i = 0; i < clientMessage.FrameCount; i++)
                            {
                                Console.WriteLine("Frame[{0}] = {1}", i,
                                    clientMessage[i].ConvertToString());
                            }

                            if (clientMessage.FrameCount == 3)
                            {
                                var clientAddress = clientMessage[0];
                                var clientOriginalMessage = clientMessage[2].ConvertToString();
                                string response = string.Format("{0} back from server {1}",
                                    clientOriginalMessage, DateTime.Now.ToLongTimeString());
                                var messageToClient = new NetMQMessage();
                                messageToClient.Append(clientAddress);
                                messageToClient.AppendEmptyFrame();
                                messageToClient.Append(response);
                                server.SendMessage(messageToClient);
                            }
                        }
                    }
                }
            }
        }

        [STAThread]
        public static void Main(string[] args)
        {
            Program p = new Program();
            p.Run();
        }
    }
}

 

If you run this code you may see something like this:

image