ZeroMQ #4 : Multiple Sockets Polling

Last time we looked at a few things, namely

  • Options
  • Identity
  • SendMore

This time we will talk about how to handle using multiple sockets

Where Is The Code?

As always before we start, it’s only polite to tell you where the code is, and it is as before on GitHub:

https://github.com/sachabarber/ZeroMqDemos

Handling Multiple Sockets, And Why Would You Need To?

So why would you want to handle multiple sockets anyway? Well there are a variety of reasons, such as:

  • You may have multiple sockets within one process that rely on each other , and the timings are such that you need to know that the socket(s) are ready before it/they can receive anything 
  • You may have a Request, as well as a Publisher socket in one process

To be honest there times you may end up with more than one socket per process. And there may be occasions when you only want to use the socket(s) when they are deemed ready.

ZeroMQ actually has a concept of a “Poller” that can be used to determine if a socket is deemed ready to use.

NetMQ has an implementation of the “Poller”, and it can be used to do the following things:

  • Monitor a single socket, for readiness
  • Monitor a IEnumerable<NetMQSocket> for readiness
  • Allow NetMQSocket(s) to be added dynamically and still report on the readiness of the new sockets
  • Allow NetMQSocket(s) to be remove dynamically
  • Raise a event on the socket instance when it is ready

A good way to look into the NetMQ Poller class is via some tests. I am not going to test everything in this post, but if you want more, NetMQ itself comes with some very very good tests for the Poller. Which is in fact where I lifted these test cases from.

Some Examples

As I just stated I am not the author of these tests, I have taken a subset of the NetMQ Poller test suite, that I think may be pertinent to a introductory discussion around the Poller class.

NOTE : This series of posts is meant as a beginners guide, and advanced ZeroMQ users would likely not get too much from this series of posts.

Single Socket Poll Test

This test cases use the kind of familiar (hopefully by now) Request/Response socket arrangement. We will use the Poller to alert us (via the xxxxSocket.ReceiveReady event that the Poller raises for us) that the ResponseSocket is Ready.

Here is the code for this:

[Test]
public void SingleSocketPollTest()
{
    using (NetMQContext contex = NetMQContext.Create())
    {
        using (var rep = contex.CreateResponseSocket())
        {
            rep.Bind("tcp://127.0.0.1:5002");

            using (var req = contex.CreateRequestSocket())
            using (Poller poller = new Poller())
            {
                req.Connect("tcp://127.0.0.1:5002");

                //The ReceiveReady event is raised by the Poller
                rep.ReceiveReady += (s, a) =>
                {
                    bool more;
                    string m = a.Socket.ReceiveString(out more);

                    Assert.False(more);
                    Assert.AreEqual("Hello", m);

                    a.Socket.Send("World");
                };

                poller.AddSocket(rep);

                Task pollerTask = Task.Factory.StartNew(poller.Start);
                req.Send("Hello");

                bool more2;
                string m1 = req.ReceiveString(out more2);

                Assert.IsFalse(more2);
                Assert.AreEqual("World", m1);

                poller.Stop();

                Thread.Sleep(100);
                Assert.IsTrue(pollerTask.IsCompleted);
            }
        }
    }
}

Add Socket During Work Test

This example shows how we can add extra socket(s) to the Poller at runtime, and the Poller will still raise the xxxxSocket.ReceiveReady event for us

[Test]
public void AddSocketDuringWorkTest()
{
    using (NetMQContext contex = NetMQContext.Create())
    {
        // we are using three responses to make sure we actually
        //move the correct socket and other sockets still work
        using (var router = contex.CreateRouterSocket())
        using (var router2 = contex.CreateRouterSocket())
        {
            router.Bind("tcp://127.0.0.1:5002");
            router2.Bind("tcp://127.0.0.1:5003");

            using (var dealer = contex.CreateDealerSocket())
            using (var dealer2 = contex.CreateDealerSocket())
            using (Poller poller = new Poller())
            {
                dealer.Connect("tcp://127.0.0.1:5002");
                dealer2.Connect("tcp://127.0.0.1:5003");

                bool router1arrived = false;
                bool router2arrived = false;

                bool more;

                //The ReceiveReady event is raised by the Poller
                router2.ReceiveReady += (s, a) =>
                {
                    router2.Receive(out more);
                    router2.Receive(out more);
                    router2arrived = true;
                };

                //The ReceiveReady event is raised by the Poller
                router.ReceiveReady += (s, a) =>
                {
                    router1arrived = true;

                    router.Receive(out more);
                    router.Receive(out more);

                    poller.AddSocket(router2);
                };

                poller.AddSocket(router);

                Task task = Task.Factory.StartNew(poller.Start);

                dealer.Send("1");
                Thread.Sleep(300);
                dealer2.Send("2");
                Thread.Sleep(300);

                poller.Stop(true);
                task.Wait();

                Assert.IsTrue(router1arrived);
                Assert.IsTrue(router2arrived);
            }
        }
    }
}

Add Socket After Removing Test

This example builds on the last one where we add a new socket to the Poller after removing another socket from the Poller :

