Azure

Getting reacquainted with Azure Devops

Its been a while since I blogged about anything of note. I plan on changing that soon. I am going through some personal stuff, but am determined to get my mojo back.

 

So hopefully this is the start of me getting my zing back

 

Anyway at the job before this one we used to make quite heavy use of VSTS, which is now Azure Devops. I used it a bit, but I am was never the “build” guy. At my current role, I am becoming more and more architecture/devops focussed. I am what one would call a hands on Architect.

Anyway long story short we use Gitlab/Rancher/AWS/Docker where I am now, but I just wanted to keep my toe in with what is going on in Azure, so I plan on getting to know the lay of the land of some of the new tooling

 

I thought what better way to start with something dead simple, like lets create a new WebApi .NET Core app, and get it deployed to a Azure App Service. This has to be pretty simple to start with right?

 

So before we start, what I did was sign up for a free tier in Azure, and added all my stuff to a single ResourceGroup in Azure.That way I can just trash it all to keep my costs low. For those that don’t know Azure resource groups are like the uber top level container that all your other services live in.

 

Ok so lets get started

 

Step 1 : Create the app

Creating the app for me was as simple as creating a new Web project in Visual Studio 2019, Where I stuck with the standard WebApi project. I literally did not change a thing, I stuck completely with the standard ValuesController, and targeted .NET Core 2.1

 

image

 

Step 2 : Created A GitHub Repo

So now that I have created a pretty useless demo app, I created a GitHub repo, and pushed this code to it

 

Step 3 : Azure AppService

So I have this small .NET Core 2.1 demo app, so I should be able to run it on Linux. So I sign up the free tier of Azure, then created a new AppService, which looked like these where I was careful to keep to the Dev/Test tier of the size plan

 

image

Ok so once that was created I was ready to head over to devops.azure.com

Step 3 : Azure DevOps

So the 1st thing I did was create a new project, lets call it “XXX” for now

image

From there I picked “GitHub” on the Connect part of the wizard. Next I simply picked my repository from the list, and then it was time to get down with the actual Build side of the Azure DevOps pipeline.

 

When I last used VSTS it was all UI based, like everything it has all turned to YAML now. So you need to look up the various build tasks to get the available task syntax that you can use. Luckily the basic skeleton was clever enough to give a very hand link to this page : https://docs.microsoft.com/en-gb/azure/devops/pipelines/ecosystems/dotnet-core?view=azure-devops

 

From there you can grab several of the tasks you might need such as

 

– Build

– Publish

 

If you want to see all the available actions for the DotNetCoreCLI@2 task you can look here : https://docs.microsoft.com/en-us/azure/devops/pipelines/tasks/build/dotnet-core-cli?view=azure-devops, in fact from that link you can use the treeview on the left to find all sorts of tasks to help you live the DevOps pipeline dream

 

So once I read the docs a bit I ended up with this DevOps Build pipeline configuration

 

# ASP.NET Core
# Build and test ASP.NET Core projects targeting .NET Core.
# Add steps that run tests, create a NuGet package, deploy, and more:
# https://docs.microsoft.com/azure/devops/pipelines/languages/dotnet-core

trigger:
- master

pool:
  vmImage: 'ubuntu-latest'

variables:
  buildConfiguration: 'Release'

steps:
- task: DotNetCoreCLI@2
  displayName: Build
  inputs:
    command: build
    projects: '**/*.csproj'
    arguments: '--configuration Release' # Update this to match your need

- task: DotNetCoreCLI@2
  inputs:
    command: publish
    projects: '**/*.csproj'
    publishWebProjects: True
    arguments: '--configuration $(BuildConfiguration) --output $(Build.ArtifactStagingDirectory)'
    zipAfterPublish: True

# this code takes all the files in $(Build.ArtifactStagingDirectory) and uploads them as an artifact of your build.
- task: PublishBuildArtifacts@1
  inputs:
    pathtoPublish: '$(Build.ArtifactStagingDirectory)' 
    artifactName: 'AzureDevOpsWebApiTestApp'
  

I think this is fairly self explanatory.So with this configured, it was time to test the Build. Which worked 1st time YAY.

 

image

 

So with the Build side of the pipeline working, I turned my hand towards the Release side of the pipeline. For me this was as simple as picking this type of release

 

image

I filled in the required parameters and ran the Release from the pipeline. It also passed. YAY

 

I then headed over to my previously created App Service in Azure, grabbed the public url, and gave it a try, and it worked

 

image

 

I have to say I was pleasantly surprised at just how smoothly all of that went.

 

I will be digging into Azure DevOps a bit more, I would like to see it run some tests, and I would like to setup some private nuget feeds and publish to those and consume from them. This will be the subjects of some future posts no doubt

powershell, Uncategorized

Powershell : WebRequest Server Not Found

So at work we use a bit of PowerShell here and there to do various things. We also use PowerShell remoting, where we run the same bit of code on a potential list of remote targets.

One of the things we use a lot is a notification system where it will attempt to use this code or code

Invoke-RestMethod -Uri "$env:mattermost_uri" -Method Post -Body $json -ContentType "application/json"

But occasionally this would fail on some of our target VMs, where I would see a “Unable to connect to remote server“.

Turns out there is a default proxy at play which can be disabled like this

[System.Net.HttpWebRequest]::DefaultWebProxy = New-Object System.Net.WebProxy

After that is was all good for me

Kafka

KafkaStreams : Windowing

SO last time we look at interactive queries. This time (the last one in this series) we will look at Windowing operations. This is a fairly dry subject, and I don’t have too much to add to this one over the official docs, so this is a nice short one to end the series.

Where is the code?

The code for this post is all contained here

What is Windowing anyway?

Windowing lets you control how to group records that have the same key for stateful operations such as aggregations or joins into so-called windows. Windows are tracked per record key. For example, in join operations, a windowing state store is used to store all the records received so far within the defined window boundary. In aggregating operations, a windowing state store is used to store the latest aggregation results per window. Old records in the state store are purged after the specified window retention period. Kafka Streams guarantees to keep a window for at least this specified time; the default value is one day and can be changed via Materialized#withRetention().

This is what Kafka Streams supports for windowing

image

 

 

Tumbling Windows

Tumbling time windows are a special case of hopping time windows and, like the latter, are windows based on time intervals. They model fixed-size, non-overlapping, gap-less windows. A tumbling window is defined by a single property: the window’s size. A tumbling window is a hopping window whose window size is equal to its advance interval. Since tumbling windows never overlap, a data record will belong to one and only one window.

../../_images/streams-time-windows-tumbling.png

Here is an example of this using the simple word count example we have used before

package windowing

import java.time.Duration
import java.util
import java.util.Properties
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.kstream.TimeWindows
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala._
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.{KafkaStreams, Topology}
import utils.Settings


class TumblingWordCountTopology extends App {

  import Serdes._

  val props: Properties = Settings.createBasicStreamProperties(
    "tumbling-window-wordcount-application","localhost:9092")

  run()

  private def run(): Unit = {
    val topology = wordCountToplogy()
    val streams: KafkaStreams = new KafkaStreams(topology, props)
    streams.start()
    sys.ShutdownHookThread {
      streams.close(Duration.ofSeconds(10))
    }
  }

   def wordCountToplogy() : Topology = {

    import org.apache.kafka.streams.state.Stores
    val wordCountStoreName = "wordCountStore"
    val wordCountStoreSupplied = Stores.inMemoryKeyValueStore(wordCountStoreName)

    val builder = new StreamsBuilder()
    val textLines: KStream[String, String] = builder.stream[String, String]("TextLinesTopic")
    val wordCounts = textLines.flatMapValues(x => x.toLowerCase.split("\\W+"))
                    .groupBy((key, word) => word)
      .windowedBy(TimeWindows.of(Duration.ofSeconds(5)))
      .count()
    wordCounts.toStream.to("WordsWithCountsTopic")
    builder.build()
  }
}

 

