Uncategorized

Elasticsearch

So most people would have probably heard of Elasticsearch by now. So what exactly is Elasticsearch?

Elasticsearch is a distributed, open source search and analytics engine for all types of data, including textual, numerical, geospatial, structured, and unstructured. Elasticsearch is built on Apache Lucene and was first released in 2010 by Elasticsearch N.V. (now known as Elastic). Known for its simple REST APIs, distributed nature, speed, and scalability, Elasticsearch is the central component of the Elastic Stack, a set of open source tools for data ingestion, enrichment, storage, analysis, and visualization. Commonly referred to as the ELK Stack (after Elasticsearch, Logstash, and Kibana), the Elastic Stack now includes a rich collection of lightweight shipping agents known as Beats for sending data to Elasticsearch.

https://www.elastic.co/what-is/elasticsearch

Essentially it is a great tool for analysing data that is stored within indexes inside of a NoSQL type database that is clustered/sharded and fault tolerant. As the blurb above states it is built on top of Lucene. For those that are interested in that, I wrote a small article in the past on using Luecene.Net : https://www.codeproject.com/Articles/609980/Small-Lucene-NET-Demo-App

 

Anyway this post will talk you through downloading Elasticsearch for windows, and will show you how to use the high level C# client called NEST.

 

We will be learning how to do the following things:

  • Create and index new documents
  • Search for documents
  • Update documents
  • Delete documents

So let’s carry on and learn how we can download Elasticsearch.

Download

You can download it from here : https://www.elastic.co/downloads/elasticsearch. For my setup (windows) once downloaded we can simply open the bin folder from the download, and use the BAT file shown in the image below to start it on Windows.

 

image

Once you click that BAT file, and wait a while you should see something like this appear

image

Demo

For this set of demos I am using Visual Studio 2019 (Community), and have installed the following Nuget package for Elasticsearch:

<PackageReference Include="Elasticsearch.Net" Version="7.5.1" />
<PackageReference Include="NEST" Version="7.5.1" />

So with those in place lets proceed to the meat of this post, which is how do we do the things that we said we would do at the start of this post. So lets carry on to look at that. As I say this demo will use the high level Elasticsearch .NET client NEST which you can read more about here : https://www.elastic.co/guide/en/elasticsearch/client/net-api/current/nest.html

Indexing documents

The 1st step is to get some data into Elasticsearch, so to do that we need to craft some data and also Index the data. Elastic is clever enough to infer some of the data/field types that should be used when it indexes but you can override this should you want to. Lets see an example

We will use this class (ONLY) during this demo to do all our operations with

namespace ElasticDemoApp_CSharp.Models
{
    public class Person
    {
        public string Id { get; set; }
        public string FirstName { get; set; }
        public string LastName { get; set; }
        public bool IsManager { get; set; }
        public DateTime StartedOn { get; set; }
    }
}

It can be seen that there is an Id field in that POCO object. This field is fairly important and we will see why later.Lets see how we can get some data in.

var settings = new ConnectionSettings(new Uri("http://localhost:9200"))
.DefaultIndex("people");

var client = new ElasticClient(settings);

//CREATE
var person = new Person
{
    Id = "1",
    FirstName = "Tom",
    LastName = "Laarman",
    StartedOn = new DateTime(2016, 1, 1)
};

var people = new[]
{
    new Person
    {
        Id = "2",
        FirstName = "Tom",
        LastName = "Pand",
        StartedOn = new DateTime(2017, 1, 1)
    },
    new Person
    {
        Id = "3",
        FirstName = "Tom",
        LastName = "grand",
        StartedOn = new DateTime(2017, 5, 4)
    }
};

client.IndexDocument(person);
client.IndexMany(people);

var manager1 = new Person
{
    Id = "4",
    FirstName = "Tom",
    LastName = "Foo",
    StartedOn = new DateTime(2017, 1, 1)
};

client.Index(manager1, i => i.Index("managerpeople"));

The code above shows you how to create the initial client, and also how to insert a single document, and how to insert many documents. Elastic kind of has a few ways for doing the same thing, so its up to you which API syntax you prefer, but the examples above largely do the same thing, they get data into Elastic at certain indexes. You can read more about Indexing here :https://www.elastic.co/guide/en/elasticsearch/client/net-api/current/indexing-documents.html

Query

So now that we have some data in we may want to Search for it. Elastic comes with a rich query API, which you can read about here : https://www.elastic.co/guide/en/elasticsearch/client/net-api/current/search.html

So here is an example to query the data we just stored in Elastic. Note the use of the “&&” to form complex queries, you can read about that here : https://www.elastic.co/guide/en/elasticsearch/client/net-api/current/bool-queries.html#binary-and-operator. Its worth getting to know these operators as it will make your queries more readable

//SEARCH
var searchResponse = client.Search<Person>(s => s
    .From(0)
    .Size(10)
    .AllIndices()
    .Query(q =>
            q.Match(m => m
            .Field(f => f.FirstName)
            .Query("Tom")
            ) &&
            q.DateRange(r => r
            .Field(f => f.StartedOn)
            .GreaterThanOrEquals(new DateTime(2017, 1, 1))
            .LessThan(new DateTime(2018, 1, 1))
            )
    )
);

var matches = searchResponse.Documents;

Update

So now that we have some data and we can search it, lets turn our hand to updating it. Here are a few examples where I mix in some queries to check the updated data

//UPDATE 

//update all "Tom" person in "people" index
person.FirstName = "Tim";
client.UpdateAsync(new DocumentPath<Person>(person.Id),
    u => u.Index("people")
    .DocAsUpsert(true)
    .Doc(person)
    .Refresh(Elasticsearch.Net.Refresh.True))
    .ConfigureAwait(false).GetAwaiter().GetResult();

searchResponse = client.Search<Person>(s => s
    .From(0)
    .Size(10)
    .AllIndices()
    .Query(q =>
            q.Match(m => m
            .Field(f => f.FirstName)
            .Query("Tim")
            )
    )
);

matches = searchResponse.Documents;

//update "Tim" to "Samantha" using different update method
client.UpdateAsync<Person, object>(new DocumentPath<Person>(1),
    u => u.Index("people")
        .DocAsUpsert(true)
        .RetryOnConflict(3)
        .Doc(new { FirstName = "Samantha" })
        .Refresh(Elasticsearch.Net.Refresh.True))
        .ConfigureAwait(false).GetAwaiter().GetResult();


searchResponse = client.Search<Person>(s => s
    .From(0)
    .Size(10)
    .AllIndices()
    .Query(q =>
        q.Match(m => m
            .Field(f => f.FirstName)
            .Query("Samantha")
        )
    )
);

matches = searchResponse.Documents;

There is not much more to say there apart from perhaps pay special attention to how we use the fluent DSL Doc(…) to apply partial updates, and we also use Refresh(..) which ensures the shards are updated that hold this data, which makes it visible to new searches.

Deleting data

So now we have put data in, queried it, and updated it, guess we should talk about deletes. This is done as follows:

//DELETE
client.DeleteAsync<Person>(1,
    d => d.Index("people")
        .Refresh(Elasticsearch.Net.Refresh.True))
        .ConfigureAwait(false).GetAwaiter().GetResult();

searchResponse = client.Search<Person>(s => s
    .From(0)
    .Size(10)
    .AllIndices()
    .Query(q =>
        q.Match(m => m
            .Field(f => f.Id)
            .Query("1")
        )
    )
);


matches = searchResponse.Documents;

//delete using a query
client.DeleteByQueryAsync<Person>(
    d => d.AllIndices()
        .Query(qry => qry.Term(p => p.Name("FirstName").Value("Tom")))
        .Refresh(true)
        .WaitForCompletion())
        .ConfigureAwait(false).GetAwaiter().GetResult();

var response = client.DeleteByQueryAsync<Person>(
    q => q
        .AllIndices()
        .Query(rq => rq
            .Match(m => m
            .Field(f => f.FirstName)
            .Query("Tom")))
        .Refresh(true)
        .WaitForCompletion())
        .ConfigureAwait(false).GetAwaiter().GetResult();

searchResponse = client.Search<Person>(s => s
.From(0)
.Size(10)
.AllIndices()
.Query(q =>
        q.Match(m => m
        .Field(f => f.FirstName)
        .Query("Tom")
        )
    )
);


matches = searchResponse.Documents;

As before I have included queries in here to check the deletes. Hopefully you get the idea, where below we can delete by an Id, or by using a query where we look to match N-many records.

Demo Project

Anyway that is all I wanted to show this time, hopefully it gives you a small taste of using the .NET Elastic client. You can download a demo project from here : https://github.com/sachabarber/Elasticdemo

