C#, CodeProject, Distributed Systems

ZeroMQ #4 : Multiple Sockets Polling

Last time we looked at a few things, namely

  • Options
  • Identity
  • SendMore

This time we will talk about how to handle using multiple sockets

Where Is The Code?

As always before we start, it’s only polite to tell you where the code is, and it is as before on GitHub:

https://github.com/sachabarber/ZeroMqDemos

Handling Multiple Sockets, And Why Would You Need To?

So why would you want to handle multiple sockets anyway? Well there are a variety of reasons, such as:

  • You may have multiple sockets within one process that rely on each other , and the timings are such that you need to know that the socket(s) are ready before it/they can receive anything 
  • You may have a Request, as well as a Publisher socket in one process

To be honest there times you may end up with more than one socket per process. And there may be occasions when you only want to use the socket(s) when they are deemed ready.

ZeroMQ actually has a concept of a “Poller” that can be used to determine if a socket is deemed ready to use.

NetMQ has an implementation of the “Poller”, and it can be used to do the following things:

  • Monitor a single socket, for readiness
  • Monitor a IEnumerable<NetMQSocket> for readiness
  • Allow NetMQSocket(s) to be added dynamically and still report on the readiness of the new sockets
  • Allow NetMQSocket(s) to be remove dynamically
  • Raise a event on the socket instance when it is ready

A good way to look into the NetMQ Poller class is via some tests. I am not going to test everything in this post, but if you want more, NetMQ itself comes with some very very good tests for the Poller. Which is in fact where I lifted these test cases from.

Some Examples

As I just stated I am not the author of these tests, I have taken a subset of the NetMQ Poller test suite, that I think may be pertinent to a introductory discussion around the Poller class.

NOTE : This series of posts is meant as a beginners guide, and advanced ZeroMQ users would likely not get too much from this series of posts.

Single Socket Poll Test

This test cases use the kind of familiar (hopefully by now) Request/Response socket arrangement. We will use the Poller to alert us (via the xxxxSocket.ReceiveReady event that the Poller raises for us) that the ResponseSocket is Ready.

Here is the code for this:

[Test]
public void SingleSocketPollTest()
{
    using (NetMQContext contex = NetMQContext.Create())
    {
        using (var rep = contex.CreateResponseSocket())
        {
            rep.Bind("tcp://127.0.0.1:5002");

            using (var req = contex.CreateRequestSocket())
            using (Poller poller = new Poller())
            {
                req.Connect("tcp://127.0.0.1:5002");

                //The ReceiveReady event is raised by the Poller
                rep.ReceiveReady += (s, a) =>
                {
                    bool more;
                    string m = a.Socket.ReceiveString(out more);

                    Assert.False(more);
                    Assert.AreEqual("Hello", m);

                    a.Socket.Send("World");
                };

                poller.AddSocket(rep);

                Task pollerTask = Task.Factory.StartNew(poller.Start);
                req.Send("Hello");

                bool more2;
                string m1 = req.ReceiveString(out more2);

                Assert.IsFalse(more2);
                Assert.AreEqual("World", m1);

                poller.Stop();

                Thread.Sleep(100);
                Assert.IsTrue(pollerTask.IsCompleted);
            }
        }
    }
}

Add Socket During Work Test

This example shows how we can add extra socket(s) to the Poller at runtime, and the Poller will still raise the xxxxSocket.ReceiveReady event for us

[Test]
public void AddSocketDuringWorkTest()
{
    using (NetMQContext contex = NetMQContext.Create())
    {
        // we are using three responses to make sure we actually
        //move the correct socket and other sockets still work
        using (var router = contex.CreateRouterSocket())
        using (var router2 = contex.CreateRouterSocket())
        {
            router.Bind("tcp://127.0.0.1:5002");
            router2.Bind("tcp://127.0.0.1:5003");

            using (var dealer = contex.CreateDealerSocket())
            using (var dealer2 = contex.CreateDealerSocket())
            using (Poller poller = new Poller())
            {
                dealer.Connect("tcp://127.0.0.1:5002");
                dealer2.Connect("tcp://127.0.0.1:5003");

                bool router1arrived = false;
                bool router2arrived = false;

                bool more;

                //The ReceiveReady event is raised by the Poller
                router2.ReceiveReady += (s, a) =>
                {
                    router2.Receive(out more);
                    router2.Receive(out more);
                    router2arrived = true;
                };

                //The ReceiveReady event is raised by the Poller
                router.ReceiveReady += (s, a) =>
                {
                    router1arrived = true;

                    router.Receive(out more);
                    router.Receive(out more);

                    poller.AddSocket(router2);
                };

                poller.AddSocket(router);

                Task task = Task.Factory.StartNew(poller.Start);

                dealer.Send("1");
                Thread.Sleep(300);
                dealer2.Send("2");
                Thread.Sleep(300);

                poller.Stop(true);
                task.Wait();

                Assert.IsTrue(router1arrived);
                Assert.IsTrue(router2arrived);
            }
        }
    }
}

