ZeroMQ #6 : Divide And Conquer

Last time we looked at how to send from multiple sockets. Believe it or not we have pretty much introduced most of the core concepts you will need. As a recap here is what we have already covered

So from here on in it is just a matter of going through some of the well known patterns from the ZeroMQ guide.

Now it would be immoral (even fraudulent) of me to not mention this up front, in the main, the information that I present in the remaining posts in this series of posts, will be based quite heavily on the ZeroMQ guide by Pieter Hintjens. Pieter has actually been in touch with me regarding this series of posts, and has been kind enough to let me run each new post by him. I think is generous, and I am extremely pleased to have Pieter on hand, to run them past. What that means to you, is that if there are any misunderstandings/mistakes on my behalf, I am sure Pieter will be pointing them out (at which point I will obviously correct any mistakes made,  hopefully I will not make any). So big thanks go out to Pieter, cheers as would say in England.

It is all good publicity for ZeroMQ though, and as NetMQ is a native port it is not one of the ones covered by the language bindings on the ZeroMQ guide site. So even though I am basing my content on the fantastic work done by Pieter, it will obviously be using NetMQ, so from that point of view the code is still very much relevant.

Where Is The Code?

As always before we start, it’s only polite to tell you where the code is, and it is as before on GitHub:

https://github.com/sachabarber/ZeroMqDemos

 

What Will We Be Doing This Time?

This time we will continue to look at ZeroMQ patterns. Which is actually what the remaining posts will all pretty much be focussed on.

The pattern that we will look at this time involves dividing a problem domain into smaller chunks and distributed them across workers, and then collating the results back together again.

This pattern is really a “divide and conquer” one, but it has also been called “Parallel Pipeline”. With all the remaining posts, I will be linking back to the original portion of the guide such that you can read more about the problem and Pieter’s solution.

ZeroMQ Guide Divide And Conquer : http://zguide.zeromq.org/page:all#Divide-and-Conquer

The idea is that you have something that generates work, and then distributes the work out to n-many workers. The workers each do some work, and push their results to some other process (could be a thread too) where the workers’ results are accumulated.

In the ZeroMQ guide, it shows an example that has the work generator just tell each worker to sleep for a period of time. I toyed with creating a more elaborate example than this, but in the end felt that the examples simplicity was quite important, so have stuck with the workload for each worker just being a value that tells the work to sleep for a number of Milliseconds (thus simulating some actual work).  This as I say has been borrowed from the ZeroMQ guide.

In real life the work could obviously be anything, though you would more than likely want the work to be something that could be cut up and distributed without the work generator caring/knowing how many workers there are.

Here is what we are trying to achieve :

image 

Ventilator

using System;
using NetMQ;

namespace Ventilator
{
    public class Program
    {
        public static void Main(string[] args)
        {
            // Task Ventilator
            // Binds PUSH socket to tcp://localhost:5557
            // Sends batch of tasks to workers via that socket
            Console.WriteLine("====== VENTILATOR ======");
           
            using (NetMQContext ctx = NetMQContext.Create())
            {
                //socket to send messages on
                using (var sender = ctx.CreatePushSocket())
                {
                    sender.Bind("tcp://*:5557");

                    using (var sink = ctx.CreatePushSocket())
                    {
                        sink.Connect("tcp://localhost:5558");

                        Console.WriteLine("Press enter when worker are ready");
                        Console.ReadLine();
                        
                        //the first message it "0" and signals start of batch
                        //see the Sink.csproj Program.cs file for where this is used
                        Console.WriteLine("Sending start of batch to Sink");    
                        sink.Send("0");

                        Console.WriteLine("Sending tasks to workers");

                        //initialise random number generator
                        Random rand= new Random(0);

                        //expected costs in Ms
                        int totalMs = 0;
                        
                        //send 100 tasks (workload for tasks, is just some random sleep time that
                        //the workers can perform, in real life each work would do more than sleep
                        for (int taskNumber = 0; taskNumber < 100; taskNumber++)
                        {
                            //Random workload from 1 to 100 msec
                            int workload = rand.Next(0, 100);
                            totalMs += workload;
                            Console.WriteLine("Workload : {0}", workload);
                            sender.Send(workload.ToString());
                        }
                        Console.WriteLine("Total expected cost : {0} msec", totalMs);
                        Console.WriteLine("Press Enter to quit");
                        Console.ReadLine();
                    }
                }
            }
        }
    }
}

 

Worker

using System;
using System.Threading;
using NetMQ;

namespace Worker
{
    public class Program
    {
        public static void Main(string[] args)
        {
            // Task Worker
            // Connects PULL socket to tcp://localhost:5557
            // collects workload for socket from Ventilator via that socket
            // Connects PUSH socket to tcp://localhost:5558
            // Sends results to Sink via that socket
            Console.WriteLine("====== WORKER ======");

            using (NetMQContext ctx = NetMQContext.Create())
            {
                //socket to receive messages on
                using (var receiver = ctx.CreatePullSocket())
                {
                    receiver.Connect("tcp://localhost:5557");

                     //socket to send messages on
                    using (var sender = ctx.CreatePushSocket())
                    {
                        sender.Connect("tcp://localhost:5558");

                        //process tasks forever
                        while (true)
                        {
                            //workload from the vetilator is a simple delay
                            //to simulate some work being done, see
                            //Ventilator.csproj Proram.cs for the workload sent
                            //In real life some more meaningful work would be done
                            string workload = receiver.ReceiveString();

                            //simulate some work being done
                            Thread.Sleep(int.Parse(workload));

                            //send results to sink, sink just needs to know worker
                            //is done, message content is not important, just the precence of
                            //a message means worker is done.
                            //See Sink.csproj Proram.cs
                            Console.WriteLine("Sending to Sink");
                            sender.Send(string.Empty);
                        }
                    }

                }
            }

        }
    }
}

 

Sink

using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using NetMQ;

namespace Sink
{
    public class Program
    {
        public static void Main(string[] args)
        {

            // Task Sink
            // Bindd PULL socket to tcp://localhost:5558
            // Collects results from workers via that socket
            Console.WriteLine("====== SINK ======");
           
            using (NetMQContext ctx = NetMQContext.Create())
            {
                //socket to receive messages on
                using (var receiver = ctx.CreatePullSocket())
                {
                    receiver.Bind("tcp://localhost:5558");

                    //wait for start of batch (see Ventilator.csproj Program.cs)
                    var startOfBatchTrigger = receiver.ReceiveString();
                    Console.WriteLine("Seen start of batch");

                    //Start our clock now
                    Stopwatch watch = new Stopwatch();
                    watch.Start();

                    for (int taskNumber = 0; taskNumber < 100; taskNumber++)
                    {
                        var workerDoneTrigger = receiver.ReceiveString();
                        if (taskNumber % 10 == 0)
                        {
                            Console.Write(":");
                        }
                        else
                        {
                            Console.Write(".");
                        }
                    }
                    watch.Stop();
                    //Calculate and report duration of batch
                    Console.WriteLine();
                    Console.WriteLine("Total elapsed time {0} msec", watch.ElapsedMilliseconds);
                    Console.ReadLine();
                }
            }
        }
    }
}

 

There is a couple of batch files you can use to spin up different amounts of workers, see:

Run1Worker.bat : One worker

Which when run should give you some output like this in the Sink process console output

====== SINK ======
Seen start of batch
:………:………:………:………:………:………:………:………
:………:………
Total elapsed time 5695 msec

 

Run2Worker.bat : two workers

Which when run should give you some output like this in the Sink process Console output

====== SINK ======
Seen start of batch
:………:………:………:………:………:………:………:………
:………:………
Total elapsed time 2959 msec

 

Run4Worker.bat : four workers

Which when run should give you some output like this in the Sink process Console output

====== SINK ======
Seen start of batch
:………:………:………:………:………:………:………:………
:………:………
Total elapsed time 1492 msec

 