React

React router

I plan on getting a lot better at a few things this year, my current list is

  • Really getting to know React
  • Really getting to know AWS
  • Really getting to know Azure

 

As such you can expect some post on all these subjects over a while. But this is the here and now. So in this post I wanted to start with looking at React router

 

What is react router

React Router is a set of React components that help with all your navigation concerns when it comes to working with React.

 

Main Components

It comes with the following main components

 

BrowserRouter

A <Router> that uses the HTML5 history API (pushState, replaceState and the popstate event) to keep your UI in sync with the URL

 

HashRouter

A <Router> that uses the hash portion of the URL (i.e. window.location.hash) to keep your UI in sync with the URL.

 

Link

Provides declarative, accessible navigation around your application.

 

NavLink

A special version of the <Link> that will add styling attributes to the rendered element when it matches the current URL.

 

MemoryRouter

A <Router> that keeps the history of your “URL” in memory (does not read or write to the address bar). Useful in tests and non-browser environments like React Native.

 

Redirect

Rendering a <Redirect> will navigate to a new location. The new location will override the current location in the history stack, like server-side redirects (HTTP 3xx) do.

 

Route

The Route component is perhaps the most important component in React Router to understand and learn to use well. Its most basic responsibility is to render some UI when its path matches the current URL.

 

Router

The common low-level interface for all router components. Typically apps will use one of the high-level routers instead:

The most common use-case for using the low-level <Router> is to synchronize a custom history with a state management lib like Redux or Mobx. Note that this is not required to use state management libs alongside React Router, it’s only for deep integration.

 

StaticRouter

A <Router> that never changes location.

This can be useful in server-side rendering scenarios when the user isn’t actually clicking around, so the location never actually changes. Hence, the name: static. It’s also useful in simple tests when you just need to plug in a location and make assertions on the render output.

 

Switch

Renders the first child <Route> or <Redirect> that matches the location.

 

How is this different than just using a bunch of <Route>s?

<Switch> is unique in that it renders a route exclusively. In contrast, every <Route> that matches the location renders inclusively.

 

Hooks

The current version of the React Router also comes with these hooks

useHistory

The useHistory hook gives you access to the history instance that you may use to navigate.

import { useHistory } from "react-router-dom";

function HomeButton() {
  let history = useHistory();

  function handleClick() {
    history.push("/home");
  }

  return (
    <button type="button" onClick={handleClick}>
      Go home
    </button>
  );
}

useLocation

The useLocation hook returns the location object that represents the current URL. You can think about it like a useState that returns a new location whenever the URL changes.

import React from "react";
import ReactDOM from "react-dom";
import {
  BrowserRouter as Router,
  Switch,
  useLocation
} from "react-router-dom";

function usePageViews() {
  let location = useLocation();
}

useParams

useParams returns an object of key/value pairs of URL parameters. Use it to access match.params of the current <Route>.

import React from "react";
import ReactDOM from "react-dom";
import {
  BrowserRouter as Router,
  Switch,
  Route,
  useParams
} from "react-router-dom";

function BlogPost() {
  let { slug } = useParams();
  return <div>Now showing post {slug}</div>;
}

useRouteMatch

The useRouteMatch hook attempts to match the current URL in the same way that a <Route> would. It’s mostly useful for getting access to the match data without actually rendering a <Route>

import { useRouteMatch } from "react-router-dom";

function BlogPost() {
  let match = useRouteMatch("/blog/:slug");

  // Do whatever you want with the match...
  return <div />;
}

 

So that is the main components and hooks that are available, so lets proceed and see how we can use it

 

How do we use it?

This largely boils down to a few steps.

 

We need an actual React App

Firstly we need an actual app. There are many ways you could do this, but by far the easiest way is to use Create React App. I prefer to work with TypeScript where possible, so we can use something like this : https://create-react-app.dev/docs/adding-typescript/ which will create a simple skeleton react app that will allow the use of TypeScript

 

Lets creates some routes

Once we have created an app, we need to create some actual routes and components to render for the routes. I think the best way to do this is to show a full sample here, then discuss the various different parts as we go. So here is a full example of how to use the React-Router (it should be noted that you will have needed to have installed this via NPM as react-router-dom)

import React, {Component } from "react";
import { RouteComponentProps, useHistory } from 'react-router';
import {
    BrowserRouter as Router,
    Switch,
    Route,
    Link,
    useParams,
    HashRouter,
    BrowserRouter,
    NavLink
} from "react-router-dom";


//Non boostrap version
export default function AppRouter() {

    return (
        <BrowserRouter >
            <div>
                <nav>
                    <ul>
                        <li>
                            <NavLink to={{ pathname: "/" }} activeStyle={{
                                fontWeight: "bold",
                                color: "red"
                            }}>Home</NavLink>
                        </li>
                        <li>
                            <NavLink to="/about" activeStyle={{
                                fontWeight: "bold",
                                color: "red"
                            }}>About</NavLink>
                        </li>
                        <li>
                            <NavLink to="/aboutComponentUsingFunction" activeStyle={{
                                fontWeight: "bold",
                                color: "red"
                            }}>AboutComponentUsingFunction</NavLink>
                        </li>
                        <li>
                            <NavLink to="/aboutComponentUsingRenderFunction" activeStyle={{
                                fontWeight: "bold",
                                color: "red"
                            }}>AboutComponentUsingRenderFunction</NavLink>
                        </li>
                        <li>
                            <NavLink to="/users/1" activeStyle={{
                                fontWeight: "bold",
                                color: "red"
                            }}>Users1</NavLink>
                        </li>
                        <li>
                            <NavLink to="/users/2" activeStyle={{
                                fontWeight: "bold",
                                color: "red"
                            }}>Users2</NavLink>
                        </li>
                        <li>
                            <NavLink to="/users2/1" activeStyle={{
                                fontWeight: "bold",
                                color: "red"
                            }}>Users As Class With History link</NavLink>
                        </li>
                    </ul>
                </nav>

                {/* A <Switch> looks through its children <Route>s and
                    renders the first one that matches the current URL. */}
                <Switch>
                    <Route path="/about">
                        <About />
                    </Route>
                    <Route path="/aboutComponentUsingFunction"
                        //This is bad though due to this statement from the docs
                        //When you use the component props, the router uses React.createElement 
                        //to create a new React element from the given component. 
                        //That means if you provide an inline function to the component attribute, 
                        //you would create a new component every render. 
                        //This results in the existing component unmounting and the new component 
                        //mounting instead of just updating the existing component

                        component={(props: any) => <About {...props} isAuthed={true} />}>
                    </Route>
                    <Route path="/aboutComponentUsingRenderFunction"
                        //This is better as are using render rather than component, which does not 
                        //suffer from the issue mentioned above
                        render={(props: any) => <About {...props} isAuthed={true} />}>
                    </Route>
                    <Route path="/users/:id" children={<Users />} />
                    <Route path="/users2/:id" component={Users2} />
                    <Route path="/">
                        <Home />
                    </Route>
                </Switch>
            </div>
        </BrowserRouter >
    );
}


function Home() {
    return <h2>Home</h2>;
}

function About(props: any) {

    console.log(`In render method of About`);
    console.log(props);
    return <h2>About</h2>;
}

function Users() {
    // We can use the `useParams` hook here to access
    // the dynamic pieces of the URL.
    let { id } = useParams();
    let history = useHistory();

    const handleClick = () => {
        history.push("/home");
    };

    return (
        <div>
            <h3>ID: {id}</h3>
            <button type="button" onClick={handleClick}>Go home</button>
        </div>
    );
}

class Users2 extends React.Component<RouteComponentProps, any> {

    constructor(props: any) {
        super(props);
    }

    render() {
        return (
            <div>
                <h1>Hello {(this.props.match.params as any).id}!</h1 >
                <button
                    type='button'
                    onClick={() => { this.props.history.push('/users/1') }} >
                    Go to users/1
                </button>
            </div>
        );
    }
}

When run this should look like this

image 

 

So from the above code there are a couple of points that deserve special callouts, so lets go through them

 

NavLink usage

We make use of NavLink to declared our actual routes. This would include the to which would be the actual route we wish to be rendered. Some examples would be

  • /
  • /about
  • /aboutComponentUsingFunction
  • /users/1

 

Here is one such example of this

<NavLink to="/about" activeStyle={{
    fontWeight: "bold",
    color: "red"
}}>About</NavLink>

Switch usage

The next thing we need to make sure React-Router is working correctly is to include a Switch block, where we would declare all the routes. A Route should have as a minimum a path and some way of actual rendering the component, such as render/component/children each of which works slightly differently.

 