Hopping Time Window

Hopping time windows are windows based on time intervals. They model fixed-sized, (possibly) overlapping windows. A hopping window is defined by two properties: the window’s size and its advance interval (aka “hop”). The advance interval specifies by how much a window moves forward relative to the previous one. For example, you can configure a hopping window with a size 5 minutes and an advance interval of 1 minute. Since hopping windows can overlap — and in general they do — a data record may belong to more than one such window.

image

and here is an example of this using the word count example

package windowing

import java.time.Duration
import java.util
import java.util.Properties
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.kstream.TimeWindows
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala._
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.{KafkaStreams, Topology}
import utils.Settings


class HoppingTimeWordCountTopology extends App {

  val props: Properties = Settings.createBasicStreamProperties(
    "hopping-time-window-wordcount-application","localhost:9092")

  run()

  private def run(): Unit = {
    val topology = wordCountToplogy()
    val streams: KafkaStreams = new KafkaStreams(topology, props)
    streams.start()
    sys.ShutdownHookThread {
      streams.close(Duration.ofSeconds(10))
    }
  }

   def wordCountToplogy() : Topology = {

    import org.apache.kafka.streams.state.Stores
    val wordCountStoreName = "wordCountStore"
    val wordCountStoreSupplied = Stores.inMemoryKeyValueStore(wordCountStoreName)

    val builder = new StreamsBuilder()
    val textLines: KStream[String, String] = builder.stream[String, String]("TextLinesTopic")
    val wordCounts = textLines.flatMapValues(x => x.toLowerCase.split("\\W+"))
                    .groupBy((key, word) => word)
      .windowedBy(TimeWindows.of(Duration.ofSeconds(5).plus(Duration.ofMinutes(1))))
      .count()
    wordCounts.toStream.to("WordsWithCountsTopic")
    builder.build()
  }
}

That’s It

And that brings us to the end of this series, I will be diving back into .NET land next to write an implementation of the SWIM algorithm. After that I think I will get myself a AWS Solution Architect exam under my belt

 

I hope you have enjoyed the series. I know this one was a bit naff, but I kind of ran out of steam

Kafka

KafkaStreams : Interactive Queries

So last time we looked at how to make use of the Processor API in the DSL. This time we are going to look at interactive queries.

 

Where is the code?

The code for this post is all contained here

And the tests are all contained here

 

Walking through a Kafka Streams processing node, and the duality of streams

Before we get started I just wanted to include a several excerpts taken from the official Kafka docs : http://docs.confluent.io/current/streams/concepts.html#duality-of-streams-and-tables which talks about KStream and KTable objects (which are the stream and table objects inside Kafka streams)

 

When implementing stream processing use cases in practice, you typically need both streams and also databases. An example use case that is very common in practice is an e-commerce application that enriches an incoming stream of customer transactions with the latest customer information from a database table. In other words, streams are everywhere, but databases are everywhere, too.

Any stream processing technology must therefore provide first-class support for streams and tables. Kafka’s Streams API provides such functionality through its core abstractions for streams and tables, which we will talk about in a minute. Now, an interesting observation is that there is actually a close relationship between streams and tables, the so-called stream-table duality. And Kafka exploits this duality in many ways: for example, to make your applications elastic, to support fault-tolerant stateful processing, or to run interactive queries against your application’s latest processing results. And, beyond its internal usage, the Kafka Streams API also allows developers to exploit this duality in their own applications.

A simple form of a table is a collection of key-value pairs, also called a map or associative array. Such a table may look as follows:

../_images/streams-table-duality-01.jpg

The stream-table duality describes the close relationship between streams and tables.

  • Stream as Table: A stream can be considered a changelog of a table, where each data record in the stream captures a state change of the table. A stream is thus a table in disguise, and it can be easily turned into a “real” table by replaying the changelog from beginning to end to reconstruct the table. Similarly, aggregating data records in a stream will return a table. For example, we could compute the total number of pageviews by user from an input stream of pageview events, and the result would be a table, with the table key being the user and the value being the corresponding pageview count.
  • Table as Stream: A table can be considered a snapshot, at a point in time, of the latest value for each key in a stream (a stream’s data records are key-value pairs). A table is thus a stream in disguise, and it can be easily turned into a “real” stream by iterating over each key-value entry in the table.

Let’s illustrate this with an example. Imagine a table that tracks the total number of pageviews by user (first column of diagram below). Over time, whenever a new pageview event is processed, the state of the table is updated accordingly. Here, the state changes between different points in time – and different revisions of the table – can be represented as a changelog stream (second column).

image

Because of the stream-table duality, the same stream can be used to reconstruct the original table (third column):

../_images/streams-table-duality-03.jpg

The same mechanism is used, for example, to replicate databases via change data capture (CDC) and, within Kafka Streams, to replicate its so-called state stores across machines for fault tolerance. The stream-table duality is such an important concept for stream processing applications in practice that Kafka Streams models it explicitly via the KStream and KTable abstractions, which we describe in the next sections.

I would STRONLY urge you all to read the section of the official docs above, as it will really help you should you want to get into Kafka Streams.

 

Interactive Queries

Up until now we have been using KStream, and on occasion we have also used KTable to store state into a state store. A KTable is also able to be turned back into a KStream and pushed back out on an “output” topic, which is what most of my examples have done so far.

 

However this is kind of annoying and wasteful, we just had the data in the format we wanted in Kafka in a KTable, and then we need to transform it back into a KStream and send it to a new topic. Surely there is a better way.

 

Luckily there is “Interactive queries”.KafkaStreams supports the notion of queries across state stores. Most typical state stores are key value stores, where the key will be on a certain partition. As such the state store is distributed across the partitions that make up the topic.

Mmm so how do we get the data from all the stores? Well luckily KafkaStreams exposes metadata that allow us to obtain the hostname of the hosts that have a certain state store. From there we can either use a local call or perform a RPC (most typically REST call) call to fetch the remote hosts data.

 

This is illustrated in the following diagram

 

../../_images/streams-interactive-queries-03.png

The full state of your application is typically split across many distributed instances of your application, and across many state stores that are managed locally by these application instances.

That is the basic idea anyway. So we need to create some plumbing to help us either call into a local state store, or query a remote host that owns some keys for a state store.

Is it that simple?

Well no actually. KafkaStreams has the following state state transition diagram that one needs to be aware of. If you try and query a state store for metadata/or actual data before the 2nd running you won’t see anything as Kafka is considered NOT READY.

 

image

 

There is a lot of chat on the internet (its even in the Confluent FAQs) about state store metadata not being available, and it is due to this state machine.

So how do we deal with this?

  • Well one thing to do is implement some kind of retry policy (this post covers this)
  • Another thing to do is stream state listener (this post also covers this)

Ok so are there any other pit falls? No not really you just need to make sure you don’t start using state stores unless your KafkaStreams is in a good state.

So now that we have the basics covered lets see some code

The Tolology

This is the topology that we will be using to construct our state store

package interactive.queries.ratings


import java.util

import entities.Rating
import org.apache.kafka.streams.scala.{Serdes, _}
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.state.KeyValueStore
import serialization.JSONSerde
import utils.StateStores


class RatingStreamProcessingTopology  {