There are a couple of points to be aware of with this pattern

  • The Ventilator uses a NetMQ PushSocket to distribute work to the workers, this is referred to as load balancing
  • The Ventilator and the Sink are the static parts of the system, where as workers are dynamic. It is trivial to add more workers, we can just spin up a new instance of a worker, and in theory the work gets done quicker.
  • We need to synchronize the starting of the batch (when workers are ready), as if we did not do that, the first worker that connected would get more messages that the rest, which is not really load balanced
  • The Sink uses a NetMQ PullSocket to accumulate the results from the workers

ZeroMQ #5 : Sending From Multiple Sockets

Last time we looked at how to use the Poller to work with multiple sockets, and detect their readiness. This time we will continue to work with the familiar request/response model that we have been using thus far. We will however be beefing things up a bit, and shall examine several ways in which you can have more than one thread pushing messages to the server and getting responses, which is a fairly typical requirement (at least in my book it is).

Where Is The Code?

As always before we start, it’s only polite to tell you where the code is, and it is as before on GitHub:

https://github.com/sachabarber/ZeroMqDemos

 

One Thing Before We Start

As you may have realised by now, ZeroMQ is a messaging library, and as such, promotes the idea of lock free messaging. I also happen to think this is a very good idea. You can achieve an excellent throughput of messages and save yourself a lot of synchronization pain, if you try and avoid shared data structures. By doing this you will also be saving yourself the pain of having to synchronize access to them. So in general try and work with ZeroMQ in the way it wants to be worked with, which is via message passing, and avoiding locks, shared data structures.

 

Setting The Scene For This Post

Ok so we are nearly at the point where we can start to look at some code, but before we do that, let’s just talk a little bit more about what this post is trying to discuss.

In the code I typically write, it is quite common for a bunch of client threads all to be running at once, each capable of talking to the server. If this sounds like a requirement that you have had to deal with, then you may find this post of use, as this is exactly the scenario this post is aimed at solving.

As the aim of this post is to have asynchronous client, we need a asynchronous server too, so we use DealerSocket(s) for the client(s) and a RouterSocket for the server.

As with most things there is more than one way to skin a cat, so we will look at a couple of options, each with the their own pros/cons.

 

Option 1 : Each Thread Has It’ Own DealerSocket

The first options does need a bit of .NET threading knowledge, but if you have that, then the idea is a simple one. For each client thread we also create a dedicated DealerSocket that *should be* used exclusively by that thread.

This is achieved using the ThreadLocal<T> .NET class, which allows us to have a DealerSocket per thread. We add each of the client created DealerSocket(s) to a Poller instance, and listen to the ReceieveReady event on each socket, which allows us to get the message back from the server.

The obvious downside to this approach is that there will be more socket(s) created on the client side. The upside is that it is  very easy to implement, and just works.

Here is an image showing what we are trying to achieve here

image

Here is the code for this scenario:

using System;
using System.Threading;
using System.Threading.Tasks;
using NetMQ;
using NetMQ.Sockets;

namespace ManualThreadingDemo
{
    public class Program
    {
        public void Run()
        {

            //NOTES
            //1. Use ThreadLocal<DealerSocket> where each thread has
            //  its own client DealerSocket to talk to server
            //2. Each thread can send using it own socket
            //3. Each thread socket is added to poller
            
            ThreadLocal<DealerSocket> clientSocketPerThread =
                new ThreadLocal<DealerSocket>();
            int delay = 3000;
            Poller poller = new Poller();

            using (NetMQContext ctx = NetMQContext.Create())
            {
                using (var server = ctx.CreateRouterSocket())
                {
                    server.Bind("tcp://127.0.0.1:5556");

                    //start some threads, each with its own DealerSocket
                    //to talk to the server socket. Creates lots of sockets,
                    //but no nasty race conditions no shared state, each
                    //thread has its own socket, happy days
                    for (int i = 0; i < 3; i++)
                    {
                        Task.Factory.StartNew((state) =>
                        {
                            DealerSocket client = null;

                            if (!clientSocketPerThread.IsValueCreated)
                            {
                                client = ctx.CreateDealerSocket();
                                client.Connect("tcp://127.0.0.1:5556");
                                client.ReceiveReady += Client_ReceiveReady;
                                clientSocketPerThread.Value = client;
                                poller.AddSocket(client);
                            }
                            else
                            {
                                client = clientSocketPerThread.Value;
                            }

                            while (true)
                            {
                                var messageToServer = new NetMQMessage();
                                messageToServer.AppendEmptyFrame();
                                messageToServer.Append(state.ToString());
                                client.SendMessage(messageToServer);
                                Thread.Sleep(delay);
                            }

                        },string.Format("client {0}", i), TaskCreationOptions.LongRunning);
                    }

                    //start the poller
                    Task task = Task.Factory.StartNew(poller.Start);

                    //server loop
                    while (true)
                    {
                        var clientMessage = server.ReceiveMessage();
                        Console.WriteLine("========================");
                        Console.WriteLine(" INCOMING CLIENT MESSAGE ");
                        Console.WriteLine("========================");
                        for (int i = 0; i < clientMessage.FrameCount; i++)
                        {
                            Console.WriteLine("Frame[{0}] = {1}", i,
                                clientMessage[i].ConvertToString());
                        }

                        if (clientMessage.FrameCount == 3)
                        {
                            var clientAddress = clientMessage[0];
                            var clientOriginalMessage = clientMessage[2].ConvertToString();
                            string response = string.Format("{0} back from server {1}",
                                clientOriginalMessage, DateTime.Now.ToLongTimeString());
                            var messageToClient = new NetMQMessage();
                            messageToClient.Append(clientAddress);
                            messageToClient.AppendEmptyFrame();
                            messageToClient.Append(response);
                            server.SendMessage(messageToClient);
                        }
                    }
                }
            }
        }

        void Client_ReceiveReady(object sender, NetMQSocketEventArgs e)
        {
            bool hasmore = false;
            e.Socket.Receive(out hasmore);
            if (hasmore)
            {
                string result = e.Socket.ReceiveString(out hasmore);
                Console.WriteLine("REPLY " + result);
            }
        }

        [STAThread]
        public static void Main(string[] args)
        {
            Program p = new Program();
            p.Run();
        }

    }
}

 

If you were to run this, you would see something like this:

image

 

Option 2 : Each Thread Delegates Of To A Local Broker

The next example keeps the idea of a separate threads that want to send message(s) to the server. This time however we will use a broker on the client side. The idea being that the client threads will push to a shared data queue, I know I have told you to avoid shared data structures. Thing is, this is not a shared data structure it is just a thread safe queue, that many threads can write to. Where as a a shared data structure may mean several threads all trying to update the current Bid rate of an Fx option quote price. There is a difference. OK the shared queue will have some synchronization somewhere to make it thread safe, thankfully we can rely on the good work of the PFX team at Microsoft for that. Those guys are smart and I am sure the Concurrent collections namespace is pretty well designed and can be trusted to be pretty optimal.

Again we need to call on a bit of .NET know how, so for the centralized queue we use a ConcurrentQueue<T>. All client threads will enqueue  their messages for the server here.

There will also be another thread started. This extra thread is the one that will be processing the messages that have been queued onto the centralized queue. When there is a message taken of the centralized queue it will be sent to the server. The thing is only the thread that reads from the centralized queue will send messages to the server.

As we still want messages to be sent out asynchronously we stick with using a DealerSocket, but since their is now only one place where we send messages to the server we only need a single DealerSocket.

We add the SINGLE DealerSocket(s) to a Poller instance, and listen to the ReceieveReady event on each socket, which allows us to get the message back from the server.

This is more complex than the first example as there are more moving parts, but we no longer have loads of sockets being create. There is just one.

As before here is a diagram of what we are trying to achieve here

image

Here is the code for this scenario:

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using NetMQ;