The path is where you would be able to pick up the parameters for the matched route.  An example for one of the routes that expects some route parameters may look like this

<Route path="/users/:id" children={<Users />} />
<Route path="/users2/:id" component={Users2} />

function Users() {
    // We can use the `useParams` hook here to access
    // the dynamic pieces of the URL.
    let { id } = useParams();
    let history = useHistory();

    const handleClick = () => {
        history.push("/home");
    };

    return (
        <div>
            <h3>ID: {id}</h3>
            <button type="button" onClick={handleClick}>Go home</button>
        </div>
    );
}


class Users2 extends React.Component<RouteComponentProps, any> {

    constructor(props: any) {
        super(props);
    }

    render() {
        return (
            <div>
                <h1>Hello {(this.props.match.params as any).id}!</h1 >
                <button
                    type='button'
                    onClick={() => { this.props.history.push('/users/1') }} >
                    Go to users/1
                </button>
            </div>
        );
    }
}

It can be seen that when we use react-router that the router provides a prop called “match” which we can either expect as props when using class based components or which we may extract using the useParams hook when using a functional React component.

 

It can also be seen from these 2 examples above how we can navigate  to different routes from within our components. This is done using the history object which you can read more about here : https://reacttraining.com/react-router/web/api/history

 

A match object contains information about how a <Route path> matched the URL. match objects contain the following properties:

  • params – (object) Key/value pairs parsed from the URL corresponding to the dynamic segments of the path
  • isExact – (boolean) true if the entire URL was matched (no trailing characters)
  • path – (string) The path pattern used to match. Useful for building nested <Route>s
  • url – (string) The matched portion of the URL. Useful for building nested <Link>s

 

In the example that is provided here I have tried to use to a mixture of render/component/children each of which works differently. Lets go through how these things work

 

render

This allows for convenient inline rendering and wrapping without the undesired remounting explained above.

Instead of having a new React element created for you using the component prop, you can pass in a function to be called when the location matches. The render prop function has access to all the same route props (match, location and history) as the component render prop.

component

A React component to render only when the location matches. It will be rendered with route props.

When you use component (instead of render or children) the router uses React.createElement to create a new React element from the given component. That means if you provide an inline function to the component prop, you would create a new component every render. This results in the existing component unmounting and the new component mounting instead of just updating the existing component. When using an inline function for inline rendering, use the render or the children prop.

children

Sometimes you need to render whether the path matches the location or not. In these cases, you can use the function children prop. It works exactly like render except that it gets called whether there is a match or not.The children render prop receives all the same route props as the component and render methods, except when a route fails to match the URL, then match is null.

 

What about passing extra props to our component?

While the methods described above will give you access to the standard react-router props, what do we do if we want extra props passed to our component? Is this possible? Well yes it is. Lets see one example of this. One such route is this one, where we use the standard react-router props but also supply a further “isAuthed” prop to the component being rendered for the route

 

<Route path="/aboutComponentUsingRenderFunction"
    render={(props: any) => <About {...props} isAuthed={true} />}>
</Route>

function About(props: any) {

    console.log(`In render method of About`);
    console.log(props);
    return <h2>About</h2>;
}

Which when clicked on in the browser will look like this in the console

image

 

 

Lets mount our route based navigation component

We then just need to import this top level react component AppRouter and use it to mount to the DOM, which can be done as follows:

import React from 'react';
import ReactDOM from 'react-dom';
import './index.css';
import AppRouter from './AppRouter'


ReactDOM.render(<AppRouter />, document.getElementById('root'));

 

What about bootstrap Navigation?

There are probably quite a lot of people out there that like to use Bootstrap to aid in their website design. There is a React version of Bootstrap called react-bootstrap which has been built specifically for React, so we can use this, it as simple as installing it via NPM as npm install react-bootstrap

 

With that installed, we can also craft a more traditional Bootstrap based navigation component using something like this full example. Note how we use the react-bootstrap NavBar and other components to do the actual navigation now, rather than just nav and list elements like we did in the 1st full example above

import React, { Component } from "react";
import { RouteComponentProps, useHistory } from 'react-router';
import {
    Switch,
    Route,
    useParams,
    BrowserRouter,
    Link
} from "react-router-dom";


import 'bootstrap/dist/css/bootstrap.min.css';
import {
    Nav,
    Navbar
} from "react-bootstrap";


function Navigation() {
    return (
        <BrowserRouter >
            <div>
                <Navbar bg="light" expand="lg">
                    <Navbar.Brand href="#home">React-Bootstrap</Navbar.Brand>
                    <Navbar.Toggle aria-controls="basic-navbar-nav" />
                    <Navbar.Collapse id="basic-navbar-nav">
                        <Nav className="mr-auto">
                            <Nav.Link as={Link} to="/">Home</Nav.Link>
                            <Nav.Link as={Link} to="/about">About</Nav.Link>
                            <Nav.Link as={Link} to="/aboutComponentUsingFunction">AboutComponentUsingFunction</Nav.Link>
                            <Nav.Link as={Link} to="/aboutComponentUsingRenderFunction">AboutComponentUsingRenderFunction</Nav.Link>
                            <Nav.Link as={Link} to="/users/1">Users1</Nav.Link>
                            <Nav.Link as={Link} to="/users/2">Users2</Nav.Link>
                            <Nav.Link as={Link} to="/users2/1">Users As Class With History link</Nav.Link>

                        </Nav>
                    </Navbar.Collapse>
                </Navbar>

                {/* A <Switch> looks through its children <Route>s and
                    renders the first one that matches the current URL. */}
                <Switch>
                    <Route path="/about">
                        <About />
                    </Route>
                    <Route path="/users/:id" render={() => <Users />}/>
                    <Route path="/users2/:id" component={Users2} />
                    <Route path="/">
                        <Home />
                    </Route>
                </Switch>
            </div>
        </BrowserRouter >
    );
}


class AppRouterBootstrap extends Component {
    render() {
        return (
            <div id="App">
                <Navigation />
            </div>
        );
    }
}

export default AppRouterBootstrap;

function Home() {
    return <h2>Home</h2>;
}

function About() {
    return <h2>About</h2>;
}

function Users() {
    // We can use the `useParams` hook here to access
    // the dynamic pieces of the URL.
    let { id } = useParams();
    let history = useHistory();

    const handleClick = () => {
        history.push("/home");
    };

    return (
        <div>
            <h3>ID: {id}</h3>
            <button type="button" onClick={handleClick}>Go home</button>
        </div>
    );
}


class Users2 extends React.Component<RouteComponentProps, any> {

    render() {
        return (
            <div>
                <h1>Hello {(this.props.match.params as any).id}!</h1 >
                <button
                    type='button'
                    onClick={() => { this.props.history.push('/users/1') }} >
                    Go to users/1
                </button>
            </div>
        );
    }
}

The main thing to note here is that we make use of the react-bootstrap components such as Navbar/Nav and Nav.Link.

 

One particular callout should be how we use Nav.Link, where it can be seen that we use like this

import {
    Switch,
    Route,
    useParams,
    BrowserRouter,
    Link
} from "react-router-dom";


import 'bootstrap/dist/css/bootstrap.min.css';
import {
    Nav,
    Navbar
} from "react-bootstrap";

<Nav.Link as={Link} to="/">Home</Nav.Link>

 

See how we use the “as={Link}” we do this to ensure that a full server round trip doesn’t occur. If you use the typical approach of using a href with the react-bootstrap  Nav.Link which is typically as follows

<Nav.Link href="/home">Active</Nav.Link>

 

You would find that a FULL server round trip is done. Which is not what we want, so we instead use the Link component from react-router to use with the react-bootstrap  Nav.Link. This ensures that NO full server round trip is done and the redirect/render occurs purely client side

 

Since the rest of the example is the same as the main example above, I won’t go through it all again.

 

To use this version we would just need to use this instead when we mount the main component to the DOM

ReactDOM.render(<AppRouterBootstrap />, document.getElementById('root'));

 

When run this should look like this

image

 

That’s it for now

Anyways that is all I wanted to say for now. In future react posts, I want to explore React-Redux (which obviously there are many resources for on the internet already, but this is my own journey, so perhaps I will have something useful to say who knows) and how you can properly test React-Redux apps

Azure

Azure DevOps : Setting up and pushing nuget package

So its been a while since I posted. But thought it would be good to finally add the 2nd part of this, where this time we will look at how to create and push to Azure DevOps hosted Nuget feeds.

 

Creating a AzureDevops project

The 1st step is to create a new Azure Devops project, and from there you will need to go into the newly created project, and turn on the following

  • Repos : Which allows you to host repos in Azure Devops
  • Artifacts :  Which allows you create new Nuget feeds in Azure Devops