  def createTopolgy(): Topology = {

    implicit val stringSerde = Serdes.String
    implicit val ratingSerde = new JSONSerde[Rating]
    implicit val listRatingSerde = new JSONSerde[List[Rating]]
    implicit val consumed = kstream.Consumed.`with`(stringSerde, ratingSerde)
    implicit val materializer = Materialized.`with`(stringSerde, listRatingSerde)
    implicit val grouped = Grouped.`with`(stringSerde, ratingSerde)

    val builder: StreamsBuilder = new StreamsBuilder
    val ratings: KStream[String, Rating] =
      builder.stream[String, Rating]("rating-submit-topic")


    import org.apache.kafka.streams.state.Stores

    val logConfig = new util.HashMap[String, String]
    logConfig.put("retention.ms", "172800000")
    logConfig.put("retention.bytes", "10000000000")
    logConfig.put("cleanup.policy", "compact,delete")
    val ratingByEmailStoreName = StateStores.RATINGS_BY_EMAIL_STORE
    val ratingByEmailStoreSupplied = Stores.inMemoryKeyValueStore(ratingByEmailStoreName)
    val ratingByEmailStoreBuilder = Stores.keyValueStoreBuilder(ratingByEmailStoreSupplied,
      Serdes.String, listRatingSerde)
      .withLoggingEnabled(logConfig)
      .withCachingEnabled()

    val builtStore = ratingByEmailStoreBuilder.build()

    //When aggregating a grouped stream, you must provide an initializer (e.g., aggValue = 0)
    //and an “adder” aggregator (e.g., aggValue + curValue). When aggregating a grouped table,
    //you must provide a “subtractor” aggregator (think: aggValue - oldValue).
    val groupedBy = ratings.groupByKey

    //aggrgate by (user email -> their ratings)
    val ratingTable : KTable[String, List[Rating]] = groupedBy
      .aggregate[List[Rating]](List[Rating]())((aggKey, newValue, aggValue) => {
      newValue :: aggValue
    })(Materialized.as[String, List[Rating]](ratingByEmailStoreSupplied))

    ratingTable.mapValues((k,v) => {
      val theKey = k
      val theValue = v
      v
    })


    builder.build()
  }
}

Entities stored

And this is the entity type that we will be storing in the state store

package entities

case class Rating(fromEmail: String, toEmail: String, score: Float)

Metadata

As previously mentioned we need to query metadata to find out what host the store is that has a particular key. In order to do this I have come up with this class

package interactive.queries

import org.apache.kafka.streams.KafkaStreams
import org.apache.kafka.streams.state.StreamsMetadata
import java.util.stream.Collectors
import entities.HostStoreInfo
import org.apache.kafka.common.serialization.Serializer
import org.apache.kafka.connect.errors.NotFoundException
import scala.collection.JavaConverters._


/**
  * Looks up StreamsMetadata from KafkaStreams
  */
class MetadataService(val streams: KafkaStreams) {


  /**
    * Get the metadata for all of the instances of this Kafka Streams application
    *
    * @return List of { @link HostStoreInfo}
    */
  def streamsMetadata() : List[HostStoreInfo] = {

    // Get metadata for all of the instances of this Kafka Streams application
    val metadata = streams.allMetadata
    return mapInstancesToHostStoreInfo(metadata)
  }


  /**
    * Get the metadata for all instances of this Kafka Streams application that currently
    * has the provided store.
    *
    * @param store The store to locate
    * @return List of { @link HostStoreInfo}
    */
  def streamsMetadataForStore(store: String) : List[HostStoreInfo] = {

    // Get metadata for all of the instances of this Kafka Streams application hosting the store
    val metadata = streams.allMetadataForStore(store)
    return mapInstancesToHostStoreInfo(metadata)
  }


  /**
    * Find the metadata for the instance of this Kafka Streams Application that has the given
    * store and would have the given key if it exists.
    *
    * @param store Store to find
    * @param key   The key to find
    * @return { @link HostStoreInfo}
    */
  def streamsMetadataForStoreAndKey[T](store: String, key: T, serializer: Serializer[T]) : HostStoreInfo = {
    // Get metadata for the instances of this Kafka Streams application hosting the store and
    // potentially the value for key
    val metadata = streams.metadataForKey(store, key, serializer)
    if (metadata == null)
      throw new NotFoundException(
        s"No metadata could be found for store : ${store}, and key type : ${key.getClass.getName}")

    HostStoreInfo(metadata.host, metadata.port, metadata.stateStoreNames.asScala.toList)
  }


  def mapInstancesToHostStoreInfo(metadatas : java.util.Collection[StreamsMetadata]) : List[HostStoreInfo] = {
    metadatas.stream.map[HostStoreInfo](metadata =>
      HostStoreInfo(
        metadata.host(),
        metadata.port,
        metadata.stateStoreNames.asScala.toList))
      .collect(Collectors.toList())
      .asScala.toList
  }
}

Exposing the state store over RPC

As I say the most popular way of doing this is via REST. As such I am using Akka.Http to do this. Here the Rating http server class

package interactive.queries.ratings

import org.apache.kafka.streams.KafkaStreams
import org.apache.kafka.streams.state.HostInfo
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import spray.json.DefaultJsonProtocol._
import entities.AkkaHttpEntitiesJsonFormats._
import entities._
import utils.StateStores
import akka.http.scaladsl.marshalling.ToResponseMarshallable
import org.apache.kafka.common.serialization.Serdes
import scala.concurrent.{Await, ExecutionContext, Future}
import akka.http.scaladsl.unmarshalling.Unmarshal
import interactive.queries.MetadataService
import spray.json._
import scala.util.{Failure, Success}
import org.apache.kafka.streams.state.QueryableStoreTypes
import scala.concurrent.duration._


object RestService {
  val DEFAULT_REST_ENDPOINT_HOSTNAME  = "localhost"
}

class RatingRestService(val streams: KafkaStreams, val hostInfo: HostInfo) {

  val metadataService = new MetadataService(streams)
  var bindingFuture: Future[Http.ServerBinding] = null

  implicit val system = ActorSystem("rating-system")
  implicit val materializer = ActorMaterializer()
  implicit val executionContext = system.dispatcher

  var isStateStoredReady: Boolean = false


  def setReady(isReady : Boolean): Unit = {
    isStateStoredReady = isReady
  }


  def start() : Unit = {
    val emailRegexPattern =  """\w+""".r
    val storeNameRegexPattern =  """\w+""".r

    val route =
      path("ratingByEmail") {
        get {
          parameters('email.as[String]) { (email) =>

            if(!isStateStoredReady) {
              complete(HttpResponse(StatusCodes.InternalServerError, entity = "state stored not queryable, possible due to re-balancing"))
            }

            try {

              val host = metadataService.streamsMetadataForStoreAndKey[String](
                StateStores.RATINGS_BY_EMAIL_STORE,
                email,
                Serdes.String().serializer()
              )

              //store is hosted on another process, REST Call
              if(!thisHost(host)) {
                onComplete(fetchRemoteRatingByEmail(host, email)) {
                  case Success(value) => complete(value)
                  case Failure(ex)    => complete(HttpResponse(StatusCodes.InternalServerError, entity = ex.getMessage))
                }
              }
              else {
                onComplete(fetchLocalRatingByEmail(email)) {
                  case Success(value) => complete(value)
                  case Failure(ex)    => complete(HttpResponse(StatusCodes.InternalServerError, entity = ex.getMessage))
                }
              }
            }
            catch {
              case (ex: Exception) => {
                complete(HttpResponse(StatusCodes.InternalServerError, entity = ex.getMessage))
              }
            }
          }
        }
      } ~
      path("instances") {
        get {
          if(!isStateStoredReady) {
            complete(HttpResponse(StatusCodes.InternalServerError, entity = "state stored not queryable, possible due to re-balancing"))
          }
          complete(ToResponseMarshallable.apply(metadataService.streamsMetadata))
        }
      }~
      path("instances" / storeNameRegexPattern) { storeName =>
        get {
          if(!isStateStoredReady) {
            complete(HttpResponse(StatusCodes.InternalServerError, entity = "state stored not queryable, possible due to re-balancing"))
          }
          complete(ToResponseMarshallable.apply(metadataService.streamsMetadataForStore(storeName)))
        }
      }

    bindingFuture = Http().bindAndHandle(route, hostInfo.host, hostInfo.port)
    println(s"Server online at http://${hostInfo.host}:${hostInfo.port}/\n")

    Runtime.getRuntime.addShutdownHook(new Thread(() => {
      bindingFuture
        .flatMap(_.unbind()) // trigger unbinding from the port
        .onComplete(_ => system.terminate()) // and shutdown when done
    }))
  }