namespace ConcurrentQueueDemo
{
    public class Program
    {
        public void Run()
        {
            //NOTES
            //1. Use many threads each writing to ConcurrentQueue
            //2. Extra thread to read from ConcurrentQueue, and this is the one that
            //   will deal with writing to the server
            ConcurrentQueue<string> messages = new ConcurrentQueue<string>();
            int delay = 3000;
            Poller poller = new Poller();

            using (NetMQContext ctx = NetMQContext.Create())
            {
                using (var server = ctx.CreateRouterSocket())
                {
                    server.Bind("tcp://127.0.0.1:5556");

                    //start some threads, where each thread, will use a client side
                    //broker (simple thread that monitors a CooncurrentQueue), where
                    //ONLY the client side broker talks to the server
                    for (int i = 0; i < 3; i++)
                    {
                        Task.Factory.StartNew((state) =>
                        {
                            while (true)
                            {
                                messages.Enqueue(state.ToString());
                                Thread.Sleep(delay);
                            }

                        }, string.Format("client {0}", i), TaskCreationOptions.LongRunning);
                    }

                    //single sender loop
                    Task.Factory.StartNew((state) =>
                    {
                        var client = ctx.CreateDealerSocket();
                        client.Connect("tcp://127.0.0.1:5556");
                        client.ReceiveReady += Client_ReceiveReady;
                        poller.AddSocket(client);

                        while (true)
                        {
                            string clientMessage = null;
                            if (messages.TryDequeue(out clientMessage))
                            {
                                var messageToServer = new NetMQMessage();
                                messageToServer.AppendEmptyFrame();
                                messageToServer.Append(clientMessage);
                                client.SendMessage(messageToServer);
                            }
                        }

                    }, TaskCreationOptions.LongRunning);

                    //start the poller
                    Task task = Task.Factory.StartNew(poller.Start);

                    //server loop
                    while (true)
                    {
                        var clientMessage = server.ReceiveMessage();
                        Console.WriteLine("========================");
                        Console.WriteLine(" INCOMING CLIENT MESSAGE ");
                        Console.WriteLine("========================");
                        for (int i = 0; i < clientMessage.FrameCount; i++)
                        {
                            Console.WriteLine("Frame[{0}] = {1}", i,
                                clientMessage[i].ConvertToString());
                        }

                        if (clientMessage.FrameCount == 3)
                        {
                            var clientAddress = clientMessage[0];
                            var clientOriginalMessage = clientMessage[2].ConvertToString();
                            string response = string.Format("{0} back from server {1}",
                                clientOriginalMessage, DateTime.Now.ToLongTimeString());
                            var messageToClient = new NetMQMessage();
                            messageToClient.Append(clientAddress);
                            messageToClient.AppendEmptyFrame();
                            messageToClient.Append(response);
                            server.SendMessage(messageToClient);
                        }
                    }
                }
            }
        }

        void Client_ReceiveReady(object sender, NetMQSocketEventArgs e)
        {
            bool hasmore = false;
            e.Socket.Receive(out hasmore);
            if (hasmore)
            {
                string result = e.Socket.ReceiveString(out hasmore);
                Console.WriteLine("REPLY " + result);
            }
        }

        [STAThread]
        public static void Main(string[] args)
        {
            Program p = new Program();
            p.Run();
        }
    }
}

 

If you were to run this, you would see something like this:

image

 

Option 3 : Use NetMQScheduler

The final option is to use the NetMQ library class : NetMQScheduler. I think the best place to start with that is by reading the link I just included. Then come back here.

…….

…….

Time passes

…….

…….

Oh hello you’re back. Ok so now you know that the NetMQScheduler offers us a way to use TPL to schedule work and that there is a Poller that we pass into the NetMQScheduler. Cool.

The NetMQScheduler is a custom TPL scheduler, which allows us to create tasks that we want done, and it will take care of the threading aspects of them. Since we told the NetMQScheduler about the Poller we want to use we are able to hook up the ReceiveReady event and use that to get messages back from the server.

The difference here is that since we are using TPL and NetMQ we need to use TPL Task(s) and the NetMQScheduler instance whenever we want to Send/Receive.

To be honest, I think I like this design the least, as it mixes up too many concepts, and the TPL stuff tends to be mixing a bit too much with the ZeroMQ goodness for my taste. I did however just want to show this example for completeness.

So the code for this example has two parts. A simple client, and then the code that spins up a client instance and then multiple threads that use the client instance to send messages to the server. There is also a basic server loop (which I will show below under the title “The Rest”)

Client Code

Here is the client code, where it can be seen that we create a NetMQScheduler which gets handed a new Poller instance to use internally. The idea is that anyone can send a message simply by calling the clients SendMessage(..) method

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using NetMQ;

namespace NetMQSchedulerDemo
{
    public class Client : IDisposable
    {
        private readonly NetMQContext context;
        private readonly string address;
        private Poller poller;
        private NetMQScheduler scheduler;
        private NetMQSocket clientSocket;

        public Client(NetMQContext context, string address)
        {
            this.context = context;
            this.address = address;
        }

        public void Start()
        {
            poller = new Poller();
            clientSocket = context.CreateDealerSocket();
            clientSocket.ReceiveReady += clientSocket_ReceiveReady;
            clientSocket.Connect(address);
            scheduler = new NetMQScheduler(context, poller);
            Task.Factory.StartNew(poller.Start, TaskCreationOptions.LongRunning);
        }

        void clientSocket_ReceiveReady(object sender, NetMQSocketEventArgs e)
        {
            string result = e.Socket.ReceiveString();
            Console.WriteLine("REPLY " + result);
        }

        public async Task SendMessage(NetMQMessage message)
        {
            // instead of creating inproc socket which listen to messages and then send
            //to the server we just creating task and run a code on
            // the poller thread which the the thread of the clientSocket
            Task task = new Task(() => clientSocket.SendMessage(message));
            task.Start(scheduler);
            await task;
            await ReceiveMessage();
        }

        public async Task ReceiveMessage()
        {
            Task task = new Task(() =>
            {
                var result = clientSocket.ReceiveString();
                Console.WriteLine("REPLY " + result);
            });
            task.Start(scheduler);
            await task;
        }

        public void Dispose()
        {
            scheduler.Dispose();
            clientSocket.Dispose();
            poller.Stop();
        }
    }
}

The Rest

And here is the rest of the code that is responsible for spinning up the client and extra threads to push messages through the client (using the SendMessage(..) method above)

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.SqlServer.Server;
using NetMQ;
using NetMQ.Sockets;
using NetMQSchedulerDemo;
using NUnit.Framework;

namespace NetMQSchedulerDemo
{
    public class Program
    {
        public void Run()
        {
            //NOTES
            //1. Use NetMQs NetMQScheduler to communicate with the
            //   server. All Send/Receive MUST be done via the
            //   NetMQScheduler and TPL Tasks. See the Client class
            //   for more information on this

            int delay = 3000;

            using (NetMQContext ctx = NetMQContext.Create())
            {
                using (var server = ctx.CreateRouterSocket())
                {
                    server.Bind("tcp://127.0.0.1:5556");

                    using (var client = new Client(ctx, "tcp://127.0.0.1:5556"))
                    {
                        client.Start();

                        //start some theads, each thread will use the
                        //Clients NetMQScheduler to send/receieve messages
                        //to/from the server
                        for (int i = 0; i < 2; i++)
                        {
                            Task.Factory.StartNew(async (state) =>
                            {
                                while (true)
                                {
                                    var messageToServer = new NetMQMessage();
                                    messageToServer.AppendEmptyFrame();
                                    messageToServer.Append(state.ToString());
                                    await client.SendMessage(messageToServer);
                                    Thread.Sleep(delay);
                                }
                            }, string.Format("client {0}", i), TaskCreationOptions.LongRunning);
                        }

                        //server loop
                        while (true)
                        {
                            var clientMessage = server.ReceiveMessage();
                            Console.WriteLine("========================");
                            Console.WriteLine(" INCOMING CLIENT MESSAGE ");
                            Console.WriteLine("========================");
                            for (int i = 0; i < clientMessage.FrameCount; i++)
                            {
                                Console.WriteLine("Frame[{0}] = {1}", i,
                                    clientMessage[i].ConvertToString());
                            }

                            if (clientMessage.FrameCount == 3)
                            {
                                var clientAddress = clientMessage[0];
                                var clientOriginalMessage = clientMessage[2].ConvertToString();
                                string response = string.Format("{0} back from server {1}",
                                    clientOriginalMessage, DateTime.Now.ToLongTimeString());
                                var messageToClient = new NetMQMessage();
                                messageToClient.Append(clientAddress);
                                messageToClient.AppendEmptyFrame();
                                messageToClient.Append(response);
                                server.SendMessage(messageToClient);
                            }
                        }
                    }
                }
            }
        }