[Test]
public void AddSocketAfterRemovingTest()
{
    using (NetMQContext contex = NetMQContext.Create())
    {
        // we are using three responses to make sure we actually
        //move the correct socket and other sockets still work
        using (var router = contex.CreateRouterSocket())
        using (var router2 = contex.CreateRouterSocket())
        using (var router3 = contex.CreateRouterSocket())
        {
            router.Bind("tcp://127.0.0.1:5002");
            router2.Bind("tcp://127.0.0.1:5003");
            router3.Bind("tcp://127.0.0.1:5004");

            using (var dealer = contex.CreateDealerSocket())
            using (var dealer2 = contex.CreateDealerSocket())
            using (var dealer3 = contex.CreateDealerSocket())
            using (Poller poller = new Poller())
            {
                dealer.Connect("tcp://127.0.0.1:5002");
                dealer2.Connect("tcp://127.0.0.1:5003");
                dealer3.Connect("tcp://127.0.0.1:5004");

                bool router1arrived = false;
                bool router2arrived = false;
                bool router3arrived = false;

                bool more;

                //The ReceiveReady event is raised by the Poller
                router.ReceiveReady += (s, a) =>
                {
                    router1arrived = true;

                    router.Receive(out more);
                    router.Receive(out more);

                    poller.RemoveSocket(router);

                };

                poller.AddSocket(router);

                //The ReceiveReady event is raised by the Poller
                router3.ReceiveReady += (s, a) =>
                {
                    router3.Receive(out more);
                    router3.Receive(out more);
                    router3arrived = true;
                };

                //The ReceiveReady event is raised by the Poller
                router2.ReceiveReady += (s, a) =>
                {
                    router2arrived = true;
                    router2.Receive(out more);
                    router2.Receive(out more);

                    poller.AddSocket(router3);
                };
                poller.AddSocket(router2);

                Task task = Task.Factory.StartNew(poller.Start);

                dealer.Send("1");
                Thread.Sleep(300);
                dealer2.Send("2");
                Thread.Sleep(300);
                dealer3.Send("3");
                Thread.Sleep(300);

                poller.Stop(true);
                task.Wait();

                Assert.IsTrue(router1arrived);
                Assert.IsTrue(router2arrived);
                Assert.IsTrue(router3arrived);
            }
        }
    }
}

Add 2 Sockets After Removing Test

And in this one we add a few sockets to the Poller after removing from the Poller :

[Test]
public void AddTwoSocketAfterRemovingTest()
{
    using (NetMQContext contex = NetMQContext.Create())
    {
        // we are using three responses to make sure we actually
        //move the correct socket and other sockets still work
        using (var router = contex.CreateRouterSocket())
        using (var router2 = contex.CreateRouterSocket())
        using (var router3 = contex.CreateRouterSocket())
        using (var router4 = contex.CreateRouterSocket())
        {
            router.Bind("tcp://127.0.0.1:5002");
            router2.Bind("tcp://127.0.0.1:5003");
            router3.Bind("tcp://127.0.0.1:5004");
            router4.Bind("tcp://127.0.0.1:5005");

            using (var dealer = contex.CreateDealerSocket())
            using (var dealer2 = contex.CreateDealerSocket())
            using (var dealer3 = contex.CreateDealerSocket())
            using (var dealer4 = contex.CreateDealerSocket())
            using (Poller poller = new Poller())
                  
            {
                dealer.Connect("tcp://127.0.0.1:5002");
                dealer2.Connect("tcp://127.0.0.1:5003");
                dealer3.Connect("tcp://127.0.0.1:5004");
                dealer4.Connect("tcp://127.0.0.1:5005");

                int router1arrived = 0;
                int router2arrived = 0;
                bool router3arrived = false;
                bool router4arrived = false;

                bool more;

                //The ReceiveReady event is raised by the Poller
                router.ReceiveReady += (s, a) =>
                {
                    router1arrived++;

                    router.Receive(out more);
                    router.Receive(out more);

                    poller.RemoveSocket(router);

                };

                poller.AddSocket(router);

                //The ReceiveReady event is raised by the Poller
                router3.ReceiveReady += (s, a) =>
                {
                    router3.Receive(out more);
                    router3.Receive(out more);
                    router3arrived = true;
                };

                //The ReceiveReady event is raised by the Poller
                router4.ReceiveReady += (s, a) =>
                {
                    router4.Receive(out more);
                    router4.Receive(out more);
                    router4arrived = true;
                };

                //The ReceiveReady event is raised by the Poller
                router2.ReceiveReady += (s, a) =>
                {
                    router2arrived++;
                    router2.Receive(out more);
                    router2.Receive(out more);

                    if (router2arrived == 1)
                    {
                        poller.AddSocket(router3);

                        poller.AddSocket(router4);
                    }
                };

                poller.AddSocket(router2);

                Task task = Task.Factory.StartNew(poller.Start);

                dealer.Send("1");
                Thread.Sleep(300);
                dealer2.Send("2");
                Thread.Sleep(300);
                dealer3.Send("3");
                dealer4.Send("4");
                dealer2.Send("2");
                dealer.Send("1");
                Thread.Sleep(300);

                poller.Stop(true);
                task.Wait();

                router.Receive(true, out more);

                Assert.IsTrue(more);

                router.Receive(true, out more);

                Assert.IsFalse(more);

                Assert.AreEqual(1, router1arrived);
                Assert.AreEqual(2, router2arrived);
                Assert.IsTrue(router3arrived);
                Assert.IsTrue(router4arrived);
            }
        }
    }
}

Cancel Socket Test