  def fetchRemoteRatingByEmail(host:HostStoreInfo, email: String) : Future[List[Rating]] = {

    val requestPath = s"http://${hostInfo.host}:${hostInfo.port}/ratingByEmail?email=${email}"
    println(s"Client attempting to fetch from online at ${requestPath}")

    val responseFuture: Future[List[Rating]] = {
      Http().singleRequest(HttpRequest(uri = requestPath))
        .flatMap(response => Unmarshal(response.entity).to[List[Rating]])
    }

    responseFuture
  }

  def fetchLocalRatingByEmail(email: String) : Future[List[Rating]] = {

    val ec = ExecutionContext.global

    println(s"client fetchLocalRatingByEmail email=${email}")

    val host = metadataService.streamsMetadataForStoreAndKey[String](
      StateStores.RATINGS_BY_EMAIL_STORE,
      email,
      Serdes.String().serializer()
    )

    val f = StateStores.waitUntilStoreIsQueryable(
      StateStores.RATINGS_BY_EMAIL_STORE,
      QueryableStoreTypes.keyValueStore[String,List[Rating]](),
      streams
    ).map(_.get(email))(ec)

    val mapped = f.map(rating => {
      if (rating == null)
        List[Rating]()
      else
        rating
    })

    mapped
  }

  def stop() : Unit = {
    bindingFuture
      .flatMap(_.unbind()) // trigger unbinding from the port
      .onComplete(_ => system.terminate()) // and shutdown when done
  }

  def thisHost(hostStoreInfo: HostStoreInfo) : Boolean = {
    hostStoreInfo.host.equals(hostInfo.host()) &&
      hostStoreInfo.port == hostInfo.port
  }
}

You can see that this class does the following

  • Defines the routes
  • Uses the metadata service to work out whether it should be a local host query or an RPC call
  • Uses a waitUntilStoreIsQueryable helper method to work out when the state store is queryable. The helper class is shown here
package utils

import org.apache.kafka.streams.KafkaStreams
import org.apache.kafka.streams.errors.InvalidStateStoreException
import org.apache.kafka.streams.state.{QueryableStoreType, QueryableStoreTypes}

import scala.concurrent.{ExecutionContext, Future}

object StateStores {
  val RATINGS_BY_EMAIL_STORE = "ratings-by-email-store"

  def waitUntilStoreIsQueryable[T]
  (
    storeName: String,
    queryableStoreType: QueryableStoreType[T],
    streams: KafkaStreams
  ) (implicit ec: ExecutionContext): Future[T] = {

    Retry.retry(5) {
      Thread.sleep(500)
      streams.store(storeName, queryableStoreType)
    }(ec)
  }


  private def printStoreMetaData[K, V](streams:KafkaStreams, storeName:String) : Unit = {

    val md = streams.allMetadata()
    val mdStore = streams.allMetadataForStore(storeName)

    val maybeStore = StateStores.waitUntilStoreIsQueryableSync(
      storeName,
      QueryableStoreTypes.keyValueStore[K,V](),
      streams)

    maybeStore match {
      case Some(store) => {
        val range = store.all
        val HASNEXT = range.hasNext
        while (range.hasNext) {
          val next = range.next
          System.out.print(s"key: ${next.key} value: ${next.value}")
        }
      }
      case None => {
        System.out.print(s"store not ready")
        throw new Exception("not ready")
      }
    }
  }

  @throws[InterruptedException]
  def waitUntilStoreIsQueryableSync[T](
        storeName: String,
        queryableStoreType: QueryableStoreType[T],
        streams: KafkaStreams): Option[T] = {
    while (true) {
      try {
        return Some(streams.store(storeName, queryableStoreType))
      }
      catch {
        case ignored: InvalidStateStoreException =>
          val state = streams.state
          // store not yet ready for querying
          Thread.sleep(100)
      }
    }
    None
  }
}

Existing Framework

If you looked at the code above, and saw things in it that you thought could be made generic, such as GET uri, fetching data from state store, retries, JSON serialization etc etc. You are correct. I could have done this. But as is quite common for me, I get to the end of something and I do everything that needs to be done, and I then discover better people than me have thought about this, and actually offer a framework for this type of thing. Lightbend the people behind Akka, have one such library for KafkaStreams. It available here : https://github.com/lightbend/kafka-streams-query

What they have done is make all the glue bits in the middle nice a modular and generic so you can get on with writing your application code, and leave the robust querying, retries etc etc to a framework. It is well written and looks fairly easy to use, but if you have not used KafkaStreams interactive queries before it is best to dive in yourself first.

The Streams App

So we almost done actually. All you need to do now is wire all this together, which for me is done by the following code, which starts the http service, creates the topology, and also MOST importantly sets a KafkaStreams state listener.

package interactive.queries.ratings

import java.util.Properties

import entities.Rating
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.errors.BrokerNotFoundException
import org.apache.kafka.streams.state.{HostInfo, QueryableStoreTypes}
import org.apache.kafka.streams.KafkaStreams
import utils.{Retry, Settings, StateStores}

import scala.concurrent.duration._
import scala.concurrent.ExecutionContext
import scala.util.Success
import java.util.concurrent.CountDownLatch

object RatingStreamProcessingTopologyApp extends App {

  import Serdes._


  implicit val ec = ExecutionContext.global
  val doneSignal = new CountDownLatch(1)

  run()

  private def run(): Unit = {

    val restEndpoint: HostInfo = new HostInfo(Settings.restApiDefaultHostName, Settings.restApiDefaultPort)
    System.out.println(s"Connecting to Kafka cluster via bootstrap servers ${Settings.bootStrapServers}")
    System.out.println(s"REST endpoint at http://${restEndpoint.host}:${restEndpoint.port}")

    val props: Properties = Settings.createRatingStreamsProperties()
    val topology = new RatingStreamProcessingTopology().createTopolgy()
    val streams: KafkaStreams = new KafkaStreams(topology,props)

    val restService = new RatingRestService(streams, restEndpoint)

    //Can only add this in State == CREATED
    streams.setUncaughtExceptionHandler(( thread :Thread, throwable : Throwable) => {
      println(s"============> ${throwable.getMessage}")
      shutDown(streams,restService)

    })

    streams.setStateListener((newState, oldState) => {
      if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
        restService.setReady(true)
      } else if (newState != KafkaStreams.State.RUNNING) {
        restService.setReady(false)
      }
    })

    restService.start()

    Runtime.getRuntime.addShutdownHook(new Thread(() => {
      shutDown(streams,restService)
    }))

    println("Starting KafkaStream")

    // Always (and unconditionally) clean local state prior to starting the processing topology.
    // We opt for this unconditional call here because this will make it easier for you to
    // play around with the example when resetting the application for doing a re-run
    // (via the Application Reset Tool,
    // http://docs.confluent.io/current/streams/developer-guide.html#application-reset-tool).
    //
    // The drawback of cleaning up local state prior is that your app must rebuilt its local
    // state from scratch, which will take time and will require reading all the state-relevant
    // data from the Kafka cluster over the network.
    // Thus in a production scenario you typically do not want to clean up always as we do
    // here but rather only when it is truly needed, i.e., only under certain conditions
    // (e.g., the presence of a command line flag for your app).
    // See `ApplicationResetExample.java` for a production-like example.
    streams.cleanUp
    streams.start

    doneSignal.await
   ()
  }


  private def shutDown(streams: KafkaStreams, restService: RatingRestService): Unit = {
    doneSignal.countDown
    streams.close()
    restService.stop
  }
}