        [STAThread]
        public static void Main(string[] args)
        {
            Program p = new Program();
            p.Run();
        }
    }
}

 

If you run this code you may see something like this:

image

ZeroMQ #4 : Multiple Sockets Polling

Last time we looked at a few things, namely

  • Options
  • Identity
  • SendMore

This time we will talk about how to handle using multiple sockets

Where Is The Code?

As always before we start, it’s only polite to tell you where the code is, and it is as before on GitHub:

https://github.com/sachabarber/ZeroMqDemos

Handling Multiple Sockets, And Why Would You Need To?

So why would you want to handle multiple sockets anyway? Well there are a variety of reasons, such as:

  • You may have multiple sockets within one process that rely on each other , and the timings are such that you need to know that the socket(s) are ready before it/they can receive anything 
  • You may have a Request, as well as a Publisher socket in one process

To be honest there times you may end up with more than one socket per process. And there may be occasions when you only want to use the socket(s) when they are deemed ready.

ZeroMQ actually has a concept of a “Poller” that can be used to determine if a socket is deemed ready to use.

NetMQ has an implementation of the “Poller”, and it can be used to do the following things:

  • Monitor a single socket, for readiness
  • Monitor a IEnumerable<NetMQSocket> for readiness
  • Allow NetMQSocket(s) to be added dynamically and still report on the readiness of the new sockets
  • Allow NetMQSocket(s) to be remove dynamically
  • Raise a event on the socket instance when it is ready

A good way to look into the NetMQ Poller class is via some tests. I am not going to test everything in this post, but if you want more, NetMQ itself comes with some very very good tests for the Poller. Which is in fact where I lifted these test cases from.

Some Examples

As I just stated I am not the author of these tests, I have taken a subset of the NetMQ Poller test suite, that I think may be pertinent to a introductory discussion around the Poller class.

NOTE : This series of posts is meant as a beginners guide, and advanced ZeroMQ users would likely not get too much from this series of posts.

Single Socket Poll Test

This test cases use the kind of familiar (hopefully by now) Request/Response socket arrangement. We will use the Poller to alert us (via the xxxxSocket.ReceiveReady event that the Poller raises for us) that the ResponseSocket is Ready.

Here is the code for this:

[Test]
public void SingleSocketPollTest()
{
    using (NetMQContext contex = NetMQContext.Create())
    {
        using (var rep = contex.CreateResponseSocket())
        {
            rep.Bind("tcp://127.0.0.1:5002");

            using (var req = contex.CreateRequestSocket())
            using (Poller poller = new Poller())
            {
                req.Connect("tcp://127.0.0.1:5002");

                //The ReceiveReady event is raised by the Poller
                rep.ReceiveReady += (s, a) =>
                {
                    bool more;
                    string m = a.Socket.ReceiveString(out more);

                    Assert.False(more);
                    Assert.AreEqual("Hello", m);

                    a.Socket.Send("World");
                };

                poller.AddSocket(rep);

                Task pollerTask = Task.Factory.StartNew(poller.Start);
                req.Send("Hello");

                bool more2;
                string m1 = req.ReceiveString(out more2);

                Assert.IsFalse(more2);
                Assert.AreEqual("World", m1);

                poller.Stop();

                Thread.Sleep(100);
                Assert.IsTrue(pollerTask.IsCompleted);
            }
        }
    }
}

Add Socket During Work Test

This example shows how we can add extra socket(s) to the Poller at runtime, and the Poller will still raise the xxxxSocket.ReceiveReady event for us

[Test]
public void AddSocketDuringWorkTest()
{
    using (NetMQContext contex = NetMQContext.Create())
    {
        // we are using three responses to make sure we actually
        //move the correct socket and other sockets still work
        using (var router = contex.CreateRouterSocket())
        using (var router2 = contex.CreateRouterSocket())
        {
            router.Bind("tcp://127.0.0.1:5002");
            router2.Bind("tcp://127.0.0.1:5003");

            using (var dealer = contex.CreateDealerSocket())
            using (var dealer2 = contex.CreateDealerSocket())
            using (Poller poller = new Poller())
            {
                dealer.Connect("tcp://127.0.0.1:5002");
                dealer2.Connect("tcp://127.0.0.1:5003");

                bool router1arrived = false;
                bool router2arrived = false;

                bool more;

                //The ReceiveReady event is raised by the Poller
                router2.ReceiveReady += (s, a) =>
                {
                    router2.Receive(out more);
                    router2.Receive(out more);
                    router2arrived = true;
                };

                //The ReceiveReady event is raised by the Poller
                router.ReceiveReady += (s, a) =>
                {
                    router1arrived = true;

                    router.Receive(out more);
                    router.Receive(out more);

                    poller.AddSocket(router2);
                };

                poller.AddSocket(router);

                Task task = Task.Factory.StartNew(poller.Start);

                dealer.Send("1");
                Thread.Sleep(300);
                dealer2.Send("2");
                Thread.Sleep(300);

                poller.Stop(true);
                task.Wait();

                Assert.IsTrue(router1arrived);
                Assert.IsTrue(router2arrived);
            }
        }
    }
}

Add Socket After Removing Test

This example builds on the last one where we add a new socket to the Poller after removing another socket from the Poller :

[Test]
public void AddSocketAfterRemovingTest()
{
    using (NetMQContext contex = NetMQContext.Create())
    {
        // we are using three responses to make sure we actually
        //move the correct socket and other sockets still work
        using (var router = contex.CreateRouterSocket())
        using (var router2 = contex.CreateRouterSocket())
        using (var router3 = contex.CreateRouterSocket())
        {
            router.Bind("tcp://127.0.0.1:5002");
            router2.Bind("tcp://127.0.0.1:5003");
            router3.Bind("tcp://127.0.0.1:5004");

            using (var dealer = contex.CreateDealerSocket())
            using (var dealer2 = contex.CreateDealerSocket())
            using (var dealer3 = contex.CreateDealerSocket())
            using (Poller poller = new Poller())
            {
                dealer.Connect("tcp://127.0.0.1:5002");
                dealer2.Connect("tcp://127.0.0.1:5003");
                dealer3.Connect("tcp://127.0.0.1:5004");

                bool router1arrived = false;
                bool router2arrived = false;
                bool router3arrived = false;

                bool more;

                //The ReceiveReady event is raised by the Poller
                router.ReceiveReady += (s, a) =>
                {
                    router1arrived = true;

                    router.Receive(out more);
                    router.Receive(out more);

                    poller.RemoveSocket(router);

                };

                poller.AddSocket(router);

                //The ReceiveReady event is raised by the Poller
                router3.ReceiveReady += (s, a) =>
                {
                    router3.Receive(out more);
                    router3.Receive(out more);
                    router3arrived = true;
                };

                //The ReceiveReady event is raised by the Poller
                router2.ReceiveReady += (s, a) =>
                {
                    router2arrived = true;
                    router2.Receive(out more);
                    router2.Receive(out more);

                    poller.AddSocket(router3);
                };
                poller.AddSocket(router2);

                Task task = Task.Factory.StartNew(poller.Start);

                dealer.Send("1");
                Thread.Sleep(300);
                dealer2.Send("2");
                Thread.Sleep(300);
                dealer3.Send("3");
                Thread.Sleep(300);

                poller.Stop(true);
                task.Wait();

                Assert.IsTrue(router1arrived);
                Assert.IsTrue(router2arrived);
                Assert.IsTrue(router3arrived);
            }
        }
    }
}

Add 2 Sockets After Removing Test

And in this one we add a few sockets to the Poller after removing from the Poller :

