Category Archives: RX

RX Over the wire

 

Introduction

Now you know I’m a RX fan boy, I think it’s the bees knees in fact. Of late I have also gotten into Akka, and Akka Streams, which is one implementation of the Reactive Streams API.

I also have a colleague who recently attend the ReactConf and came back raving about all the good work that NetFlix were doing using this uber duper specific socket that they have developed to allow RX type operators over the wire using a socket.

NetFlix call their implementation ReactiveSocket : http://reactivesocket.io/ 

Which offers these things

  • request/response (stream of 1)
  • request/stream (finite stream of many)
  • fire-and-forget (no response)
  • event subscription (infinite stream of many)
  • channel (bi-directional streams)

Mmmm, sounds pretty cool. Thing is I was sure I had seen this before, and a long time ago too (well a long time in software years).

IQbservable – The Dual of IQueryable

Back in 2010 Bart De Smet of the RX team posted a couple of intriguing resources around this VERY badly named interface.

Most informative one being this video:

https://channel9.msdn.com/shows/Going+Deep/Bart-De-Smet-Observations-on-IQbservable-The-Dual-of-IQueryable/

For those that can not be bothered to watch the video here are some of the highlights

IQbservable allows the following

  • Combines LINQ Queryable and RX Observable functionality
  • Queryable – allows you to create a query client side using LINQ, and pass that to a datasource (server, database, web service etc)
  • Observable – instead of blocking until the data comes back, will just notify you know when it gets the data

So those are the key take away points. But how about a nice diagram or 2 to really set the scene

I think these are the best 2 diagrams (at least for my money)

image

image

So that is what IQbservable is all about. So what is the rest of this post about then?

Well it just so happens that Dave Sexton one of the Reactive Extension Extensions guys (meant to be slightly tongue in cheek) has written an extremely useful and fairly lightweight library that does much of what is described above.

Dave calls it QActive. He has done a great job of it, and has written serialized expression trees and parsers, which allow us to create client client queries in LINQ which are sent across the wire.

In the rest of this post I will be showcasing a very simple demo based on QActive, and I’ll point out some more links that Dave Sexton provides, which are invaluable reading

 Dave has 2 of his own posts covering more than I do here, which you can go to here:

 

A Small Demo

So it may not come as a surprise to know that we need a server side and a client side. We will look at both of these for a simple example, and in the download at my github there is also an forms based server that allows you to push items to the client on demand

What Is Not Inlcuded

There is no form of fault tolerance, if you want that you could do worse than to read my SignalR + RX code, which shows you how to make a resilient connection using RX

https://www.codeproject.com/Articles/851437/SignalR-plus-RX-Streaming-Data-Demo-App-of:

We will now proceed to look at a simple server/client. This is the most simplistic of examples that has a server that runs on a timer, and the client provides a LINQ where (filter) to this that will be applied to the server side stream ON THE SERVER.

Simple Server

This is all we need for a simple server

using System;
using System.Net;
using System.Reactive.Linq;
using Qactive;

namespace Server
{
    class Program
    {
        static void Main(string[] args)
        {
            IObservable<long> source = Observable.Interval(TimeSpan.FromSeconds(1));
            var service = source.ServeQbservableTcp(
                new IPEndPoint(IPAddress.Loopback, 3205),
                new QbservableServiceOptions()
                {
                    SendServerErrorsToClients = true,
                    EnableDuplex = true,
                    AllowExpressionsUnrestricted = true
                }
            );
            using (service.Subscribe(
              client => Console.WriteLine(
                  "Client shutdown."),
              ex => Console.WriteLine(
                  "Fatal error: {0}", ex.Message),
              () => Console.WriteLine(
                  "This will never be printed because a service host never completes.")))
            {
                Console.ReadKey();
            }
        }
    }
}

Take away points here are:

  • We use a TCIP IPEndpoint to bind the server too
  • We can subscribe to the IObservable to see what is happening with the connected client

On Demand Server Notification

If you want to see a server that allows you to send notifications to the clients on demand have a look at the FormsServer in my GitHub repo.

 

Simple Client

This is all we need for a simple client

using System;
using System.Net;
using System.Reactive;
using System.Reactive.Linq;
using Qactive;

namespace Client
{
    class Program
    {
        static void Main(string[] args)
        {
            var client = new TcpQbservableClient<long>(new IPEndPoint(IPAddress.Loopback, 3205));

            //thie expression tree filtering will happen server side
            //THAT IS AWESOME
            IQbservable<string> query =
              from value in client.Query()
              where value <= 5 || value >= 8
              select string.Format("The incoming value has been doubled to {0}", value * 2);

            using (query.Subscribe(
              value => Console.WriteLine("Client observed: " + value),
              ex => Console.WriteLine("Error: {0}", ex.Message),
              () => Console.WriteLine("Completed")))
            {
                Console.ReadKey();
            }
        }
    }
}

 

Take away points here are:

  • The client is also using a TCIP IPEndpoint
  • The client IS ABLE to use LINQ expressions which WILL be serialized and sent to the server where they will be applied

Here is the output when the simple server is run, and 2 clients are started one after another.

image

 

Why Is This Cool?

This is very cool (uber cool in fact), we have just created a push based notification system that supports server side filtering (thanks to LINQ) in about 20 lines of code.

If you can’t see what is cool about that, well I can’t help you.

There may be some amongst you that go well any messaging framework that has a server and a client would/could do that in the same amount of code, what and the server side push down delegates (thanks to serializable expression trees)….mmmmm Don’t think so.

Only thing I can think of that even comes close is OData, but that requires a fair bit of infrastructure/baggage to make it work.

 

 

Where Can I Get The Code?

As usual I have posted the code  to my GitHub account :