image

 

Once you have done that you can grab the repos remote clone details. So for a new project this may be something like this

 

image

 

So for me I then cloned the repo, and created a simple .NET Standard 2.0 library, that simply adds numbers

image

From there I simply pushed up the code to the remote repo.

 

So all good so far, we should have a new project which supports feeds and has our new code in it. Lets carry on

 

Creating a Nuget feed

So now that we have some code pushed up. How do we make it available on our own hosted Nuget feed. There are a couple of steps to perform here

 

Firstly we need to create a new Feed, which is done by going into the artifacts menu, and clicking the “Create Feed” button

 

image

 

You need to give the feed a name, lets suppose I chose “foofeed2” as the name, you should see something like this, where you will now need to go into the feed settings

 

image

 

If you click on the drop down you should see that the feed is created as “project scoped”, which means that it  belongs to the project. Until very recently this was not the case and all new feeds used to be scoped at organizational level, which effects how the build definition works. This was bug in Azure Devops. Which you can read more about in this StackOverflow which I created https://stackoverflow.com/questions/58856604/azure-devops-publish-nuget-to-hosted-feed

 

This was quite a weird bug to have but it did mean that all of a sudden for any Nuget feed you created and tried to publish it would not work. This is now fixed, and I will explain the difference between pushing to a project based NuGet feed and an Organizational one later when we discuss the build definition

 

For now you should make sure that the permissions for your need look something like this. Please forgive me but I am using a screen shot here from my actual project, where above I am just showing you what a new project will look like

 

image

 

I really do urge you to read the StackOverflow page as there is some really valuable discussions in there about scoping, and what extra permissions you need to ensure are there

 

Build Pipeline

So once we have the feed set up with the correct permissions, we can focus our attention to the build side of things.

 

This is complete source code for a build pipeline that does the following

  • Nuget restore
  • Builds
  • Package
  • Push to AzureDevop feed

 

# ASP.NET Core
# Build and test ASP.NET Core projects targeting .NET Core.
# Add steps that run tests, create a NuGet package, deploy, and more:
# https://docs.microsoft.com/azure/devops/pipelines/languages/dotnet-core

trigger:
- master

pool:
  vmImage: 'ubuntu-latest'

variables:
  buildConfiguration: 'Release'
  Major: '1'
  Minor: '0'
  Patch: '0'

steps:

- task: DotNetCoreCLI@2
  displayName: 'Restore'
  inputs:
    command: restore
    projects: '**/MathsLib.csproj'


- task: DotNetCoreCLI@2
  displayName: Build
  inputs:
    command: build
    projects: '**/MathsLib.csproj'
    arguments: '--configuration Release' # Update this to match your need


- task: DotNetCoreCLI@2
  inputs: 
    command: 'pack'
    projects: '**/MathsLib.csproj'
    outputDir: '$(Build.ArtifactStagingDirectory)'
    versioningScheme: 'byPrereleaseNumber'
    majorVersion: '1'
    minorVersion: '0'
    patchVersion: '0'


- task: NuGetCommand@2
  displayName: 'nuget push'
  inputs:
    command: 'push'
    feedsToUse: 'select'
    packagesToPush: '$(Build.ArtifactStagingDirectory)/**/*.nupkg;!$(Build.ArtifactStagingDirectory)/**/*.symbols.nupkg'
    nuGetFeedType: 'internal'
    vstsFeed: 'nugetprojects/anotherfeed'
    publishVstsFeed: 'nugetprojects/anotherfeed'
    versioningScheme: 'off'
    allowPackageConflicts: true

 

Project Scoped Feed

image

It should be noted that this is using a “project scoped” nuget feed. See these entries ‘nugetprojects/anotherfeed’ that is the syntax you need to use when using project scoped Nuget feeds.

 

Organizational Scoped Feed

image

 

In contrast to that, if you use an organizational feed you will need to ensure you use feed settings in the YAML above that just have the name of your feed so the push step would be this instead

- task: NuGetCommand@2
  displayName: 'nuget push'
  inputs:
    command: 'push'
    feedsToUse: 'select'
    packagesToPush: '$(Build.ArtifactStagingDirectory)/**/*.nupkg;!$(Build.ArtifactStagingDirectory)/**/*.symbols.nupkg'
    nuGetFeedType: 'internal'
    vstsFeed: 'testfeed'
    publishVstsFeed: 'testfeed'
    versioningScheme: 'off'
    allowPackageConflicts: true

As I say this was a VERY recent changed introduced in Azure Devops, and you can read more in the StackOverflow post that I refer to above

 

For now lets test this pipeline using the Project scoped feed “anotherfeed” above

image

 

Consuming the feed from a new project

With all this in place we should now be able to consume the feed from a new project, so lets see how we can so that. We can go to the feed to grab how to connect to it

 

image

Then pick the VisualStudio settings (or Nuget if you want to use nuget.config to configure your sources)

 

I’ll use VisualStudio one

 

image

 

I can then set this up in VisualStudio as a new Nuget feed

image