Running it all

That is a lot of code, but you probably want to know how to run it all.

Well you will need to do a few things for this one.

How to install Kafka/Zookeeper and get them running on Windows

This section will talk you through how to get Kafka and get it working on Windows

Step 1 : Download Kafka

Grab Confluence Platform  X.X.X Open Source : https://www.confluent.io/download/

Step 2 : Update Dodgy BAT Files

The official Kafka windows BAT files don’t seem to work in the Confluence Platform X.X.X. Open Source download. So replace the official [YOUR INSTALL PATH]\confluent-x.x.x\bin\windows BAT files with the ones found here : https://github.com/renukaradhya/confluentplatform/tree/master/bin/windows

Step 3 : Make Some Minor Changes To Log Locations etc etc

Kafka/Zookeeper as installed are setup for Linux, as such these paths won’t work on Windows. So we need to adjust that a bit. So lets do that now

  • Modify the [YOUR INSTALL PATH]\confluent-x.x.x\etc\kafka\zookeeper.properties file to change the dataDir to something like dataDir=c:/temp/zookeeper
  • Modify the [YOUR INSTALL PATH]\confluent-x.x.x.\etc\kafka\server.properties file to uncomment the line delete.topic.enable=true

Step 4 : Running Zookeeper + Kafka + Creating Topics

Now that we have installed everything, it’s just a matter of running stuff up. Sadly before we can run Kafka we need to run Zookeeper, and before Kafka can send messages we need to ensure that the Kafka topics are created. Topics must exist before messages

Mmm that sounds like a fair bit of work. Well it is, so I decided to script this into a little PowerShell script, which you can adjust to your needs

$global:kafkaWindowsBatFolder = "C:\Apache\confluent-5.2.1-2.12\bin\windows\"
$global:kafkaAndZooLoggingFolder = "C:\temp\"
$global:kafkaAndZooTmpFolder = "C:\tmp\"


$global:kafkaTopics = 
	"rating-submit-topic"
	
$global:ProcessesToKill = @()



function RunPipeLine() 
{
	WriteHeader "STOPPING PREVIOUS SERVICES"
	StopZookeeper
	StopKafka
	
	$path = $kafkaAndZooLoggingFolder + "kafka-logs"
	Remove-Item -Recurse -Force $path
	
	$path = $kafkaAndZooLoggingFolder + "zookeeper"
	Remove-Item -Recurse -Force $path
	
    $path = $kafkaAndZooLoggingFolder
	Remove-Item -Recurse -Force $path


	Start-Sleep -s 20
	
	WriteHeader "STARTING NEW SERVICE INSTANCES"
	StartZookeeper
	
	Start-Sleep -s 20
	StartKafka
	
	Start-Sleep -s 20

	CreateKafkaTopics
    
	Start-Sleep -s 20
	
	
	WaitForKeyPress

	WriteHeader "KILLING PROCESSES CREATED BY SCRIPT"
	KillProcesses
}

function WriteHeader($text) 
{
	Write-Host "========================================`r`n"
	Write-Host "$text`r`n"
	Write-Host "========================================`r`n"
}


function StopZookeeper() {
    # C:\Apache\confluent-5.2.1-2.12\bin\windows\zookeeper-server-stop.bat
	$zookeeperCommandLine = $global:kafkaWindowsBatFolder + "zookeeper-server-stop.bat"
	Write-Host "> Zookeeper Command Line : $zookeeperCommandLine`r`n"
    $global:ProcessesToKill += start-process $zookeeperCommandLine -WindowStyle Normal -PassThru
}

function StopKafka() {
	# C:\Apache\confluent-5.2.1-2.12\bin\windows\kafka-server-stop.bat
	$kafkaServerCommandLine = $global:kafkaWindowsBatFolder + "kafka-server-stop.bat" 
	Write-Host "> Kafka Server Command Line : $kafkaServerCommandLine`r`n"
    $global:ProcessesToKill += start-process $kafkaServerCommandLine  -WindowStyle Normal -PassThru
}

function StartZookeeper() {
    # C:\Apache\confluent-5.2.1-2.12\bin\windows\zookeeper-server-start.bat C:\Apache\confluent-5.2.1-2.12\bin\windows\..\..\etc\kafka\zookeeper.properties
	$zookeeperCommandLine = $global:kafkaWindowsBatFolder + "zookeeper-server-start.bat"
	$arguments = $global:kafkaWindowsBatFolder + "..\..\etc\kafka\zookeeper.properties"
	Write-Host "> Zookeeper Command Line : $zookeeperCommandLine args: $arguments `r`n"
    $global:ProcessesToKill += start-process $zookeeperCommandLine $arguments -WindowStyle Normal -PassThru
}

function StartKafka() {
    # C:\Apache\confluent-5.2.1-2.12\bin\windows\kafka-server-start.bat C:\Apache\confluent-5.2.1-2.12\bin\windows\..\..\etc\kafka\server.properties
	$kafkaServerCommandLine = $global:kafkaWindowsBatFolder + "kafka-server-start.bat" 
	$arguments = $global:kafkaWindowsBatFolder + "..\..\etc\kafka\server.properties"
	Write-Host "> Kafka Server Command Line : $kafkaServerCommandLine args: $arguments `r`n"
    $global:ProcessesToKill += start-process $kafkaServerCommandLine $arguments -WindowStyle Normal -PassThru
}

function CreateKafkaTopics() 
{
   # C:\Apache\confluent-5.2.1-2.12\bin\windows\kafka-topics.bat --zookeeper localhost:2181 --delete --topic rating-submit-topic
	  
   # C:\Apache\confluent-5.2.1-2.12\bin\windows\kafka-topics.bat --zookeeper localhost:2181 --create  --replication-factor 1 --partitions 1 --topic rating-submit-topic
	Foreach ($topic in $global:kafkaTopics )
	{
		$kafkaCommandLine = $global:kafkaWindowsBatFolder + "kafka-topics.bat"
		$arguments = "--zookeeper localhost:2181 --create  --replication-factor 1 --partitions 1 --topic $topic"
		Write-Host "> Create Kafka Topic Command Line : $kafkaCommandLine args: $arguments `r`n"
		$global:ProcessesToKill += start-process $kafkaCommandLine $arguments -WindowStyle Normal -PassThru
	}
}

function WaitForKeyPress
{
	Write-Host -NoNewLine "Press any key to continue....`r`n"
	[Console]::ReadKey()
}


function KillProcesses() 
{
	Foreach ($processToKill in $global:ProcessesToKill )
	{
		$name = $processToKill | Get-ChildItem -Name
		Write-Host "Killing Process : $name `r`n" 
		$processToKill | Stop-Process -Force
	}
}


# Kick of the entire pipeline
RunPipeLine

 

So once you have done all of that. You should be able to run the RatingsProducerApp from the link I post at beginning to my GitHub repo. Then after that it is simple a question of running RatingStreamProcessingTopologyApp

 

If all has gone ok you should be able to hit up postman (excellent REST tool), and use these example endpoints

image

Wooohoo we got some data interactively

 

image

 

Conclusion

And that is all I wanted to say this time. Next time we will be looking at the final post in this series, time windowed operations

Kafka

KafkaStreams : Using Processor API in DSL

Last time we look at how we can supply our own Serdes (serializer / deserializer) to the DSL. This time we will look at how we can make use of the lower level “Processor API” in the DSL, which is what we have been using until now.

 

Where is the code?

The code for this post is all contained here

And the tests are all contained here

 

DSL vs Processor API

So far we have been using the DSL, but it would be good to have a recap, of not only the Streams DSL but also the “Processor API”

 

Streams DSL

The Kafka Streams DSL (Domain Specific Language) is built on top of the Streams Processor API. It is the recommended for most users, especially beginners. Most data processing operations can be expressed in just a few lines of DSL code.

