Author Archives: sachabarber

MADCAP IDEA PART 2 : ADDING DI/IOC TO THE CLIENT SIDE FRONT END WEB SITE

 

Last Time

Last time we built a bare bones react/webpack/babel/typescript/bootstrap web site. You can read that post here should you wish to: https://sachabarbs.wordpress.com/2017/05/15/madcap-idea-part-1-start-of-the-client-side-portion-of-the-web-site/

 

PreAmble

This post will be about adding DI/IOC to the bear bones no thrills client portion of the web site that we built last time. Just as a reminder this is part of my ongoing set of posts which I talk about here :

https://sachabarbs.wordpress.com/2017/05/01/madcap-idea/, where we will be building up to a point where we have a full app using lots of different stuff, such as these

  • WebPack
  • React.js
  • React Router
  • TypeScript
  • Babel.js
  • Akka
  • Scala
  • Play (Scala Http Stack)
  • MySql
  • SBT
  • Kafka
  • Kafka Streams

 

What Is DI/IOC?

In software engineering, dependency injection is a technique whereby one object supplies the dependencies of another object. A dependency is an object that can be used (a service). An injection is the passing of a dependency to a dependent object (a client) that would use it. The service is made part of the client’s state. Passing the service to the client, rather than allowing a client to build or find the service, is the fundamental requirement of the pattern.

This fundamental requirement means that using values (services) produced within the class from new or static methods is prohibited. The class should accept values passed in from outside.

The intent behind dependency injection is to decouple objects to the extent that no client code has to be changed simply because an object it depends on needs to be changed to a different one.

Dependency injection is one form of the broader technique of inversion of control. Rather than low level code calling up to high level code, high level code can receive lower level code that it can call down to. This inverts the typical control pattern seen in procedural programming.

As with other forms of inversion of control, dependency injection supports the dependency inversion principle. The client delegates the responsibility of providing its dependencies to external code (the injector). The client is not allowed to call the injector code. It is the injecting code that constructs the services and calls the client to inject them. This means the client code does not need to know about the injecting code. The client does not need to know how to construct the services. The client does not need to know which actual services it is using. The client only needs to know about the intrinsic interfaces of the services because these define how the client may use the services. This separates the responsibilities of use and construction.

There are three common means for a client to accept a dependency injection: setter-, interface- and constructor-based injection. Setter and constructor injection differ mainly by when they can be used. Interface injection differs in that the dependency is given a chance to control its own injection. All require that separate construction code (the injector) take responsibility for introducing a client and its dependencies to each other

 

From https://en.wikipedia.org/wiki/Dependency_injection

 

 

What Choices Do We To Do This When Working With React?

There are several choices we have when working with React, such as

  • Using react context
  • Using the module system
  • Using a 3rd party DI/IOC system (I will be covering this, in this post)

 

There is VERY good post on these different techniques here : https://github.com/krasimir/react-in-patterns/tree/master/patterns/dependency-injection it is a great read and I suggest you take a look to gain a better understanding of some of the more obscure areas of react (context I am looking at you)

 

 

Which 3rd Party DI/IOC Library Did I Chose And Why?

I have decided to use the https://github.com/inversify/InversifyJS (Inversify JS) DI/IOC library. Why did I chose this one, well there were quite a few reasons:

 

  • It is written in TypeScript, and I wanted to use TypeScript where possible
  • Is looks like a good full featured container, that reminds me of many other containers that I have worked with in .NET/Scala
  • It is quite mature and is in version 2.0
  • It has quite a bit of good press around it
  • The documentation is good
  • It did what I wanted

 

So those were my reasons, so what do we have to do to get Inversify JS to work?

 

Installation

 

Node Module Installation

The first steps is to make sure we have the correct node packages, to do this we just need to issue the following  NPM (node package manager) command line :

npm install inversify reflect-metadata --save

 

Once this is done you should have the following entries in your package.json file

 

image

 

 

Changes To The TypeScript “tsconfig.json” File

The other thing that Inversify JS needs is a couple of specific TypeScript settings. These need to go in the tsconfig.json file, the new lines since last time are these

 

image

 

 

 

 

That is all the setup we need, so let’s now have a look at using the Inversify JS DI/IOC classes, decorators etc etc

 

 

Creating Some Injectable Thing

Lets start with creating something that can be injected with other values, and can be resolved from the container.

 

In Inversify JS  a type has to be marked as @Injectable, which is not something you have to do in other DI/IOC offerings. This is more than likely a requirement because JavaScript is a dynamic language and these decorators are used to create extra metadata to ease in the resolution mechanisms used by ALL IOC containers.

 

To actual get something injected into a class we need to use the @Inject decorater.

 

Here is an example of a class that can be resolved from the Inversify JS  IOC container, and will also have it constructor dependencies resolves by the Inversify JS  IOC container.

 

import { injectable, inject } from "inversify";
import { TYPES } from "../types";

@injectable()
export class Foo {

    private _num: number;

    constructor(@inject(TYPES.SomeNumber) num: number) {
        this._num = num;
    }

    getNum() {
        return this._num * 2;
    }

}

 

You can see that we also use some TYPES. What are these, lets take a look at that.

export const TYPES = {
    Foo: Symbol("Foo"),
    SomeNumber: Symbol("SomeNumber")
};

 

It can be seen this TYPES constant just offers us a way of using Symbol as runtime identifiers for our dependencies.

 

Ok so now that we have a class that expects to have its dependencies satisfied from the Inversify JS IOC Container, and is itself resolvable, lets see how we can configure the container.

 

Creating The Container

I have chosen to create a singleton object for my container, which looks like this

import "reflect-metadata";
import { Container } from "inversify";
import { TYPES } from "../types";
import { Foo } from "../domain/Foo";

export class ContainerOperations {
    private static instance: ContainerOperations;
    private _container:Container = new Container();

    private constructor() {
        
    }

    static getInstance() {
        if (!ContainerOperations.instance) {
            ContainerOperations.instance = new ContainerOperations();
            ContainerOperations.instance.createInversifyContainer();
        }
        return ContainerOperations.instance;
    }

    private createInversifyContainer() {
        this.container.bind<number>(TYPES.SomeNumber).toConstantValue(22);
        this.container.bind<Foo>(TYPES.Foo).to(Foo);
    }

    public get container(): Container {
        return this._container;
    }
}

 

There are a couple of things to note in the above code:

  • We import “reflect-metadata”, “Container” and “TYPES”
  • We have our singleton that simple wraps the Inversify JS IOC container.
  • We configure the the container registrations (bindings in Inversify JS speak)

 

And that is all there is to that part. So all that is left, is to actually resolve something from the container. We will look at that next

 

Resolving Something From The Container

 

As I am using react, I will likely be using the Inversify JS IOC container to assist me with creating the props for the react components. That is not strictly relevant to this discussion, so lets just see how we can resolve an instance of the Foo IOC registered class using Inversify JS

 

We do this as follows:

import { Foo } from "./domain/Foo";
import { TYPES } from "./types";
import { ContainerOperations } from "./ioc/ContainerOperations"; 

let foo = ContainerOperations.getInstance().container.get<Foo>(TYPES.Foo);

 

It can be seen that we simple make use of the singleton (that wraps the container) to resolve our Foo class. Happy days

 

Conclusion

I have to say I did struggle a bit with getting Inversify JS up and running in my project. But I also have to say that I asked a question on the Inversify forum and the author Remo Jansen was absolutely brilliant in helping me to get my stuff to run. To the point where I pointed him at my GitHub repo, and he looked at, got it to work, and sent me a pull request.

 

Remo I tip my hat to you sir, top library, top fella. And as promised I owe you that beer

 

So once it was installed, I found it very easy to work with, it soon felt like many other IOC frameworks I have used (NInject, Funq,Munq, Castle, AutoFac, Unity, MEF take your pick). I was very happy with the results.

 

As I previously stated I will be continuing to write posts which will be tracked on Trello : https://trello.com/b/F4ykCOOM/kafka-play-akka-react-webpack-tasks

MadCap Idea part 1 : Start of the Client side portion of the web site

 

PreAmble

This post will be about building the bear bones no thrills client portion of the web site that is part of my ongoing (well this is the first, so ongoing after this) set of posts which I talk about here :

 

https://sachabarbs.wordpress.com/2017/05/01/madcap-idea/, where we will be building up to a point where we have a full app using lots of different stuff, such as these

  • WebPack
  • React.js
  • React Router
  • TypeScript
  • Babel.js
  • Akka
  • Scala
  • Play (Scala Http Stack)
  • MySql
  • SBT
  • Kafka
  • Kafka Streams

 

 

Introduction

So let me just apologize for how long this one has taken to put together, I never envisaged that this post would take me quite as long as it has. That said it has only taken 5-6 days where I have spent a maximum of 2 hours on it, and when I started this post I had a VERY rough idea of how webpack worked and what it did, but I had NEVER tried to create a webpack project from scratch, so not so bad in the end, I am fairly happy with the results.

 

Where is the code?

If you prefer to just have a look at the end result you can see that here : https://github.com/sachabarber/MadCapIdea

 

What did I want to get to work VS what is working?

Before I started this post/code I had a set list of requirements in mind, which I will show in the table below. I will also show whether I managed to get that feature to work or not

Feature Does It Work
I wanted to use Web pack to manage the build Yes
I wanted to use Typescript via Babel to regular JavaScript Yes
I wanted 3rd party libraries to be used with their typing information No
I wanted to be able to bundle my eventually transpiled JavaScript code into a single bundle Yes
I wanted to be able to use SCSS/SASS for my CSS needs and have them transpiled to CSS Yes
I wanted to be able to bundle my eventually transpiled CSS code into a single bundle Yes
I wanted to be able to import/export stuff (JS /CSS / Files etc etc) using ES6 modules Yes
I wanted to be able to trace back from minified JavaScript bundle back to my original TypeScript via SourceMaps Yes
I wanted to be able to use a fully SourceMap enabled DEVELOP version of my webpack setup Yes
I wanted to be able to run a streamlined (minification, no comments, no console.log, no SourceMap) PRODUCTION version of my webpack setup Yes
I wanted to be able to use React Yes
I wanted to be able to use Bootstrap-React Yes
I wanted to have the option to use JQuery/Lodash as I would in a simple standard JavaScript project, ie as “$” and “_” respectively Yes

 

As you can see I did actually manage to get ALL of this to work with the one exception of the typings for 3rd party libraries. The code still works at runtime, but there is just something hinky going on outside of runtime.

 

<rant>

I would just like to spend a moment ranting about just how much disinformation is out there on the whole module/typescript/webpack space. I must have read about 100 posts, all with different setups, all with different tsconfig.json files, all suggesting different webpack setups. On one hand my god weback/TypeScript are cool, but you have to be VERY careful what you apply. If you get your tsconfig.json into an invalid state, you just may find that editing TypeScript no longer works inside Visual Studio.

 

I have lost track of just how many different approaches I took to try and resolve the unknown module issue in TSX (TypeScript JSX react files). Even the official walk through on the TypeScript.org web site doesn’t work for me. Things I tried and failed at were

  • Including typings in my tsconfig.json
  • Include reference headers in my TypeScript files
  • Including a reference.ts file for 3rd party typings
  • Messing about with moving types\xxxxFramework into full blown NPM Dependencies rather than NPM DevDependencies
  • Various module settings inside of tsconfig.json