And search for my Nuget package (I did not create proper release so you need to  “include pre-releases”

image

 

Cool all working. So that’s it for this time

Azure

Getting reacquainted with Azure Devops

At the job before this one we used to make quite heavy use of VSTS, which is now Azure Devops. I used it a bit, but I am was never the “build” guy. At my current role, I am becoming more and more architecture/devops focussed. I am what one would call a hands on Architect.

Anyway long story short we use Gitlab/Rancher/AWS/Docker where I am now, but I just wanted to keep my toe in with what is going on in Azure, so I plan on getting to know the lay of the land of some of the new tooling

 

I thought what better way to start with something dead simple, like lets create a new WebApi .NET Core app, and get it deployed to a Azure App Service. This has to be pretty simple to start with right?

 

So before we start, what I did was sign up for a free tier in Azure, and added all my stuff to a single ResourceGroup in Azure.That way I can just trash it all to keep my costs low. For those that don’t know Azure resource groups are like the uber top level container that all your other services live in.

 

Ok so lets get started

 

Step 1 : Create the app

Creating the app for me was as simple as creating a new Web project in Visual Studio 2019, Where I stuck with the standard WebApi project. I literally did not change a thing, I stuck completely with the standard ValuesController, and targeted .NET Core 2.1

 

image

 

Step 2 : Created A GitHub Repo

So now that I have created a pretty useless demo app, I created a GitHub repo, and pushed this code to it

 

Step 3 : Azure AppService

So I have this small .NET Core 2.1 demo app, so I should be able to run it on Linux. So I sign up the free tier of Azure, then created a new AppService, which looked like these where I was careful to keep to the Dev/Test tier of the size plan

 

image

Ok so once that was created I was ready to head over to devops.azure.com

Step 3 : Azure DevOps

So the 1st thing I did was create a new project, lets call it “XXX” for now

image

From there I picked “GitHub” on the Connect part of the wizard. Next I simply picked my repository from the list, and then it was time to get down with the actual Build side of the Azure DevOps pipeline.

 

When I last used VSTS it was all UI based, like everything it has all turned to YAML now. So you need to look up the various build tasks to get the available task syntax that you can use. Luckily the basic skeleton was clever enough to give a very hand link to this page : https://docs.microsoft.com/en-gb/azure/devops/pipelines/ecosystems/dotnet-core?view=azure-devops

 

From there you can grab several of the tasks you might need such as

 

– Build

– Publish

 

If you want to see all the available actions for the DotNetCoreCLI@2 task you can look here : https://docs.microsoft.com/en-us/azure/devops/pipelines/tasks/build/dotnet-core-cli?view=azure-devops, in fact from that link you can use the treeview on the left to find all sorts of tasks to help you live the DevOps pipeline dream

 

So once I read the docs a bit I ended up with this DevOps Build pipeline configuration

 

# ASP.NET Core
# Build and test ASP.NET Core projects targeting .NET Core.
# Add steps that run tests, create a NuGet package, deploy, and more:
# https://docs.microsoft.com/azure/devops/pipelines/languages/dotnet-core

trigger:
- master

pool:
  vmImage: 'ubuntu-latest'

variables:
  buildConfiguration: 'Release'

steps:
- task: DotNetCoreCLI@2
  displayName: Build
  inputs:
    command: build
    projects: '**/*.csproj'
    arguments: '--configuration Release' # Update this to match your need

- task: DotNetCoreCLI@2
  inputs:
    command: publish
    projects: '**/*.csproj'
    publishWebProjects: True
    arguments: '--configuration $(BuildConfiguration) --output $(Build.ArtifactStagingDirectory)'
    zipAfterPublish: True

# this code takes all the files in $(Build.ArtifactStagingDirectory) and uploads them as an artifact of your build.
- task: PublishBuildArtifacts@1
  inputs:
    pathtoPublish: '$(Build.ArtifactStagingDirectory)' 
    artifactName: 'AzureDevOpsWebApiTestApp'
  

I think this is fairly self explanatory.So with this configured, it was time to test the Build. Which worked 1st time YAY.

 

image

 

So with the Build side of the pipeline working, I turned my hand towards the Release side of the pipeline. For me this was as simple as picking this type of release

 

image

I filled in the required parameters and ran the Release from the pipeline. It also passed. YAY

 

I then headed over to my previously created App Service in Azure, grabbed the public url, and gave it a try, and it worked

 

image

 

I have to say I was pleasantly surprised at just how smoothly all of that went.

 

I will be digging into Azure DevOps a bit more, I would like to see it run some tests, and I would like to setup some private nuget feeds and publish to those and consume from them. This will be the subjects of some future posts no doubt

powershell, Uncategorized

Powershell : WebRequest Server Not Found

So at work we use a bit of PowerShell here and there to do various things. We also use PowerShell remoting, where we run the same bit of code on a potential list of remote targets.

One of the things we use a lot is a notification system where it will attempt to use this code or code

Invoke-RestMethod -Uri "$env:mattermost_uri" -Method Post -Body $json -ContentType "application/json"

But occasionally this would fail on some of our target VMs, where I would see a “Unable to connect to remote server“.

Turns out there is a default proxy at play which can be disabled like this

[System.Net.HttpWebRequest]::DefaultWebProxy = New-Object System.Net.WebProxy

After that is was all good for me

Kafka

KafkaStreams : Windowing

SO last time we look at interactive queries. This time (the last one in this series) we will look at Windowing operations. This is a fairly dry subject, and I don’t have too much to add to this one over the official docs, so this is a nice short one to end the series.

Where is the code?

The code for this post is all contained here

What is Windowing anyway?

Windowing lets you control how to group records that have the same key for stateful operations such as aggregations or joins into so-called windows. Windows are tracked per record key. For example, in join operations, a windowing state store is used to store all the records received so far within the defined window boundary. In aggregating operations, a windowing state store is used to store the latest aggregation results per window. Old records in the state store are purged after the specified window retention period. Kafka Streams guarantees to keep a window for at least this specified time; the default value is one day and can be changed via Materialized#withRetention().

This is what Kafka Streams supports for windowing

image

 

 

Tumbling Windows

Tumbling time windows are a special case of hopping time windows and, like the latter, are windows based on time intervals. They model fixed-size, non-overlapping, gap-less windows. A tumbling window is defined by a single property: the window’s size. A tumbling window is a hopping window whose window size is equal to its advance interval. Since tumbling windows never overlap, a data record will belong to one and only one window.

../../_images/streams-time-windows-tumbling.png

Here is an example of this using the simple word count example we have used before

package windowing

import java.time.Duration
import java.util
import java.util.Properties
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.kstream.TimeWindows
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala._
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.{KafkaStreams, Topology}
import utils.Settings


class TumblingWordCountTopology extends App {

  import Serdes._

  val props: Properties = Settings.createBasicStreamProperties(
    "tumbling-window-wordcount-application","localhost:9092")

  run()

  private def run(): Unit = {
    val topology = wordCountToplogy()
    val streams: KafkaStreams = new KafkaStreams(topology, props)
    streams.start()
    sys.ShutdownHookThread {
      streams.close(Duration.ofSeconds(10))
    }
  }

   def wordCountToplogy() : Topology = {

    import org.apache.kafka.streams.state.Stores
    val wordCountStoreName = "wordCountStore"
    val wordCountStoreSupplied = Stores.inMemoryKeyValueStore(wordCountStoreName)

    val builder = new StreamsBuilder()
    val textLines: KStream[String, String] = builder.stream[String, String]("TextLinesTopic")
    val wordCounts = textLines.flatMapValues(x => x.toLowerCase.split("\\W+"))
                    .groupBy((key, word) => word)
      .windowedBy(TimeWindows.of(Duration.ofSeconds(5)))
      .count()
    wordCounts.toStream.to("WordsWithCountsTopic")
    builder.build()
  }
}

 

Hopping Time Window

Hopping time windows are windows based on time intervals. They model fixed-sized, (possibly) overlapping windows. A hopping window is defined by two properties: the window’s size and its advance interval (aka “hop”). The advance interval specifies by how much a window moves forward relative to the previous one. For example, you can configure a hopping window with a size 5 minutes and an advance interval of 1 minute. Since hopping windows can overlap — and in general they do — a data record may belong to more than one such window.

image

and here is an example of this using the word count example

package windowing

import java.time.Duration
import java.util
import java.util.Properties
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.kstream.TimeWindows
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala._
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.{KafkaStreams, Topology}
import utils.Settings


class HoppingTimeWordCountTopology extends App {

  val props: Properties = Settings.createBasicStreamProperties(
    "hopping-time-window-wordcount-application","localhost:9092")

  run()

  private def run(): Unit = {
    val topology = wordCountToplogy()
    val streams: KafkaStreams = new KafkaStreams(topology, props)
    streams.start()
    sys.ShutdownHookThread {
      streams.close(Duration.ofSeconds(10))
    }
  }

   def wordCountToplogy() : Topology = {

    import org.apache.kafka.streams.state.Stores
    val wordCountStoreName = "wordCountStore"
    val wordCountStoreSupplied = Stores.inMemoryKeyValueStore(wordCountStoreName)

    val builder = new StreamsBuilder()
    val textLines: KStream[String, String] = builder.stream[String, String]("TextLinesTopic")
    val wordCounts = textLines.flatMapValues(x => x.toLowerCase.split("\\W+"))
                    .groupBy((key, word) => word)
      .windowedBy(TimeWindows.of(Duration.ofSeconds(5).plus(Duration.ofMinutes(1))))
      .count()
    wordCounts.toStream.to("WordsWithCountsTopic")
    builder.build()
  }
}

That’s It

And that brings us to the end of this series, I will be diving back into .NET land next to write an implementation of the SWIM algorithm. After that I think I will get myself a AWS Solution Architect exam under my belt

 

I hope you have enjoyed the series. I know this one was a bit naff, but I kind of ran out of steam

Kafka

KafkaStreams : Interactive Queries

So last time we looked at how to make use of the Processor API in the DSL. This time we are going to look at interactive queries.

 

Where is the code?

The code for this post is all contained here

And the tests are all contained here

 

Walking through a Kafka Streams processing node, and the duality of streams

Before we get started I just wanted to include a several excerpts taken from the official Kafka docs : http://docs.confluent.io/current/streams/concepts.html#duality-of-streams-and-tables which talks about KStream and KTable objects (which are the stream and table objects inside Kafka streams)

 

When implementing stream processing use cases in practice, you typically need both streams and also databases. An example use case that is very common in practice is an e-commerce application that enriches an incoming stream of customer transactions with the latest customer information from a database table. In other words, streams are everywhere, but databases are everywhere, too.

Any stream processing technology must therefore provide first-class support for streams and tables. Kafka’s Streams API provides such functionality through its core abstractions for streams and tables, which we will talk about in a minute. Now, an interesting observation is that there is actually a close relationship between streams and tables, the so-called stream-table duality. And Kafka exploits this duality in many ways: for example, to make your applications elastic, to support fault-tolerant stateful processing, or to run interactive queries against your application’s latest processing results. And, beyond its internal usage, the Kafka Streams API also allows developers to exploit this duality in their own applications.

A simple form of a table is a collection of key-value pairs, also called a map or associative array. Such a table may look as follows:

../_images/streams-table-duality-01.jpg

The stream-table duality describes the close relationship between streams and tables.

  • Stream as Table: A stream can be considered a changelog of a table, where each data record in the stream captures a state change of the table. A stream is thus a table in disguise, and it can be easily turned into a “real” table by replaying the changelog from beginning to end to reconstruct the table. Similarly, aggregating data records in a stream will return a table. For example, we could compute the total number of pageviews by user from an input stream of pageview events, and the result would be a table, with the table key being the user and the value being the corresponding pageview count.
  • Table as Stream: A table can be considered a snapshot, at a point in time, of the latest value for each key in a stream (a stream’s data records are key-value pairs). A table is thus a stream in disguise, and it can be easily turned into a “real” stream by iterating over each key-value entry in the table.

Let’s illustrate this with an example. Imagine a table that tracks the total number of pageviews by user (first column of diagram below). Over time, whenever a new pageview event is processed, the state of the table is updated accordingly. Here, the state changes between different points in time – and different revisions of the table – can be represented as a changelog stream (second column).

image

Because of the stream-table duality, the same stream can be used to reconstruct the original table (third column):

../_images/streams-table-duality-03.jpg

The same mechanism is used, for example, to replicate databases via change data capture (CDC) and, within Kafka Streams, to replicate its so-called state stores across machines for fault tolerance. The stream-table duality is such an important concept for stream processing applications in practice that Kafka Streams models it explicitly via the KStream and KTable abstractions, which we describe in the next sections.

I would STRONLY urge you all to read the section of the official docs above, as it will really help you should you want to get into Kafka Streams.

 

Interactive Queries

Up until now we have been using KStream, and on occasion we have also used KTable to store state into a state store. A KTable is also able to be turned back into a KStream and pushed back out on an “output” topic, which is what most of my examples have done so far.

 

However this is kind of annoying and wasteful, we just had the data in the format we wanted in Kafka in a KTable, and then we need to transform it back into a KStream and send it to a new topic. Surely there is a better way.

 

Luckily there is “Interactive queries”.KafkaStreams supports the notion of queries across state stores. Most typical state stores are key value stores, where the key will be on a certain partition. As such the state store is distributed across the partitions that make up the topic.

Mmm so how do we get the data from all the stores? Well luckily KafkaStreams exposes metadata that allow us to obtain the hostname of the hosts that have a certain state store. From there we can either use a local call or perform a RPC (most typically REST call) call to fetch the remote hosts data.

 

This is illustrated in the following diagram

 

../../_images/streams-interactive-queries-03.png

The full state of your application is typically split across many distributed instances of your application, and across many state stores that are managed locally by these application instances.

That is the basic idea anyway. So we need to create some plumbing to help us either call into a local state store, or query a remote host that owns some keys for a state store.

Is it that simple?

Well no actually. KafkaStreams has the following state state transition diagram that one needs to be aware of. If you try and query a state store for metadata/or actual data before the 2nd running you won’t see anything as Kafka is considered NOT READY.

 

image

 

There is a lot of chat on the internet (its even in the Confluent FAQs) about state store metadata not being available, and it is due to this state machine.

So how do we deal with this?

  • Well one thing to do is implement some kind of retry policy (this post covers this)
  • Another thing to do is stream state listener (this post also covers this)

Ok so are there any other pit falls? No not really you just need to make sure you don’t start using state stores unless your KafkaStreams is in a good state.

So now that we have the basics covered lets see some code

The Tolology

This is the topology that we will be using to construct our state store

package interactive.queries.ratings


import java.util

import entities.Rating
import org.apache.kafka.streams.scala.{Serdes, _}
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.state.KeyValueStore
import serialization.JSONSerde
import utils.StateStores


class RatingStreamProcessingTopology  {

  def createTopolgy(): Topology = {

    implicit val stringSerde = Serdes.String
    implicit val ratingSerde = new JSONSerde[Rating]
    implicit val listRatingSerde = new JSONSerde[List[Rating]]
    implicit val consumed = kstream.Consumed.`with`(stringSerde, ratingSerde)
    implicit val materializer = Materialized.`with`(stringSerde, listRatingSerde)
    implicit val grouped = Grouped.`with`(stringSerde, ratingSerde)

    val builder: StreamsBuilder = new StreamsBuilder
    val ratings: KStream[String, Rating] =
      builder.stream[String, Rating]("rating-submit-topic")


    import org.apache.kafka.streams.state.Stores

    val logConfig = new util.HashMap[String, String]
    logConfig.put("retention.ms", "172800000")
    logConfig.put("retention.bytes", "10000000000")
    logConfig.put("cleanup.policy", "compact,delete")
    val ratingByEmailStoreName = StateStores.RATINGS_BY_EMAIL_STORE
    val ratingByEmailStoreSupplied = Stores.inMemoryKeyValueStore(ratingByEmailStoreName)
    val ratingByEmailStoreBuilder = Stores.keyValueStoreBuilder(ratingByEmailStoreSupplied,
      Serdes.String, listRatingSerde)
      .withLoggingEnabled(logConfig)
      .withCachingEnabled()

    val builtStore = ratingByEmailStoreBuilder.build()

    //When aggregating a grouped stream, you must provide an initializer (e.g., aggValue = 0)
    //and an “adder” aggregator (e.g., aggValue + curValue). When aggregating a grouped table,
    //you must provide a “subtractor” aggregator (think: aggValue - oldValue).
    val groupedBy = ratings.groupByKey

    //aggrgate by (user email -> their ratings)
    val ratingTable : KTable[String, List[Rating]] = groupedBy
      .aggregate[List[Rating]](List[Rating]())((aggKey, newValue, aggValue) => {
      newValue :: aggValue
    })(Materialized.as[String, List[Rating]](ratingByEmailStoreSupplied))

    ratingTable.mapValues((k,v) => {
      val theKey = k
      val theValue = v
      v
    })


    builder.build()
  }
}

Entities stored

And this is the entity type that we will be storing in the state store

package entities

case class Rating(fromEmail: String, toEmail: String, score: Float)

Metadata

As previously mentioned we need to query metadata to find out what host the store is that has a particular key. In order to do this I have come up with this class

package interactive.queries

import org.apache.kafka.streams.KafkaStreams
import org.apache.kafka.streams.state.StreamsMetadata
import java.util.stream.Collectors
import entities.HostStoreInfo
import org.apache.kafka.common.serialization.Serializer
import org.apache.kafka.connect.errors.NotFoundException
import scala.collection.JavaConverters._


/**
  * Looks up StreamsMetadata from KafkaStreams
  */
class MetadataService(val streams: KafkaStreams) {


  /**
    * Get the metadata for all of the instances of this Kafka Streams application
    *
    * @return List of { @link HostStoreInfo}
    */
  def streamsMetadata() : List[HostStoreInfo] = {

    // Get metadata for all of the instances of this Kafka Streams application
    val metadata = streams.allMetadata
    return mapInstancesToHostStoreInfo(metadata)
  }


  /**
    * Get the metadata for all instances of this Kafka Streams application that currently
    * has the provided store.
    *
    * @param store The store to locate
    * @return List of { @link HostStoreInfo}
    */
  def streamsMetadataForStore(store: String) : List[HostStoreInfo] = {

    // Get metadata for all of the instances of this Kafka Streams application hosting the store
    val metadata = streams.allMetadataForStore(store)
    return mapInstancesToHostStoreInfo(metadata)
  }


  /**
    * Find the metadata for the instance of this Kafka Streams Application that has the given
    * store and would have the given key if it exists.
    *
    * @param store Store to find
    * @param key   The key to find
    * @return { @link HostStoreInfo}
    */
  def streamsMetadataForStoreAndKey[T](store: String, key: T, serializer: Serializer[T]) : HostStoreInfo = {
    // Get metadata for the instances of this Kafka Streams application hosting the store and
    // potentially the value for key
    val metadata = streams.metadataForKey(store, key, serializer)
    if (metadata == null)
      throw new NotFoundException(
        s"No metadata could be found for store : ${store}, and key type : ${key.getClass.getName}")

    HostStoreInfo(metadata.host, metadata.port, metadata.stateStoreNames.asScala.toList)
  }


  def mapInstancesToHostStoreInfo(metadatas : java.util.Collection[StreamsMetadata]) : List[HostStoreInfo] = {
    metadatas.stream.map[HostStoreInfo](metadata =>
      HostStoreInfo(
        metadata.host(),
        metadata.port,
        metadata.stateStoreNames.asScala.toList))
      .collect(Collectors.toList())
      .asScala.toList
  }
}

Exposing the state store over RPC

As I say the most popular way of doing this is via REST. As such I am using Akka.Http to do this. Here the Rating http server class

package interactive.queries.ratings

import org.apache.kafka.streams.KafkaStreams
import org.apache.kafka.streams.state.HostInfo
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import spray.json.DefaultJsonProtocol._
import entities.AkkaHttpEntitiesJsonFormats._
import entities._
import utils.StateStores
import akka.http.scaladsl.marshalling.ToResponseMarshallable
import org.apache.kafka.common.serialization.Serdes
import scala.concurrent.{Await, ExecutionContext, Future}
import akka.http.scaladsl.unmarshalling.Unmarshal
import interactive.queries.MetadataService
import spray.json._
import scala.util.{Failure, Success}
import org.apache.kafka.streams.state.QueryableStoreTypes
import scala.concurrent.duration._


object RestService {
  val DEFAULT_REST_ENDPOINT_HOSTNAME  = "localhost"
}

class RatingRestService(val streams: KafkaStreams, val hostInfo: HostInfo) {

  val metadataService = new MetadataService(streams)
  var bindingFuture: Future[Http.ServerBinding] = null

  implicit val system = ActorSystem("rating-system")
  implicit val materializer = ActorMaterializer()
  implicit val executionContext = system.dispatcher

  var isStateStoredReady: Boolean = false


  def setReady(isReady : Boolean): Unit = {
    isStateStoredReady = isReady
  }


  def start() : Unit = {
    val emailRegexPattern =  """\w+""".r
    val storeNameRegexPattern =  """\w+""".r

    val route =
      path("ratingByEmail") {
        get {
          parameters('email.as[String]) { (email) =>

            if(!isStateStoredReady) {
              complete(HttpResponse(StatusCodes.InternalServerError, entity = "state stored not queryable, possible due to re-balancing"))
            }

            try {

              val host = metadataService.streamsMetadataForStoreAndKey[String](
                StateStores.RATINGS_BY_EMAIL_STORE,
                email,
                Serdes.String().serializer()
              )

              //store is hosted on another process, REST Call
              if(!thisHost(host)) {
                onComplete(fetchRemoteRatingByEmail(host, email)) {
                  case Success(value) => complete(value)
                  case Failure(ex)    => complete(HttpResponse(StatusCodes.InternalServerError, entity = ex.getMessage))
                }
              }
              else {
                onComplete(fetchLocalRatingByEmail(email)) {
                  case Success(value) => complete(value)
                  case Failure(ex)    => complete(HttpResponse(StatusCodes.InternalServerError, entity = ex.getMessage))
                }
              }
            }
            catch {
              case (ex: Exception) => {
                complete(HttpResponse(StatusCodes.InternalServerError, entity = ex.getMessage))
              }
            }
          }
        }
      } ~
      path("instances") {
        get {
          if(!isStateStoredReady) {
            complete(HttpResponse(StatusCodes.InternalServerError, entity = "state stored not queryable, possible due to re-balancing"))
          }
          complete(ToResponseMarshallable.apply(metadataService.streamsMetadata))
        }
      }~
      path("instances" / storeNameRegexPattern) { storeName =>
        get {
          if(!isStateStoredReady) {
            complete(HttpResponse(StatusCodes.InternalServerError, entity = "state stored not queryable, possible due to re-balancing"))
          }
          complete(ToResponseMarshallable.apply(metadataService.streamsMetadataForStore(storeName)))
        }
      }

    bindingFuture = Http().bindAndHandle(route, hostInfo.host, hostInfo.port)
    println(s"Server online at http://${hostInfo.host}:${hostInfo.port}/\n")

    Runtime.getRuntime.addShutdownHook(new Thread(() => {
      bindingFuture
        .flatMap(_.unbind()) // trigger unbinding from the port
        .onComplete(_ => system.terminate()) // and shutdown when done
    }))
  }


  def fetchRemoteRatingByEmail(host:HostStoreInfo, email: String) : Future[List[Rating]] = {

    val requestPath = s"http://${hostInfo.host}:${hostInfo.port}/ratingByEmail?email=${email}"
    println(s"Client attempting to fetch from online at ${requestPath}")

    val responseFuture: Future[List[Rating]] = {
      Http().singleRequest(HttpRequest(uri = requestPath))
        .flatMap(response => Unmarshal(response.entity).to[List[Rating]])
    }

    responseFuture
  }

  def fetchLocalRatingByEmail(email: String) : Future[List[Rating]] = {

    val ec = ExecutionContext.global

    println(s"client fetchLocalRatingByEmail email=${email}")

    val host = metadataService.streamsMetadataForStoreAndKey[String](
      StateStores.RATINGS_BY_EMAIL_STORE,
      email,
      Serdes.String().serializer()
    )

    val f = StateStores.waitUntilStoreIsQueryable(
      StateStores.RATINGS_BY_EMAIL_STORE,
      QueryableStoreTypes.keyValueStore[String,List[Rating]](),
      streams
    ).map(_.get(email))(ec)

    val mapped = f.map(rating => {
      if (rating == null)
        List[Rating]()
      else
        rating
    })

    mapped
  }

  def stop() : Unit = {
    bindingFuture
      .flatMap(_.unbind()) // trigger unbinding from the port
      .onComplete(_ => system.terminate()) // and shutdown when done
  }

  def thisHost(hostStoreInfo: HostStoreInfo) : Boolean = {
    hostStoreInfo.host.equals(hostInfo.host()) &&
      hostStoreInfo.port == hostInfo.port
  }
}

You can see that this class does the following

  • Defines the routes
  • Uses the metadata service to work out whether it should be a local host query or an RPC call
  • Uses a waitUntilStoreIsQueryable helper method to work out when the state store is queryable. The helper class is shown here
package utils

import org.apache.kafka.streams.KafkaStreams
import org.apache.kafka.streams.errors.InvalidStateStoreException
import org.apache.kafka.streams.state.{QueryableStoreType, QueryableStoreTypes}

import scala.concurrent.{ExecutionContext, Future}

object StateStores {
  val RATINGS_BY_EMAIL_STORE = "ratings-by-email-store"

  def waitUntilStoreIsQueryable[T]
  (
    storeName: String,
    queryableStoreType: QueryableStoreType[T],
    streams: KafkaStreams
  ) (implicit ec: ExecutionContext): Future[T] = {

    Retry.retry(5) {
      Thread.sleep(500)
      streams.store(storeName, queryableStoreType)
    }(ec)
  }


  private def printStoreMetaData[K, V](streams:KafkaStreams, storeName:String) : Unit = {

    val md = streams.allMetadata()
    val mdStore = streams.allMetadataForStore(storeName)

    val maybeStore = StateStores.waitUntilStoreIsQueryableSync(
      storeName,
      QueryableStoreTypes.keyValueStore[K,V](),
      streams)

    maybeStore match {
      case Some(store) => {
        val range = store.all
        val HASNEXT = range.hasNext
        while (range.hasNext) {
          val next = range.next
          System.out.print(s"key: ${next.key} value: ${next.value}")
        }
      }
      case None => {
        System.out.print(s"store not ready")
        throw new Exception("not ready")
      }
    }
  }

  @throws[InterruptedException]
  def waitUntilStoreIsQueryableSync[T](
        storeName: String,
        queryableStoreType: QueryableStoreType[T],
        streams: KafkaStreams): Option[T] = {
    while (true) {
      try {
        return Some(streams.store(storeName, queryableStoreType))
      }
      catch {
        case ignored: InvalidStateStoreException =>
          val state = streams.state
          // store not yet ready for querying
          Thread.sleep(100)
      }
    }
    None
  }
}

Existing Framework

If you looked at the code above, and saw things in it that you thought could be made generic, such as GET uri, fetching data from state store, retries, JSON serialization etc etc. You are correct. I could have done this. But as is quite common for me, I get to the end of something and I do everything that needs to be done, and I then discover better people than me have thought about this, and actually offer a framework for this type of thing. Lightbend the people behind Akka, have one such library for KafkaStreams. It available here : https://github.com/lightbend/kafka-streams-query

What they have done is make all the glue bits in the middle nice a modular and generic so you can get on with writing your application code, and leave the robust querying, retries etc etc to a framework. It is well written and looks fairly easy to use, but if you have not used KafkaStreams interactive queries before it is best to dive in yourself first.

The Streams App

So we almost done actually. All you need to do now is wire all this together, which for me is done by the following code, which starts the http service, creates the topology, and also MOST importantly sets a KafkaStreams state listener.

package interactive.queries.ratings

import java.util.Properties

import entities.Rating
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.errors.BrokerNotFoundException
import org.apache.kafka.streams.state.{HostInfo, QueryableStoreTypes}
import org.apache.kafka.streams.KafkaStreams
import utils.{Retry, Settings, StateStores}

import scala.concurrent.duration._
import scala.concurrent.ExecutionContext
import scala.util.Success
import java.util.concurrent.CountDownLatch

object RatingStreamProcessingTopologyApp extends App {

  import Serdes._


  implicit val ec = ExecutionContext.global
  val doneSignal = new CountDownLatch(1)

  run()

  private def run(): Unit = {

    val restEndpoint: HostInfo = new HostInfo(Settings.restApiDefaultHostName, Settings.restApiDefaultPort)
    System.out.println(s"Connecting to Kafka cluster via bootstrap servers ${Settings.bootStrapServers}")
    System.out.println(s"REST endpoint at http://${restEndpoint.host}:${restEndpoint.port}")

    val props: Properties = Settings.createRatingStreamsProperties()
    val topology = new RatingStreamProcessingTopology().createTopolgy()
    val streams: KafkaStreams = new KafkaStreams(topology,props)

    val restService = new RatingRestService(streams, restEndpoint)

    //Can only add this in State == CREATED
    streams.setUncaughtExceptionHandler(( thread :Thread, throwable : Throwable) => {
      println(s"============> ${throwable.getMessage}")
      shutDown(streams,restService)

    })

    streams.setStateListener((newState, oldState) => {
      if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
        restService.setReady(true)
      } else if (newState != KafkaStreams.State.RUNNING) {
        restService.setReady(false)
      }
    })

    restService.start()

    Runtime.getRuntime.addShutdownHook(new Thread(() => {
      shutDown(streams,restService)
    }))

    println("Starting KafkaStream")

    // Always (and unconditionally) clean local state prior to starting the processing topology.
    // We opt for this unconditional call here because this will make it easier for you to
    // play around with the example when resetting the application for doing a re-run
    // (via the Application Reset Tool,
    // http://docs.confluent.io/current/streams/developer-guide.html#application-reset-tool).
    //
    // The drawback of cleaning up local state prior is that your app must rebuilt its local
    // state from scratch, which will take time and will require reading all the state-relevant
    // data from the Kafka cluster over the network.
    // Thus in a production scenario you typically do not want to clean up always as we do
    // here but rather only when it is truly needed, i.e., only under certain conditions
    // (e.g., the presence of a command line flag for your app).
    // See `ApplicationResetExample.java` for a production-like example.
    streams.cleanUp
    streams.start

    doneSignal.await
   ()
  }


  private def shutDown(streams: KafkaStreams, restService: RatingRestService): Unit = {
    doneSignal.countDown
    streams.close()
    restService.stop
  }
}