In comparison to the Processor API, only the DSL supports:

  • Built-in abstractions for streams and tables in the form of KStream, KTable, and GlobalKTable. Having first-class support for streams and tables is crucial because, in practice, most use cases require not just either streams or databases/tables, but a combination of both. For example, if your use case is to create a customer 360-degree view that is updated in real-time, what your application will be doing is transforming many input streams of customer-related events into an output table that contains a continuously updated 360-degree view of your customers.
  • Declarative, functional programming style with stateless transformations (e.g. map and filter) as well as stateful transformations such as aggregations (e.g. count and reduce), joins (e.g. leftJoin), and windowing (e.g. session windows).

With the DSL, you can define processor topologies (i.e., the logical processing plan) in your application. The steps to accomplish this are:

  1. Specify one or more input streams that are read from Kafka topics.
  2. Compose transformations on these streams.
  3. Write the resulting output streams back to Kafka topics, or expose the processing results of your application directly to other applications through interactive queries (e.g., via a REST API).
    After the application is run, the defined processor topologies are continuously executed (i.e., the processing plan is put into action). A step-by-step guide for writing a stream processing application using the DSL is provided below.

Once you have built your Kafka Streams application using the DSL you can view the underlying Topology by first executing StreamsBuilder#build() which returns the Topology object. Then to view the Topology you call Topology#desribe().

 

Processor API

The Processor API allows developers to define and connect custom processors and to interact with state stores. With the Processor API, you can define arbitrary stream processors that process one received record at a time, and connect these processors with their associated state stores to compose the processor topology that represents a customized processing logic

The Processor API can be used to implement both stateless as well as stateful operations, where the latter is achieved through the use of state stores.

 

Here is a Java 8 example of what a Processor API app may look like

Topology builder = new Topology();

// add the source processor node that takes Kafka topic "source-topic" as input
builder.addSource("Source", "source-topic")

    // add the WordCountProcessor node which takes the source processor as its upstream processor
    .addProcessor("Process", () -> new WordCountProcessor(), "Source")

    // add the count store associated with the WordCountProcessor processor
    .addStateStore(countStoreSupplier, "Process")

    // add the sink processor node that takes Kafka topic "sink-topic" as output
    // and the WordCountProcessor node as its upstream processor
    .addSink("Sink", "sink-topic", "Process");

 

So that is a rough outline from Confluent themselves about their 2 APIs. So just what are we going to talk about for this post?

 

Using the Processor API in the DSL

We are going to talk about how you can make use of the Processor API in the DSL. Now you may be asking yourself why would I want to do that, if the DSL can do things in a few lines of code, and the Processor API is very low level. Meh WHY?

Here are a few reasons

 

  • Customization: You need to implement special, customized logic that is not or not yet available in the DSL.
  • Combining ease-of-use with full flexibility where it’s needed: Even though you generally prefer to use the expressiveness of the DSL, there are certain steps in your processing that require more flexibility and tinkering than the DSL provides. For example, only the Processor API provides access to a record’s metadata such as its topic, partition, and offset information. However, you don’t want to switch completely to the Processor API just because of that.
  • Migrating from other tools: You are migrating from other stream processing technologies that provide an imperative API, and migrating some of your legacy code to the Processor API was faster and/or easier than to migrate completely to the DSL right away.

Ok so now that we know why we might do it, what things can we use from the ProcessorAPI with the DSL?

 

We can use these things

  • Processor
  • Transformer
  • ValuesTransformer

 

We will look at Processor and ValuesTransformer for this post

 

Adding A Processor to the DSL

Ok so as before we will use the DSL to create some simple functionality, where will do the following

  • GroupByKey
  • Count
  • Pipe to Processor API Processor class

It should be noted that adding a Processor to the DSL is a terminal step.

So here is the DSL part of this scenario, where it can be noted that we use the .process to add the custom Processor to the pipeline

package processorapi.interop

import java.io.PrintWriter
import java.time.Duration
import java.util
import java.util.Properties

import common.PropsHelper
import entities.Contributor
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.scala.{ByteArrayKeyValueStore, StreamsBuilder, kstream}
import org.apache.kafka.streams.{KafkaStreams, Topology}
import serialization.JSONSerde


class ProcessorApiProcessSupplierTopology(val pw: PrintWriter) extends App {

  val props: Properties = PropsHelper.createBasicStreamProperties(
    "processor-api-process-supplier-application", "localhost:9092")

  run()

  private def run(): Unit = {
    val topology = createTopolgy()
    val streams: KafkaStreams = new KafkaStreams(topology, props)
    streams.start()
    sys.ShutdownHookThread {
      streams.close(Duration.ofSeconds(10))
    }
  }

  def stop() : Unit = {
    pw.close()
  }

  def createTopolgy(): Topology = {

    implicit val stringSerde = Serdes.String
    implicit val contributorSerde = new JSONSerde[Contributor]
    implicit val consumed = kstream.Consumed.`with`(stringSerde, contributorSerde)
    implicit val grouped = Grouped.`with`(stringSerde, contributorSerde)

    import org.apache.kafka.streams.state.Stores
    val contributorStoreName = "processContributorStore"

    val logConfig = new util.HashMap[String, String]
    logConfig.put("retention.ms", "172800000")
    logConfig.put("retention.bytes", "10000000000")
    logConfig.put("cleanup.policy", "compact,delete")
    val contributorStoreSupplier = Stores.inMemoryKeyValueStore(contributorStoreName)
    val contributorStoreBuilder = Stores.keyValueStoreBuilder(contributorStoreSupplier,
        Serdes.String, Serdes.Long())
      .withLoggingEnabled(logConfig)
      .withCachingEnabled()

    implicit val materializer =
      Materialized.as(contributorStoreSupplier)(Serdes.String, Serdes.Long())
      .asInstanceOf[Materialized[String, Long,ByteArrayKeyValueStore ]]

    val builder: StreamsBuilder = new StreamsBuilder
    val contribs: KStream[String, Contributor] =
          builder.stream[String, Contributor]("ProcessorApiProcessorSupplierInputTopic")

    contribs
      .groupByKey
      .count()(materializer)
      .toStream
      .process(() => new ContributorPrintingSupplier(pw).get(), contributorStoreName)



    builder.build()
  }
}

Nothing to scary there. So lets continue to examine the inner workings of the ContributorPrintingSupplier class, which looks like this

package processorapi.interop

import java.io.PrintWriter

import entities.Contributor
import org.apache.kafka.streams.processor.{Processor, ProcessorSupplier}

class ContributorPrintingSupplier(val pw: PrintWriter) extends ProcessorSupplier[String, Long] {
  override def get(): Processor[String, Long] = new Processor[String,Long] {

    import org.apache.kafka.streams.processor.ProcessorContext
    import org.apache.kafka.streams.state.KeyValueStore

    private var context:ProcessorContext  = null
    private var contributorStore:KeyValueStore[String, Long]  = null

    override def init(context: ProcessorContext): Unit = {
      import org.apache.kafka.streams.state.KeyValueStore
      this.context = context
      this.contributorStore = context.getStateStore("processContributorStore")
        .asInstanceOf[KeyValueStore[String, Long]]
    }

    override def process(key: String, value: Long): Unit = {
      pw.write(s"key ${key} has been seen ${value} times\r\n")
      pw.flush()
    }

    override def close(): Unit = {
      if(contributorStore != null) {
        contributorStore.close()
      }
    }
  }
}

There are several take away points here, namely :

  • We create anonymous Processor, and override the following methods
    • Init
    • Process
    • Close
  • That we are able to gain access to state stores via the supplied ProcessorContext

 

As I say this is a terminal operation, and such when using the .Process in the DSL you will not see any following operations.