This final example shows 3 RouterSockets connected to 3 DealerSockets respectively (we will talk about DealerSocket(s) in a later post, for now you can think of them as typically being used for asynchronous workers). We then add all the routers to the Poller. Within the 1st RouterSocket.ReceiveReady we remove the RouterSocket from the Poller, so it should not receive any more messages back from its respective DealerSocket. Here is the code for this test :

[Test]
public void CancelSocketTest()
{
    using (NetMQContext contex = NetMQContext.Create())
    {
        // we are using three responses to make sure we actually
        //move the correct socket and other sockets still work
        using (var router = contex.CreateRouterSocket())
        using (var router2 = contex.CreateRouterSocket())
        using (var router3 = contex.CreateRouterSocket())
        {
            router.Bind("tcp://127.0.0.1:5002");
            router2.Bind("tcp://127.0.0.1:5003");
            router3.Bind("tcp://127.0.0.1:5004");

            using (var dealer = contex.CreateDealerSocket())
            using (var dealer2 = contex.CreateDealerSocket())
            using (var dealer3 = contex.CreateDealerSocket())
            using (Poller poller = new Poller())
            {
                dealer.Connect("tcp://127.0.0.1:5002");
                dealer2.Connect("tcp://127.0.0.1:5003");
                dealer3.Connect("tcp://127.0.0.1:5004");

                bool first = true;

                //The ReceiveReady event is raised by the Poller
                router2.ReceiveReady += (s, a) =>
                {
                    bool more;

                    // identity
                    byte[] identity = a.Socket.Receive(out more);

                    // message
                    a.Socket.Receive(out more);

                    a.Socket.SendMore(identity);
                    a.Socket.Send("2");
                };

                poller.AddSocket(router2);

                //The ReceiveReady event is raised by the Poller
                router.ReceiveReady += (s, a) =>
                {
                    if (!first)
                    {
                        Assert.Fail("This should happen because we cancelled the socket");
                    }
                    first = false;

                    bool more;

                    // identity
                    a.Socket.Receive(out more);

                    string m = a.Socket.ReceiveString(out more);

                    Assert.False(more);
                    Assert.AreEqual("Hello", m);

                    // cancelling the socket
                    poller.RemoveSocket(a.Socket);
                };

                poller.AddSocket(router);

                //The ReceiveReady event is raised by the Poller
                router3.ReceiveReady += (s, a) =>
                {
                    bool more;

                    // identity
                    byte[] identity = a.Socket.Receive(out more);

                    // message
                    a.Socket.Receive(out more);

                    a.Socket.SendMore(identity).Send("3");
                };

                poller.AddSocket(router3);

                Task pollerTask = Task.Factory.StartNew(poller.Start);

                dealer.Send("Hello");

                // sending this should not arrive on the poller,
                //therefore response for this will never arrive
                dealer.Send("Hello2");

                Thread.Sleep(100);

                // sending this should not arrive on the poller,
                //therefore response for this will never arrive                        
                dealer.Send("Hello3");

                Thread.Sleep(500);

                bool more2;

                // making sure the socket defined before the one cancelled still works
                dealer2.Send("1");
                string msg = dealer2.ReceiveString(out more2);
                Assert.AreEqual("2", msg);

                // making sure the socket defined after the one cancelled still works
                dealer3.Send("1");
                msg = dealer3.ReceiveString(out more2);
                Assert.AreEqual("3", msg);

                // we have to give this some time if we want to make sure
                //it's really not happening and it not only because of time
                Thread.Sleep(300);

                poller.Stop();

                Thread.Sleep(100);
                Assert.IsTrue(pollerTask.IsCompleted);
            }
        }

    }
}

 

And that is about all I wanted to talk about this time. I hope you can see how you could make use of the Poller in your own socket topologies, and why it is a useful tool.

ZeroMQ #3 : Socket Options/Identity And SendMore

Last time we looked at the different socket types within ZeroMQ, and I also told you their equivalent in NetMQ (which is what I use for these posts).

This time we will look at 3 small areas of ZeroMQ, which are none the less very important areas, and should not be overlooked.

These areas are socket Options/Identity and SendMore.

Where Is The Code?

The code for all these posts is hosted in one large solution in github:

https://github.com/sachabarber/ZeroMqDemos

 

Socket Options

Depending on the type of sockets you are using, or the topology you are attempting to create, you may find that you need to set some ZeroMQ options. In NetMQ this is done using the xxxxSocket.Options property.

Here is a listing of the available properties that you may set on a xxxxSocket. It is hard to say exactly which of these values you may need to set, as that obviously depends entirely on what you are trying to achieve. All I can do is list the options, and make you aware of them. So here they are

  • Affinity
  • BackLog
  • CopyMessages
  • DelayAttachOnConnect
  • Endian
  • GetLastEndpoint
  • IPv4Only
  • Identity
  • Linger
  • MaxMsgSize
  • MulticastHops
  • MulticastRate
  • MulticastRecoveryInterval
  • ReceiveHighWaterMark
  • ReceiveMore
  • ReceiveTimeout
  • ReceiveBuffer
  • ReconnectInterval
  • ReconnectIntervalMax
  • SendHighWaterMark
  • SendTimeout
  • SendBuffer
  • TcpAcceptFilter
  • TcpKeepAlive
  • TcpKeepaliveCnt
  • TcpKeepaliveIdle
  • TcpKeepaliveInterval
  • XPubVerbose

To see exactly what all these options mean you will more than likely need to refer to the actual ZeroMQ documentation, i.e the guide.