All failed, so if anyone out there that is a TypeScript / React / WebPack guru, please let me know what I am doing wrong.  The funny thing is that EVERYTHING is 100% fine at runtime.

</rant>

 

Ah that feels better, anyway now that, that is out of my system, lets continue shall we….

 

 

Webpack fundamentals

So what exactly is webpack. I think this image from the webpack web site https://webpack.js.org/ does a fairly good job of described at a glance what webpack is all about

 

image

 

So clear? No? Ok lets try some words as well

 

  • Webpack at its heart is a bundler which is able to offer module support, and is able to bundle a lot of different things into bundles
  • Webpack is able to bundle lots of thing via a technique called “loaders”, loaders can be piped one to the next (just like a command line)
  • Webpack also offers module support for
    • AMD
    • CommonJS
    • ES6 modules
  • Webpack comes with SourceMap support (one of my favorite things ever, maybe even better than ice cream, but no where near as good as BBQ food and beers)
  • Webpack comes with inbuilt minification support (thanks to Uglify.js : https://www.npmjs.com/package/uglifyjs)
  • Webpack works seamlessly with NPM (Node package manager)
  • Webpack is able to watch your files and produce new packages based on the diff of what you edited compared to what was previously built
  • Webpack supports the idea of base/different configs such that you may have different environment configs DEV|PROD (typically you want loads of debugging aids in dev)

 

So in a nutshell that is what webpack is all about. We will dive into some of the sub areas in a bit more details below before we examine the actual use cases that I set out to solve

 

This may all seem a bit overwhelming, but with webpack it mainly boils down to a config file (typically called webpack.config.js). Here is a minimal example

const { resolve } = require('path');

const webpack = require('webpack');

// plugins
const HtmlWebpackPlugin = require('html-webpack-plugin');

module.exports = (env) => {

  return {
    context: resolve('src'),
    entry: {
      app: './main.ts'
    },
    output: {
      filename: '[name].[hash].js',
      path: resolve('dist'),
      // Include comments with information about the modules.
      pathinfo: true,
    },

    resolve: {
        extensions: [
            '',
            '.js',
            '.ts',
            '.tsx'
        ]
    },

    devtool: 'cheap-module-source-map',

    module: {
      loaders: [
        { test: /\.tsx?$/, loaders: [ 'awesome-typescript-loader' ], exclude: /node_modules/ }
      ],
    },

    plugins: [

      new HtmlWebpackPlugin({
        template: resolve('src','index.html')
      })

    ]

  }
};

 

We will be diving into this, and a lot more within this post.

 

Node

As stated Node/NPM is a fairly vital part of working with webpack, so you will need to ensure you have done the following as a minimum

  • Installed node
  • Installed NPM
  • Installed webpack globally : npm install webpack –g

 

Most of the stuff I talk about in this post requires installing via NPM. But You have a copy of all the requirements inside the package.json file. Which at the time of writing this post looked like this

 

{
  "name": "task1webpackconfig",
  "version": "1.0.0",
  "description": "webpack 2 + TypeScript 2 + Babel example",
  "repository": {
    "type": "git",
    "url": "git+https://github.com/sachabarber/MadCapIdea.git"
  },
  "keywords": [
    "babel",
    "typescript",
    "webpack",
    "bundling",
    "javascript",
    "npm"
  ],
  "author": "sacha barber",
  "homepage": "https://github.com/sachabarber/MadCapIdea#readme",
  "dependencies": {
    "bootstrap": "^3.3.7",
    "jquery": "^3.2.1",
    "lodash": "^4.17.4",
    "react": "^15.5.4",
    "react-bootstrap": "^0.31.0",
    "react-dom": "^15.5.4",
    "webpack": "^2.5.0",
    "webpack-merge": "^4.1.0"
  },
  "devDependencies": {
    "@types/jquery": "^2.0.43",
    "@types/lodash": "^4.14.63",
    "@types/react": "^15.0.24",
    "@types/react-dom": "^15.5.0",
    "awesome-typescript-loader": "^3.1.3",
    "babel-core": "^6.24.1",
    "babel-loader": "^7.0.0",
    "babel-preset-es2015": "^6.24.1",
    "babel-preset-es2015-native-modules": "^6.9.4",
    "babel-preset-react": "^6.24.1",
    "css-loader": "^0.28.1",
    "extract-text-webpack-plugin": "^2.1.0",
    "html-webpack-plugin": "^2.28.0",
    "node-sass": "^4.5.2",
    "on-build-webpack": "^0.1.0",
    "sass-loader": "^6.0.3",
    "source-map-loader": "^0.2.1",
    "typescript": "^2.3.2",
    "webpack": "^2.4.1"
  },
  "scripts": {
    "build-dev": "webpack -d --config webpack.develop.js",
    "build-prod": "webpack --config webpack.production.js"
  }
}

 

 

 

Loaders

Loaders are probably the MOST important webpack concept to learn. There is practically a loader for EVERYTHING. But what exactly is a loader?

Well quite simply, a loader is a way to take some source file contents, and bundle it up in the final artifact. However things can get more sophisticated as some loaders are also able to transpile (act of converting code written in one language into another language say TypeScript –> JavaScript, or ES6 JavaScript –> ES5 JavaScript).

 

Loaders may also be piped together where the loaders declared run from right most to left most (or bottom to top, if you have them over multiple lines). This is EXTREMELY powerful, as it enables this sort of workflow in the demo code

  • Write code in TypeScript (using good stuff like classes (ok ES6 has those but you get me), interfaces, async-await etc etc)
  • Have that run through Babel.Js (bring future JS functions to you by converting your future JS into JS that runs in browsers now)
  • Finally into plain old JS that is compatible with today’s browsers (they will all catch up one day, actually they won’t so yeah babel.js is here to help)

Loaders are not just for JS, they can be used for CSS/Images/Fonts all sorts of things

 

We will see examples on this stuff when we get into the guts of things

Code dissection

In this section we will dissect the code contained at the github repo, and talk through all my initial requirements and see how they ended up being implemented

 

Bundles

One of the main reason to want to use webpack is for its bundling abilities, where I wanted to be able to bundle the following things

  • Typescript which is transpiled to JavaScript (thanks to a TypeScript loader)
  • SCSS/SASS/Css (thanks to a Sass loader)
  • Images(thanks to a Url loader)

So that is what we are trying to bundle, but there are a few things that need to be done to make that happen, so lets start with the loaders (I will be covering images and fonts later, so for now lets just talk about JavaScript and CSS bundling)

 

JavaScript Bundling

As I say I wanted the option to use TypeScript or regular JavaScript, and I also wanted to be able to use SASS or regular CSS so we start with these loaders which will traverse the source code and find all the relevant files (see the little regex that’s used to find the files) and will then bundle these files

 

let _ = require('lodash');
let webpack = require('webpack');
let path = require('path');
let fs = require("fs");
let WebpackOnBuildPlugin = require('on-build-webpack');
let ExtractTextPlugin = require('extract-text-webpack-plugin');
let HtmlWebpackPlugin = require('html-webpack-plugin');

let babelOptions = {
    "presets": ["es2015", "react"]
};

function isVendor(module) {
    return module.context && module.context.indexOf('node_modules') !== -1;
}

let entries = {
    index: './src/index.tsx'

};

let buildDir = path.resolve(__dirname, 'dist');

module.exports = {

    context: __dirname,

    entry: entries,

    output: {
        filename: '[name].bundle.[hash].js',
        path: buildDir
    },

    
    
    resolve: {
        extensions: [".tsx", ".ts", ".js", ".jsx"],
        modules: [path.resolve(__dirname, "src"), "node_modules"]
    },

    plugins: [

       

        // creates a common vendor js file for libraries in node_modules
        new webpack.optimize.CommonsChunkPlugin({
            names: ['vendor'],
            minChunks: function (module, count) {
                return isVendor(module);
            }
        }),

        // creates a common vendor js file for libraries in node_modules
        new webpack.optimize.CommonsChunkPlugin({
            name: "commons",
            chunks: _.keys(entries),
            minChunks: function (module, count) {
                return !isVendor(module) && count > 1;
            }
        }),


        //scss/sass files extracted to common css bundle
        new ExtractTextPlugin({
            filename: '[name].bundle.css',
            allChunks: true,
        }),

        new HtmlWebpackPlugin({
            filename: 'index.html',
            template: 'template.html',
        })
    ],

    module: {
        rules: [
            // All files with a '.ts' or '.tsx' extension will be handled by 'awesome-typescript-loader' 1st 
            // then 'babel-loader'
            // NOTE : loaders run right to left (think of them as a cmd line pipe)
            {
                test: /\.ts(x?)$/,
                exclude: /node_modules/,
                use: [
                  {
                      loader: 'babel-loader',
                      options: babelOptions
                  },
                  {
                      loader: 'awesome-typescript-loader'
                  }
                ]
            },


            // All files with a .css extenson will be handled by 'css-loader'
            {
                test: /\.css$/,
                loader: ExtractTextPlugin.extract(['css-loader?importLoaders=1']),
            },

            // All files with a .scss|.sass extenson will be handled by 'sass-loader'
            {
                test: /\.(sass|scss)$/,
                loader: ExtractTextPlugin.extract(['css-loader', 'sass-loader'])
            },


            // All files with a '.js' extension will be handled by 'babel-loader'.
            {
                test: /\.js$/,
                exclude: /node_modules/,
                use: [
                  {
                      loader: 'babel-loader',
                      options: babelOptions
                  }
                ]
            },


            // All output '.js' files will have any sourcemaps re-processed by 'source-map-loader'.
            {
                enforce: "pre",
                test: /\.js$/,
                loader: "source-map-loader"
            }
        ]
    }
};

 

 

The bulk of the code above is made up of loaders. But there are a few things above that deserve special call outs, namely

 

Resolve

This tells us what type of files webpack should try and resolve

 

resolve: {
	extensions: [".tsx", ".ts", ".js", ".jsx"],
	modules: [path.resolve(__dirname, "src"), "node_modules"]
},

 

Entry

These are the main entry points into the code. So for me this is the index.tsx file.

 

let entries = {
    index: './src/index.tsx'

};


entry: entries,

 

 

Output

This is where you tell webpack what the name of your final bundles will be, which will contain all the code files that matches the regex test that was setup in the loaders. It is VERY important to note that ALL the files that matches the loader regex will become part of the bundle file.

 

output: {
	filename: '[name].bundle.[hash].js',
	path: buildDir
},

 

 

TypeScript

I wanted the option to be able to use TypeScript IF I WANTED to. So to do this we need a webpack loader, there are a couple  of TypeScript loaders for webpack. But I went with awesome-typescript-loader. I also want to run my TypeScript files through Babel. We will get onto what Babel brings to the party in just a second, but for now just understand that TypeScript and Babel act as transpilers where they take JavaScript using features that is not available in regular JavaScript and transpile that code into regular JavaScript that today’s browsers understand. Obviously since the final product of both TypeScript and Babel is regular JavaScript we also need a loader for that too.

 

Here is my TypeScript/Babel/JavaScript setup.

// All files with a '.ts' or '.tsx' extension will be handled by 'awesome-typescript-loader' 1st 
// then 'babel-loader'
// NOTE : loaders run right to left (think of them as a cmd line pipe)
{
	test: /\.ts(x?)$/,
	exclude: /node_modules/,
	use: [
	  {
		  loader: 'babel-loader',
		  options: babelOptions
	  },
	  {
		  loader: 'awesome-typescript-loader'
	  }
	]
},

// All files with a '.js' extension will be handled by 'babel-loader'.
{
	test: /\.js$/,
	exclude: /node_modules/,
	use: [
	  {
		  loader: 'babel-loader',
		  options: babelOptions
	  }
	]
}

 

The other thing you need when working with TypeScript is a tsconfig.json file. Here is mine, you can see that I have configured mine to be react friendly.

 

{
  "compilerOptions": {
    "allowSyntheticDefaultImports": true,
    "moduleResolution": "node",
    "outDir": "./dist/",
    "sourceMap": true,
    "noImplicitAny": false,
    "module": "es2015",
    "target": "es5",
    "jsx": "react",
    "types" : ["jquery", "lodash", "react", "react-dom"]
  },
    "include": [
        "./src/**/*"
    ]
}

 

NOTE: You need to be a bit careful with this file, if you mess it up, you may find yourself in a quite sad position where you can no longer edit TypeScript files in Visual Studio.

 

Babel

I just showed you the babel loader, so I won’t repeat that. But just what is this Babel you speak of. Well here the blurb from the Babel.js website

 

Babel has support for the latest version of JavaScript through syntax transformers. These plugins allow you to use new syntax, right now without waiting for browser support.

 

This is the sort of stuff that Babel allows you to write right now.

 

image

 

The only other thing you need for Babel is to give a little config file called .babelrc which for me just contains this

 

{ "presets": ["es2015","react"] }

 

And that is pretty much all there is to it, you can now use these features in your JavaScript. Neato

 

SCSS

I don’t mind CSS, but these days there are better tools out there, namely LESS/SASS. What these tools offer you are things like this

  • Modular CSS (multiple files in a heirachy)
  • Nested CSS rules
  • Variables
  • etc etc

 

So it seems strange NOT to want to work with this. As with most things in webpack, it starts with a loader, where we have support for SASS and also plain CSS. Remember loaders run from right to left, so in the case of the SASS/SCSS file match, the files will 1st run through the sass-loader the the css-loader. However for plain old CSS they just go through the css-loader

 

// All files with a .css extenson will be handled by 'css-loader'
{
	test: /\.css$/,
	loader: ExtractTextPlugin.extract(['css-loader?importLoaders=1']),
},

// All files with a .scss|.sass extenson will be handled by 'sass-loader'
{
	test: /\.(sass|scss)$/,
	loader: ExtractTextPlugin.extract(['css-loader', 'sass-loader'])
},

 

The other part of the puzzle to get CSS to work is this ExtractTextPlugin that you can see mentioned in the loader sections just above. What the ExtractTextPlugin  does is to extract all the text from the individual CSS files (yep that’s right SASS/SCSS is transpiled to regular CSS) into a single CSS file.

 

//scss/sass files extracted to common css bundle
new ExtractTextPlugin({
    filename: '[name].bundle.[hash].css',
    allChunks: true,
}),

 

Don’t be too scared by the [name] and [hash] stuff just yet we will get onto to that later.

 

 

Bootstrap

So for those of you living under a rock there is a great library (started by Twitter engineers) to help create responsive uniform looking sites. This library is called twitter Bootstrap. It comes with various components and CSS, and use typeography for its icons.

 

Now Bootstrap is great, but I wanted to use React, and React has the concept of a virtual DOM, and generally speaking tries to work with it own Virtual DOM rather than the real DOM. This has led to a specialized version of Bootstrap specifically for use with React. Naturally I needed to get that to work. It is called React-Bootstrap.

 

So once we have it installed via NPM we just need to worry about a few small thing

 

Images

These are loaded by (surprise surprise) another bootstrap loader section

{ 
	test: /\.png$/, 
	loader: "url-loader?limit=100000" 
},

{ 
	test: /\.jpg$/, 
	loader: "file-loader" 
},

{
	test: /\.svg(\?.*)?$/,
	loader: 'url-loader?prefix=fonts/&name=fonts/[name].[ext]&limit=10000&mimetype=image/svg+xml'
},

 

Fonts

Fonts are also loaded by more webpack loaders

{
	test: /\.woff(\?.*)?$/,
	loader: 'url-loader?prefix=fonts/&name=fonts/[name].[ext]&limit=10000&mimetype=application/font-woff'
},

{
	test: /\.woff2(\?.*)?$/,
	loader: 'url-loader?prefix=fonts/&name=fonts/[name].[ext]&limit=10000&mimetype=application/font-woff2'
},

{
	test: /\.ttf(\?.*)?$/,
	loader: 'url-loader?prefix=fonts/&name=fonts/[name].[ext]&limit=10000&mimetype=application/octet-stream'
},

{
	test: /\.eot(\?.*)?$/, loader: 'file-loader?prefix=fonts/&name=fonts/[name].[ext]'
},

 

 

Css

So once you have all the other stuff done you can proceed to just use react-bootstrap. Here is a small example from one of my TypeScript files

 

import * as React from "react";
import * as ReactDOM from "react-dom";
import { Button } from 'react-bootstrap';

import 'bootstrap/dist/css/bootstrap.css';

export class Hello extends React.Component<HelloProps, undefined> {
    render() {
        return 
<div>
                <Button bsStyle="primary" bsSize="large">Large button</Button>
                
<h1 id="helloText">Hello from {this.props.compiler} and {this.props.framework}!</h1>

               </div>

;
    }
}

 

Which when rendered looks like this:

image

 

 

 

Lodash

Lodash is a kind of new underscore-esque library, which offers many convenience methods on collections. It is like the LINQ to obejcts of the JavaScript world. To work with Lodash you can simply import it as follows

 

import * as _ from "lodash";

 

Which we could verify quite simply with something like this, where the image below is me finding the original line in my TypeScript file within the SourceMap that was sent to the browser and putting a break point on the line I wanted to debug

 

console.log(_.VERSION);

image

 

 

JQuery

Ah the blessed Jquery, love it or hate it, there is certainly a lot of it on the web. And at times it is still very convenient, so we should really allow for it too. Thing with JQuery is that it wants to be available as global variable $ or via a property on window. Is this even possible with webpack? Well yes it is, we simply add the following bit of config within the webpack Plugins section

 

//The ProvidePlugin makes a module available as a variable in every other
//module required by webpack
new webpack.ProvidePlugin({
    $: "jquery",
    jQuery: "jquery",
    "window.jQuery": "jquery"
}),

 

That then allows us to to use Jquery like this without having to ever import it anywhere, its just automatically globally available

 

console.log("jquery");
console.log($);
console.log($.fn.jquery);

 

Again I am using the emitted SourceMap to find my original TypeScript code

 

image

 

 

Source Map Support

I also wanted to be able to debug my ORIGINAL TypeScript/JavaScript, so using SourceMaps WAS A MUST. By using source maps in webpack I am able to send the transpiled/bundled (but not minified I only do that in production mode), and also view the original code, and set break points in the original code.

 

This is the JavaScript bundle that webpack sent

 

image

 

And here is me inside the SourceMap file, see how I am in the original content here (ie the code I wrote)

 

image

 

 

This is enabled via the webpack setting

devtool: "source-map"

 

ES6 style code and modules

Another feature of using webpack is that you may using AMD/CommonJS modules (or if you included TypeScript/Babel ES6 modules). I am using TypeScript and Babel so I went with ES6 style modules, which means I can export/import things like this:

import * as React from "react";
import * as ReactDOM from "react-dom";
import * as _ from "lodash";
import { Button } from 'react-bootstrap';

import 'bootstrap/dist/css/bootstrap.css';

export interface HelloProps { compiler: string; framework: string; }

export class Foo {

    private _num: number;

    constructor(num: number) {
        this._num = num;
    }

    getNum() {
        return this._num * 2;
    }

}

 

 

Html Plugin

Ok hope you all recall but a while ago I promised to explain what was meant by [name] and [hash] in my webpack config.

  • [name] : simply gets replaced by the current bundle name
  • [hash] : produces a hash of the bundle

I think name is self explanatory, but [hash] is an interesting one.  The idea of producing a hash for your bundles is great. That means if the file contents change the hash produced is different, so the browser cache would be invalidated.

 

That’s cool. But hang on how do we normally include script/css references in our Html page, either in Script/head tags right? And if the hash is changing all the time, how can we possible link to files where we don’t know what the hash will be.

 

Luckily we just use the HtmlWebpackPlugin, which does a great job of taking a template for the original HTML we want to end up with, and putting the final bundle generated references into a copy of the template and copying that final HTML file to the desired output directory.

 

So for me I have this webpack config

new HtmlWebpackPlugin({
    filename: 'index.html',
    template: 'template.html',
})

 

Where my template.html file looks like this

<!DOCTYPE html>
<html>
    <head>
        <meta charset="UTF-8" />
        <title>Hello React!</title>
    </head>
    <body>
        
<div id="example"></div>

        <!-- Main -->
    </body>
</html>

 

And once webpack / HtmlWebpackPlugin have run their magic, the resultant HTML (ie final HTML file) looks like this:

<!DOCTYPE html>
<html>
    <head>
        <meta charset="UTF-8" />
        <title>Hello React!</title>
		<link href="vendor.bundle.b8e27b8c09179b83b9b1.css" rel="stylesheet">
		<link href="indexCss.bundle.b8e27b8c09179b83b9b1.css" rel="stylesheet"></head>
    <body>
        
<div id="example"></div>

        <!-- Main -->
		<img src="data:image/gif;base64,R0lGODlhAQABAIAAAAAAAP///yH5BAEAAAAALAAAAAABAAEAAAIBRAA7" data-wp-preserve="%3Cscript%20type%3D%22text%2Fjavascript%22%20src%3D%22vendor.bundle.b8e27b8c09179b83b9b1.js%22%3E%3C%2Fscript%3E" data-mce-resize="false" data-mce-placeholder="1" class="mce-object" width="20" height="20" alt="&lt;script&gt;" title="&lt;script&gt;" />
		<script type="text/javascript" src="index.bundle.b8e27b8c09179b83b9b1.js">
	</body>
</html>

 

See how it just inserts the CSS/JS bundles for me, and my hashing for the bundles now seemlessly happens and I don’t have to worry about it ever again

 

Separate Configs

The final thing I wanted to cover was how to have different DEV/PROD webpack configs. Up until now I have just been showing you a base config file. But we can use webpack-merge to allow us to create bespoke webpack config files for specific environments.

For example here is my Develop webpack file (which is the same as the base config file)

 

let commonConfig = require('./webpack.config.js');
let webpack = require('webpack');
let Merge = require('webpack-merge');

module.exports = function (env) {
    return Merge(commonConfig, {})
}

 

Whilst this is my Production webpack config file where I want

  • No SourceMap files
  • No console.log
  • No comments
  • Minification

 

let commonConfig = require('./webpack.config.js');
let webpack = require('webpack');
let Merge = require('webpack-merge');

module.exports = function (env) {
    return Merge(commonConfig, {
        plugins: [
          new webpack.LoaderOptionsPlugin({
              minimize: true,
              debug: false
          }),
          new webpack.optimize.UglifyJsPlugin({
              // Eliminate comments
              comments: false,
              beautify: false,
              mangle: {
                  screw_ie8: true,
                  keep_fnames: true
              },
              compress: {
                  screw_ie8: true,

                  // remove warnings
                  warnings: false,

                  // Drop console statements
                  drop_console: true
              },
              comments: false,
              sourceMap: false
          })
        ]
    })
}

 

 

 

Conclusion

So that is all I wanted to say this time, as I stated in the 1st post I will be continuing to write posts which will be tracked on Trello : https://trello.com/b/F4ykCOOM/kafka-play-akka-react-webpack-tasks

MADCAP IDEA

INTRODUCTION

So this is somewhat of strange post, or should I say what will hopefully become a decent set of posts, thing is, I have no idea how this will end up really,
as I have not embarked on a mission like this before. So please bear with me.

SO JUST WHAT IS IT THAT I AM TALKING ABOUT?

Well the way I typically like to run my blog/code project articles / life, is that I pick a technology and
just concentrate on it for a while and write about it. This time however I have decided to treat my blogging/articles as a bit more
of a work like escapade, where I will be assigning mini tasks (think JIRA tickets) to myself, some of which I know nothing about, that should/could in reality be treated
as “spikes” and end up in complete dead ends. It is about the journey after all.

I WILL have a complete list of “tickets” (AKA tasks), which may or may not be completely fleshed out in advance. I will stick to “DOING” those “tickets”
and there is an end goal in sight, and I will outline that in a top level story. I cannot however commit to any timelines, this is as much my journey as it is yours (in fact I mainly
do this stuff for myself, and would highly reccomend it as a way of self improvement). That said I hope people get something out of the series of posts that WILL UNDOUBTEDLY
come from this idea.

You can think of the tasks as “technical tasks” which make up the high level “stories” (in JIRA speak).

This may come across a bit weird, but the technogies I plan to cover in the final product is pretty much a full app, so it’s a little hard to describe in
one blog post/article. So I am hoping that by breaking it down into small chunks, each story/sub task will be a useful learning experience in
it’s own right.

SOURCE CONTROL : ORGANISATION

The idea is that each story/sub task will be a folder/subfolder which is completely independent of other stories/sub tasks (up until the final goal, which is of course
a working showcase that demostrates it all working together).

NOTE TO SELF : I am going to try really hard to do this (aren’t we sacha), as I think one topic -> one source control repo (more than likely GIT), is a good way to
correlate ideas/words on the post/article

 

WHAT DO I WANT TO WRITE


In essence I want to write a very (pardon the pun) but uber simple “uber” type app. Where there are the following funtional requirements

  • There should be a web interface that a client can use. Clients may be a “driver” or a “pickup client” requireing a delivery
  • There should be a web interface that a “pickup client” can use, that shows a “pickup client” location on a map, which the “pickup client” choses.
    The “pickup client” may request a pickup job, in which case “drivers” that are in the area bid for a job.
    The “pickup client” location should be visible to a “driver” on a map
  • A “driver” may bid for a “pickup client” job, and the bidding “driver(s)” location should be visible to the “pickup client”.
  • The acceptance of the bidding “driver” is down to the “pickup client”
  • Once a “pickup client” accepts a “driver” ONLY the assigned “driver(s)” current map position will be shown to the “pickup client”
  • When a “pickup client” is happy that they have been picked up by a “driver”, the “pickup client” may rate the driver from 1-10, and the “driver” may also rate the “pickup client” from 1-10.
  • The rating should only be available once a “pickup client” has marked a job as “completed”
  • A “driver” or a “pickup client” should ALWAYS be able to view their previous ratings. 

Whilst this may sound child’s play to a lot of you (me included if I stuck to using simply CRUD operations), I just want to point out that this app is meant as a learning experience so I will not be using a simple SignalR Hub, and a couple of database tables.

I intend to write this project using a completely different set of technologies from the norm. Some of the technology choices could easily scale to hundreds of thousands of requests per second (Kafka has your back here)

POTENTIAL TECNHOLOGIES INVOLVED

  • WebPack
  • React.js
  • React Router
  • TypeScript
  • Babel.js
  • Akka
  • Scala
  • Play (Scala Http Stack)
  • MySql
  • SBT
  • Kafka
  • Kafka Streams

Some of this will undoubtedly be covered in other blogs (such as React/Webpack), however some of it I am hoping will be quite novel/insightful material.

Who knows though there may be some of you out there that haven’t heard of Webpack, so some of that may even be new, we shall se, hopefully enough stuff for everyone.

STORIES

I will maintain a list of stories and their sub tasks using Trello here : https://trello.com/b/F4ykCOOM/kafka-play-akka-react-webpack-tasks which at the time of writing this post was the items shown below

 

TOP LEVEL STORIES

Web Site

Play Back End

  • Create a back end play app
  • Create test Kafka consumer that is able to read from JSON payload from a Kafka topic
  • Create test publisher that publishes JSON payload to a Kafka topic
  • Create Akka Publisher flow to test EventSource JS call
  • Create login API
  • Create check ranking API, which will use Kafka Active queries over KTable (or Global KTable) in the materialized streams
  • Create publish job API, which will publish out on Kafka publisher where it will send a JSON payload
  • Create receive job update API, will read JSON from Kafka Consumer where it will read in JSON payload, with the intention of updating the map of the drivers position
  • Create “Accept Job” API which will publish out on Kafka publisher where it will send JSON payload
  • Create “Bid for Job” API which will publish out on Kafka publisher where it will send JSON payload
  • Create Complete job API, which will publish out on Kafka publisher where it will send a JSON payload
  • Create ranking API, which will publish out on Kafka publisher where it will send a JSON payload
  • Create publish driver job co-ordinate update API, which will publish out on Kafka publisher where it will send a JSON payload

Kafka Streams

Create test app that tests out listening to any single Kafka publisher JSON topic, and creates streams app from it, and pushes out to an output topic

  • Create a windowed Kafka stream app that will window over all “driver bidding” jobs for a give period, and will output to an output stream, such that all the job bids can be consumed by Kafka Consumer
  • Create a paired stream of accepted job (id, client, driver id) and an updated driver position which will come in on a different stream
  • Create a ranking streams app which will store a successful ranking in a Kafka Stream KTable
  • Create a way to use Active Queries for allowing clients/drivers to query their rankings

 

 

HOW WILL PROGRESS BE TRACKED

I will simply use Trellos “Label” facility, such that done tasks will be “Green”, and there will obviously be a post/GitHub code repo folder that goes with that.

 

CAVEATS

1. I will not be concerned with connection failures, the aim of the project is to try and create a real world like project, but not actually create a end-end production grade application
2. I will be treating every run as if it were the first, I will not be storing ANY permanent state (apart from ratings potentially)
3. I will be doing things at my own pace (I have 2 kids) so it comes when it comes

4. I will try and use varied technology choices, which will in places mean that there could potentially be more work required to make it production quality

 

 

 

Crossbar.io quick look at it

A while ago someone posted another SignalR article on www.codeproject.com, and I stated hey you should take a look at this stuff : crossbar.io, and not liking to me someone that is not willing to die by the sword (afting living by it of course) I decided to put my money where my mouth was/is, and write a small article on crossbar.io.

So what its crossbar.io? Well quite simply it is a message broker that has many language bindings, that should all be able to communicate together seamlessly.

Here is what the people being crossbar.io have to say about their own product

Crossbar.io is an open source networking platform for distributed and microservice applications. It implements the open Web Application Messaging Protocol (WAMP), is feature rich, scalable, robust and secure. Let Crossbar.io take care of the hard parts of messaging so you can focus on your app’s features.

I decided to take a look at this a bit more which I have written up here : https://www.codeproject.com/Articles/1183744/Crossbar-io-a-quick-look-at-it

A Look At Docker

A while ago I worked on a project that used this tech stack

  • Akka HTTP : (actually we used Spray.IO but it is practically the same thing for the purpose of this article). For those that don’t know what Akka HTTP is, it is a simple Akka based framework that is also able to expose a REST interface to communicate with the actor system
  • Cassandra database : Apache Cassandra is a free and open-source distributed database management system designed to handle large amounts of data across many commodity servers, providing high availability with no single point of failure. Cassandra offers robust support for clusters spanning multiple datacenters, with asynchronous masterless replication allowing low latency operations for all clients.

It is a multi node cluster

This was a pain to test, and we were always stepping on each others toes, as you can imagine running up a 5 node cluster of VMs just to satisfy my each developers own testing needs was a bit much. So we ended up with some dedicated test environments, running 5 Cassandra nodes. These was still a PITA to be honest.

This got me thinking perhaps I could use Docker to help me out here, perhaps I could run Cassandra in a Docker container, hell perhaps I could even run my own code that uses Cassandra in a Docker container, and just point my UI at the Akka HTTP REST server running in Docker. mmmmm

I started to dig around, and of course this is entirely possible (otherwise I would not be writing this article now would I).

This is certainly not a new thing here for Codeproject, there are numerous Docker articles,  but I never found one that talked about Cassandra, so I thought why not write another one.

 

Which I have just published here : https://www.codeproject.com/Articles/1175248/A-look-at-Docker

Update scheduled Quartz.Net job by monitoring App.Config

 

Introduction

So I was back in .NET land the other day at work, where I had to schedule some code to run periodically on some schedule.

The business also needed this schedule to be adjustable, so that they could adjust it when things were busier and wind it down when they are not

This adjusting of the schedule time would be done via a setting in the App.Config, where the App.Config is monitored for changes. If there is a change then we would look to use the new schedule value from the App.Config to run the job. Ideally the app must not go down to afford this change of job schedule time.

There are some good job / scheduling libraries out there, but for this I just wanted to use something light weight so I went with Quartz.net

Its easy to setup and use, and has a fairly nice API, supports IOC and CRON schedules. In short it fits the bill

In a netshell this post will simple talk about how you can adjust the schedule of a ALREADY scheduled job, there will also be some special caveats that I personally had to deal with in my requirements, which may or may not be an issue for you

 

Some Weird Issues That I Needed To Cope With

So let me just talk about some of the issues that I had to deal with

The guts of the job code that I run on my schedule is actually writing to Azure Blob Storage and then to Azure SQL DW tables. And as such has several writes to several components one after another.

So this run of the current job run MUST be allowed to complete in FULL (or fail using Exception handling that’s ok to). It would not be acceptable to just stop the Quartz job while there is work in flight.

I guess some folk may be thinking of some sort of transaction here, that must either commit or rollback. Unfortunately that doesn’t work with Azure Blob Storage uploads.

So I had to think of another plan.

So here is what I came up with. I would use threading primitives namely an AutoResetEvent that would control when the Quartz.net job could be changed to use a new schedule.

if a change in the App.Config was seen, then we know that we “should” be using a new schedule time, however the scheduled job MAY have work in flight. So we need to wait for that work to complete (or fail) before we could think about swapping the Quartz.net scheduler time.

So that is what I went for, there are a few other things to be aware of such as I needed threading primitives that worked with Async/Await code. Luckily Stephen Toub from the TPL team has done that for us : asyncautoresetevent

There is also the well known fact that the FileSystemWatcher class fires events twice : http://lmgtfy.com/?q=filesystemwatcher+firing+twice

So as we go through the code you will see how I dealt with those

The Code

Ok so now that we have talked about the problem, lets go through the code.

There are several NuGet packages I am using to make my life easier

So lets start with the entry point, which for me is the simple Program class shown below

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Security.Principal;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Autofac;
using SachaBarber.QuartzJobUpdate.Services;
using Topshelf;


namespace SachaBarber.QuartzJobUpdate
{
    static class Program
    {
        private static ILogger _log = null;


        [STAThread]
        public static void Main()
        {
            try
            {
                var container = ContainerOperations.Container;
                _log = container.Resolve<ILogger>();
                _log.Log("Starting");

                AppDomain.CurrentDomain.UnhandledException += AppDomainUnhandledException;
                TaskScheduler.UnobservedTaskException += TaskSchedulerUnobservedTaskException;
                Thread.CurrentPrincipal = new WindowsPrincipal(WindowsIdentity.GetCurrent());
                
                HostFactory.Run(c =>                                 
                {
                    c.Service<SomeWindowsService>(s =>                        
                    {
                        s.ConstructUsing(() => container.Resolve<SomeWindowsService>());
                        s.WhenStarted(tc => tc.Start());             
                        s.WhenStopped(tc => tc.Stop());               
                    });
                    c.RunAsLocalSystem();                            

                    c.SetDescription("Uploads Calc Payouts/Summary data into Azure blob storage for RiskStore DW ingestion");       
                    c.SetDisplayName("SachaBarber.QuartzJobUpdate");                       
                    c.SetServiceName("SachaBarber.QuartzJobUpdate");                      
                });
            }
            catch (Exception ex)
            {
                _log.Log(ex.Message);
            }
            finally
            {
                _log.Log("Closing");
            }
        }
      

        private static void AppDomainUnhandledException(object sender, UnhandledExceptionEventArgs e)
        {
            ProcessUnhandledException((Exception)e.ExceptionObject);
        }

        private static void TaskSchedulerUnobservedTaskException(object sender, UnobservedTaskExceptionEventArgs e)
        {
            ProcessUnhandledException(e.Exception);
            e.SetObserved();
        }

        private static void ProcessUnhandledException(Exception ex)
        {
            if (ex is TargetInvocationException)
            {
                ProcessUnhandledException(ex.InnerException);
                return;
            }
            _log.Log("Error");
        }
    }
}

All this does it host the actual windows service class for me using TopShelf. Where the actual service class looks like this

using System;
using System.Configuration;
using System.IO;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Xml.Linq;
using Autofac;
using SachaBarber.QuartzJobUpdate.Async;
using SachaBarber.QuartzJobUpdate.Configuration;
using SachaBarber.QuartzJobUpdate.Jobs;
using SachaBarber.QuartzJobUpdate.Services;
//logging
using Quartz;

namespace SachaBarber.QuartzJobUpdate
{
    public class SomeWindowsService
    {
        private readonly ILogger _log;
        private readonly ISchedulingAssistanceService _schedulingAssistanceService;
        private readonly IRxSchedulerService _rxSchedulerService;
        private readonly IObservableFileSystemWatcher _observableFileSystemWatcher;
        private IScheduler _quartzScheduler;
        private readonly AsyncLock _lock = new AsyncLock();
        private readonly SerialDisposable _configWatcherDisposable = new SerialDisposable();
        private static readonly JobKey _someScheduledJobKey = new JobKey("SomeScheduledJobKey");
        private static readonly TriggerKey _someScheduledJobTriggerKey = new TriggerKey("SomeScheduledJobTriggerKey");

        public SomeWindowsService(
            ILogger log,
            ISchedulingAssistanceService schedulingAssistanceService, 
            IRxSchedulerService rxSchedulerService,
            IObservableFileSystemWatcher observableFileSystemWatcher)
        {
            _log = log;
            _schedulingAssistanceService = schedulingAssistanceService;
            _rxSchedulerService = rxSchedulerService;
            _observableFileSystemWatcher = observableFileSystemWatcher;
        }

        public void Start()
        {
            try
            {
                var ass = typeof (SomeWindowsService).Assembly;
                var configFile = $"{ass.Location}.config"; 
                CreateConfigWatcher(new FileInfo(configFile));


                _log.Log("Starting SomeWindowsService");

                _quartzScheduler = ContainerOperations.Container.Resolve<IScheduler>();
                _quartzScheduler.JobFactory = new AutofacJobFactory(ContainerOperations.Container);
                _quartzScheduler.Start();

                //create the Job
                CreateScheduledJob();
            }
            catch (JobExecutionException jeex)
            {
                _log.Log(jeex.Message);
            }
            catch (SchedulerConfigException scex)
            {
                _log.Log(scex.Message);
            }
            catch (SchedulerException sex)
            {
                _log.Log(sex.Message);
            }

        }

        public void Stop()
        {
            _log.Log("Stopping SomeWindowsService");
            _quartzScheduler?.Shutdown();
            _configWatcherDisposable.Dispose();
            _observableFileSystemWatcher.Dispose();
        }


        private void CreateConfigWatcher(FileInfo configFileInfo)
        {
            FileSystemWatcher watcher = new FileSystemWatcher();
            watcher.Path = configFileInfo.DirectoryName;
            watcher.NotifyFilter = 
                NotifyFilters.LastAccess | 
                NotifyFilters.LastWrite | 
                NotifyFilters.FileName | 
                NotifyFilters.DirectoryName;
            watcher.Filter = configFileInfo.Name;
            _observableFileSystemWatcher.SetFile(watcher);
            //FSW is notorious for firing twice see here : 
            //http://stackoverflow.com/questions/1764809/filesystemwatcher-changed-event-is-raised-twice
            //so lets use Rx to Throttle it a bit
            _configWatcherDisposable.Disposable = _observableFileSystemWatcher.Changed.SubscribeOn(
                _rxSchedulerService.TaskPool).Throttle(TimeSpan.FromMilliseconds(500)).Subscribe(
                    async x =>
                    {
                        //at this point the config has changed, start a critical section
                        using (var releaser = await _lock.LockAsync())
                        {
                            //tell current scheduled job that we need to read new config, and wait for it
                            //to signal us that we may continue
                            _log.Log($"Config file {configFileInfo.Name} has changed, attempting to read new config data");
                            _schedulingAssistanceService.RequiresNewSchedulerSetup = true;
                            _schedulingAssistanceService.SchedulerRestartGate.WaitAsync().GetAwaiter().GetResult();
                            //recreate the AzureBlobConfiguration, and recreate the scheduler using new settings
                            ConfigurationManager.RefreshSection("schedulingConfiguration");
                            var newSchedulingConfiguration = SimpleConfig.Configuration.Load<SchedulingConfiguration>();
                            _log.Log($"SchedulingConfiguration section is now : {newSchedulingConfiguration}");
                            ContainerOperations.ReInitialiseSchedulingConfiguration(newSchedulingConfiguration);
                            ReScheduleJob();
                        }
                    },
                    ex =>
                    {
                        _log.Log($"Error encountered attempting to read new config data from config file {configFileInfo.Name}");
                    });
        }

        private void CreateScheduledJob(IJobDetail existingJobDetail = null)
        {
            var azureBlobConfiguration = ContainerOperations.Container.Resolve<SchedulingConfiguration>();
            IJobDetail job = JobBuilder.Create<SomeQuartzJob>()
                    .WithIdentity(_someScheduledJobKey)
                    .Build();

            ITrigger trigger = TriggerBuilder.Create()
                .WithIdentity(_someScheduledJobTriggerKey)
                .WithSimpleSchedule(x => x
                    .RepeatForever()
                    .WithIntervalInSeconds(azureBlobConfiguration.ScheduleTimerInMins)
                )
                .StartAt(DateTimeOffset.Now.AddSeconds(azureBlobConfiguration.ScheduleTimerInMins))
                .Build();

            _quartzScheduler.ScheduleJob(job, trigger);
        }

        private void ReScheduleJob()
        {
            if (_quartzScheduler != null)
            {
                _quartzScheduler.DeleteJob(_someScheduledJobKey);
                CreateScheduledJob();
            }
        }
    }


}

There is a fair bit going on here. So lets list some of the work this code does

  • It creates the initial Quartz.Net job and scheduled it using the values from a custom config section which are read into an object
  • It watches the config file for changes (we will go through that in a moment) and will wait on the AsyncAutoResetEvent to be signalled, at which point it will recreate the Quartz.net job

So lets have a look at some of the small helper parts

This is a simple Rx based file system watcher. The reason Rx is good here is that you can Throttle the events (see this post FileSystemWatcher raises 2 events)

using System;
using System.IO;
using System.Reactive.Linq;

namespace SachaBarber.QuartzJobUpdate.Services
{
    public class ObservableFileSystemWatcher : IObservableFileSystemWatcher
    {
        private FileSystemWatcher _watcher;

        public void SetFile(FileSystemWatcher watcher)
        {
            _watcher = watcher;

            Changed = Observable
                .FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>
                (h => _watcher.Changed += h, h => _watcher.Changed -= h)
                .Select(x => x.EventArgs);

            Renamed = Observable
                .FromEventPattern<RenamedEventHandler, RenamedEventArgs>
                (h => _watcher.Renamed += h, h => _watcher.Renamed -= h)
                .Select(x => x.EventArgs);

            Deleted = Observable
                .FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>
                (h => _watcher.Deleted += h, h => _watcher.Deleted -= h)
                .Select(x => x.EventArgs);

            Errors = Observable
                .FromEventPattern<ErrorEventHandler, ErrorEventArgs>
                (h => _watcher.Error += h, h => _watcher.Error -= h)
                .Select(x => x.EventArgs);

            Created = Observable
                .FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>
                (h => _watcher.Created += h, h => _watcher.Created -= h)
                .Select(x => x.EventArgs);

            All = Changed.Merge(Renamed).Merge(Deleted).Merge(Created);
            _watcher.EnableRaisingEvents = true;
        }

        public void Dispose()
        {
            _watcher.EnableRaisingEvents = false;
            _watcher.Dispose();
        }

        public IObservable<FileSystemEventArgs> Changed { get; private set; }
        public IObservable<RenamedEventArgs> Renamed { get; private set; }
        public IObservable<FileSystemEventArgs> Deleted { get; private set; }
        public IObservable<ErrorEventArgs> Errors { get; private set; }
        public IObservable<FileSystemEventArgs> Created { get; private set; }
        public IObservable<FileSystemEventArgs> All { get; private set; }
    }
}

And this is a small utility class that will contain the results of the custom config section that may be read using SimpleConfig

namespace SachaBarber.QuartzJobUpdate.Configuration
{
    public class SchedulingConfiguration
    {
        public int ScheduleTimerInMins { get; set; }

        public override string ToString()
        {
            return $"ScheduleTimerInMins: {ScheduleTimerInMins}";
        }
    }
}

Which you read from the App.Config like this

 var newSchedulingConfiguration = SimpleConfig.Configuration.Load<SchedulingConfiguration>();

And this is the Async/Await compatible AutoResetEvent that I took from Stephen Toubs blog

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace SachaBarber.QuartzJobUpdate.Async
{
    public class AsyncAutoResetEvent
    {
        private static readonly Task Completed = Task.FromResult(true);
        private readonly Queue<TaskCompletionSource<bool>> _waits = new Queue<TaskCompletionSource<bool>>();
        private bool _signaled;

        public Task WaitAsync()
        {
            lock (_waits)
            {
                if (_signaled)
                {
                    _signaled = false;
                    return Completed;
                }
                else
                {
                    var tcs = new TaskCompletionSource<bool>();
                    _waits.Enqueue(tcs);
                    return tcs.Task;
                }
            }
        }

        public void Set()
        {
            TaskCompletionSource<bool> toRelease = null;
            lock (_waits)
            {
                if (_waits.Count > 0)
                    toRelease = _waits.Dequeue();
                else if (!_signaled)
                    _signaled = true;
            }
            toRelease?.SetResult(true);
        }
    }
}

So the last part of the puzzle is how does the AsynAutoReset event get signalled?

Well as we said above we need to wait for any in progress work to complete first. So the way I tackled that was that within the job code that gets run every Quartz.Net scheduler tick time, we just check whether we have be requested to swap out the current schedule time, and if so we should signal the waiting code of the (shared) AsyncAutoResetEvent, otherwise we just carry on and do the regular job work.

The way that we get the AsyncAutoResetEvent that is used by the waiting code and also the job code (to signal it) is via using a singleton registration in an IOC container. I am using AutoFac which I set up like this, but you could have your own singleton, or IOC container of choice that you could use.

The trick is to make sure that both classes that need to access the AsyncAutoResetEvent use a single instance.

using System;
using System.Reflection;
using Autofac;
using SachaBarber.QuartzJobUpdate.Configuration;
using SachaBarber.QuartzJobUpdate.Services;
using Quartz;
using Quartz.Impl;

namespace SachaBarber.QuartzJobUpdate
{
    public class ContainerOperations
    {
        private static Lazy<IContainer> _containerSingleton = 
            new Lazy<IContainer>(CreateContainer);

        public static IContainer Container => _containerSingleton.Value;

        public static void ReInitialiseSchedulingConfiguration(
            SchedulingConfiguration newSchedulingConfiguration)
        {
            var currentSchedulingConfiguration = 
                Container.Resolve<SchedulingConfiguration>();
            currentSchedulingConfiguration.ScheduleTimerInMins = 
                newSchedulingConfiguration.ScheduleTimerInMins;
        }
        

        private static IContainer CreateContainer()
        {
            var builder = new ContainerBuilder();
            builder.RegisterType<ObservableFileSystemWatcher>()
                .As<IObservableFileSystemWatcher>().ExternallyOwned();
            builder.RegisterType<RxSchedulerService>()
                .As<IRxSchedulerService>().ExternallyOwned();
            builder.RegisterType<Logger>().As<ILogger>().ExternallyOwned();
            builder.RegisterType<SomeWindowsService>();
            builder.RegisterInstance(new SchedulingAssistanceService())
                .As<ISchedulingAssistanceService>();
            builder.RegisterInstance(
                SimpleConfig.Configuration.Load<SchedulingConfiguration>());

            // Quartz/jobs
            builder.Register(c => new StdSchedulerFactory().GetScheduler())
                .As<Quartz.IScheduler>();
            builder.RegisterAssemblyTypes(Assembly.GetExecutingAssembly())
                .Where(x => typeof(IJob).IsAssignableFrom(x));
            return builder.Build();
        }

        
    }
}

Where the shared instance in my case is this class

using SachaBarber.QuartzJobUpdate.Async;

namespace SachaBarber.QuartzJobUpdate.Services
{
    public class SchedulingAssistanceService : ISchedulingAssistanceService
    {
        public SchedulingAssistanceService()
        {
            SchedulerRestartGate = new AsyncAutoResetEvent();
            RequiresNewSchedulerSetup = false;
        }    

        public AsyncAutoResetEvent SchedulerRestartGate { get; }
        public bool RequiresNewSchedulerSetup { get; set; }
    }
}

Here is the actual job code that will check to see if a change in the App.Config has been detected. Which would require this code to signal the waiting code that it may continue.

using System;
using System.IO;
using System.Net;
using System.Threading.Tasks;
using Quartz;

namespace SachaBarber.QuartzJobUpdate.Services
{
    public class SomeQuartzJob : IJob
    {
        private readonly ILogger _log;
        private readonly ISchedulingAssistanceService _schedulingAssistanceService;

        public SomeQuartzJob(
            ILogger log, 
            ISchedulingAssistanceService schedulingAssistanceService)
        {
            _log = log;
            _schedulingAssistanceService = schedulingAssistanceService;
        }


        public void Execute(IJobExecutionContext context)
        {
            try
            {
                ExecuteAsync(context).GetAwaiter().GetResult();
            }
            catch (JobExecutionException jeex)
            {
                _log.Log(jeex.Message);
                throw;
            }
            catch (SchedulerConfigException scex)
            {
                _log.Log(scex.Message);
                throw;
            }
            catch (SchedulerException sex)
            {
                _log.Log(sex.Message);
                throw;
            }
            catch (ArgumentNullException anex)
            {
                _log.Log(anex.Message);
                throw;
            }
            catch (OperationCanceledException ocex)
            {
                _log.Log(ocex.Message);
                throw;
            }
            catch (IOException ioex)
            {
                _log.Log(ioex.Message);
                throw;
            }
        }


        /// <summary>
        /// This is called every time the Quartz.net scheduler CRON time ticks
        /// </summary>
        public async Task ExecuteAsync(IJobExecutionContext context)
        {
            await Task.Run(async () =>
            {
                if (_schedulingAssistanceService.RequiresNewSchedulerSetup)
                {
                    //signal the waiting scheduler restart code that it can now restart the scheduler
                    _schedulingAssistanceService.RequiresNewSchedulerSetup = false;
                    _log.Log("Job has been asked to stop, to allow job reschedule due to change in config");
                    _schedulingAssistanceService.SchedulerRestartGate.Set();
                }
                else
                {
                    await Task.Delay(1000);
                    _log.Log("Doing the uninterruptible work now");
                }
            });
        }
    }
}

So when the AsyncAutoResetEvent is signalled the waiting code (inside the subscribe code of the Rx file system watcher inside the SomeWindowsService.cs code) will proceed to swap out the Quartz.Net scheduler time.

It can do this safely as we know there is NO work in flight as the job has told this waiting to code to proceed, which it can only do if there is no work in flight.

This swapping over of the scheduler time to use the newly read App.Config values is also protected in an AsyncLock class (again taken from Stephen Toub)

using System;
using System.Threading;
using System.Threading.Tasks;

namespace SachaBarber.QuartzJobUpdate.Async
{
    /// <summary>
    /// See http://blogs.msdn.com/b/pfxteam/archive/2012/02/12/10266988.aspx
    /// from the fabulous Stephen Toub
    /// </summary>    
    public class AsyncLock
    {
        private readonly AsyncSemaphore m_semaphore;
        private readonly Task<Releaser> m_releaser;

        public AsyncLock()
        {
            m_semaphore = new AsyncSemaphore(1);
            m_releaser = Task.FromResult(new Releaser(this));
        }

        public Task<Releaser> LockAsync()
        {
            var wait = m_semaphore.WaitAsync();
            return wait.IsCompleted ?
                m_releaser :
                wait.ContinueWith((_, state) => new Releaser((AsyncLock)state),
                    this, CancellationToken.None,
                    TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
        }

        public struct Releaser : IDisposable
        {
            private readonly AsyncLock m_toRelease;

            internal Releaser(AsyncLock toRelease) { m_toRelease = toRelease; }

            public void Dispose()
            {
                if (m_toRelease != null)
                    m_toRelease.m_semaphore.Release();
            }
        }
    }
}

Where this relies on AsyncSemaphore

using System;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace SachaBarber.QuartzJobUpdate.Async
{
    /// <summary>
    /// See http://blogs.msdn.com/b/pfxteam/archive/2012/02/12/10266983.aspx
    /// from the fabulous Stephen Toub
    /// </summary>
    public class AsyncSemaphore
    {
        private static readonly Task s_completed = Task.FromResult(true);
        private readonly Queue<TaskCompletionSource<bool>> _mWaiters = new Queue<TaskCompletionSource<bool>>();
        private int _mCurrentCount;

        public AsyncSemaphore(int initialCount)
        {
            if (initialCount < 0) throw new ArgumentOutOfRangeException("initialCount");
            _mCurrentCount = initialCount;
        }

        public Task WaitAsync()
        {
            lock (_mWaiters)
            {
                if (_mCurrentCount > 0)
                {
                    --_mCurrentCount;
                    return s_completed;
                }
                else
                {
                    var waiter = new TaskCompletionSource<bool>();
                    _mWaiters.Enqueue(waiter);
                    return waiter.Task;
                }
            }
        }


        public void Release()
        {
            TaskCompletionSource<bool> toRelease = null;
            lock (_mWaiters)
            {
                if (_mWaiters.Count > 0)
                    toRelease = _mWaiters.Dequeue();
                else
                    ++_mCurrentCount;
            }
            if (toRelease != null)
                toRelease.SetResult(true);
        }


    }
}

Just for completeness this is how you get an App.Config section to refresh at runtime

 ConfigurationManager.RefreshSection("schedulingConfiguration");

Anyway this works fine for me, I now have a reactive app that changes to changes in the App.Config without the need to restart the app, and it does so by allowing inflight work to be completed.

Hope it helps someone out there

 

Where Is The Code?

The code can be found here : : https://github.com/sachabarber/SachaBarber.QuartzJobUpdate

scala environment config options

This is going to be a slightly weird post in a way as it is going to go round the houses a bit, and not going to contain any actual code, but shall talk about possible techniques of how to best manage specific environment config values for a multi project scala setup

Coming from .NET

So as many of you know I came from .NET, where we have a simple config model. We have App.Config or Web.Config.

We have tools at our disposal such as the XmlTransformation MsBuild tasks which allow us to maintain a set of different App.Config values in them that will be transformed for your different environments

I wrote a post about this here

https://sachabarbs.wordpress.com/2015/07/07/app-config-transforms-outside-of-web-project/

Here is a small example of what this might look like

 

image

So when I started doing multi project Scala projects using SBT where I might have the following project requirements

image

In the above diagram the following is assumed

  • There is a Sacha.Common library that we want to use across multiple projects
  • That both Sacaha.Rest.Endpoints and Sacha.Play.FrontEnd are both apps which will need some environment specific configuration in them
  • That there is going to be more than 1 environment that we wish to use such as
    • PROD
    • QA
    • DEV

Now coming from .NET my initial instinct was to put a a bunch of folders in the 2 apps, so taking the Sacha.Rest.Endpoints app as an example we may have something like this

image

So the idea would be that we would have specific application.conf files for the different environments that we need to target (I am assuming there is some build process which takes care of putting the correct file in place for the correct build within the CI server).

This is very easy to understand, if we want QA we would end up using the QA version of the application.conf file

This is a very common way of thinking about this problem in .NET.

Why Is This Bad?

But hang on here this is only 1 app, what if we had 100 apps that made up our software in total. That means we need to maintain all these separate config files for all the environments in ALL the separate apps.

Wow that doesn’t sound so cool anymore.

Another Approach!

A colleague and I were talking about this in some scala code that was being written for a new project, and this is kind of what was being discussed.

I should point out that this idea was not in fact mine, but my colleagues Andy Sprague, which is not something I credited him for in the 1st draft of this post. Which is bad, sorry Andy.

Anyway how about this for another idea. How about the Sacha.Common JAR hold just the specific bits of changing config in separate config files such as

  • “Qa.conf”
  • “Prod.conf”
  • “Dev.conf”

And then the individual apps that already reference the Sacha.Common JAR just include the environment config they need.

This is entirely possible thanks to the way that the Typesafe config library works, where it is designed to include extra config files. These extra config files in this case are just inside of the a JAR that is external -> Sacha.Common

Here is what this might like look for a consumer of the Sacha.Common jar

image

Where we just include the relevant environment config from Sacha.Common in the application.conf for the current app

And this is what the Sacha.Common may look like, where it provides the separate environment config files that consumers may use

image

This diagram may help to illustrate this further

image

Why Is This Cool?

The good thing about this design over the separate config files per environment per application is that we now ONLY need to maintain one set of environment specific settings, which are those in the common Jar Sacha.Common

I wish we could do this within the .NET configuration system.

Hope this helps, I personally think that this is a fairly nice way to manage your configs for multiple applications and multiple environments

 

 

 

 

 

 

No longer an mvp

So January is here, and I have been informed I am no longer an MVP.

I held the MVP award for 9 years, so I am happy with that, would have been nice to make it a 10th, but hey ho

To be honest I am not too surprised by this announcement, as I have become more and more interested in a wider range of things, such as

  • Scala
  • Akka
  • Play
  • Cassandra
  • Kafka

I also spent most of the last year blogging about these subjects.

I was told that MVPs these days are contributing about 120 blobs per year in the UK so that is what one is up against.

I have 2 kids so that’s not going to happen for me, like ever, anyone with 2 or more kids will know what I mean here

Like I say I am not surprised, it has been great to be recognized as an MVP, even though I never did go to a single MVP summit (should have doh)

And I would like to thank Microsoft for the opportunity to preach the good word.

I would also like to personally thank Chris Maunder of codeproject.com who was the chap that originally nominated me, thanks so much Chris. Chris you rock.

As for what’s next for me, I am going to continue to blog, and I hope to get back into writing articles for codeproject.com again after a break from that.

You can expect me to keep on writing about stuff that I enjoy.

So for now over and out, thanks for all the support over the years guys/girls.

RX Over the wire

 

Introduction

Now you know I’m a RX fan boy, I think it’s the bees knees in fact. Of late I have also gotten into Akka, and Akka Streams, which is one implementation of the Reactive Streams API.

I also have a colleague who recently attend the ReactConf and came back raving about all the good work that NetFlix were doing using this uber duper specific socket that they have developed to allow RX type operators over the wire using a socket.

NetFlix call their implementation ReactiveSocket : http://reactivesocket.io/ 

Which offers these things

  • request/response (stream of 1)
  • request/stream (finite stream of many)
  • fire-and-forget (no response)
  • event subscription (infinite stream of many)
  • channel (bi-directional streams)

Mmmm, sounds pretty cool. Thing is I was sure I had seen this before, and a long time ago too (well a long time in software years).

IQbservable – The Dual of IQueryable

Back in 2010 Bart De Smet of the RX team posted a couple of intriguing resources around this VERY badly named interface.

Most informative one being this video:

https://channel9.msdn.com/shows/Going+Deep/Bart-De-Smet-Observations-on-IQbservable-The-Dual-of-IQueryable/

For those that can not be bothered to watch the video here are some of the highlights

IQbservable allows the following

  • Combines LINQ Queryable and RX Observable functionality
  • Queryable – allows you to create a query client side using LINQ, and pass that to a datasource (server, database, web service etc)
  • Observable – instead of blocking until the data comes back, will just notify you know when it gets the data

So those are the key take away points. But how about a nice diagram or 2 to really set the scene

I think these are the best 2 diagrams (at least for my money)

image

image

So that is what IQbservable is all about. So what is the rest of this post about then?

Well it just so happens that Dave Sexton one of the Reactive Extension Extensions guys (meant to be slightly tongue in cheek) has written an extremely useful and fairly lightweight library that does much of what is described above.

Dave calls it QActive. He has done a great job of it, and has written serialized expression trees and parsers, which allow us to create client client queries in LINQ which are sent across the wire.

In the rest of this post I will be showcasing a very simple demo based on QActive, and I’ll point out some more links that Dave Sexton provides, which are invaluable reading

 Dave has 2 of his own posts covering more than I do here, which you can go to here:

 

A Small Demo

So it may not come as a surprise to know that we need a server side and a client side. We will look at both of these for a simple example, and in the download at my github there is also an forms based server that allows you to push items to the client on demand

What Is Not Inlcuded

There is no form of fault tolerance, if you want that you could do worse than to read my SignalR + RX code, which shows you how to make a resilient connection using RX

https://www.codeproject.com/Articles/851437/SignalR-plus-RX-Streaming-Data-Demo-App-of:

We will now proceed to look at a simple server/client. This is the most simplistic of examples that has a server that runs on a timer, and the client provides a LINQ where (filter) to this that will be applied to the server side stream ON THE SERVER.

Simple Server

This is all we need for a simple server

using System;
using System.Net;
using System.Reactive.Linq;
using Qactive;

namespace Server
{
    class Program
    {
        static void Main(string[] args)
        {
            IObservable<long> source = Observable.Interval(TimeSpan.FromSeconds(1));
            var service = source.ServeQbservableTcp(
                new IPEndPoint(IPAddress.Loopback, 3205),
                new QbservableServiceOptions()
                {
                    SendServerErrorsToClients = true,
                    EnableDuplex = true,
                    AllowExpressionsUnrestricted = true
                }
            );
            using (service.Subscribe(
              client => Console.WriteLine(
                  "Client shutdown."),
              ex => Console.WriteLine(
                  "Fatal error: {0}", ex.Message),
              () => Console.WriteLine(
                  "This will never be printed because a service host never completes.")))
            {
                Console.ReadKey();
            }
        }
    }
}

Take away points here are:

  • We use a TCIP IPEndpoint to bind the server too
  • We can subscribe to the IObservable to see what is happening with the connected client

On Demand Server Notification

If you want to see a server that allows you to send notifications to the clients on demand have a look at the FormsServer in my GitHub repo.

 

Simple Client

This is all we need for a simple client

using System;
using System.Net;
using System.Reactive;
using System.Reactive.Linq;
using Qactive;

namespace Client
{
    class Program
    {
        static void Main(string[] args)
        {
            var client = new TcpQbservableClient<long>(new IPEndPoint(IPAddress.Loopback, 3205));

            //thie expression tree filtering will happen server side
            //THAT IS AWESOME
            IQbservable<string> query =
              from value in client.Query()
              where value <= 5 || value >= 8
              select string.Format("The incoming value has been doubled to {0}", value * 2);

            using (query.Subscribe(
              value => Console.WriteLine("Client observed: " + value),
              ex => Console.WriteLine("Error: {0}", ex.Message),
              () => Console.WriteLine("Completed")))
            {
                Console.ReadKey();
            }
        }
    }
}

 

Take away points here are:

  • The client is also using a TCIP IPEndpoint
  • The client IS ABLE to use LINQ expressions which WILL be serialized and sent to the server where they will be applied

Here is the output when the simple server is run, and 2 clients are started one after another.

image

 

Why Is This Cool?

This is very cool (uber cool in fact), we have just created a push based notification system that supports server side filtering (thanks to LINQ) in about 20 lines of code.

If you can’t see what is cool about that, well I can’t help you.

There may be some amongst you that go well any messaging framework that has a server and a client would/could do that in the same amount of code, what and the server side push down delegates (thanks to serializable expression trees)….mmmmm Don’t think so.

Only thing I can think of that even comes close is OData, but that requires a fair bit of infrastructure/baggage to make it work.

 

 

Where Can I Get The Code?

As usual I have posted the code  to my GitHub account :

https://github.com/sachabarber/RxOverTheWire

AKKA STREAMS

Last time we looked at Akka Http, this time we will look at Akka Streams.

Akka Streams is a vast topic, and you will definitely need to supplement this  post with the official documentation.

Akka Streams is one of the founding members of Reactive Streams, and Akka streams is one implementation (there are many) of the Reactive Streams APIs.

Reactive Streams  is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure. This encompasses efforts aimed at runtime environments (JVM and JavaScript) as well as network protocols.

Introduction

There may be some readers who have come from .NET such as myself who have used RX.

You may even have heard of Reactive Streams before. So what exactly makes reactive streams different from Rx?

The central thing that is the big win with reactive streams over Rx is the idea of back pressure. Here is what the Akka docs say about back pressure

The back pressure protocol is defined in terms of the number of elements a downstream Subscriber is able to receive and buffer, referred to as demand. The source of data, referred to as Publisher in Reactive Streams terminology and implemented as Source in Akka Streams, guarantees that it will never emit more elements than the received total demand for any given Subscriber.

http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-flows-and-basics.html#back-pressure-explained

Luckily this is all inbuilt to Akka streams, you do not have to worry about this too much as a user of Akka streams.

You can pretty much decide how you want the built in streams pipelines (which we will be diving into in more details below) in terms of backpressure using the OverflowStrategy enum value. Here is a very simple example

Source(1 to 10).buffer(10, OverflowStrategy.backpressure)

Where the following are the available OverflowStrategy values

object OverflowStrategy {
  /**
   * If the buffer is full when a new element arrives, drops the oldest element from the buffer to make space for
   * the new element.
   */
  def dropHead: OverflowStrategy = DropHead

  /**
   * If the buffer is full when a new element arrives, drops the youngest element from the buffer to make space for
   * the new element.
   */
  def dropTail: OverflowStrategy = DropTail

  /**
   * If the buffer is full when a new element arrives, drops all the buffered elements to make space for the new element.
   */
  def dropBuffer: OverflowStrategy = DropBuffer

  /**
   * If the buffer is full when a new element arrives, drops the new element.
   */
  def dropNew: OverflowStrategy = DropNew

  /**
   * If the buffer is full when a new element is available this strategy backpressures the upstream publisher until
   * space becomes available in the buffer.
   */
  def backpressure: OverflowStrategy = Backpressure

  /**
   * If the buffer is full when a new element is available this strategy completes the stream with failure.
   */
  def fail: OverflowStrategy = Fail
}

So that is the basic idea, Akka streams does provide a lot of stuff, such as

  • Built in stages/shapes
  • A graph API
  • Ability to create your own stages/shapes

For the rest of this post we will be looking at some examples of these 3 points.

Working With The Akka Streams APIs

As stated at the beginning of this post the Akka Streams implementation is vast. There is a lot of ground to cover, far more than I can reasonably cover in a small blog post. The official docs are still the place to go, but if you have not heard of Akka Streams this post may be enough to get you into it.

The official docs (at time of writing) are here:

http://doc.akka.io/docs/akka/2.4.2/scala/stream/index.html

 

Working With Built In Stages/Shapes

Akka comes with loads of prebuilt stages which we can make use of. However before I mention those lets try and just spend a bit of time taking a bit about how you use the Akka Streams APIs in their most basic form.

The idea is that we have 4 different parts that make up a useable pipeline.

Source
A processing stage with exactly one output, emitting data elements whenever downstream processing stages are ready to receive them.

Sink
A processing stage with exactly one input, requesting and accepting data elements possibly slowing down the upstream producer of elements

Flow
A processing stage which has exactly one input and output, which connects its up- and downstreams by transforming the data elements flowing through it.

RunnableGraph
A Flow that has both ends “attached” to a Source and Sink respectively, and is ready to be run().

As I say Akka comes with loads of inbuilt stages to make our lives easier here. For example these are the available stages at time of writing

Source Stages

  • fromIterator
  • apply
  • single
  • repeat
  • tick
  • fromFuture
  • fromCompletionStage
  • unfold
  • unfoldAsync
  • empty
  • maybe
  • failed
  • actorPublisher
  • actorRef
  • combine
  • queue
  • asSubscriber
  • fromPublisher
  • fromFile

Sink Stages

  • head
  • headOption
  • last
  • lastOption
  • ignore
  • cancelled
  • seq
  • foreach
  • foreachParallel
  • onComplete
  • fold
  • reduce
  • combine
  • actorRef
  • actorRefWithAck
  • actorSubscriber
    asPublisher
  • fromSubscriber
  • toFile

We will now look at some example of using some of these

def simpleFlow() : Unit = {
  val source = Source(1 to 10)
  val sink = Sink.fold[Int, Int](0)(_ + _)
  // connect the Source to the Sink, obtaining a RunnableGraph
  val runnable: RunnableGraph[Future[Int]] = source.toMat(sink)(Keep.right)
  // materialize the flow and get the value of the FoldSink
  implicit val timeout = Timeout(5 seconds)
  val sumFuture: Future[Int] = runnable.run()
  val sum = Await.result(sumFuture, timeout.duration)
  println(s"source.toMat(sink)(Keep.right) Sum = $sum")

  // Use the shorthand source.runWith(sink)
  val sumFuture2: Future[Int] = source.runWith(sink)
  val sum2 = Await.result(sumFuture2, timeout.duration)
  println(s"source.runWith(sink) Sum = $sum")
}

In this simple example we have s Source(1 to 10) which we then wire up to a Sink which adds the numbers coming in.

This block demonstrates various different Source(s) and Sink(s)

def differentSourcesAndSinks() : Unit = {
  //various sources
  Source(List(1, 2, 3)).runWith(Sink.foreach(println))
  Source.single("only one element").runWith(Sink.foreach(println))
  //actor sink
  val helloActor = system.actorOf(Props[HelloActor], name = "helloactor")
  Source(List("hello", "hello"))
    .runWith(Sink.actorRef(helloActor,DoneMessage))
  //future source
  val futureString = Source.fromFuture(Future.successful("Hello Streams!"))
    .toMat(Sink.head)(Keep.right).run()
  implicit val timeout = Timeout(5 seconds)
  val theString = Await.result(futureString, timeout.duration)
  println(s"theString = $theString")
}

And this block demos using a simple Map on a Source

def mapFlow() : Unit = {
  val source = Source(11 to 16)
  val doublerSource = source.map(x => x * 2)
  val sink = Sink.foreach(println)
  implicit val timeout = Timeout(5 seconds)

  // Use the shorthand source.runWith(sink)
  val printSinkFuture: Future[Done] = doublerSource.runWith(sink)
  Await.result(printSinkFuture, timeout.duration)
}

Working With The Graph API

Akka streams also comes with a pretty funky graph building DSL. You would use this when you want to create quite elaborate flows.

The other very interesting thing about the graph builder DSL is that you can use custom shapes inside it, and you can also leave it partially connected. Such that you could potentially use it as a Source/Sink.

Lets say you had an output from the graph you built using the graph DSL, you could then use that partially constructed graph as a Source in its own right.

The same goes if you had an unconnected input in the graph you created you could use that as a Sink.

You can read more about this here :

http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-graphs.html#constructing-sources-sinks-and-flows-from-partial-graphs

I urge you all to have a read of that as its quite cool what can be done with the graph DSL

Ok so time for an example, this example comes directly from the TypeSafe activator code

http://www.lightbend.com/activator/template/akka-stream-scala

package com.sas.graphs

import java.io.File

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.ClosedShape
import akka.stream.scaladsl._
import akka.util.ByteString

import scala.concurrent.forkjoin.ThreadLocalRandom
import scala.util.{ Failure, Success }

class WritePrimesDemo {

  def run(): Unit = {
    implicit val system = ActorSystem("Sys")
    import system.dispatcher
    implicit val materializer = ActorMaterializer()

    // generate random numbers
    val maxRandomNumberSize = 1000000
    val primeSource: Source[Int, NotUsed] =
      Source.fromIterator(() => Iterator.continually(ThreadLocalRandom.current().nextInt(maxRandomNumberSize))).
        // filter prime numbers
        filter(rnd => isPrime(rnd)).
        // and neighbor +2 is also prime
        filter(prime => isPrime(prime + 2))

    // write to file sink
    val fileSink = FileIO.toPath(new File("target/primes.txt").toPath)
    val slowSink = Flow[Int]
      // act as if processing is really slow
      .map(i => { Thread.sleep(1000); ByteString(i.toString) })
      .toMat(fileSink)((_, bytesWritten) => bytesWritten)

    // console output sink
    val consoleSink = Sink.foreach[Int](println)

    // send primes to both slow file sink and console sink using graph API
    val graph = GraphDSL.create(slowSink, consoleSink)((slow, _) => slow) { implicit builder =>
      (slow, console) =>
        import GraphDSL.Implicits._
        val broadcast = builder.add(Broadcast[Int](2)) // the splitter - like a Unix tee
        primeSource ~> broadcast ~> slow // connect primes to splitter, and one side to file
        broadcast ~> console // connect other side of splitter to console
        ClosedShape
    }
    val materialized = RunnableGraph.fromGraph(graph).run()

    // ensure the output file is closed and the system shutdown upon completion
    materialized.onComplete {
      case Success(_) =>
        system.terminate()
      case Failure(e) =>
        println(s"Failure: ${e.getMessage}")
        system.terminate()
    }

  }

  def isPrime(n: Int): Boolean = {
    if (n <= 1) false
    else if (n == 2) true
    else !(2 to (n - 1)).exists(x => n % x == 0)
  }
}

The most important part of this code is this part

// send primes to both slow file sink and console sink using graph API
val graph = GraphDSL.create(slowSink, consoleSink)((slow, _) => slow) { implicit builder =>
  (slow, console) =>
    import GraphDSL.Implicits._
    val broadcast = builder.add(Broadcast[Int](2)) // the splitter - like a Unix tee
    primeSource ~> broadcast ~> slow // connect primes to splitter, and one side to file
    broadcast ~> console // connect other side of splitter to console
    ClosedShape
}
val materialized = RunnableGraph.fromGraph(graph).run()

There is 2 sinks defined before we use the Graph

  • A file Sink
  • A console Sink

There is also a Source that generates random primes

So the Graph DSL allows you to um well create graphs. It allows you to take in inputs and create other shapes using the implicit builder that is provided.

The DSL then allows you to connect inputs/other builder creates stages/shapes to the inputs and even expose the connected stages to an output.

This is done using the ~> syntax than simply means connect

As previously stated you can create partially connected graphs, but if you have all inputs and outputs connected it is considered a ClosedShape, that can be used as an isolated component

Here is an example of the output of running this graph example

image

Create Custom Shapes/Stages

It doesn’t stop there, we can also create out own shapes that can be used in flows. This is a pretty complex subject and you will definitely benefit from reading this page

http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-customize.html

There is no way this little post will cover enough, but here are some highlights of the official documentation

This is the basic pattern you would use to create a custom stage

import akka.stream.SourceShape
import akka.stream.stage.GraphStage
 
class NumbersSource extends GraphStage[SourceShape[Int]] {
  // Define the (sole) output port of this stage
  val out: Outlet[Int] = Outlet("NumbersSource")
  // Define the shape of this stage, which is SourceShape with the port we defined above
  override val shape: SourceShape[Int] = SourceShape(out)
 
  // This is where the actual (possibly stateful) logic will live
  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = ???
}

Most of the actual logic will be inside the createLogic method. But in order to do anything useful in there you will need to use handlers. Handlers are what you use to handle input/output. There are InHandler and OutHandler.

Each of which has its own state machine flow. For example this is the state machine for an OutHandler

image

Whilst this is the one for InHandler

image

This is the best page to read to learn more about these handlers

http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-customize.html#Port_states__InHandler_and_OutHandler

The one and ONLY place that state should be maintained is within the createLogic method.

Lets consider a small example. Lets say we have some objects like this

case class Element(id: Int, value: Int)

And we want to build a custom stage that will allow us to select a value from this type, and should only emit an output value for unique values as provided by the property selector.

We could call this DistinctUntilChanged. Lets see what an example for this could look like

package com.sas.customshapes

import akka.stream.stage.{GraphStageLogic, InHandler, OutHandler, GraphStage}
import akka.stream.{Outlet, Attributes, Inlet, FlowShape}

import scala.collection.immutable

final class DistinctUntilChanged[E, P](propertyExtractor: E => P)
  extends GraphStage[FlowShape[E, E]] {

  val in = Inlet[E]("DistinctUntilChanged.in")
  val out = Outlet[E]("DistinctUntilChanged.out")

  override def shape = FlowShape.of(in, out)

  override def createLogic(attributes: Attributes) = new GraphStageLogic(shape) {

    private var savedState : Option[E] = None

    setHandlers(in, out, new InHandler with OutHandler {

      override def onPush(): Unit = {
        val nextElement = grab(in)
        val nextState = propertyExtractor(nextElement)

        if (savedState.isEmpty  || propertyExtractor(savedState.get) != nextState) {
          savedState = Some(nextElement)
          push(out, savedState.get)
        }
        else {
          pull(in)
        }
        savedState = Some(nextElement)
      }

      override def onPull(): Unit = {
        pull(in)
      }

      override def onUpstreamFinish(): Unit = {
        completeStage()
      }
    })

    override def postStop(): Unit = {
      savedState = None
    }
  }
}



The highlights of this are

  • We have a single Inlet
  • We have a single Outlet
  • We expose a FlowShape (in/out only) there are many shapes but FlowShape is what we want for one in/out out
  • We use createLogic to do the work
  • We use an InHandler to handle input
  • We use an OutHandler to handle output

One other important thing (at least for this single in/out example) is that we DO NOT call pull/push more than once in the createLogic

Lets assume we have these elements

package com.sas.customshapes

import scala.collection.immutable

object SampleElements {

  val E11 = Element(1, 1)
  val E21 = Element(2, 1)
  val E31 = Element(3, 1)
  val E42 = Element(4, 2)
  val E52 = Element(5, 2)
  val E63 = Element(6, 3)

  val Ones = immutable.Seq(E11, E21, E31)
  val Twos = immutable.Seq(E42, E52)
  val Threes = immutable.Seq(E63)

  val All = Ones ++ Twos ++ Threes
}

And this demo code

def runDistinctUntilChanged() : Unit = {
  Source(SampleElements.All)
    .via(new DistinctUntilChanged(_.value))
    .runWith(Sink.foreach(println))
}

We would get this output to the Sink

image

This example does owe a lot to a nice blog post I found here :

https://www.softwaremill.com/implementing-a-custom-akka-streams-graph-stage/
 

That’s It

Anyway that is the end of the series I hope you have enjoyed it, and have learnt you some Akka along the way

I am going to have a small break now and then start looking into some Azure/Web stuff I think

 

Where Can I Find The Code Examples?

I will be augmenting this GitHub repo with the example projects as I move through this series

https://github.com/sachabarber/SachaBarber.AkkaExamples