RX

RX Over the wire

 

Introduction

Now you know I’m a RX fan boy, I think it’s the bees knees in fact. Of late I have also gotten into Akka, and Akka Streams, which is one implementation of the Reactive Streams API.

I also have a colleague who recently attend the ReactConf and came back raving about all the good work that NetFlix were doing using this uber duper specific socket that they have developed to allow RX type operators over the wire using a socket.

NetFlix call their implementation ReactiveSocket : http://reactivesocket.io/ 

Which offers these things

  • request/response (stream of 1)
  • request/stream (finite stream of many)
  • fire-and-forget (no response)
  • event subscription (infinite stream of many)
  • channel (bi-directional streams)

Mmmm, sounds pretty cool. Thing is I was sure I had seen this before, and a long time ago too (well a long time in software years).

IQbservable – The Dual of IQueryable

Back in 2010 Bart De Smet of the RX team posted a couple of intriguing resources around this VERY badly named interface.

Most informative one being this video:

https://channel9.msdn.com/shows/Going+Deep/Bart-De-Smet-Observations-on-IQbservable-The-Dual-of-IQueryable/

For those that can not be bothered to watch the video here are some of the highlights

IQbservable allows the following

  • Combines LINQ Queryable and RX Observable functionality
  • Queryable – allows you to create a query client side using LINQ, and pass that to a datasource (server, database, web service etc)
  • Observable – instead of blocking until the data comes back, will just notify you know when it gets the data

So those are the key take away points. But how about a nice diagram or 2 to really set the scene

I think these are the best 2 diagrams (at least for my money)

image

image

So that is what IQbservable is all about. So what is the rest of this post about then?

Well it just so happens that Dave Sexton one of the Reactive Extension Extensions guys (meant to be slightly tongue in cheek) has written an extremely useful and fairly lightweight library that does much of what is described above.

Dave calls it QActive. He has done a great job of it, and has written serialized expression trees and parsers, which allow us to create client client queries in LINQ which are sent across the wire.

In the rest of this post I will be showcasing a very simple demo based on QActive, and I’ll point out some more links that Dave Sexton provides, which are invaluable reading

 Dave has 2 of his own posts covering more than I do here, which you can go to here:

 

A Small Demo

So it may not come as a surprise to know that we need a server side and a client side. We will look at both of these for a simple example, and in the download at my github there is also an forms based server that allows you to push items to the client on demand

What Is Not Inlcuded

There is no form of fault tolerance, if you want that you could do worse than to read my SignalR + RX code, which shows you how to make a resilient connection using RX

https://www.codeproject.com/Articles/851437/SignalR-plus-RX-Streaming-Data-Demo-App-of:

We will now proceed to look at a simple server/client. This is the most simplistic of examples that has a server that runs on a timer, and the client provides a LINQ where (filter) to this that will be applied to the server side stream ON THE SERVER.

Simple Server

This is all we need for a simple server

using System;
using System.Net;
using System.Reactive.Linq;
using Qactive;

namespace Server
{
    class Program
    {
        static void Main(string[] args)
        {
            IObservable<long> source = Observable.Interval(TimeSpan.FromSeconds(1));
            var service = source.ServeQbservableTcp(
                new IPEndPoint(IPAddress.Loopback, 3205),
                new QbservableServiceOptions()
                {
                    SendServerErrorsToClients = true,
                    EnableDuplex = true,
                    AllowExpressionsUnrestricted = true
                }
            );
            using (service.Subscribe(
              client => Console.WriteLine(
                  "Client shutdown."),
              ex => Console.WriteLine(
                  "Fatal error: {0}", ex.Message),
              () => Console.WriteLine(
                  "This will never be printed because a service host never completes.")))
            {
                Console.ReadKey();
            }
        }
    }
}

Take away points here are:

  • We use a TCIP IPEndpoint to bind the server too
  • We can subscribe to the IObservable to see what is happening with the connected client

On Demand Server Notification

If you want to see a server that allows you to send notifications to the clients on demand have a look at the FormsServer in my GitHub repo.

 

Simple Client

This is all we need for a simple client

using System;
using System.Net;
using System.Reactive;
using System.Reactive.Linq;
using Qactive;

namespace Client
{
    class Program
    {
        static void Main(string[] args)
        {
            var client = new TcpQbservableClient<long>(new IPEndPoint(IPAddress.Loopback, 3205));

            //thie expression tree filtering will happen server side
            //THAT IS AWESOME
            IQbservable<string> query =
              from value in client.Query()
              where value <= 5 || value >= 8
              select string.Format("The incoming value has been doubled to {0}", value * 2);

            using (query.Subscribe(
              value => Console.WriteLine("Client observed: " + value),
              ex => Console.WriteLine("Error: {0}", ex.Message),
              () => Console.WriteLine("Completed")))
            {
                Console.ReadKey();
            }
        }
    }
}

 

Take away points here are:

  • The client is also using a TCIP IPEndpoint
  • The client IS ABLE to use LINQ expressions which WILL be serialized and sent to the server where they will be applied

Here is the output when the simple server is run, and 2 clients are started one after another.

image

 

Why Is This Cool?

This is very cool (uber cool in fact), we have just created a push based notification system that supports server side filtering (thanks to LINQ) in about 20 lines of code.

If you can’t see what is cool about that, well I can’t help you.

There may be some amongst you that go well any messaging framework that has a server and a client would/could do that in the same amount of code, what and the server side push down delegates (thanks to serializable expression trees)….mmmmm Don’t think so.

Only thing I can think of that even comes close is OData, but that requires a fair bit of infrastructure/baggage to make it work.

 

 

Where Can I Get The Code?

As usual I have posted the code  to my GitHub account :

https://github.com/sachabarber/RxOverTheWire

9 thoughts on “RX Over the wire

  1. another AMAZING RX article. I followed the development of Rx from day one and watched all the videos and I am always impressed with what Rx can do. Eric Meijer and his team really deserve an award for this elegant library.

    1. Bloody right they do, rx is one of favourite libs ever

      Have a look at my Akka streams article to compare that’s also tres cool

  2. Thank you for your post. I’m also a big fan of Rx and also had the privilege of talking to Bart De Smet about it at TechED SA a few years back. Do you maybe know of a project that utilises Rx over a queue technology like RabbitMQ or MQTT?

  3. How does this compare to 1. a message broker with topics (effectively server side filtering)
    2. Akka actors based messaging

    Do they solve the same problems?
    Do you think this is a scale-ble model?

    1. It’s sending predicates over the wire, like a WHERE clause and LINQ selects so very different to topics

      Akka streams are similar but you would have to use Akka streams over Web request

      I blogged about too recently check my Akka series

    1. Have not used that part of F#, but from what I see they are not equivalent at all. This provides predicates over the wire with FULL LINQ support from source data

  4. Thanks a million!

    I wonder
    1-Can it be an alternative for Robbitmq (as a messaging infrastructure for microservices)
    2-Can I query the server from other languages as well !?

Leave a comment