Adding A ValueTransformer to the DSL

  • Applies a ValueTransformer to each record, while retaining the key of the original record. transformValues() allows you to leverage the Processor API from the DSL. (details)
  • Each input record is transformed into exactly one output record (zero output records or multiple output records are not possible). The ValueTransformer may return null as the new value for a record.
  • transformValues is preferable to transform because it will not cause data re-partitioning. It is also possible to get read-only access to the input record key if you use ValueTransformerWithKey (provided via ValueTransformerWithKeySupplier) instead.

Ok so as before we will use the DSL to create a simple KStream and then transform the values using the Processor API. This example is a little contrived as the transformed value is the same as the original value, but you should get the idea

package processorapi.interop

import java.time.Duration
import java.util
import java.util.Properties

import common.PropsHelper
import entities.Contributor
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.scala.{StreamsBuilder, kstream}
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.{KafkaStreams, Topology}
import serialization.JSONSerde


class ProcessorApiTransformValuesTopology extends App {

  val props: Properties = PropsHelper.createBasicStreamProperties(
    "processor-api-transform-values-application", "localhost:9092")

  run()

  private def run(): Unit = {
    val topology = createTopolgy()
    val streams: KafkaStreams = new KafkaStreams(topology, props)
    streams.start()
    sys.ShutdownHookThread {
      streams.close(Duration.ofSeconds(10))
    }
  }

  def createTopolgy(): Topology = {

    implicit val stringSerde = Serdes.String
    implicit val contributorSerde = new JSONSerde[Contributor]
    implicit val consumed = kstream.Consumed.`with`(stringSerde, contributorSerde)
    implicit val materializer = Materialized.`with`(stringSerde, contributorSerde)

    import org.apache.kafka.streams.state.Stores
    val contributorStoreName = "contributorStore"

    val logConfig = new util.HashMap[String, String]
    logConfig.put("retention.ms", "172800000")
    logConfig.put("retention.bytes", "10000000000")
    logConfig.put("cleanup.policy", "compact,delete")
    val contributorStoreSupplier = Stores.inMemoryKeyValueStore(contributorStoreName)
    val contributorStoreBuilder = Stores.keyValueStoreBuilder(contributorStoreSupplier, Serdes.String, contributorSerde)
      .withLoggingEnabled(logConfig)
      .withCachingEnabled()


    val builder: StreamsBuilder = new StreamsBuilder
    val contribs: KStream[String, Contributor] =
          builder.stream[String, Contributor]("ProcessorApiTransformValuesInputTopic")

    builder.addStateStore(contributorStoreBuilder)

    contribs
      .transformValues(new ContributorTranformSupplier, contributorStoreName)
      .to("ProcessorApiTransformValuesOutputTopic")(Produced.`with`(stringSerde, contributorSerde))

    builder.build()
  }
}

Nice and simple we just use the transformValues() method to apply our custom ValueTransformer

package processorapi.interop

import java.time.Duration

import entities.Contributor
import org.apache.kafka.streams.kstream.{ValueTransformer, ValueTransformerSupplier}
import org.apache.kafka.streams.processor.{PunctuationType, Punctuator}

class ContributorTranformSupplier extends ValueTransformerSupplier[Contributor, Contributor] {
  override def get(): ValueTransformer[Contributor, Contributor] = new ValueTransformer[Contributor, Contributor] {

    import org.apache.kafka.streams.processor.ProcessorContext
    import org.apache.kafka.streams.state.KeyValueStore

    private var context:ProcessorContext  = null
    private var contributorStore:KeyValueStore[String, Contributor]  = null

    override def init(context: ProcessorContext): Unit = {
      import org.apache.kafka.streams.state.KeyValueStore
      this.context = context
      this.contributorStore = context.getStateStore("contributorStore")
        .asInstanceOf[KeyValueStore[String, Contributor]]

      //to punctuate you would do something like this
      //      context.schedule(Duration.ofSeconds(1),
      //            PunctuationType.WALL_CLOCK_TIME, new Punctuator {
      //        override def punctuate(timestamp: Long): Unit = {
      //
      //          val it = contributorStore.all
      //          val currentTime = System.currentTimeMillis
      //          while (it.hasNext) {
      //            val contributorValue = it.next.value
      //            if (contributorValue.updatedWithinLastMillis(currentTime, 11000))
      //              context.forward(contributorValue.email,contributorValue)
      //          }
      //        }
      //      })
    }

    override def transform(value: Contributor): Contributor = {

      var finalContributor:Contributor = null
      try {
        val contributor = contributorStore.get(value.email)
        if(contributor == null) {
          contributorStore.putIfAbsent(value.email, value)
          finalContributor = value
        }
        else {
          val newContributor = contributor.copy(
            ranking = contributor.ranking + 1,
            lastUpdatedTime = System.currentTimeMillis
          )
          contributorStore.put(value.email,newContributor)
          finalContributor = newContributor
        }

        finalContributor
      }
      catch {
        case e:NullPointerException => {
          contributorStore.putIfAbsent(value.email, value)
          value
        }
        case e:Exception => {
          value
        }
      }
    }

    override def close(): Unit = {
      if(contributorStore != null) {
        contributorStore.close()
      }
    }
  }
}

As before we have access to state store via the ProcessorContext. I also show how you could possibly punctuate (this essentially forwards on / commits records on some scheduled time) in the code above

 

And I think that is all I have to say this time. You can of course check out the tests to play with it more.

 

Next time we will look at the interactive queries API.

Kafka

KafkaStreams : Custom Serdes

Last time we look at Joining. This time we will continue to look at the streams DSL, and how we can supply our own Serdes (serializer / deserializer).

 

Where is the code?

The code for this post is all contained here

And the tests are all contained here

 

Serdes

Just to remind ourselves how Kafka Streams makes use of Serdes, from https://kafka.apache.org/10/documentation/streams/developer-guide/datatypes.html up on date 13/03/19

Every Kafka Streams application must provide SerDes (Serializer/Deserializer) for the data types of record keys and record values (e.g. java.lang.String) to materialize the data when necessary. Operations that require such SerDes information include: stream(), table(), to(), through(), groupByKey(), groupBy().

You can provide SerDes by using either of these methods:

By setting default SerDes via a StreamsConfig instance.
By specifying explicit SerDes when calling the appropriate API methods, thus overriding the defaults.

 

InBuilt Serdes

So Kafka comes with a whole bunch of pre canned Serdes which you can access using the following namespace

org.apache.kafka.common.serialization.Serdes

But what happens when  you want to send more than just primitives/byte[]

Imagine we want to send these types

  • Rating
  • List[Rating]

Where Rating may look like this

package entities

case class Rating(fromEmail: String, toEmail: String, score: Float)

Then the inbuilt Serdes may not cut the mustard, so we need to implement our own

Custom Serdes

So the first step is to implement the custom Serde. For this post we will assume we are using JSON and will use the Jackson library to deal with the JSON, so our Serde looks like this.

import java.lang.reflect.{ParameterizedType, Type}
import java.util
import com.fasterxml.jackson.annotation.JsonInclude
import com.fasterxml.jackson.core.JsonParseException
import com.fasterxml.jackson.core.`type`.TypeReference
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.exc.{UnrecognizedPropertyException => UPE}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.kafka.common.serialization.{Deserializer, Serde, Serializer}