Add Socket After Removing Test

This example builds on the last one where we add a new socket to the Poller after removing another socket from the Poller :

[Test]
public void AddSocketAfterRemovingTest()
{
    using (NetMQContext contex = NetMQContext.Create())
    {
        // we are using three responses to make sure we actually
        //move the correct socket and other sockets still work
        using (var router = contex.CreateRouterSocket())
        using (var router2 = contex.CreateRouterSocket())
        using (var router3 = contex.CreateRouterSocket())
        {
            router.Bind("tcp://127.0.0.1:5002");
            router2.Bind("tcp://127.0.0.1:5003");
            router3.Bind("tcp://127.0.0.1:5004");

            using (var dealer = contex.CreateDealerSocket())
            using (var dealer2 = contex.CreateDealerSocket())
            using (var dealer3 = contex.CreateDealerSocket())
            using (Poller poller = new Poller())
            {
                dealer.Connect("tcp://127.0.0.1:5002");
                dealer2.Connect("tcp://127.0.0.1:5003");
                dealer3.Connect("tcp://127.0.0.1:5004");

                bool router1arrived = false;
                bool router2arrived = false;
                bool router3arrived = false;

                bool more;

                //The ReceiveReady event is raised by the Poller
                router.ReceiveReady += (s, a) =>
                {
                    router1arrived = true;

                    router.Receive(out more);
                    router.Receive(out more);

                    poller.RemoveSocket(router);

                };

                poller.AddSocket(router);

                //The ReceiveReady event is raised by the Poller
                router3.ReceiveReady += (s, a) =>
                {
                    router3.Receive(out more);
                    router3.Receive(out more);
                    router3arrived = true;
                };

                //The ReceiveReady event is raised by the Poller
                router2.ReceiveReady += (s, a) =>
                {
                    router2arrived = true;
                    router2.Receive(out more);
                    router2.Receive(out more);

                    poller.AddSocket(router3);
                };
                poller.AddSocket(router2);

                Task task = Task.Factory.StartNew(poller.Start);

                dealer.Send("1");
                Thread.Sleep(300);
                dealer2.Send("2");
                Thread.Sleep(300);
                dealer3.Send("3");
                Thread.Sleep(300);

                poller.Stop(true);
                task.Wait();

                Assert.IsTrue(router1arrived);
                Assert.IsTrue(router2arrived);
                Assert.IsTrue(router3arrived);
            }
        }
    }
}

Add 2 Sockets After Removing Test

And in this one we add a few sockets to the Poller after removing from the Poller :

[Test]
public void AddTwoSocketAfterRemovingTest()
{
    using (NetMQContext contex = NetMQContext.Create())
    {
        // we are using three responses to make sure we actually
        //move the correct socket and other sockets still work
        using (var router = contex.CreateRouterSocket())
        using (var router2 = contex.CreateRouterSocket())
        using (var router3 = contex.CreateRouterSocket())
        using (var router4 = contex.CreateRouterSocket())
        {
            router.Bind("tcp://127.0.0.1:5002");
            router2.Bind("tcp://127.0.0.1:5003");
            router3.Bind("tcp://127.0.0.1:5004");
            router4.Bind("tcp://127.0.0.1:5005");

            using (var dealer = contex.CreateDealerSocket())
            using (var dealer2 = contex.CreateDealerSocket())
            using (var dealer3 = contex.CreateDealerSocket())
            using (var dealer4 = contex.CreateDealerSocket())
            using (Poller poller = new Poller())
                  
            {
                dealer.Connect("tcp://127.0.0.1:5002");
                dealer2.Connect("tcp://127.0.0.1:5003");
                dealer3.Connect("tcp://127.0.0.1:5004");
                dealer4.Connect("tcp://127.0.0.1:5005");

                int router1arrived = 0;
                int router2arrived = 0;
                bool router3arrived = false;
                bool router4arrived = false;

                bool more;

                //The ReceiveReady event is raised by the Poller
                router.ReceiveReady += (s, a) =>
                {
                    router1arrived++;

                    router.Receive(out more);
                    router.Receive(out more);

                    poller.RemoveSocket(router);

                };

                poller.AddSocket(router);

                //The ReceiveReady event is raised by the Poller
                router3.ReceiveReady += (s, a) =>
                {
                    router3.Receive(out more);
                    router3.Receive(out more);
                    router3arrived = true;
                };

                //The ReceiveReady event is raised by the Poller
                router4.ReceiveReady += (s, a) =>
                {
                    router4.Receive(out more);
                    router4.Receive(out more);
                    router4arrived = true;
                };

                //The ReceiveReady event is raised by the Poller
                router2.ReceiveReady += (s, a) =>
                {
                    router2arrived++;
                    router2.Receive(out more);
                    router2.Receive(out more);

                    if (router2arrived == 1)
                    {
                        poller.AddSocket(router3);

                        poller.AddSocket(router4);
                    }
                };

                poller.AddSocket(router2);

                Task task = Task.Factory.StartNew(poller.Start);

                dealer.Send("1");
                Thread.Sleep(300);
                dealer2.Send("2");
                Thread.Sleep(300);
                dealer3.Send("3");
                dealer4.Send("4");
                dealer2.Send("2");
                dealer.Send("1");
                Thread.Sleep(300);

                poller.Stop(true);
                task.Wait();

                router.Receive(true, out more);

                Assert.IsTrue(more);

                router.Receive(true, out more);

                Assert.IsFalse(more);

                Assert.AreEqual(1, router1arrived);
                Assert.AreEqual(2, router2arrived);
                Assert.IsTrue(router3arrived);
                Assert.IsTrue(router4arrived);
            }
        }
    }
}