Running it all

That is a lot of code, but you probably want to know how to run it all.

Well you will need to do a few things for this one.

How to install Kafka/Zookeeper and get them running on Windows

This section will talk you through how to get Kafka and get it working on Windows

Step 1 : Download Kafka

Grab Confluence Platform  X.X.X Open Source : https://www.confluent.io/download/

Step 2 : Update Dodgy BAT Files

The official Kafka windows BAT files don’t seem to work in the Confluence Platform X.X.X. Open Source download. So replace the official [YOUR INSTALL PATH]\confluent-x.x.x\bin\windows BAT files with the ones found here : https://github.com/renukaradhya/confluentplatform/tree/master/bin/windows

Step 3 : Make Some Minor Changes To Log Locations etc etc

Kafka/Zookeeper as installed are setup for Linux, as such these paths won’t work on Windows. So we need to adjust that a bit. So lets do that now

  • Modify the [YOUR INSTALL PATH]\confluent-x.x.x\etc\kafka\zookeeper.properties file to change the dataDir to something like dataDir=c:/temp/zookeeper
  • Modify the [YOUR INSTALL PATH]\confluent-x.x.x.\etc\kafka\server.properties file to uncomment the line delete.topic.enable=true

Step 4 : Running Zookeeper + Kafka + Creating Topics