[Test]
public void AddTwoSocketAfterRemovingTest()
{
    using (NetMQContext contex = NetMQContext.Create())
    {
        // we are using three responses to make sure we actually
        //move the correct socket and other sockets still work
        using (var router = contex.CreateRouterSocket())
        using (var router2 = contex.CreateRouterSocket())
        using (var router3 = contex.CreateRouterSocket())
        using (var router4 = contex.CreateRouterSocket())
        {
            router.Bind("tcp://127.0.0.1:5002");
            router2.Bind("tcp://127.0.0.1:5003");
            router3.Bind("tcp://127.0.0.1:5004");
            router4.Bind("tcp://127.0.0.1:5005");

            using (var dealer = contex.CreateDealerSocket())
            using (var dealer2 = contex.CreateDealerSocket())
            using (var dealer3 = contex.CreateDealerSocket())
            using (var dealer4 = contex.CreateDealerSocket())
            using (Poller poller = new Poller())
                  
            {
                dealer.Connect("tcp://127.0.0.1:5002");
                dealer2.Connect("tcp://127.0.0.1:5003");
                dealer3.Connect("tcp://127.0.0.1:5004");
                dealer4.Connect("tcp://127.0.0.1:5005");

                int router1arrived = 0;
                int router2arrived = 0;
                bool router3arrived = false;
                bool router4arrived = false;

                bool more;

                //The ReceiveReady event is raised by the Poller
                router.ReceiveReady += (s, a) =>
                {
                    router1arrived++;

                    router.Receive(out more);
                    router.Receive(out more);

                    poller.RemoveSocket(router);

                };

                poller.AddSocket(router);

                //The ReceiveReady event is raised by the Poller
                router3.ReceiveReady += (s, a) =>
                {
                    router3.Receive(out more);
                    router3.Receive(out more);
                    router3arrived = true;
                };

                //The ReceiveReady event is raised by the Poller
                router4.ReceiveReady += (s, a) =>
                {
                    router4.Receive(out more);
                    router4.Receive(out more);
                    router4arrived = true;
                };

                //The ReceiveReady event is raised by the Poller
                router2.ReceiveReady += (s, a) =>
                {
                    router2arrived++;
                    router2.Receive(out more);
                    router2.Receive(out more);

                    if (router2arrived == 1)
                    {
                        poller.AddSocket(router3);

                        poller.AddSocket(router4);
                    }
                };

                poller.AddSocket(router2);

                Task task = Task.Factory.StartNew(poller.Start);

                dealer.Send("1");
                Thread.Sleep(300);
                dealer2.Send("2");
                Thread.Sleep(300);
                dealer3.Send("3");
                dealer4.Send("4");
                dealer2.Send("2");
                dealer.Send("1");
                Thread.Sleep(300);

                poller.Stop(true);
                task.Wait();

                router.Receive(true, out more);

                Assert.IsTrue(more);

                router.Receive(true, out more);

                Assert.IsFalse(more);

                Assert.AreEqual(1, router1arrived);
                Assert.AreEqual(2, router2arrived);
                Assert.IsTrue(router3arrived);
                Assert.IsTrue(router4arrived);
            }
        }
    }
}

Cancel Socket Test

This final example shows 3 RouterSockets connected to 3 DealerSockets respectively (we will talk about DealerSocket(s) in a later post, for now you can think of them as typically being used for asynchronous workers). We then add all the routers to the Poller. Within the 1st RouterSocket.ReceiveReady we remove the RouterSocket from the Poller, so it should not receive any more messages back from its respective DealerSocket. Here is the code for this test :

[Test]
public void CancelSocketTest()
{
    using (NetMQContext contex = NetMQContext.Create())
    {
        // we are using three responses to make sure we actually
        //move the correct socket and other sockets still work
        using (var router = contex.CreateRouterSocket())
        using (var router2 = contex.CreateRouterSocket())
        using (var router3 = contex.CreateRouterSocket())
        {
            router.Bind("tcp://127.0.0.1:5002");
            router2.Bind("tcp://127.0.0.1:5003");
            router3.Bind("tcp://127.0.0.1:5004");

            using (var dealer = contex.CreateDealerSocket())
            using (var dealer2 = contex.CreateDealerSocket())
            using (var dealer3 = contex.CreateDealerSocket())
            using (Poller poller = new Poller())
            {
                dealer.Connect("tcp://127.0.0.1:5002");
                dealer2.Connect("tcp://127.0.0.1:5003");
                dealer3.Connect("tcp://127.0.0.1:5004");

                bool first = true;

                //The ReceiveReady event is raised by the Poller
                router2.ReceiveReady += (s, a) =>
                {
                    bool more;

                    // identity
                    byte[] identity = a.Socket.Receive(out more);

                    // message
                    a.Socket.Receive(out more);

                    a.Socket.SendMore(identity);
                    a.Socket.Send("2");
                };

                poller.AddSocket(router2);

                //The ReceiveReady event is raised by the Poller
                router.ReceiveReady += (s, a) =>
                {
                    if (!first)
                    {
                        Assert.Fail("This should happen because we cancelled the socket");
                    }
                    first = false;

                    bool more;

                    // identity
                    a.Socket.Receive(out more);

                    string m = a.Socket.ReceiveString(out more);

                    Assert.False(more);
                    Assert.AreEqual("Hello", m);

                    // cancelling the socket
                    poller.RemoveSocket(a.Socket);
                };

                poller.AddSocket(router);

                //The ReceiveReady event is raised by the Poller
                router3.ReceiveReady += (s, a) =>
                {
                    bool more;

                    // identity
                    byte[] identity = a.Socket.Receive(out more);

                    // message
                    a.Socket.Receive(out more);

                    a.Socket.SendMore(identity).Send("3");
                };

                poller.AddSocket(router3);

                Task pollerTask = Task.Factory.StartNew(poller.Start);

                dealer.Send("Hello");

                // sending this should not arrive on the poller,
                //therefore response for this will never arrive
                dealer.Send("Hello2");

                Thread.Sleep(100);

                // sending this should not arrive on the poller,
                //therefore response for this will never arrive                        
                dealer.Send("Hello3");

                Thread.Sleep(500);

                bool more2;

                // making sure the socket defined before the one cancelled still works
                dealer2.Send("1");
                string msg = dealer2.ReceiveString(out more2);
                Assert.AreEqual("2", msg);

                // making sure the socket defined after the one cancelled still works
                dealer3.Send("1");
                msg = dealer3.ReceiveString(out more2);
                Assert.AreEqual("3", msg);

                // we have to give this some time if we want to make sure
                //it's really not happening and it not only because of time
                Thread.Sleep(300);

                poller.Stop();

                Thread.Sleep(100);
                Assert.IsTrue(pollerTask.IsCompleted);
            }
        }

    }
}

 

And that is about all I wanted to talk about this time. I hope you can see how you could make use of the Poller in your own socket topologies, and why it is a useful tool.

ZeroMQ #3 : Socket Options/Identity And SendMore

Last time we looked at the different socket types within ZeroMQ, and I also told you their equivalent in NetMQ (which is what I use for these posts).

This time we will look at 3 small areas of ZeroMQ, which are none the less very important areas, and should not be overlooked.

These areas are socket Options/Identity and SendMore.

Where Is The Code?

The code for all these posts is hosted in one large solution in github:

https://github.com/sachabarber/ZeroMqDemos

 

Socket Options

Depending on the type of sockets you are using, or the topology you are attempting to create, you may find that you need to set some ZeroMQ options. In NetMQ this is done using the xxxxSocket.Options property.

Here is a listing of the available properties that you may set on a xxxxSocket. It is hard to say exactly which of these values you may need to set, as that obviously depends entirely on what you are trying to achieve. All I can do is list the options, and make you aware of them. So here they are

  • Affinity
  • BackLog
  • CopyMessages
  • DelayAttachOnConnect
  • Endian
  • GetLastEndpoint
  • IPv4Only
  • Identity
  • Linger
  • MaxMsgSize
  • MulticastHops
  • MulticastRate
  • MulticastRecoveryInterval
  • ReceiveHighWaterMark
  • ReceiveMore
  • ReceiveTimeout
  • ReceiveBuffer
  • ReconnectInterval
  • ReconnectIntervalMax
  • SendHighWaterMark
  • SendTimeout
  • SendBuffer
  • TcpAcceptFilter
  • TcpKeepAlive
  • TcpKeepaliveCnt
  • TcpKeepaliveIdle
  • TcpKeepaliveInterval
  • XPubVerbose

To see exactly what all these options mean you will more than likely need to refer to the actual ZeroMQ documentation, i.e the guide.

http://zguide.zeromq.org/page:all

 