package serialization {

  object Json {

    type ParseException = JsonParseException
    type UnrecognizedPropertyException = UPE

    private val mapper = new ObjectMapper()
    mapper.registerModule(DefaultScalaModule)
    mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL)

    private def typeReference[T: Manifest] = new TypeReference[T] {
      override def getType = typeFromManifest(manifest[T])
    }

    private def typeFromManifest(m: Manifest[_]): Type = {
      if (m.typeArguments.isEmpty) {
        m.runtimeClass
      }
      else new ParameterizedType {
        def getRawType = m.runtimeClass

        def getActualTypeArguments = m.typeArguments.map(typeFromManifest).toArray

        def getOwnerType = null
      }
    }

    object ByteArray {
      def encode(value: Any): Array[Byte] = mapper.writeValueAsBytes(value)

      def decode[T: Manifest](value: Array[Byte]): T =
        mapper.readValue(value, typeReference[T])
    }

  }

  /**
    * JSON serializer for JSON serde
    *
    * @tparam T
    */
  class JSONSerializer[T] extends Serializer[T] {
    override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = ()

    override def serialize(topic: String, data: T): Array[Byte] =
      Json.ByteArray.encode(data)

    override def close(): Unit = ()
  }

  /**
    * JSON deserializer for JSON serde
    *
    * @tparam T
    */
  class JSONDeserializer[T >: Null <: Any : Manifest] extends Deserializer[T] {
    override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = ()

    override def close(): Unit = ()

    override def deserialize(topic: String, data: Array[Byte]): T = {
      if (data == null) {
        return null
      } else {
        Json.ByteArray.decode[T](data)
      }
    }
  }

  /**
    * JSON serde for local state serialization
    *
    * @tparam T
    */
  class JSONSerde[T >: Null <: Any : Manifest] extends Serde[T] {
    override def deserializer(): Deserializer[T] = new JSONDeserializer[T]

    override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = ()

    override def close(): Unit = ()

    override def serializer(): Serializer[T] = new JSONSerializer[T]
  }

}

 

Notice how this Serde is generic and can work with any T, but in order to do this with we need to deal with the insanity of the JVM and type erasure (OMG they got that wrong, .NET absolutely nailed generics). So in this example we use the Manifest to store Type information such that the Type information is not “erased” and we know how to deserialize the JSON string back into a T

 

Ok so now that we have this, lets see an example of how it is used

package serialization

import java.time.Duration
import java.util.Properties

import common.PropsHelper
import entities.Rating
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala._
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.{KafkaStreams, Topology}


class CustomSerdesTopology extends App {

  import Serdes._

  val props: Properties = PropsHelper.createBasicStreamProperties(
    "custom-serdes-application", "localhost:9092")

  run()

  private def run(): Unit = {
    val topology = createTopolgy()
    val streams: KafkaStreams = new KafkaStreams(topology, props)
    streams.start()
    sys.ShutdownHookThread {
      streams.close(Duration.ofSeconds(10))
    }
  }

  def createTopolgy(): Topology = {

    implicit val stringSerde = Serdes.String
    implicit val ratingSerde = new JSONSerde[Rating]
    implicit val listRatingSerde = new JSONSerde[List[Rating]]
    implicit val consumed = kstream.Consumed.`with`(stringSerde, ratingSerde)
    implicit val materializer = Materialized.`with`(stringSerde, listRatingSerde)
    implicit val grouped = Grouped.`with`(stringSerde, ratingSerde)

    val builder: StreamsBuilder = new StreamsBuilder
    val ratings: KStream[String, Rating] =
          builder.stream[String, Rating]("CustomSerdesInputTopic")

    //When aggregating a grouped stream, you must provide an initializer (e.g., aggValue = 0)
    //and an “adder” aggregator (e.g., aggValue + curValue). When aggregating a grouped table,
    //you must provide a “subtractor” aggregator (think: aggValue - oldValue).
    val groupedBy = ratings.groupByKey
    val aggregatedTable =
      groupedBy
        .aggregate[List[Rating]](List[Rating]())((aggKey, newValue, aggValue) => newValue :: aggValue)

    var finalStream = aggregatedTable.toStream
    finalStream.peek((key, values) => {

      val theKey = key
      val theValues = values

    })


    finalStream.to("CustomSerdesOutputTopic")(Produced.`with`(stringSerde, listRatingSerde))

    builder.build()
  }
}

Pretty easy stuff, only call out points here are that we use the implicits at then start of the file to specify all the serdes that the code will need to look up implicitly

Just for completeness this is the test suite that goes with this example

package serialization

import java.io._
import java.lang
import java.util.Properties

import common.PropsHelper
import entities.Rating
import org.apache.kafka.common.serialization.{LongDeserializer, _}
import org.apache.kafka.streams.TopologyTestDriver
import org.apache.kafka.streams.test.{ConsumerRecordFactory, OutputVerifier}
import org.scalatest._
import org.apache.kafka.common.serialization.Serdes


class CustomSerdesTopologyTests
  extends FunSuite
  with BeforeAndAfter
  with Matchers {

  val props = PropsHelper.createBasicStreamProperties("custom-serdes-application", "localhost:9092")
  val stringDeserializer: StringDeserializer = new StringDeserializer
  val ratingLIstDeserializer: JSONDeserializer[List[Rating]] = new JSONDeserializer[List[Rating]]

  before {
  }

  after {
  }


  test("Should produce correct output") {

    //arrange
    val recordFactory: ConsumerRecordFactory[java.lang.String, Array[Byte]] =
        new ConsumerRecordFactory[java.lang.String, Array[Byte]](new StringSerializer, Serdes.ByteArray().serializer())
    val customSerdesTopology = new CustomSerdesTopology()


    val jsonSerde = new JSONSerde[Rating]

    val rating = Rating("jarden@here.com","sacha@here.com", 1.5f)
    val ratingBytes = jsonSerde.serializer().serialize("", rating)


    //NOTE : You may find you need to play with these Config values in order
    //to get the stateful operation to work correctly/how you want it to
    //    props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10000.asInstanceOf[Object])
    //    props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10485760.asInstanceOf[Object])
    //    props.put(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG, 50000.asInstanceOf[Object])
    //By playing around with these values you should be able to find the values that work for you
    //WARNING : Chaning these settings may have impact on the tests, as less frequent commits/state store
    //cache flushing may occur
    val testDriver = new TopologyTestDriver(customSerdesTopology.createTopolgy(), props)

    //Use the custom JSONSerde[Rating]
    testDriver.pipeInput(recordFactory.create("CustomSerdesInputTopic", rating.toEmail, ratingBytes, 9995L))

    val result = testDriver.readOutput("CustomSerdesOutputTopic", stringDeserializer, ratingLIstDeserializer)

    OutputVerifier.compareKeyValue(result, "sacha@here.com",List(Rating("jarden@here.com","sacha@here.com", 1.5f)))
    val result1 = testDriver.readOutput("CustomSerdesOutputTopic", stringDeserializer, ratingLIstDeserializer)
    assert(result1 == null)

    cleanup(props, testDriver)
  }


  def cleanup(props:Properties, testDriver: TopologyTestDriver) = {

    try {
      //there is a bug on windows which causes this line to throw exception
      testDriver.close
    } catch {
      case e: Exception => {
        delete(new File("C:\\data\\kafka-streams"))
      }
    }
  }

  def delete(file: File) {
    if (file.isDirectory)
      Option(file.listFiles).map(_.toList).getOrElse(Nil).foreach(delete(_))
    file.delete
  }
}

 

That’s it for now

So that’s all I wanted to say this time, so until the next time, hope this has shown you just how easy it is to hand roll your own object that you can serialize with ease

Uncategorized

OpenFin demo

It has been quite a while since I wrote a full article for codeproject.com, but I just needed a break from all the server side stuff for a while. So I decided to take something for a spin which has been on my TODO radar for ages.That thing is OpenFin

 

Where they market themselves is as the OS for finance, where the apps are built using HTML5, but financial apps still benefit from a desktop feel where Windows are used, and things can be shown in  a Notification tray, and can be docked etc etc

 

This is where OpenFin see themselves

 

This is what I have built and demonstrate in the fuller article at codeproject.com, If you want to know more the full article is here : https://www.codeproject.com/Articles/1279126/OpenFin-A-small-example-app-using-it

 

image

 

This is a simple React/Redux/OpenFin application