Now that we have installed everything, it’s just a matter of running stuff up. Sadly before we can run Kafka we need to run Zookeeper, and before Kafka can send messages we need to ensure that the Kafka topics are created. Topics must exist before messages

Mmm that sounds like a fair bit of work. Well it is, so I decided to script this into a little PowerShell script, which you can adjust to your needs

$global:kafkaWindowsBatFolder = "C:\Apache\confluent-5.2.1-2.12\bin\windows\"
$global:kafkaAndZooLoggingFolder = "C:\temp\"
$global:kafkaAndZooTmpFolder = "C:\tmp\"


$global:kafkaTopics = 
	"rating-submit-topic"
	
$global:ProcessesToKill = @()



function RunPipeLine() 
{
	WriteHeader "STOPPING PREVIOUS SERVICES"
	StopZookeeper
	StopKafka
	
	$path = $kafkaAndZooLoggingFolder + "kafka-logs"
	Remove-Item -Recurse -Force $path
	
	$path = $kafkaAndZooLoggingFolder + "zookeeper"
	Remove-Item -Recurse -Force $path
	
    $path = $kafkaAndZooLoggingFolder
	Remove-Item -Recurse -Force $path


	Start-Sleep -s 20
	
	WriteHeader "STARTING NEW SERVICE INSTANCES"
	StartZookeeper
	
	Start-Sleep -s 20
	StartKafka
	
	Start-Sleep -s 20

	CreateKafkaTopics
    
	Start-Sleep -s 20
	
	
	WaitForKeyPress

	WriteHeader "KILLING PROCESSES CREATED BY SCRIPT"
	KillProcesses
}