https://github.com/sachabarber/RxOverTheWire

Apache Kafka 0.9 Scala Producer/Consumer

For my job at the moment, I am roughly spending 50% of my time working on .NET and the other 50% of the time working with Scala. As such a lot of Scala/JVM toys have spiked my interest of late. My latest quest was to try and learn Apache Kafka, well enough that I at least understood the core concepts. I have even read a book or two on Apache Kafka, now, so feel I am at least talking partial sense in this article.

So what is Apache Kafka, exactly?

Here is what the Apache Kafka folks have to say about their own tool.

Apache Kafka is publish-subscribe messaging rethought as a distributed commit log.
Fast
A single Kafka broker can handle hundreds of megabytes of reads and writes per second from thousands of clients.

Scalable
Kafka is designed to allow a single cluster to serve as the central data backbone for a large organization. It can be elastically and transparently expanded without downtime. Data streams are partitioned and spread over a cluster of machines to allow data streams larger than the capability of any single machine and to allow clusters of co-ordinated consumers

Durable
Messages are persisted on disk and replicated within the cluster to prevent data loss. Each broker can handle terabytes of messages without performance impact.

Distributed by Design
Kafka has a modern cluster-centric design that offers strong durability and fault-tolerance guarantees.

Taken from http://kafka.apache.org/ up on date 11/03/16

Apache Kafka was designed and built by a team of engineers at LinkedIn, where I am sure you will agree they probably had to deal with quite a bit of data.

 

I decided to learn a bit more about all this and have written an article on this over at code project :

 

http://www.codeproject.com/Articles/1085758/Apache-Kafka-Scala-Producer-Consumer-With-Some-RxS

 

In this article I will talk you through some of the core Apache Kafka concepts, and will also show how to create a Scala Apache Kafka Producer and a Scala Apache Kafka Consumer. I will also sprinkle some RxScala pixie dust on top of the Apache Kafka Consumer code such that the RX operators to be applied to the incoming Apache Kafka messages.

NetMQ + RX Streaming Data Demo App

So last time I showed you a slightly cut down demo app (well cut down based on the great work done by our friends at Adaptive, where they created the awesome demo : Reactive Trader).  The demo app we looked at last time used SignalR + RX. I also promised that there would be a part 2 of that article, where I showed how to do the same sort of thing using NetMQ  (the pure c# port of ZeroMQ the extremely popular socket library) + RX.

I am pleased to say that Doron (author of NetMQ) and I have now published this article and it is available to read at the following Url:

http://www.codeproject.com/Articles/853476/NetMQplus-RX-Streaming-Data-Demo-App-of

If you looked at the 1st part in this mini series of articles you will know that we cover things like

  • Creating a streaming API of TickerDto objects
  • We make a resilient RX stream, that may be repeated
  • We expect the client(s) to know when the server is unavailable
  • We expect the client(s) to know when the server is available again

We show all of this, just like we did with the 1st demo app (SignalR + RX), but you will also learn a few things along the way such as:

  • How to create a generic NetMQ heartbeat
  • How to create a NetMQ Pub/Sub topoloogy
  • The NetMQ Actor model
  • The NetMQ Poller
  • How to use NetMQ, which will actually put you in very good stead to use ZeroMQ in any of the other language bindings (and it has loads)
  • A few of the different types of sockets within NetMQ
  • How to actually use NetMQ in a real life project

We are hoping you will actually get quite a lot from this article. So if you do enjoy it, please leave a vote over at the codeproject web site

SignalR + RX Streaming Data Demo App

I work in finance, and at the moment work in the Foreign Exchange (FX) space, where we have lots of requirements that involve streaming of various things, such as:

  • Rates
  • Trades
  • Other financial instruments

Historically what we would have done to accomodate these requirements was to use sockets and push our own data (usually JSON or XML) down the wire, and then had some quite knarly listeners above that, where each listener would want some slightly different view of the original streaming data.

Thing is, time has never stands still in the world of software development, and new things come out all the time, literally every other day a new framework comes along that helps improve what came before it in some way or another.

One particular thing, that I think a lot of people actually misunderstood was Reactive Extensions RX, which in fairness has been around a while now. I think a lot of people think of RX as LINQ to events, which it does have. However the truth is that RX is a superb framework for creating streaming applications and the more you get into RX the more you tend to see everything as a stream, from and event, down to picking a file or a ICommand.Execute in a ViewModel say.

RX comes with many many tools in it’s arsenal, such as

  • 2 core interfaces IObservable / IObserver
  • Lots of LINQ like extension methods for IObservable
  • Time / window based operations (LINQ doesn’t really have these)
  • Concurrency schedulers
  • Stream error handling controls (Catch etc etc)
  • Ability to create IObservable from a great many other things, such as
    • IEnumerable
    • Task
    • Events
    • IObservable Factories

I personally feel that a lot of people could make use of RX to great gain, if they were to put in the time to learn about it, a bit more.

There is also another great library from Microsoft which facilitates push based technology, namely SignalR. Quite a lot has been written about SignalR, but you rarely see that from a desktop point of view.

Most people would probably associate a SignalR Hub with a web site, but there are various ways you can use SignalR, which also make it a good choice for desktop (ie non web) development. This is possible thanks to the SignalR OWIN hosting API, such that you can self host a SignalR Hub. There is also a .NET client, we can use to communicate with the hub. SignalR also has quite a lot of events within the .NET client, and guess what we can do with those events, yep that’s right we can create IObservable(s) streams out of them, which opens up the gates to use RX.

This article will show one area where RX excels, which is in the area  of streaming APIs, where we will be using RX/SignalR

 

Read the full article here : http://www.codeproject.com/Articles/851437/SignalR-plus-RX-Streaming-Data-Demo-App