Identity

One of the great things (at least in my opinion) when working with ZeroMQ is that we can still stick with a standard request/response arrangement (just like we had in the 1st posts hello world example http://sachabarbs.wordpress.com/2014/08/19/zeromq-1-introduction/) but we may then choose to switch to having an asynchronous server. This is easily achieved using a RouterSocket for the server. The clients stay as RequestSocket(s).

So this is now an interesting arrangement, we have

  • Synchronous clients, thanks to standard RequestSocket type
  • Asynchronous server, thanks to  new socket called RouterSocket

The RouterSocket is a personal favourite of mine, as it is very easy to use (as are many of the ZeroMQ sockets, once you know what they do), but it is a capable of creating a server that can seamlessly talk to 1000nds of clients, all asynchronously, with very little changes to the code we saw in part 1.

Slight Diversion

When you work with RequestSocket(s), they do something clever for you, they always provide a message that has the following frames:

  • Frame[0] address
  • Frame[1] empty frame
  • Frame[2] the message payload

Even though all we did was send a payload (look at the “Hello World” example in part1)

Likewise when you work with ResponseSocket(s), they also do some of the heavy lifting for us, where they always provide a message that has the following frames:

  • Frame[0] return address
  • Frame[1] empty frame
  • Frame[2] the message payload

Even though all we did was send a payload (look at the “Hello World” example in part1)

By understanding how the standard synchronous request/response socket works, it is now fairly easy to create a fully asynchronous server using the RouterSocket, that knows how to dispatch messages back to the correct client. All we need to do is emulate how the standard ResponseSocket works, where we construct the message frames ourselves. Where we would be looking to create the following frames from the RouterSocket (thus emulating the behaviour of the standard ResponseSocket)

  • Frame[0] return address
  • Frame[1] empty frame
  • Frame[2] the message payload

I think the best way to understand this is via an example. The example works like this:

  1. There are 4 clients,  these are standard synchronous RequestSocket(s)
  2. There is a single asynchronous server, which uses a RouterSocket
  3. If the client sends a message with the prefix “_B” it gets a special message from the server, all other clients get a standard response message

Without further ado, here is the full code for this example

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.SqlServer.Server;
using NetMQ;
using NetMQ.Sockets;

namespace ZeroMqIdentity
{
    public class Program : IDisposable
    {
        private List<RequestSocket> clients = new List<RequestSocket>();

        public void Run()
        {
            using (NetMQContext ctx = NetMQContext.Create())
            {
                using (var server = ctx.CreateRouterSocket())
                {
                    server.Bind("tcp://127.0.0.1:5556");

                    CreateClient(ctx, "A_");
                    CreateClient(ctx, "B_");
                    CreateClient(ctx, "C_");
                    CreateClient(ctx, "D_");

                    while (true)
                    {

                        var clientMessage = server.ReceiveMessage();
                        Console.WriteLine("========================");
                        Console.WriteLine(" INCOMING CLIENT MESSAGE ");
                        Console.WriteLine("========================");
                        for (int i = 0; i < clientMessage.FrameCount; i++)
                        {
                            Console.WriteLine("Frame[{0}] = {1}", i,
                                clientMessage[i].ConvertToString());
                        }

                        var clientAddress = clientMessage[0];
                        var clientOriginalMessage = clientMessage[2].ConvertToString();
                        string response = string.Format("{0} back from server",
                            clientOriginalMessage);

                        // "B_" client is special
                        if (clientOriginalMessage.StartsWith("B_"))
                        {
                            response = string.Format(
                                "special Message for 'B' back from server");
                        }

                        var messageToClient = new NetMQMessage();
                        messageToClient.Append(clientAddress);
                        messageToClient.AppendEmptyFrame();
                        messageToClient.Append(response);
                        server.SendMessage(messageToClient);
                    }
                }
            }

            Console.ReadLine();
        }

        private static void Main(string[] args)
        {
            Program p = new Program();
            p.Run();
        }

        private void CreateClient(NetMQContext ctx, string prefix)
        {
            Task.Run(() =>
            {
                var client = ctx.CreateRequestSocket();
                clients.Add(client);
                client.Connect("tcp://127.0.0.1:5556");
                client.Send(string.Format("{0}Hello", prefix));

                //read client message
                var echoedServerMessage = client.ReceiveString();
                Console.WriteLine(
                    "\r\nClient Prefix is : '{0}', Server Message : '{1}'",
                    prefix, echoedServerMessage);

            });
        }

        public void Dispose()
        {
            foreach (var client in clients)
            {
                client.Dispose();
            }
        }
    }
}

 

I think to full appreciate this example, one needs to examine the output, which should be something like this (it may not be exactly this, as the RouterSocket is FULLY async, so it may deal with RequestSocket(s) in a different order for you:

========================
INCOMING CLIENT MESSAGE
========================
Frame[0] =  ???”
Frame[1] =
Frame[2] = A_Hello
========================
INCOMING CLIENT MESSAGE
========================
Frame[0] =  @??”
Frame[1] =
Frame[2] = D_Hello

Client Prefix is : ‘A_’, Server Message : ‘A_Hello back from server’
========================
INCOMING CLIENT MESSAGE
========================
Frame[0] =  A??”
Frame[1] =
Frame[2] = B_Hello

Client Prefix is : ‘D_’, Server Message : ‘D_Hello back from server’
========================
INCOMING CLIENT MESSAGE
========================
Frame[0] =  B??”
Frame[1] =
Frame[2] = C_Hello

Client Prefix is : ‘B_’, Server Message : ‘special Message for ‘B’ back from ser
ver’

Client Prefix is : ‘C_’, Server Message : ‘C_Hello back from server’

 

SendMore

ZeroMQ works using message frames. Using ZeroMQ you are able to create multipart messages which you may use for a variety of reasons, such as

  • Including address information (which we just saw an example of above actually)
  • Designing a protocol for your end purpose
  • Sending serialized data (for example the 1st message frame could be the type of the item, and the next message frame could be the actual serialized data)

When you work with multipart messages you must send/receive all the parts of the message you want to work with.

I think the best way to try and get to understand multipart message is perhaps via a small test. I have stuck to use a all in one demo, which builds on the original “Hello World” request/response demo. We use NUnit to do Asserts on the data between the client/server.

Here is a small test case, where the following points should be observed

  1. We construct the 1st message part and use the xxxxSocket.SendMore() method, to send the 1st message
  2. We construct the 2nd (and final) message part using the xxxxSocket.Send() method
  3. The Server is able to receive the 1st message part, and also assign a value to determine if there are more parts. Which is done by using an overload of the xxxxSocket.Receive(..) that allows us to get an out value for “more”
  4. We may also use an actual NetMqMessage and append to it, which we can then send using xxxxSocket.SendMessage, where the receiving socket would use xxxxSocket.ReceieveMessage(..) and can examine the actual NetMqMessage frames

Anyway here is the code

using System;
using System.Threading;
using NetMQ;
using NUnit.Framework;

namespace SendMore
{
    [TestFixture]
    public class SendMoreTests
    {
        [Test]
        public void SendMoreTest()
        {
            using (NetMQContext ctx = NetMQContext.Create())
            {
                using (var server = ctx.CreateResponseSocket())
                {
                    server.Bind("tcp://127.0.0.1:5556");

                    using (var client = ctx.CreateRequestSocket())
                    {
                        client.Connect("tcp://127.0.0.1:5556");

                        //client send message
                        client.SendMore("A");
                        client.Send("Hello");

                        //server receive 1st part
                        bool more;
                        string m = server.ReceiveString(out more);
                        Assert.AreEqual("A", m);
                        Assert.IsTrue(more);

                        //server receive 2nd part
                        string m2 = server.ReceiveString(out more);
                        Assert.AreEqual("Hello", m2);
                        Assert.False(more);

                        //server send message, this time use NetMqMessage
                        //which will be sent as frames if the client calls
                        //ReceieveMessage()
                        var m3 = new NetMQMessage();
                        m3.Append("From");
                        m3.Append("Server");
                        server.SendMessage(m3);

                        //client receive
                        var m4 = client.ReceiveMessage();
                        Assert.AreEqual(2, m4.FrameCount);
                        Assert.AreEqual("From", m4[0].ConvertToString());
                        Assert.AreEqual("Server", m4[1].ConvertToString());

                    }
                }
            }
        }
    }
}

 

Here are a couple of REALLY important points from the Zero Guide when working with SendMore and multi part messages, this talks about the ZeroMQ C++ core implementation, not the NetMQ version, but the points are just as valid when using NetMQ.

http://zguide.zeromq.org/page:all#toc36

Some things to know about multipart messages:

  • When you send a multipart message, the first part (and all following parts) are only actually sent on the wire when you send the final part.
  • If you are using  zmq_poll(), when you receive the first part of a message, all the rest has also arrived.
  • You will receive all parts of a message, or none at all.
  • Each part of a message is a separate zmq_msg item.
  • You will receive all parts of a message whether or not you check the more property.
  • On sending, ØMQ queues message frames in memory until the last is received, then sends them all.
  • There is no way to cancel a partially sent message, except by closing the socket.

That’s all I wanted to talk about in the post, so until the next time then.

ZeroMQ #2 : The Socket Types

Last time we introduced ZeroMQ and also talked about the fact that there was a native C# port by way of the NetMQ library, which as I  said we will be using from here on out. I also mentioned that the power of ZeroMQ comes from a bunch of pre-canned sockets, which you can use as building blocks to build massive or small topologies.

From the ZeroMQ guide it states this:

The built-in core ØMQ patterns are:

  • Request-reply, which connects a set of clients to a set of services. This is a remote procedure call and task distribution pattern.
  • Pub-sub, which connects a set of publishers to a set of subscribers. This is a data distribution pattern.
  • Pipeline, which connects nodes in a fan-out/fan-in pattern that can have multiple steps and loops. This is a parallel task distribution and collection pattern.
  • Exclusive pair, which connects two sockets exclusively. This is a pattern for connecting two threads in a process, not to be confused with “normal” pairs of sockets.

There are of course certain well known patterns, that the chaps that wrote Zero have come across, and talk about in the book and the guide (links here again in case you missed them last time)

Book

http://www.amazon.co.uk/ZeroMQ-Messaging-Applications-Pieter-Hintjens/dp/1449334067

Online guide

http://zguide.zeromq.org/page:all

I personally think it is a very clever thing to have done, to give certain sections of topologies an actual pattern name, as it means you can Google around a certain pattern name. For example I may Google “Lazy Pirate Pattern C#”, and I would know that the results would almost certainly be talking about the exact socket arrangement I had in mind. So yeah good idea giving these things names.

Standard ZeroMQ Socket Types

Anyway enough chit chat, lets get to the crux of what I wanted to talk about this time, which is the different socket types within ZeroMQ.

Zero actual has the following socket types:

PUB

This is known as a PublisherSocket in NetMQ, and can be used to publish messages.

SUB

This is known as a SubscriberSocket in NetMQ, and can be used to subscribe to message(s) (you can fill in a subscription topic which indicates which published messages you care about)

XPUB

This is known as a XPublisherSocket in NetMQ and can be used to publish messages. XPUB and XSUB are used where you may have to bridge different networks.

XSUB

This is known as a XSubscriberSocket in NetMQ. and can be used to subscribe to message(s) (you can fill in a subscription topic which indicates which published messages you care about). XPUB and XSUB are used where you may have to bridge different networks.

REQ

This is known as a RequestSocket in NetMQ. Is a synchronous blocking socket, that would initiate a request message.

REP

This is known as a ResponseSocket in NetMQ. Is a synchronous blocking socket, that would provide a response to a message.

ROUTER

This is known as a RouterSocket in NetMQ. Router is typically a broker socket (but not limited to), and provides routing where it would more than likely know how to route messages back to the calling socket, thus its name of “Router”. It is fully asynchronous (non blocking).

The ROUTER socket, unlike other sockets, tracks every connection it has, and tells the caller about these. The way it tells the caller is to stick the connection identity in front of each message received. An identity, sometimes called an address, is just a binary string with no meaning except “this is a unique handle to the connection”. Then, when you send a message via a ROUTER socket, you first send an identity frame.

http://zguide.zeromq.org/page:all

DEALER

This is known as a DealerSocket in NetMQ. Dealer is typically a worker socket, and doesn’t provide any routing (ie it doesn’t know about the calling sockets identity), but it is fully asynchronous (non blocking)

PUSH

This is known as a PushSocket in NetMQ. This would typically be used to push messages at worker, within a pipeline pattern

PULL

This is known as a PullSocket in NetMQ. This would one part of a work within a pipeline pattern, which would pull from a PUSH socket and the n do some work.

PAIR

This is known as a PairSocket in NetMQ.

 

Standard ZeroMQ Socket Pairs

There are pretty strict recommendations about the pairing of the sockets we just discussed. The standard pairs of sockets that you should stick to using are shown below.

Any other combination will produce undocumented and unreliable results, and future versions of ZeroMQ will probably return errors if you try them

PUB and SUB

A standard Pub/Sub arrangement

XPUB and XSUB

A standard Pub/Sub arrangement

REQ and RES

A standard synchronous request/response arrangement

The REQ client must initiate the message flow. A REP server cannot talk to a REQ client that hasn’t first sent it a request. Technically, it’s not even possible, and the API also returns an EFSM error if you try it.

http://zguide.zeromq.org/page:all#toc59

REQ and ROUTER

A standard synchronous request with an asynchronous server responding, where the router will know how to do the routing back the correct request socket

In the same way that we can replace REQ with DEALER ….we can replace REP with ROUTER. This gives us an asynchronous server that can talk to multiple REQ clients at the same time. If we rewrote the “Hello World” server using ROUTER, we’d be able to process any number of “Hello” requests in parallel.

We can use ROUTER in two distinct ways:

  • As a proxy that switches messages between frontend and backend sockets.
  • As an application that reads the message and acts on it.

In the first case, the ROUTER simply reads all frames, including the artificial identity frame, and passes them on blindly. In the second case the ROUTER must know the format of the reply envelope it’s being sent. As the other peer is a REQ socket, the ROUTER gets the identity frame, an empty frame, and then the data frame.

http://zguide.zeromq.org/page:all#toc59

We will see more on what this means in subsequent posts

DEALER and REP

An asynchronous request with a synchronous server responding. When we use a standard REQ (ie not a DEALER for the client) socket, it does one extra thing for us, which is to include an empty frame. So when we switch to using a Dealer for the client, we need to do that part ourselves, by using SendMore, which we will get into within the next post.

If we rewrote the “Hello World” client using DEALER, we’d be able to send off any number of “Hello” requests without waiting for replies.

When we use a DEALER to talk to a REP socket, we must accurately emulate the envelope that the REQ socket would have sent, or the REP socket will discard the message as invalid. So, to send a message, we:

  • Send an empty message frame with the MORE flag set; then
  • Send the message body.

And when we receive a message, we:

  • Receive the first frame and if it’s not empty, discard the whole message;
  • Receive the next frame and pass that to the application.

http://zguide.zeromq.org/page:all#toc59

DEALER and ROUTER

An asynchronous request with an asynchronous server responding, where the router will know how to do the routing back the correct request socket

With DEALER and ROUTER to get the most powerful socket combination, which is DEALER talking to ROUTER. It gives us asynchronous clients talking to asynchronous servers, where both sides have full control over the message formats.

Because both DEALER and ROUTER can work with arbitrary message formats, if you hope to use these safely, you have to become a little bit of a protocol designer. At the very least you must decide whether you wish to emulate the REQ/REP reply envelope. It depends on whether you actually need to send replies or not.

http://zguide.zeromq.org/page:all#toc59

DEALER and DEALER

An asynchronous request with an asynchronous server responding (this should be used if the DEALER is talking to one and only one peer).

With a DEALER/DEALER, your worker can suddenly go full asynchronous, sending any number of replies back. The cost is that you have to manage the reply envelopes yourself, and get them right, or nothing at all will work. We’ll see a worked example later. Let’s just say for now that DEALER to DEALER is one of the trickier patterns to get right, and happily it’s rare that we need it.

http://zguide.zeromq.org/page:all#toc59

ROUTER and ROUTER

An asynchronous request with an asynchronous server responding.

This sounds perfect for N-to-N connections, but it’s the most difficult combination to use. You should avoid it until you are well advanced with ØMQ

http://zguide.zeromq.org/page:all#toc59

PUSH and PULL

Push socket connected to a Pull, which you may see in a divide and conquer type arrangement.

PAIR and PAIR

  • Pair sockets should ONLY talk to another pair, it is a well defined pair, Typically you would use this for connecting two threads in a process

f

ZeroMQ # 1: Hello World

What Is ZeroMQ

ZeroMQ is a C library that contains a bunch of insane sockets, that provide a very very cool abstraction over the typical socket code you would find yourself writing. It provides building blocks by way of a standard set of sockets that have been built with certain scenarios in mind.

The people that make it were instrumental in the advanced message queue protocol (AMQP) being written, and are very big in the messaging world.

There is a completely awesome book that everyone should read, it is by Pieter Hintjens

http://www.amazon.co.uk/ZeroMQ-Messaging-Applications-Pieter-Hintjens/dp/1449334067

There is also a online version of the PDF book that has full code samples, which is known as the guide:

http://zguide.zeromq.org/page:all

 

Why Use ZeroMQ/Messaging At All

If you have ever written any asynchronous code, and have had to deal with shared state, and have had to deal with that, you will know that that brings locks/semaphores etc etc.

Now imagine a world where you don’t care about locks, semaphores etc etc, you just pass a message, and there is no shared state to worry about. Welcome to messaging. That is how you can write systems with a high throughput of messages per second, without the overhead of shared state management.

Zero is at its core a messaging library. It can be setup in a brokerless manner or also used broker, or even peer to peer. It is sockets that make it powerful. They are the fundamental building blocks, which you may use to create large distributed architectures, or very small ones.

I would urge you all to read the book, or check out the guide, they have changed the way I think about certain tasks for sure.

Where Do I Get The ZeroMQ library?

First things first, I mentioned that ZeroMQ is written in C, but has many many language bindings. There is in fact a C# binding that you could use, which is the zmqcli  binding. The thing with that, is that the errors you get are quite confusing at times, as it has a tendency to show the actual C error code.

I wanted (if possible) to use an entirely native port of ZeroMQ, luckily for me there is just such a thing, by way of the NetMQ project. That is the library I will be using throughout all my posts.

You can install NetMQ using the following Nuget package manager command line:

Install-Package NetMQ

So without further ado lets get down to see a very simple example.

What Does The Example Do

The example is dead straight forward, we send a message from the client to the server, and the server sends a message back. This is a (extremely simple) example of the Request/Response pattern, of which we will see a lot more examples. Zero also supports publish/subscribe which we will look at too (though not in as much detail as request/response).

So let’s see some code shall we:

using System;
using NetMQ;namespace HelloWorldDemo
{
class Program
{
private static void Main(string[] args)
{
using (NetMQContext ctx = NetMQContext.Create())
{
using (var server = ctx.CreateResponseSocket())
{
server.Bind(“tcp://127.0.0.1:5556″);

using (var client = ctx.CreateRequestSocket())
{
client.Connect(“tcp://127.0.0.1:5556″);

client.Send(“Hello”);

string fromClientMessage = server.ReceiveString();

Console.WriteLine(“From Client: {0}, fromClientMessage);

server.Send(“Hi Back”);

string fromServerMessage = client.ReceiveString();

Console.WriteLine(“From Server: {0}, fromServerMessage);

Console.ReadLine();
}
}
}
}
}
}

 

Believe it or not that code is enough to have a fully functioning request (client) / response (server) pattern, up and working. Don’t believe me here is some output to prove it

image

Ok, so it does work, that is quite mad. So how does it work?

Well there are a couple of take away points there

  1. We are able to create a request/response pattern, by using specialized sockets that are geared towards working in the request/response scenario.
  2. We can use tcp as the protocol (ZeroMQ also supports others such as inproc)
  3. We did not have to spin up any extra thread on the server to deal with the freshly connected client socket and then continue to accept other client sockets. In fact this code could pretty much talk to 1000nds of clients without much alteration at all (in fact I will show you an example of the using separate processes)
  4. There is this magical NetMQContext. This is mandatory and must be used whenever you use ZeroMQ sockets.

There are a couple of specific things to talk about in request/response, in that those type of sockets are assumed to be 1:1 request/response. So If you call ReceiveString() twice on the clients request socket without the servers response socket sending something you will get an Exception, as can be seen in the screen shot below.

image

 

Running In Separate Threads

This demo is to show you how you could use an internal processes messaging system. You obviously need to use new threads in this example as we need to not block on the receive() method of the sockets

image

Program

This just kicks of a few clients for the server to deal with

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;namespace HelloWorldDemoSeparateThreads
{
public class Program
{
public static void Main(string[] args)
{
Server server = new Server();
server.Run();

foreach (Client client in Enumerable.Range(0, 5).Select(
x => new Client(string.Format(“client {0}, x))))
{
client.Run();
}

Console.ReadLine();
}
}
}

 

Client

using System;
using System.Threading;
using System.Threading.Tasks;
using NetMQ;namespace HelloWorldDemoSeparateThreads
{
sealed class Client
{
private readonly string clientName;

public Client(string clientName)
{
this.clientName = clientName;
}

public void Run()
{
Task.Run(() =>
{
using (NetMQContext ctx = NetMQContext.Create())
{
using (var client = ctx.CreateRequestSocket())
{
client.Connect(“tcp://127.0.0.1:5556″);
while (true)
{
client.Send(string.Format(“Hello from client {0}, clientName));
string fromServerMessage = client.ReceiveString();
Console.WriteLine(“From Server: {0} running on ThreadId : {1},
fromServerMessage, Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(5000);
}
}
}
});

}
}
}

 

Server

using System;
using System.Threading;
using System.Threading.Tasks;
using NetMQ;namespace HelloWorldDemoSeparateThreads
{
sealed class Server
{
public void Run()
{
Task.Run(() =>
{
using (NetMQContext ctx = NetMQContext.Create())
{
using (var server = ctx.CreateResponseSocket())
{
server.Bind(“tcp://127.0.0.1:5556″);

while (true)
{
string fromClientMessage = server.ReceiveString();
Console.WriteLine(“From Client: {0} running on ThreadId : {1},
fromClientMessage, Thread.CurrentThread.ManagedThreadId);
server.Send(“Hi Back”);
}

}
}
});

}
}
}

Running In Separate Processes

Client

using System;
using System.Threading;
using NetMQ;namespace HelloWorldSeparateClient
{
sealed class Client
{
private string clientName;

public static void Main(string[] args)
{
Client c = new Client();
c.clientName = args[0];
c.Run();
}

public void Run()
{

using (NetMQContext ctx = NetMQContext.Create())
{
using (var client = ctx.CreateRequestSocket())
{
client.Connect(“tcp://127.0.0.1:5556″);
while (true)
{
client.Send(string.Format(“Hello from client {0}, clientName));
string fromServerMessage = client.ReceiveString();
Console.WriteLine(“From Server: {0} running on ThreadId : {1},
fromServerMessage, Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(5000);
}
}
}
}
}

}

 

Server

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using NetMQ;namespace HelloWorldSeparateServer
{
sealed class Server
{

public static void Main(string[] args)
{
Server s = new Server();
s.Run();
}

public void Run()
{
using (NetMQContext ctx = NetMQContext.Create())
{
using (var server = ctx.CreateResponseSocket())
{
server.Bind(“tcp://127.0.0.1:5556″);

while (true)
{
string fromClientMessage = server.ReceiveString();
Console.WriteLine(“From Client: {0} running on ThreadId : {1}”,
fromClientMessage, Thread.CurrentThread.ManagedThreadId);
server.Send(“Hi Back”);
}

}
}
}
}
}

 

Here is the code running in separate processes, where it can be seen that we did not have to do a damn thing to change the server code at all we just moved it around to run in a new process rather than a new thread.

image

 

 

Where Is The Code For This Article?

The code for all these posts is hosted in one large solution in github:

https://github.com/sachabarber/ZeroMqDemos