http://zguide.zeromq.org/page:all

 

Identity

One of the great things (at least in my opinion) when working with ZeroMQ is that we can still stick with a standard request/response arrangement (just like we had in the 1st posts hello world example http://sachabarbs.wordpress.com/2014/08/19/zeromq-1-introduction/) but we may then choose to switch to having an asynchronous server. This is easily achieved using a RouterSocket for the server. The clients stay as RequestSocket(s).

So this is now an interesting arrangement, we have

  • Synchronous clients, thanks to standard RequestSocket type
  • Asynchronous server, thanks to  new socket called RouterSocket

The RouterSocket is a personal favourite of mine, as it is very easy to use (as are many of the ZeroMQ sockets, once you know what they do), but it is a capable of creating a server that can seamlessly talk to 1000nds of clients, all asynchronously, with very little changes to the code we saw in part 1.

Slight Diversion

When you work with RequestSocket(s), they do something clever for you, they always provide a message that has the following frames:

  • Frame[0] address
  • Frame[1] empty frame
  • Frame[2] the message payload

Even though all we did was send a payload (look at the “Hello World” example in part1)

Likewise when you work with ResponseSocket(s), they also do some of the heavy lifting for us, where they always provide a message that has the following frames:

  • Frame[0] return address
  • Frame[1] empty frame
  • Frame[2] the message payload

Even though all we did was send a payload (look at the “Hello World” example in part1)

By understanding how the standard synchronous request/response socket works, it is now fairly easy to create a fully asynchronous server using the RouterSocket, that knows how to dispatch messages back to the correct client. All we need to do is emulate how the standard ResponseSocket works, where we construct the message frames ourselves. Where we would be looking to create the following frames from the RouterSocket (thus emulating the behaviour of the standard ResponseSocket)

  • Frame[0] return address
  • Frame[1] empty frame
  • Frame[2] the message payload

I think the best way to understand this is via an example. The example works like this:

  1. There are 4 clients,  these are standard synchronous RequestSocket(s)
  2. There is a single asynchronous server, which uses a RouterSocket
  3. If the client sends a message with the prefix “_B” it gets a special message from the server, all other clients get a standard response message

Without further ado, here is the full code for this example

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.SqlServer.Server;
using NetMQ;
using NetMQ.Sockets;

namespace ZeroMqIdentity
{
    public class Program : IDisposable
    {
        private List<RequestSocket> clients = new List<RequestSocket>();

        public void Run()
        {
            using (NetMQContext ctx = NetMQContext.Create())
            {
                using (var server = ctx.CreateRouterSocket())
                {
                    server.Bind("tcp://127.0.0.1:5556");

                    CreateClient(ctx, "A_");
                    CreateClient(ctx, "B_");
                    CreateClient(ctx, "C_");
                    CreateClient(ctx, "D_");

                    while (true)
                    {

                        var clientMessage = server.ReceiveMessage();
                        Console.WriteLine("========================");
                        Console.WriteLine(" INCOMING CLIENT MESSAGE ");
                        Console.WriteLine("========================");
                        for (int i = 0; i < clientMessage.FrameCount; i++)
                        {
                            Console.WriteLine("Frame[{0}] = {1}", i,
                                clientMessage[i].ConvertToString());
                        }

                        var clientAddress = clientMessage[0];
                        var clientOriginalMessage = clientMessage[2].ConvertToString();
                        string response = string.Format("{0} back from server",
                            clientOriginalMessage);

                        // "B_" client is special
                        if (clientOriginalMessage.StartsWith("B_"))
                        {
                            response = string.Format(
                                "special Message for 'B' back from server");
                        }

                        var messageToClient = new NetMQMessage();
                        messageToClient.Append(clientAddress);
                        messageToClient.AppendEmptyFrame();
                        messageToClient.Append(response);
                        server.SendMessage(messageToClient);
                    }
                }
            }

            Console.ReadLine();
        }

        private static void Main(string[] args)
        {
            Program p = new Program();
            p.Run();
        }

        private void CreateClient(NetMQContext ctx, string prefix)
        {
            Task.Run(() =>
            {
                var client = ctx.CreateRequestSocket();
                clients.Add(client);
                client.Connect("tcp://127.0.0.1:5556");
                client.Send(string.Format("{0}Hello", prefix));

                //read client message
                var echoedServerMessage = client.ReceiveString();
                Console.WriteLine(
                    "\r\nClient Prefix is : '{0}', Server Message : '{1}'",
                    prefix, echoedServerMessage);

            });
        }

        public void Dispose()
        {
            foreach (var client in clients)
            {
                client.Dispose();
            }
        }
    }
}

 

I think to full appreciate this example, one needs to examine the output, which should be something like this (it may not be exactly this, as the RouterSocket is FULLY async, so it may deal with RequestSocket(s) in a different order for you:

========================
INCOMING CLIENT MESSAGE
========================
Frame[0] =  ???”
Frame[1] =
Frame[2] = A_Hello
========================
INCOMING CLIENT MESSAGE
========================
Frame[0] =  @??”
Frame[1] =
Frame[2] = D_Hello

Client Prefix is : ‘A_’, Server Message : ‘A_Hello back from server’
========================
INCOMING CLIENT MESSAGE
========================
Frame[0] =  A??”
Frame[1] =
Frame[2] = B_Hello

Client Prefix is : ‘D_’, Server Message : ‘D_Hello back from server’
========================
INCOMING CLIENT MESSAGE
========================
Frame[0] =  B??”
Frame[1] =
Frame[2] = C_Hello

Client Prefix is : ‘B_’, Server Message : ‘special Message for ‘B’ back from ser
ver’

Client Prefix is : ‘C_’, Server Message : ‘C_Hello back from server’

 

SendMore

ZeroMQ works using message frames. Using ZeroMQ you are able to create multipart messages which you may use for a variety of reasons, such as

  • Including address information (which we just saw an example of above actually)
  • Designing a protocol for your end purpose
  • Sending serialized data (for example the 1st message frame could be the type of the item, and the next message frame could be the actual serialized data)

When you work with multipart messages you must send/receive all the parts of the message you want to work with.

I think the best way to try and get to understand multipart message is perhaps via a small test. I have stuck to use a all in one demo, which builds on the original “Hello World” request/response demo. We use NUnit to do Asserts on the data between the client/server.

Here is a small test case, where the following points should be observed

  1. We construct the 1st message part and use the xxxxSocket.SendMore() method, to send the 1st message
  2. We construct the 2nd (and final) message part using the xxxxSocket.Send() method
  3. The Server is able to receive the 1st message part, and also assign a value to determine if there are more parts. Which is done by using an overload of the xxxxSocket.Receive(..) that allows us to get an out value for “more”
  4. We may also use an actual NetMqMessage and append to it, which we can then send using xxxxSocket.SendMessage, where the receiving socket would use xxxxSocket.ReceieveMessage(..) and can examine the actual NetMqMessage frames

Anyway here is the code

using System;
using System.Threading;
using NetMQ;
using NUnit.Framework;

namespace SendMore
{
    [TestFixture]
    public class SendMoreTests
    {
        [Test]
        public void SendMoreTest()
        {
            using (NetMQContext ctx = NetMQContext.Create())
            {
                using (var server = ctx.CreateResponseSocket())
                {
                    server.Bind("tcp://127.0.0.1:5556");

                    using (var client = ctx.CreateRequestSocket())
                    {
                        client.Connect("tcp://127.0.0.1:5556");

                        //client send message
                        client.SendMore("A");
                        client.Send("Hello");

                        //server receive 1st part
                        bool more;
                        string m = server.ReceiveString(out more);
                        Assert.AreEqual("A", m);
                        Assert.IsTrue(more);

                        //server receive 2nd part
                        string m2 = server.ReceiveString(out more);
                        Assert.AreEqual("Hello", m2);
                        Assert.False(more);

                        //server send message, this time use NetMqMessage
                        //which will be sent as frames if the client calls
                        //ReceieveMessage()
                        var m3 = new NetMQMessage();
                        m3.Append("From");
                        m3.Append("Server");
                        server.SendMessage(m3);

                        //client receive
                        var m4 = client.ReceiveMessage();
                        Assert.AreEqual(2, m4.FrameCount);
                        Assert.AreEqual("From", m4[0].ConvertToString());
                        Assert.AreEqual("Server", m4[1].ConvertToString());

                    }
                }
            }
        }
    }
}

 

Here are a couple of REALLY important points from the Zero Guide when working with SendMore and multi part messages, this talks about the ZeroMQ C++ core implementation, not the NetMQ version, but the points are just as valid when using NetMQ.

http://zguide.zeromq.org/page:all#toc36

Some things to know about multipart messages:

  • When you send a multipart message, the first part (and all following parts) are only actually sent on the wire when you send the final part.
  • If you are using  zmq_poll(), when you receive the first part of a message, all the rest has also arrived.
  • You will receive all parts of a message, or none at all.
  • Each part of a message is a separate zmq_msg item.
  • You will receive all parts of a message whether or not you check the more property.
  • On sending, ØMQ queues message frames in memory until the last is received, then sends them all.
  • There is no way to cancel a partially sent message, except by closing the socket.

That’s all I wanted to talk about in the post, so until the next time then.

ZeroMQ #2 : The Socket Types

Last time we introduced ZeroMQ and also talked about the fact that there was a native C# port by way of the NetMQ library, which as I  said we will be using from here on out. I also mentioned that the power of ZeroMQ comes from a bunch of pre-canned sockets, which you can use as building blocks to build massive or small topologies.

From the ZeroMQ guide it states this:

The built-in core ØMQ patterns are:

  • Request-reply, which connects a set of clients to a set of services. This is a remote procedure call and task distribution pattern.
  • Pub-sub, which connects a set of publishers to a set of subscribers. This is a data distribution pattern.
  • Pipeline, which connects nodes in a fan-out/fan-in pattern that can have multiple steps and loops. This is a parallel task distribution and collection pattern.
  • Exclusive pair, which connects two sockets exclusively. This is a pattern for connecting two threads in a process, not to be confused with “normal” pairs of sockets.

There are of course certain well known patterns, that the chaps that wrote Zero have come across, and talk about in the book and the guide (links here again in case you missed them last time)

Book

http://www.amazon.co.uk/ZeroMQ-Messaging-Applications-Pieter-Hintjens/dp/1449334067

Online guide

http://zguide.zeromq.org/page:all

I personally think it is a very clever thing to have done, to give certain sections of topologies an actual pattern name, as it means you can Google around a certain pattern name. For example I may Google “Lazy Pirate Pattern C#”, and I would know that the results would almost certainly be talking about the exact socket arrangement I had in mind. So yeah good idea giving these things names.

Standard ZeroMQ Socket Types

Anyway enough chit chat, lets get to the crux of what I wanted to talk about this time, which is the different socket types within ZeroMQ.

Zero actual has the following socket types:

PUB

This is known as a PublisherSocket in NetMQ, and can be used to publish messages.

SUB

This is known as a SubscriberSocket in NetMQ, and can be used to subscribe to message(s) (you can fill in a subscription topic which indicates which published messages you care about)

XPUB

This is known as a XPublisherSocket in NetMQ and can be used to publish messages. XPUB and XSUB are used where you may have to bridge different networks.

XSUB

This is known as a XSubscriberSocket in NetMQ. and can be used to subscribe to message(s) (you can fill in a subscription topic which indicates which published messages you care about). XPUB and XSUB are used where you may have to bridge different networks.

REQ

This is known as a RequestSocket in NetMQ. Is a synchronous blocking socket, that would initiate a request message.

REP

This is known as a ResponseSocket in NetMQ. Is a synchronous blocking socket, that would provide a response to a message.

ROUTER

This is known as a RouterSocket in NetMQ. Router is typically a broker socket (but not limited to), and provides routing where it would more than likely know how to route messages back to the calling socket, thus its name of “Router”. It is fully asynchronous (non blocking).

The ROUTER socket, unlike other sockets, tracks every connection it has, and tells the caller about these. The way it tells the caller is to stick the connection identity in front of each message received. An identity, sometimes called an address, is just a binary string with no meaning except “this is a unique handle to the connection”. Then, when you send a message via a ROUTER socket, you first send an identity frame.

http://zguide.zeromq.org/page:all

DEALER

This is known as a DealerSocket in NetMQ. Dealer is typically a worker socket, and doesn’t provide any routing (ie it doesn’t know about the calling sockets identity), but it is fully asynchronous (non blocking)

PUSH

This is known as a PushSocket in NetMQ. This would typically be used to push messages at worker, within a pipeline pattern

PULL

This is known as a PullSocket in NetMQ. This would one part of a work within a pipeline pattern, which would pull from a PUSH socket and the n do some work.

PAIR

This is known as a PairSocket in NetMQ.

 

Standard ZeroMQ Socket Pairs

There are pretty strict recommendations about the pairing of the sockets we just discussed. The standard pairs of sockets that you should stick to using are shown below.

Any other combination will produce undocumented and unreliable results, and future versions of ZeroMQ will probably return errors if you try them

PUB and SUB

A standard Pub/Sub arrangement

XPUB and XSUB

A standard Pub/Sub arrangement

REQ and RES

A standard synchronous request/response arrangement

The REQ client must initiate the message flow. A REP server cannot talk to a REQ client that hasn’t first sent it a request. Technically, it’s not even possible, and the API also returns an EFSM error if you try it.

http://zguide.zeromq.org/page:all#toc59

REQ and ROUTER

A standard synchronous request with an asynchronous server responding, where the router will know how to do the routing back the correct request socket

In the same way that we can replace REQ with DEALER ….we can replace REP with ROUTER. This gives us an asynchronous server that can talk to multiple REQ clients at the same time. If we rewrote the “Hello World” server using ROUTER, we’d be able to process any number of “Hello” requests in parallel.

We can use ROUTER in two distinct ways:

  • As a proxy that switches messages between frontend and backend sockets.
  • As an application that reads the message and acts on it.

In the first case, the ROUTER simply reads all frames, including the artificial identity frame, and passes them on blindly. In the second case the ROUTER must know the format of the reply envelope it’s being sent. As the other peer is a REQ socket, the ROUTER gets the identity frame, an empty frame, and then the data frame.

http://zguide.zeromq.org/page:all#toc59

We will see more on what this means in subsequent posts

DEALER and REP

An asynchronous request with a synchronous server responding. When we use a standard REQ (ie not a DEALER for the client) socket, it does one extra thing for us, which is to include an empty frame. So when we switch to using a Dealer for the client, we need to do that part ourselves, by using SendMore, which we will get into within the next post.

If we rewrote the “Hello World” client using DEALER, we’d be able to send off any number of “Hello” requests without waiting for replies.

When we use a DEALER to talk to a REP socket, we must accurately emulate the envelope that the REQ socket would have sent, or the REP socket will discard the message as invalid. So, to send a message, we:

  • Send an empty message frame with the MORE flag set; then
  • Send the message body.

And when we receive a message, we:

  • Receive the first frame and if it’s not empty, discard the whole message;
  • Receive the next frame and pass that to the application.

http://zguide.zeromq.org/page:all#toc59

DEALER and ROUTER

An asynchronous request with an asynchronous server responding, where the router will know how to do the routing back the correct request socket

With DEALER and ROUTER to get the most powerful socket combination, which is DEALER talking to ROUTER. It gives us asynchronous clients talking to asynchronous servers, where both sides have full control over the message formats.

Because both DEALER and ROUTER can work with arbitrary message formats, if you hope to use these safely, you have to become a little bit of a protocol designer. At the very least you must decide whether you wish to emulate the REQ/REP reply envelope. It depends on whether you actually need to send replies or not.

http://zguide.zeromq.org/page:all#toc59

DEALER and DEALER

An asynchronous request with an asynchronous server responding (this should be used if the DEALER is talking to one and only one peer).

With a DEALER/DEALER, your worker can suddenly go full asynchronous, sending any number of replies back. The cost is that you have to manage the reply envelopes yourself, and get them right, or nothing at all will work. We’ll see a worked example later. Let’s just say for now that DEALER to DEALER is one of the trickier patterns to get right, and happily it’s rare that we need it.

http://zguide.zeromq.org/page:all#toc59

ROUTER and ROUTER

An asynchronous request with an asynchronous server responding.

This sounds perfect for N-to-N connections, but it’s the most difficult combination to use. You should avoid it until you are well advanced with ØMQ

http://zguide.zeromq.org/page:all#toc59

PUSH and PULL

Push socket connected to a Pull, which you may see in a divide and conquer type arrangement.

PAIR and PAIR

  • Pair sockets should ONLY talk to another pair, it is a well defined pair, Typically you would use this for connecting two threads in a process

f

ZeroMQ # 1: Hello World

What Is ZeroMQ

ZeroMQ is a C library that contains a bunch of insane sockets, that provide a very very cool abstraction over the typical socket code you would find yourself writing. It provides building blocks by way of a standard set of sockets that have been built with certain scenarios in mind.

The people that make it were instrumental in the advanced message queue protocol (AMQP) being written, and are very big in the messaging world.

There is a completely awesome book that everyone should read, it is by Pieter Hintjens

http://www.amazon.co.uk/ZeroMQ-Messaging-Applications-Pieter-Hintjens/dp/1449334067

There is also a online version of the PDF book that has full code samples, which is known as the guide:

http://zguide.zeromq.org/page:all

 

Why Use ZeroMQ/Messaging At All

If you have ever written any asynchronous code, and have had to deal with shared state, and have had to deal with that, you will know that that brings locks/semaphores etc etc.

Now imagine a world where you don’t care about locks, semaphores etc etc, you just pass a message, and there is no shared state to worry about. Welcome to messaging. That is how you can write systems with a high throughput of messages per second, without the overhead of shared state management.

Zero is at its core a messaging framework. It can be setup in a brokerless manner or also used broker, or even peer to peer. It is sockets that make it powerful. They are the fundamental building blocks, which you may use to create large distributed architectures, or very small ones.

I would urge you all to read the book, or check out the guide, they have changed the way I think about certain tasks for sure.

Where Do I Get The ZeroMQ library?

First things first, I mentioned that ZeroMQ is written in C, but has many many language bindings. There is in fact a C# binding that you could use, which is the zmqcli  binding. The thing with that, is that the errors you get are quite confusing at times, as it has a tendency to show the actual C error code.

I wanted (if possible) to use an entirely native port of ZeroMQ, luckily for me there is just such a thing, by way of the NetMQ project. That is the library I will be using throughout all my posts.

You can install NetMQ using the following Nuget package manager command line:

Install-Package NetMQ

So without further ado lets get down to see a very simple example.

What Does The Example Do

The example is dead straight forward, we send a message from the client to the server, and the server sends a message back. This is a (extremely simple) example of the Request/Response pattern, of which we will see a lot more examples. Zero also supports publish/subscribe which we will look at too (though not in as much detail as request/response).

So let’s see some code shall we:

using System;
using NetMQ;

namespace HelloWorldDemo
{
    class Program
    {
        private static void Main(string[] args)
        {
            using (NetMQContext ctx = NetMQContext.Create())
            {
                using (var server = ctx.CreateResponseSocket())
                {
                    server.Bind("tcp://127.0.0.1:5556");

                    using (var client = ctx.CreateRequestSocket())
                    {
                        client.Connect("tcp://127.0.0.1:5556");

                        client.Send("Hello");

                        string fromClientMessage = server.ReceiveString();

                        Console.WriteLine("From Client: {0}", fromClientMessage);

                        server.Send("Hi Back");

                        string fromServerMessage = client.ReceiveString();

                        Console.WriteLine("From Server: {0}", fromServerMessage);

                        Console.ReadLine();
                    }
                }
            }
        }
    }
}

 

Believe it or not that code is enough to have a fully functioning request (client) / response (server) pattern, up and working. Don’t believe me here is some output to prove it

image

Ok, so it does work, that is quite mad. So how does it work?

Well there are a couple of take away points there

  1. We are able to create a request/response pattern, by using specialized sockets that are geared towards working in the request/response scenario.
  2. We can use tcp as the protocol (ZeroMQ also supports others such as inproc)
  3. We did not have to spin up any extra thread on the server to deal with the freshly connected client socket and then continue to accept other client sockets. In fact this code could pretty much talk to 1000nds of clients without much alteration at all (in fact I will show you an example of the using separate processes)
  4. There is this magical NetMQContext. This is mandatory and must be used whenever you use ZeroMQ sockets.

There are a couple of specific things to talk about in request/response, in that those type of sockets are assumed to be 1:1 request/response. So If you call ReceiveString() twice on the clients request socket without the servers response socket sending something you will get an Exception, as can be seen in the screen shot below.

image

 

Running In Separate Threads

This demo is to show you how you could use an internal processes messaging system. You obviously need to use new threads in this example as we need to not block on the receive() method of the sockets

image

Program

This just kicks of a few clients for the server to deal with

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace HelloWorldDemoSeparateThreads
{
    public class Program
    {
        public static void Main(string[] args)
        {
            Server server = new Server();
            server.Run();

            foreach (Client client in Enumerable.Range(0, 5).Select(
                x => new Client(string.Format("client {0}", x))))
            {
                client.Run();
            }

            Console.ReadLine();
        }
    }
}

 

Client

using System;
using System.Threading;
using System.Threading.Tasks;
using NetMQ;

namespace HelloWorldDemoSeparateThreads
{
    sealed class Client
    {
        private readonly string clientName;

        public Client(string clientName)
        {
            this.clientName = clientName;
        }

        public void Run()
        {
            Task.Run(() =>
            {
                using (NetMQContext ctx = NetMQContext.Create())
                {
                    using (var client = ctx.CreateRequestSocket())
                    {
                        client.Connect("tcp://127.0.0.1:5556");
                        while (true)
                        {
                            client.Send(string.Format("Hello from client {0}", clientName));
                            string fromServerMessage = client.ReceiveString();
                            Console.WriteLine("From Server: {0} running on ThreadId : {1}",
                                fromServerMessage, Thread.CurrentThread.ManagedThreadId);
                            Thread.Sleep(5000);
                        }
                    }
                }
            });

        }
    }
}

 

Server

using System;
using System.Threading;
using System.Threading.Tasks;
using NetMQ;

namespace HelloWorldDemoSeparateThreads
{
    sealed class Server
    {
        public void Run()
        {
            Task.Run(() =>
            {
                using (NetMQContext ctx = NetMQContext.Create())
                {
                    using (var server = ctx.CreateResponseSocket())
                    {
                        server.Bind("tcp://127.0.0.1:5556");

                        while (true)
                        {
                            string fromClientMessage = server.ReceiveString();
                            Console.WriteLine("From Client: {0} running on ThreadId : {1}",
                                fromClientMessage, Thread.CurrentThread.ManagedThreadId);
                            server.Send("Hi Back");
                        }

                    }
                }
            });

        }
    }
}

Running In Separate Processes

Client

using System;
using System.Threading;
using NetMQ;

namespace HelloWorldSeparateClient
{
    sealed class Client
    {
        private string clientName;

        public static void Main(string[] args)
        {
            Client c = new Client();
            c.clientName = args[0];
            c.Run();
        }

        public void Run()
        {
            
            using (NetMQContext ctx = NetMQContext.Create())
            {
                using (var client = ctx.CreateRequestSocket())
                {
                    client.Connect("tcp://127.0.0.1:5556");
                    while (true)
                    {
                        client.Send(string.Format("Hello from client {0}", clientName));
                        string fromServerMessage = client.ReceiveString();
                        Console.WriteLine("From Server: {0} running on ThreadId : {1}",
                            fromServerMessage, Thread.CurrentThread.ManagedThreadId);
                        Thread.Sleep(5000);
                    }
                }
            }
            

        }
    }

}

 

Server

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using NetMQ;

namespace HelloWorldSeparateServer
{
    sealed class Server
    {

        public static void Main(string[] args)
        {
            Server s = new Server();
            s.Run();
        }

        public void Run()
        {
            using (NetMQContext ctx = NetMQContext.Create())
            {
                using (var server = ctx.CreateResponseSocket())
                {
                    server.Bind("tcp://127.0.0.1:5556");

                    while (true)
                    {
                        string fromClientMessage = server.ReceiveString();
                        Console.WriteLine("From Client: {0} running on ThreadId : {1}",
                            fromClientMessage, Thread.CurrentThread.ManagedThreadId);
                        server.Send("Hi Back");
                    }

                }
             }
        }
    }
}

 

Here is the code running in separate processes, where it can be seen that we did not have to do a damn thing to change the server code at all we just moved it around to run in a new process rather than a new thread.

image

 

 

Where Is The Code For This Article?

The code for all these posts is hosted in one large solution in github:

https://github.com/sachabarber/ZeroMqDemos

Sketcher 1 of 3

It has been quite a while since I wrote my last article, I have been busy writing a long series of blob posts on “F# For Beginners“, and doing a lot of reading (which I hope to have more blog posts/articles about very soon).

Anyway we digress, so what is this article about. Well it is a small Angular.js Single Page App, demo app that works with ASP MVC / Web Api / SignalR / Azure storage that I have written that is something like twitter but for images.

The basic idea is that you can:

  • Create sketches (a bit like paint, but no where near as advanced) which are stored in Azure blob storage
  • Choose which friends you want to receive information from (like when they post a new sketch)
  • Comment on other peoples sketches

That is it really, but in essence that is all that Twitter is really, you post some text, people follow you, and they may comment on something you posted.

I just like quite visual things, so decided to do it with images instead.

This is NOT a new idea it has been done a million times before (for example Facebook walls / Reddit etc etc), that is not the point of this article.

The point of this article is that it was something fun I could attempt to write that I could share with others. I think there is a fair bit you could get out of it, if you looked into the code. For example the demo code makes use of the following stuff:

  • Azure blob storage
  • Azure table storage
  • Angular.js Single Page app (SPA)
  • SignalR with Angular.js
  • Using Angular.js with ASP MVC
  • Responsive design Bootstrap web site
  • There is enough meat in it to make it quite a neat demo application

 

If this sounds like it could be of interest, you can read the full text for part 1 right here:

http://www.codeproject.com/Articles/806514/Sketcher-One-Of-n