function WriteHeader($text) 
{
	Write-Host "========================================`r`n"
	Write-Host "$text`r`n"
	Write-Host "========================================`r`n"
}


function StopZookeeper() {
    # C:\Apache\confluent-5.2.1-2.12\bin\windows\zookeeper-server-stop.bat
	$zookeeperCommandLine = $global:kafkaWindowsBatFolder + "zookeeper-server-stop.bat"
	Write-Host "> Zookeeper Command Line : $zookeeperCommandLine`r`n"
    $global:ProcessesToKill += start-process $zookeeperCommandLine -WindowStyle Normal -PassThru
}

function StopKafka() {
	# C:\Apache\confluent-5.2.1-2.12\bin\windows\kafka-server-stop.bat
	$kafkaServerCommandLine = $global:kafkaWindowsBatFolder + "kafka-server-stop.bat" 
	Write-Host "> Kafka Server Command Line : $kafkaServerCommandLine`r`n"
    $global:ProcessesToKill += start-process $kafkaServerCommandLine  -WindowStyle Normal -PassThru
}

function StartZookeeper() {
    # C:\Apache\confluent-5.2.1-2.12\bin\windows\zookeeper-server-start.bat C:\Apache\confluent-5.2.1-2.12\bin\windows\..\..\etc\kafka\zookeeper.properties
	$zookeeperCommandLine = $global:kafkaWindowsBatFolder + "zookeeper-server-start.bat"
	$arguments = $global:kafkaWindowsBatFolder + "..\..\etc\kafka\zookeeper.properties"
	Write-Host "> Zookeeper Command Line : $zookeeperCommandLine args: $arguments `r`n"
    $global:ProcessesToKill += start-process $zookeeperCommandLine $arguments -WindowStyle Normal -PassThru
}

function StartKafka() {
    # C:\Apache\confluent-5.2.1-2.12\bin\windows\kafka-server-start.bat C:\Apache\confluent-5.2.1-2.12\bin\windows\..\..\etc\kafka\server.properties
	$kafkaServerCommandLine = $global:kafkaWindowsBatFolder + "kafka-server-start.bat" 
	$arguments = $global:kafkaWindowsBatFolder + "..\..\etc\kafka\server.properties"
	Write-Host "> Kafka Server Command Line : $kafkaServerCommandLine args: $arguments `r`n"
    $global:ProcessesToKill += start-process $kafkaServerCommandLine $arguments -WindowStyle Normal -PassThru
}

function CreateKafkaTopics() 
{
   # C:\Apache\confluent-5.2.1-2.12\bin\windows\kafka-topics.bat --zookeeper localhost:2181 --delete --topic rating-submit-topic
	  
   # C:\Apache\confluent-5.2.1-2.12\bin\windows\kafka-topics.bat --zookeeper localhost:2181 --create  --replication-factor 1 --partitions 1 --topic rating-submit-topic
	Foreach ($topic in $global:kafkaTopics )
	{
		$kafkaCommandLine = $global:kafkaWindowsBatFolder + "kafka-topics.bat"
		$arguments = "--zookeeper localhost:2181 --create  --replication-factor 1 --partitions 1 --topic $topic"
		Write-Host "> Create Kafka Topic Command Line : $kafkaCommandLine args: $arguments `r`n"
		$global:ProcessesToKill += start-process $kafkaCommandLine $arguments -WindowStyle Normal -PassThru
	}
}

function WaitForKeyPress
{
	Write-Host -NoNewLine "Press any key to continue....`r`n"
	[Console]::ReadKey()
}


function KillProcesses() 
{
	Foreach ($processToKill in $global:ProcessesToKill )
	{
		$name = $processToKill | Get-ChildItem -Name
		Write-Host "Killing Process : $name `r`n" 
		$processToKill | Stop-Process -Force
	}
}


# Kick of the entire pipeline
RunPipeLine

 

So once you have done all of that. You should be able to run the RatingsProducerApp from the link I post at beginning to my GitHub repo. Then after that it is simple a question of running RatingStreamProcessingTopologyApp

 

If all has gone ok you should be able to hit up postman (excellent REST tool), and use these example endpoints

image

Wooohoo we got some data interactively

 

image

 

Conclusion

And that is all I wanted to say this time. Next time we will be looking at the final post in this series, time windowed operations