Cancel Socket Test

This final example shows 3 RouterSockets connected to 3 DealerSockets respectively (we will talk about DealerSocket(s) in a later post, for now you can think of them as typically being used for asynchronous workers). We then add all the routers to the Poller. Within the 1st RouterSocket.ReceiveReady we remove the RouterSocket from the Poller, so it should not receive any more messages back from its respective DealerSocket. Here is the code for this test :

[Test]
public void CancelSocketTest()
{
    using (NetMQContext contex = NetMQContext.Create())
    {
        // we are using three responses to make sure we actually
        //move the correct socket and other sockets still work
        using (var router = contex.CreateRouterSocket())
        using (var router2 = contex.CreateRouterSocket())
        using (var router3 = contex.CreateRouterSocket())
        {
            router.Bind("tcp://127.0.0.1:5002");
            router2.Bind("tcp://127.0.0.1:5003");
            router3.Bind("tcp://127.0.0.1:5004");

            using (var dealer = contex.CreateDealerSocket())
            using (var dealer2 = contex.CreateDealerSocket())
            using (var dealer3 = contex.CreateDealerSocket())
            using (Poller poller = new Poller())
            {
                dealer.Connect("tcp://127.0.0.1:5002");
                dealer2.Connect("tcp://127.0.0.1:5003");
                dealer3.Connect("tcp://127.0.0.1:5004");

                bool first = true;

                //The ReceiveReady event is raised by the Poller
                router2.ReceiveReady += (s, a) =>
                {
                    bool more;

                    // identity
                    byte[] identity = a.Socket.Receive(out more);

                    // message
                    a.Socket.Receive(out more);

                    a.Socket.SendMore(identity);
                    a.Socket.Send("2");
                };

                poller.AddSocket(router2);

                //The ReceiveReady event is raised by the Poller
                router.ReceiveReady += (s, a) =>
                {
                    if (!first)
                    {
                        Assert.Fail("This should happen because we cancelled the socket");
                    }
                    first = false;

                    bool more;

                    // identity
                    a.Socket.Receive(out more);

                    string m = a.Socket.ReceiveString(out more);

                    Assert.False(more);
                    Assert.AreEqual("Hello", m);

                    // cancelling the socket
                    poller.RemoveSocket(a.Socket);
                };

                poller.AddSocket(router);

                //The ReceiveReady event is raised by the Poller
                router3.ReceiveReady += (s, a) =>
                {
                    bool more;

                    // identity
                    byte[] identity = a.Socket.Receive(out more);

                    // message
                    a.Socket.Receive(out more);

                    a.Socket.SendMore(identity).Send("3");
                };

                poller.AddSocket(router3);

                Task pollerTask = Task.Factory.StartNew(poller.Start);

                dealer.Send("Hello");

                // sending this should not arrive on the poller,
                //therefore response for this will never arrive
                dealer.Send("Hello2");

                Thread.Sleep(100);

                // sending this should not arrive on the poller,
                //therefore response for this will never arrive                        
                dealer.Send("Hello3");

                Thread.Sleep(500);

                bool more2;

                // making sure the socket defined before the one cancelled still works
                dealer2.Send("1");
                string msg = dealer2.ReceiveString(out more2);
                Assert.AreEqual("2", msg);

                // making sure the socket defined after the one cancelled still works
                dealer3.Send("1");
                msg = dealer3.ReceiveString(out more2);
                Assert.AreEqual("3", msg);

                // we have to give this some time if we want to make sure
                //it's really not happening and it not only because of time
                Thread.Sleep(300);

                poller.Stop();

                Thread.Sleep(100);
                Assert.IsTrue(pollerTask.IsCompleted);
            }
        }

    }
}

 

And that is about all I wanted to talk about this time. I hope you can see how you could make use of the Poller in your own socket topologies, and why it is a useful tool.

Leave a comment