Kafka

KafkaStreams : Windowing

SO last time we look at interactive queries. This time (the last one in this series) we will look at Windowing operations. This is a fairly dry subject, and I don’t have too much to add to this one over the official docs, so this is a nice short one to end the series.

Where is the code?

The code for this post is all contained here

What is Windowing anyway?

Windowing lets you control how to group records that have the same key for stateful operations such as aggregations or joins into so-called windows. Windows are tracked per record key. For example, in join operations, a windowing state store is used to store all the records received so far within the defined window boundary. In aggregating operations, a windowing state store is used to store the latest aggregation results per window. Old records in the state store are purged after the specified window retention period. Kafka Streams guarantees to keep a window for at least this specified time; the default value is one day and can be changed via Materialized#withRetention().

This is what Kafka Streams supports for windowing

image

 

 

Tumbling Windows

Tumbling time windows are a special case of hopping time windows and, like the latter, are windows based on time intervals. They model fixed-size, non-overlapping, gap-less windows. A tumbling window is defined by a single property: the window’s size. A tumbling window is a hopping window whose window size is equal to its advance interval. Since tumbling windows never overlap, a data record will belong to one and only one window.

../../_images/streams-time-windows-tumbling.png

Here is an example of this using the simple word count example we have used before

package windowing

import java.time.Duration
import java.util
import java.util.Properties
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.kstream.TimeWindows
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala._
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.{KafkaStreams, Topology}
import utils.Settings


class TumblingWordCountTopology extends App {

  import Serdes._

  val props: Properties = Settings.createBasicStreamProperties(
    "tumbling-window-wordcount-application","localhost:9092")

  run()

  private def run(): Unit = {
    val topology = wordCountToplogy()
    val streams: KafkaStreams = new KafkaStreams(topology, props)
    streams.start()
    sys.ShutdownHookThread {
      streams.close(Duration.ofSeconds(10))
    }
  }

   def wordCountToplogy() : Topology = {

    import org.apache.kafka.streams.state.Stores
    val wordCountStoreName = "wordCountStore"
    val wordCountStoreSupplied = Stores.inMemoryKeyValueStore(wordCountStoreName)

    val builder = new StreamsBuilder()
    val textLines: KStream[String, String] = builder.stream[String, String]("TextLinesTopic")
    val wordCounts = textLines.flatMapValues(x => x.toLowerCase.split("\\W+"))
                    .groupBy((key, word) => word)
      .windowedBy(TimeWindows.of(Duration.ofSeconds(5)))
      .count()
    wordCounts.toStream.to("WordsWithCountsTopic")
    builder.build()
  }
}

 

Hopping Time Window

Hopping time windows are windows based on time intervals. They model fixed-sized, (possibly) overlapping windows. A hopping window is defined by two properties: the window’s size and its advance interval (aka “hop”). The advance interval specifies by how much a window moves forward relative to the previous one. For example, you can configure a hopping window with a size 5 minutes and an advance interval of 1 minute. Since hopping windows can overlap — and in general they do — a data record may belong to more than one such window.

image

and here is an example of this using the word count example

package windowing

import java.time.Duration
import java.util
import java.util.Properties
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.kstream.TimeWindows
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala._
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.{KafkaStreams, Topology}
import utils.Settings


class HoppingTimeWordCountTopology extends App {

  val props: Properties = Settings.createBasicStreamProperties(
    "hopping-time-window-wordcount-application","localhost:9092")

  run()

  private def run(): Unit = {
    val topology = wordCountToplogy()
    val streams: KafkaStreams = new KafkaStreams(topology, props)
    streams.start()
    sys.ShutdownHookThread {
      streams.close(Duration.ofSeconds(10))
    }
  }

   def wordCountToplogy() : Topology = {

    import org.apache.kafka.streams.state.Stores
    val wordCountStoreName = "wordCountStore"
    val wordCountStoreSupplied = Stores.inMemoryKeyValueStore(wordCountStoreName)

    val builder = new StreamsBuilder()
    val textLines: KStream[String, String] = builder.stream[String, String]("TextLinesTopic")
    val wordCounts = textLines.flatMapValues(x => x.toLowerCase.split("\\W+"))
                    .groupBy((key, word) => word)
      .windowedBy(TimeWindows.of(Duration.ofSeconds(5).plus(Duration.ofMinutes(1))))
      .count()
    wordCounts.toStream.to("WordsWithCountsTopic")
    builder.build()
  }
}

That’s It

And that brings us to the end of this series, I will be diving back into .NET land next to write an implementation of the SWIM algorithm. After that I think I will get myself a AWS Solution Architect exam under my belt

 

I hope you have enjoyed the series. I know this one was a bit naff, but I kind of ran out of steam

Advertisements
Kafka

KafkaStreams : Interactive Queries

So last time we looked at how to make use of the Processor API in the DSL. This time we are going to look at interactive queries.

 

Where is the code?

The code for this post is all contained here

And the tests are all contained here

 

Walking through a Kafka Streams processing node, and the duality of streams

Before we get started I just wanted to include a several excerpts taken from the official Kafka docs : http://docs.confluent.io/current/streams/concepts.html#duality-of-streams-and-tables which talks about KStream and KTable objects (which are the stream and table objects inside Kafka streams)

 

When implementing stream processing use cases in practice, you typically need both streams and also databases. An example use case that is very common in practice is an e-commerce application that enriches an incoming stream of customer transactions with the latest customer information from a database table. In other words, streams are everywhere, but databases are everywhere, too.

Any stream processing technology must therefore provide first-class support for streams and tables. Kafka’s Streams API provides such functionality through its core abstractions for streams and tables, which we will talk about in a minute. Now, an interesting observation is that there is actually a close relationship between streams and tables, the so-called stream-table duality. And Kafka exploits this duality in many ways: for example, to make your applications elastic, to support fault-tolerant stateful processing, or to run interactive queries against your application’s latest processing results. And, beyond its internal usage, the Kafka Streams API also allows developers to exploit this duality in their own applications.

A simple form of a table is a collection of key-value pairs, also called a map or associative array. Such a table may look as follows:

../_images/streams-table-duality-01.jpg

The stream-table duality describes the close relationship between streams and tables.

  • Stream as Table: A stream can be considered a changelog of a table, where each data record in the stream captures a state change of the table. A stream is thus a table in disguise, and it can be easily turned into a “real” table by replaying the changelog from beginning to end to reconstruct the table. Similarly, aggregating data records in a stream will return a table. For example, we could compute the total number of pageviews by user from an input stream of pageview events, and the result would be a table, with the table key being the user and the value being the corresponding pageview count.
  • Table as Stream: A table can be considered a snapshot, at a point in time, of the latest value for each key in a stream (a stream’s data records are key-value pairs). A table is thus a stream in disguise, and it can be easily turned into a “real” stream by iterating over each key-value entry in the table.

Let’s illustrate this with an example. Imagine a table that tracks the total number of pageviews by user (first column of diagram below). Over time, whenever a new pageview event is processed, the state of the table is updated accordingly. Here, the state changes between different points in time – and different revisions of the table – can be represented as a changelog stream (second column).

image

Because of the stream-table duality, the same stream can be used to reconstruct the original table (third column):

../_images/streams-table-duality-03.jpg

The same mechanism is used, for example, to replicate databases via change data capture (CDC) and, within Kafka Streams, to replicate its so-called state stores across machines for fault tolerance. The stream-table duality is such an important concept for stream processing applications in practice that Kafka Streams models it explicitly via the KStream and KTable abstractions, which we describe in the next sections.

I would STRONLY urge you all to read the section of the official docs above, as it will really help you should you want to get into Kafka Streams.

 

Interactive Queries

Up until now we have been using KStream, and on occasion we have also used KTable to store state into a state store. A KTable is also able to be turned back into a KStream and pushed back out on an “output” topic, which is what most of my examples have done so far.

 

However this is kind of annoying and wasteful, we just had the data in the format we wanted in Kafka in a KTable, and then we need to transform it back into a KStream and send it to a new topic. Surely there is a better way.

 

Luckily there is “Interactive queries”.KafkaStreams supports the notion of queries across state stores. Most typical state stores are key value stores, where the key will be on a certain partition. As such the state store is distributed across the partitions that make up the topic.

Mmm so how do we get the data from all the stores? Well luckily KafkaStreams exposes metadata that allow us to obtain the hostname of the hosts that have a certain state store. From there we can either use a local call or perform a RPC (most typically REST call) call to fetch the remote hosts data.

 

This is illustrated in the following diagram

 

../../_images/streams-interactive-queries-03.png

The full state of your application is typically split across many distributed instances of your application, and across many state stores that are managed locally by these application instances.

That is the basic idea anyway. So we need to create some plumbing to help us either call into a local state store, or query a remote host that owns some keys for a state store.

Is it that simple?

Well no actually. KafkaStreams has the following state state transition diagram that one needs to be aware of. If you try and query a state store for metadata/or actual data before the 2nd running you won’t see anything as Kafka is considered NOT READY.

 

image

 

There is a lot of chat on the internet (its even in the Confluent FAQs) about state store metadata not being available, and it is due to this state machine.

So how do we deal with this?

  • Well one thing to do is implement some kind of retry policy (this post covers this)
  • Another thing to do is stream state listener (this post also covers this)

Ok so are there any other pit falls? No not really you just need to make sure you don’t start using state stores unless your KafkaStreams is in a good state.

So now that we have the basics covered lets see some code

The Tolology

This is the topology that we will be using to construct our state store

package interactive.queries.ratings


import java.util

import entities.Rating
import org.apache.kafka.streams.scala.{Serdes, _}
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.state.KeyValueStore
import serialization.JSONSerde
import utils.StateStores


class RatingStreamProcessingTopology  {

  def createTopolgy(): Topology = {

    implicit val stringSerde = Serdes.String
    implicit val ratingSerde = new JSONSerde[Rating]
    implicit val listRatingSerde = new JSONSerde[List[Rating]]
    implicit val consumed = kstream.Consumed.`with`(stringSerde, ratingSerde)
    implicit val materializer = Materialized.`with`(stringSerde, listRatingSerde)
    implicit val grouped = Grouped.`with`(stringSerde, ratingSerde)

    val builder: StreamsBuilder = new StreamsBuilder
    val ratings: KStream[String, Rating] =
      builder.stream[String, Rating]("rating-submit-topic")


    import org.apache.kafka.streams.state.Stores

    val logConfig = new util.HashMap[String, String]
    logConfig.put("retention.ms", "172800000")
    logConfig.put("retention.bytes", "10000000000")
    logConfig.put("cleanup.policy", "compact,delete")
    val ratingByEmailStoreName = StateStores.RATINGS_BY_EMAIL_STORE
    val ratingByEmailStoreSupplied = Stores.inMemoryKeyValueStore(ratingByEmailStoreName)
    val ratingByEmailStoreBuilder = Stores.keyValueStoreBuilder(ratingByEmailStoreSupplied,
      Serdes.String, listRatingSerde)
      .withLoggingEnabled(logConfig)
      .withCachingEnabled()

    val builtStore = ratingByEmailStoreBuilder.build()

    //When aggregating a grouped stream, you must provide an initializer (e.g., aggValue = 0)
    //and an “adder” aggregator (e.g., aggValue + curValue). When aggregating a grouped table,
    //you must provide a “subtractor” aggregator (think: aggValue - oldValue).
    val groupedBy = ratings.groupByKey

    //aggrgate by (user email -> their ratings)
    val ratingTable : KTable[String, List[Rating]] = groupedBy
      .aggregate[List[Rating]](List[Rating]())((aggKey, newValue, aggValue) => {
      newValue :: aggValue
    })(Materialized.as[String, List[Rating]](ratingByEmailStoreSupplied))

    ratingTable.mapValues((k,v) => {
      val theKey = k
      val theValue = v
      v
    })


    builder.build()
  }
}

Entities stored

And this is the entity type that we will be storing in the state store

package entities

case class Rating(fromEmail: String, toEmail: String, score: Float)

Metadata

As previously mentioned we need to query metadata to find out what host the store is that has a particular key. In order to do this I have come up with this class

package interactive.queries

import org.apache.kafka.streams.KafkaStreams
import org.apache.kafka.streams.state.StreamsMetadata
import java.util.stream.Collectors
import entities.HostStoreInfo
import org.apache.kafka.common.serialization.Serializer
import org.apache.kafka.connect.errors.NotFoundException
import scala.collection.JavaConverters._


/**
  * Looks up StreamsMetadata from KafkaStreams
  */
class MetadataService(val streams: KafkaStreams) {


  /**
    * Get the metadata for all of the instances of this Kafka Streams application
    *
    * @return List of { @link HostStoreInfo}
    */
  def streamsMetadata() : List[HostStoreInfo] = {

    // Get metadata for all of the instances of this Kafka Streams application
    val metadata = streams.allMetadata
    return mapInstancesToHostStoreInfo(metadata)
  }


  /**
    * Get the metadata for all instances of this Kafka Streams application that currently
    * has the provided store.
    *
    * @param store The store to locate
    * @return List of { @link HostStoreInfo}
    */
  def streamsMetadataForStore(store: String) : List[HostStoreInfo] = {

    // Get metadata for all of the instances of this Kafka Streams application hosting the store
    val metadata = streams.allMetadataForStore(store)
    return mapInstancesToHostStoreInfo(metadata)
  }


  /**
    * Find the metadata for the instance of this Kafka Streams Application that has the given
    * store and would have the given key if it exists.
    *
    * @param store Store to find
    * @param key   The key to find
    * @return { @link HostStoreInfo}
    */
  def streamsMetadataForStoreAndKey[T](store: String, key: T, serializer: Serializer[T]) : HostStoreInfo = {
    // Get metadata for the instances of this Kafka Streams application hosting the store and
    // potentially the value for key
    val metadata = streams.metadataForKey(store, key, serializer)
    if (metadata == null)
      throw new NotFoundException(
        s"No metadata could be found for store : ${store}, and key type : ${key.getClass.getName}")

    HostStoreInfo(metadata.host, metadata.port, metadata.stateStoreNames.asScala.toList)
  }


  def mapInstancesToHostStoreInfo(metadatas : java.util.Collection[StreamsMetadata]) : List[HostStoreInfo] = {
    metadatas.stream.map[HostStoreInfo](metadata =>
      HostStoreInfo(
        metadata.host(),
        metadata.port,
        metadata.stateStoreNames.asScala.toList))
      .collect(Collectors.toList())
      .asScala.toList
  }
}

Exposing the state store over RPC

As I say the most popular way of doing this is via REST. As such I am using Akka.Http to do this. Here the Rating http server class

package interactive.queries.ratings

import org.apache.kafka.streams.KafkaStreams
import org.apache.kafka.streams.state.HostInfo
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import spray.json.DefaultJsonProtocol._
import entities.AkkaHttpEntitiesJsonFormats._
import entities._
import utils.StateStores
import akka.http.scaladsl.marshalling.ToResponseMarshallable
import org.apache.kafka.common.serialization.Serdes
import scala.concurrent.{Await, ExecutionContext, Future}
import akka.http.scaladsl.unmarshalling.Unmarshal
import interactive.queries.MetadataService
import spray.json._
import scala.util.{Failure, Success}
import org.apache.kafka.streams.state.QueryableStoreTypes
import scala.concurrent.duration._


object RestService {
  val DEFAULT_REST_ENDPOINT_HOSTNAME  = "localhost"
}

class RatingRestService(val streams: KafkaStreams, val hostInfo: HostInfo) {

  val metadataService = new MetadataService(streams)
  var bindingFuture: Future[Http.ServerBinding] = null

  implicit val system = ActorSystem("rating-system")
  implicit val materializer = ActorMaterializer()
  implicit val executionContext = system.dispatcher

  var isStateStoredReady: Boolean = false


  def setReady(isReady : Boolean): Unit = {
    isStateStoredReady = isReady
  }


  def start() : Unit = {
    val emailRegexPattern =  """\w+""".r
    val storeNameRegexPattern =  """\w+""".r

    val route =
      path("ratingByEmail") {
        get {
          parameters('email.as[String]) { (email) =>

            if(!isStateStoredReady) {
              complete(HttpResponse(StatusCodes.InternalServerError, entity = "state stored not queryable, possible due to re-balancing"))
            }

            try {

              val host = metadataService.streamsMetadataForStoreAndKey[String](
                StateStores.RATINGS_BY_EMAIL_STORE,
                email,
                Serdes.String().serializer()
              )

              //store is hosted on another process, REST Call
              if(!thisHost(host)) {
                onComplete(fetchRemoteRatingByEmail(host, email)) {
                  case Success(value) => complete(value)
                  case Failure(ex)    => complete(HttpResponse(StatusCodes.InternalServerError, entity = ex.getMessage))
                }
              }
              else {
                onComplete(fetchLocalRatingByEmail(email)) {
                  case Success(value) => complete(value)
                  case Failure(ex)    => complete(HttpResponse(StatusCodes.InternalServerError, entity = ex.getMessage))
                }
              }
            }
            catch {
              case (ex: Exception) => {
                complete(HttpResponse(StatusCodes.InternalServerError, entity = ex.getMessage))
              }
            }
          }
        }
      } ~
      path("instances") {
        get {
          if(!isStateStoredReady) {
            complete(HttpResponse(StatusCodes.InternalServerError, entity = "state stored not queryable, possible due to re-balancing"))
          }
          complete(ToResponseMarshallable.apply(metadataService.streamsMetadata))
        }
      }~
      path("instances" / storeNameRegexPattern) { storeName =>
        get {
          if(!isStateStoredReady) {
            complete(HttpResponse(StatusCodes.InternalServerError, entity = "state stored not queryable, possible due to re-balancing"))
          }
          complete(ToResponseMarshallable.apply(metadataService.streamsMetadataForStore(storeName)))
        }
      }

    bindingFuture = Http().bindAndHandle(route, hostInfo.host, hostInfo.port)
    println(s"Server online at http://${hostInfo.host}:${hostInfo.port}/\n")

    Runtime.getRuntime.addShutdownHook(new Thread(() => {
      bindingFuture
        .flatMap(_.unbind()) // trigger unbinding from the port
        .onComplete(_ => system.terminate()) // and shutdown when done
    }))
  }


  def fetchRemoteRatingByEmail(host:HostStoreInfo, email: String) : Future[List[Rating]] = {

    val requestPath = s"http://${hostInfo.host}:${hostInfo.port}/ratingByEmail?email=${email}"
    println(s"Client attempting to fetch from online at ${requestPath}")

    val responseFuture: Future[List[Rating]] = {
      Http().singleRequest(HttpRequest(uri = requestPath))
        .flatMap(response => Unmarshal(response.entity).to[List[Rating]])
    }

    responseFuture
  }

  def fetchLocalRatingByEmail(email: String) : Future[List[Rating]] = {

    val ec = ExecutionContext.global

    println(s"client fetchLocalRatingByEmail email=${email}")

    val host = metadataService.streamsMetadataForStoreAndKey[String](
      StateStores.RATINGS_BY_EMAIL_STORE,
      email,
      Serdes.String().serializer()
    )

    val f = StateStores.waitUntilStoreIsQueryable(
      StateStores.RATINGS_BY_EMAIL_STORE,
      QueryableStoreTypes.keyValueStore[String,List[Rating]](),
      streams
    ).map(_.get(email))(ec)

    val mapped = f.map(rating => {
      if (rating == null)
        List[Rating]()
      else
        rating
    })

    mapped
  }

  def stop() : Unit = {
    bindingFuture
      .flatMap(_.unbind()) // trigger unbinding from the port
      .onComplete(_ => system.terminate()) // and shutdown when done
  }

  def thisHost(hostStoreInfo: HostStoreInfo) : Boolean = {
    hostStoreInfo.host.equals(hostInfo.host()) &&
      hostStoreInfo.port == hostInfo.port
  }
}

You can see that this class does the following

  • Defines the routes
  • Uses the metadata service to work out whether it should be a local host query or an RPC call
  • Uses a waitUntilStoreIsQueryable helper method to work out when the state store is queryable. The helper class is shown here
package utils

import org.apache.kafka.streams.KafkaStreams
import org.apache.kafka.streams.errors.InvalidStateStoreException
import org.apache.kafka.streams.state.{QueryableStoreType, QueryableStoreTypes}

import scala.concurrent.{ExecutionContext, Future}

object StateStores {
  val RATINGS_BY_EMAIL_STORE = "ratings-by-email-store"

  def waitUntilStoreIsQueryable[T]
  (
    storeName: String,
    queryableStoreType: QueryableStoreType[T],
    streams: KafkaStreams
  ) (implicit ec: ExecutionContext): Future[T] = {

    Retry.retry(5) {
      Thread.sleep(500)
      streams.store(storeName, queryableStoreType)
    }(ec)
  }


  private def printStoreMetaData[K, V](streams:KafkaStreams, storeName:String) : Unit = {

    val md = streams.allMetadata()
    val mdStore = streams.allMetadataForStore(storeName)

    val maybeStore = StateStores.waitUntilStoreIsQueryableSync(
      storeName,
      QueryableStoreTypes.keyValueStore[K,V](),
      streams)

    maybeStore match {
      case Some(store) => {
        val range = store.all
        val HASNEXT = range.hasNext
        while (range.hasNext) {
          val next = range.next
          System.out.print(s"key: ${next.key} value: ${next.value}")
        }
      }
      case None => {
        System.out.print(s"store not ready")
        throw new Exception("not ready")
      }
    }
  }

  @throws[InterruptedException]
  def waitUntilStoreIsQueryableSync[T](
        storeName: String,
        queryableStoreType: QueryableStoreType[T],
        streams: KafkaStreams): Option[T] = {
    while (true) {
      try {
        return Some(streams.store(storeName, queryableStoreType))
      }
      catch {
        case ignored: InvalidStateStoreException =>
          val state = streams.state
          // store not yet ready for querying
          Thread.sleep(100)
      }
    }
    None
  }
}

Existing Framework

If you looked at the code above, and saw things in it that you thought could be made generic, such as GET uri, fetching data from state store, retries, JSON serialization etc etc. You are correct. I could have done this. But as is quite common for me, I get to the end of something and I do everything that needs to be done, and I then discover better people than me have thought about this, and actually offer a framework for this type of thing. Lightbend the people behind Akka, have one such library for KafkaStreams. It available here : https://github.com/lightbend/kafka-streams-query

What they have done is make all the glue bits in the middle nice a modular and generic so you can get on with writing your application code, and leave the robust querying, retries etc etc to a framework. It is well written and looks fairly easy to use, but if you have not used KafkaStreams interactive queries before it is best to dive in yourself first.

The Streams App

So we almost done actually. All you need to do now is wire all this together, which for me is done by the following code, which starts the http service, creates the topology, and also MOST importantly sets a KafkaStreams state listener.

package interactive.queries.ratings

import java.util.Properties

import entities.Rating
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.errors.BrokerNotFoundException
import org.apache.kafka.streams.state.{HostInfo, QueryableStoreTypes}
import org.apache.kafka.streams.KafkaStreams
import utils.{Retry, Settings, StateStores}

import scala.concurrent.duration._
import scala.concurrent.ExecutionContext
import scala.util.Success
import java.util.concurrent.CountDownLatch

object RatingStreamProcessingTopologyApp extends App {

  import Serdes._


  implicit val ec = ExecutionContext.global
  val doneSignal = new CountDownLatch(1)

  run()

  private def run(): Unit = {

    val restEndpoint: HostInfo = new HostInfo(Settings.restApiDefaultHostName, Settings.restApiDefaultPort)
    System.out.println(s"Connecting to Kafka cluster via bootstrap servers ${Settings.bootStrapServers}")
    System.out.println(s"REST endpoint at http://${restEndpoint.host}:${restEndpoint.port}")

    val props: Properties = Settings.createRatingStreamsProperties()
    val topology = new RatingStreamProcessingTopology().createTopolgy()
    val streams: KafkaStreams = new KafkaStreams(topology,props)

    val restService = new RatingRestService(streams, restEndpoint)

    //Can only add this in State == CREATED
    streams.setUncaughtExceptionHandler(( thread :Thread, throwable : Throwable) => {
      println(s"============> ${throwable.getMessage}")
      shutDown(streams,restService)

    })

    streams.setStateListener((newState, oldState) => {
      if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
        restService.setReady(true)
      } else if (newState != KafkaStreams.State.RUNNING) {
        restService.setReady(false)
      }
    })

    restService.start()

    Runtime.getRuntime.addShutdownHook(new Thread(() => {
      shutDown(streams,restService)
    }))

    println("Starting KafkaStream")

    // Always (and unconditionally) clean local state prior to starting the processing topology.
    // We opt for this unconditional call here because this will make it easier for you to
    // play around with the example when resetting the application for doing a re-run
    // (via the Application Reset Tool,
    // http://docs.confluent.io/current/streams/developer-guide.html#application-reset-tool).
    //
    // The drawback of cleaning up local state prior is that your app must rebuilt its local
    // state from scratch, which will take time and will require reading all the state-relevant
    // data from the Kafka cluster over the network.
    // Thus in a production scenario you typically do not want to clean up always as we do
    // here but rather only when it is truly needed, i.e., only under certain conditions
    // (e.g., the presence of a command line flag for your app).
    // See `ApplicationResetExample.java` for a production-like example.
    streams.cleanUp
    streams.start

    doneSignal.await
   ()
  }


  private def shutDown(streams: KafkaStreams, restService: RatingRestService): Unit = {
    doneSignal.countDown
    streams.close()
    restService.stop
  }
}

Running it all

That is a lot of code, but you probably want to know how to run it all.

Well you will need to do a few things for this one.

How to install Kafka/Zookeeper and get them running on Windows

This section will talk you through how to get Kafka and get it working on Windows

Step 1 : Download Kafka

Grab Confluence Platform  X.X.X Open Source : https://www.confluent.io/download/

Step 2 : Update Dodgy BAT Files

The official Kafka windows BAT files don’t seem to work in the Confluence Platform X.X.X. Open Source download. So replace the official [YOUR INSTALL PATH]\confluent-x.x.x\bin\windows BAT files with the ones found here : https://github.com/renukaradhya/confluentplatform/tree/master/bin/windows

Step 3 : Make Some Minor Changes To Log Locations etc etc

Kafka/Zookeeper as installed are setup for Linux, as such these paths won’t work on Windows. So we need to adjust that a bit. So lets do that now

  • Modify the [YOUR INSTALL PATH]\confluent-x.x.x\etc\kafka\zookeeper.properties file to change the dataDir to something like dataDir=c:/temp/zookeeper
  • Modify the [YOUR INSTALL PATH]\confluent-x.x.x.\etc\kafka\server.properties file to uncomment the line delete.topic.enable=true

Step 4 : Running Zookeeper + Kafka + Creating Topics

Now that we have installed everything, it’s just a matter of running stuff up. Sadly before we can run Kafka we need to run Zookeeper, and before Kafka can send messages we need to ensure that the Kafka topics are created. Topics must exist before messages

Mmm that sounds like a fair bit of work. Well it is, so I decided to script this into a little PowerShell script, which you can adjust to your needs

$global:kafkaWindowsBatFolder = "C:\Apache\confluent-5.2.1-2.12\bin\windows\"
$global:kafkaAndZooLoggingFolder = "C:\temp\"
$global:kafkaAndZooTmpFolder = "C:\tmp\"


$global:kafkaTopics = 
	"rating-submit-topic"
	
$global:ProcessesToKill = @()



function RunPipeLine() 
{
	WriteHeader "STOPPING PREVIOUS SERVICES"
	StopZookeeper
	StopKafka
	
	$path = $kafkaAndZooLoggingFolder + "kafka-logs"
	Remove-Item -Recurse -Force $path
	
	$path = $kafkaAndZooLoggingFolder + "zookeeper"
	Remove-Item -Recurse -Force $path
	
    $path = $kafkaAndZooLoggingFolder
	Remove-Item -Recurse -Force $path


	Start-Sleep -s 20
	
	WriteHeader "STARTING NEW SERVICE INSTANCES"
	StartZookeeper
	
	Start-Sleep -s 20
	StartKafka
	
	Start-Sleep -s 20

	CreateKafkaTopics
    
	Start-Sleep -s 20
	
	
	WaitForKeyPress

	WriteHeader "KILLING PROCESSES CREATED BY SCRIPT"
	KillProcesses
}

function WriteHeader($text) 
{
	Write-Host "========================================`r`n"
	Write-Host "$text`r`n"
	Write-Host "========================================`r`n"
}


function StopZookeeper() {
    # C:\Apache\confluent-5.2.1-2.12\bin\windows\zookeeper-server-stop.bat
	$zookeeperCommandLine = $global:kafkaWindowsBatFolder + "zookeeper-server-stop.bat"
	Write-Host "> Zookeeper Command Line : $zookeeperCommandLine`r`n"
    $global:ProcessesToKill += start-process $zookeeperCommandLine -WindowStyle Normal -PassThru
}

function StopKafka() {
	# C:\Apache\confluent-5.2.1-2.12\bin\windows\kafka-server-stop.bat
	$kafkaServerCommandLine = $global:kafkaWindowsBatFolder + "kafka-server-stop.bat" 
	Write-Host "> Kafka Server Command Line : $kafkaServerCommandLine`r`n"
    $global:ProcessesToKill += start-process $kafkaServerCommandLine  -WindowStyle Normal -PassThru
}

function StartZookeeper() {
    # C:\Apache\confluent-5.2.1-2.12\bin\windows\zookeeper-server-start.bat C:\Apache\confluent-5.2.1-2.12\bin\windows\..\..\etc\kafka\zookeeper.properties
	$zookeeperCommandLine = $global:kafkaWindowsBatFolder + "zookeeper-server-start.bat"
	$arguments = $global:kafkaWindowsBatFolder + "..\..\etc\kafka\zookeeper.properties"
	Write-Host "> Zookeeper Command Line : $zookeeperCommandLine args: $arguments `r`n"
    $global:ProcessesToKill += start-process $zookeeperCommandLine $arguments -WindowStyle Normal -PassThru
}

function StartKafka() {
    # C:\Apache\confluent-5.2.1-2.12\bin\windows\kafka-server-start.bat C:\Apache\confluent-5.2.1-2.12\bin\windows\..\..\etc\kafka\server.properties
	$kafkaServerCommandLine = $global:kafkaWindowsBatFolder + "kafka-server-start.bat" 
	$arguments = $global:kafkaWindowsBatFolder + "..\..\etc\kafka\server.properties"
	Write-Host "> Kafka Server Command Line : $kafkaServerCommandLine args: $arguments `r`n"
    $global:ProcessesToKill += start-process $kafkaServerCommandLine $arguments -WindowStyle Normal -PassThru
}

function CreateKafkaTopics() 
{
   # C:\Apache\confluent-5.2.1-2.12\bin\windows\kafka-topics.bat --zookeeper localhost:2181 --delete --topic rating-submit-topic
	  
   # C:\Apache\confluent-5.2.1-2.12\bin\windows\kafka-topics.bat --zookeeper localhost:2181 --create  --replication-factor 1 --partitions 1 --topic rating-submit-topic
	Foreach ($topic in $global:kafkaTopics )
	{
		$kafkaCommandLine = $global:kafkaWindowsBatFolder + "kafka-topics.bat"
		$arguments = "--zookeeper localhost:2181 --create  --replication-factor 1 --partitions 1 --topic $topic"
		Write-Host "> Create Kafka Topic Command Line : $kafkaCommandLine args: $arguments `r`n"
		$global:ProcessesToKill += start-process $kafkaCommandLine $arguments -WindowStyle Normal -PassThru
	}
}

function WaitForKeyPress
{
	Write-Host -NoNewLine "Press any key to continue....`r`n"
	[Console]::ReadKey()
}


function KillProcesses() 
{
	Foreach ($processToKill in $global:ProcessesToKill )
	{
		$name = $processToKill | Get-ChildItem -Name
		Write-Host "Killing Process : $name `r`n" 
		$processToKill | Stop-Process -Force
	}
}


# Kick of the entire pipeline
RunPipeLine

 

So once you have done all of that. You should be able to run the RatingsProducerApp from the link I post at beginning to my GitHub repo. Then after that it is simple a question of running RatingStreamProcessingTopologyApp

 

If all has gone ok you should be able to hit up postman (excellent REST tool), and use these example endpoints

image

Wooohoo we got some data interactively

 

image

 

Conclusion

And that is all I wanted to say this time. Next time we will be looking at the final post in this series, time windowed operations

Kafka

KafkaStreams : Using Processor API in DSL

Last time we look at how we can supply our own Serdes (serializer / deserializer) to the DSL. This time we will look at how we can make use of the lower level “Processor API” in the DSL, which is what we have been using until now.

 

Where is the code?

The code for this post is all contained here

And the tests are all contained here

 

DSL vs Processor API

So far we have been using the DSL, but it would be good to have a recap, of not only the Streams DSL but also the “Processor API”

 

Streams DSL

The Kafka Streams DSL (Domain Specific Language) is built on top of the Streams Processor API. It is the recommended for most users, especially beginners. Most data processing operations can be expressed in just a few lines of DSL code.

In comparison to the Processor API, only the DSL supports:

  • Built-in abstractions for streams and tables in the form of KStream, KTable, and GlobalKTable. Having first-class support for streams and tables is crucial because, in practice, most use cases require not just either streams or databases/tables, but a combination of both. For example, if your use case is to create a customer 360-degree view that is updated in real-time, what your application will be doing is transforming many input streams of customer-related events into an output table that contains a continuously updated 360-degree view of your customers.
  • Declarative, functional programming style with stateless transformations (e.g. map and filter) as well as stateful transformations such as aggregations (e.g. count and reduce), joins (e.g. leftJoin), and windowing (e.g. session windows).

With the DSL, you can define processor topologies (i.e., the logical processing plan) in your application. The steps to accomplish this are:

  1. Specify one or more input streams that are read from Kafka topics.
  2. Compose transformations on these streams.
  3. Write the resulting output streams back to Kafka topics, or expose the processing results of your application directly to other applications through interactive queries (e.g., via a REST API).
    After the application is run, the defined processor topologies are continuously executed (i.e., the processing plan is put into action). A step-by-step guide for writing a stream processing application using the DSL is provided below.

Once you have built your Kafka Streams application using the DSL you can view the underlying Topology by first executing StreamsBuilder#build() which returns the Topology object. Then to view the Topology you call Topology#desribe().

 

Processor API

The Processor API allows developers to define and connect custom processors and to interact with state stores. With the Processor API, you can define arbitrary stream processors that process one received record at a time, and connect these processors with their associated state stores to compose the processor topology that represents a customized processing logic

The Processor API can be used to implement both stateless as well as stateful operations, where the latter is achieved through the use of state stores.

 

Here is a Java 8 example of what a Processor API app may look like

Topology builder = new Topology();

// add the source processor node that takes Kafka topic "source-topic" as input
builder.addSource("Source", "source-topic")

    // add the WordCountProcessor node which takes the source processor as its upstream processor
    .addProcessor("Process", () -> new WordCountProcessor(), "Source")

    // add the count store associated with the WordCountProcessor processor
    .addStateStore(countStoreSupplier, "Process")

    // add the sink processor node that takes Kafka topic "sink-topic" as output
    // and the WordCountProcessor node as its upstream processor
    .addSink("Sink", "sink-topic", "Process");

 

So that is a rough outline from Confluent themselves about their 2 APIs. So just what are we going to talk about for this post?

 

Using the Processor API in the DSL

We are going to talk about how you can make use of the Processor API in the DSL. Now you may be asking yourself why would I want to do that, if the DSL can do things in a few lines of code, and the Processor API is very low level. Meh WHY?

Here are a few reasons

 

  • Customization: You need to implement special, customized logic that is not or not yet available in the DSL.
  • Combining ease-of-use with full flexibility where it’s needed: Even though you generally prefer to use the expressiveness of the DSL, there are certain steps in your processing that require more flexibility and tinkering than the DSL provides. For example, only the Processor API provides access to a record’s metadata such as its topic, partition, and offset information. However, you don’t want to switch completely to the Processor API just because of that.
  • Migrating from other tools: You are migrating from other stream processing technologies that provide an imperative API, and migrating some of your legacy code to the Processor API was faster and/or easier than to migrate completely to the DSL right away.

Ok so now that we know why we might do it, what things can we use from the ProcessorAPI with the DSL?

 

We can use these things

  • Processor
  • Transformer
  • ValuesTransformer

 

We will look at Processor and ValuesTransformer for this post

 

Adding A Processor to the DSL

Ok so as before we will use the DSL to create some simple functionality, where will do the following

  • GroupByKey
  • Count
  • Pipe to Processor API Processor class

It should be noted that adding a Processor to the DSL is a terminal step.

So here is the DSL part of this scenario, where it can be noted that we use the .process to add the custom Processor to the pipeline

package processorapi.interop

import java.io.PrintWriter
import java.time.Duration
import java.util
import java.util.Properties

import common.PropsHelper
import entities.Contributor
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.scala.{ByteArrayKeyValueStore, StreamsBuilder, kstream}
import org.apache.kafka.streams.{KafkaStreams, Topology}
import serialization.JSONSerde


class ProcessorApiProcessSupplierTopology(val pw: PrintWriter) extends App {

  val props: Properties = PropsHelper.createBasicStreamProperties(
    "processor-api-process-supplier-application", "localhost:9092")

  run()

  private def run(): Unit = {
    val topology = createTopolgy()
    val streams: KafkaStreams = new KafkaStreams(topology, props)
    streams.start()
    sys.ShutdownHookThread {
      streams.close(Duration.ofSeconds(10))
    }
  }

  def stop() : Unit = {
    pw.close()
  }

  def createTopolgy(): Topology = {

    implicit val stringSerde = Serdes.String
    implicit val contributorSerde = new JSONSerde[Contributor]
    implicit val consumed = kstream.Consumed.`with`(stringSerde, contributorSerde)
    implicit val grouped = Grouped.`with`(stringSerde, contributorSerde)

    import org.apache.kafka.streams.state.Stores
    val contributorStoreName = "processContributorStore"

    val logConfig = new util.HashMap[String, String]
    logConfig.put("retention.ms", "172800000")
    logConfig.put("retention.bytes", "10000000000")
    logConfig.put("cleanup.policy", "compact,delete")
    val contributorStoreSupplier = Stores.inMemoryKeyValueStore(contributorStoreName)
    val contributorStoreBuilder = Stores.keyValueStoreBuilder(contributorStoreSupplier,
        Serdes.String, Serdes.Long())
      .withLoggingEnabled(logConfig)
      .withCachingEnabled()

    implicit val materializer =
      Materialized.as(contributorStoreSupplier)(Serdes.String, Serdes.Long())
      .asInstanceOf[Materialized[String, Long,ByteArrayKeyValueStore ]]

    val builder: StreamsBuilder = new StreamsBuilder
    val contribs: KStream[String, Contributor] =
          builder.stream[String, Contributor]("ProcessorApiProcessorSupplierInputTopic")

    contribs
      .groupByKey
      .count()(materializer)
      .toStream
      .process(() => new ContributorPrintingSupplier(pw).get(), contributorStoreName)



    builder.build()
  }
}

Nothing to scary there. So lets continue to examine the inner workings of the ContributorPrintingSupplier class, which looks like this

package processorapi.interop

import java.io.PrintWriter

import entities.Contributor
import org.apache.kafka.streams.processor.{Processor, ProcessorSupplier}

class ContributorPrintingSupplier(val pw: PrintWriter) extends ProcessorSupplier[String, Long] {
  override def get(): Processor[String, Long] = new Processor[String,Long] {

    import org.apache.kafka.streams.processor.ProcessorContext
    import org.apache.kafka.streams.state.KeyValueStore

    private var context:ProcessorContext  = null
    private var contributorStore:KeyValueStore[String, Long]  = null

    override def init(context: ProcessorContext): Unit = {
      import org.apache.kafka.streams.state.KeyValueStore
      this.context = context
      this.contributorStore = context.getStateStore("processContributorStore")
        .asInstanceOf[KeyValueStore[String, Long]]
    }

    override def process(key: String, value: Long): Unit = {
      pw.write(s"key ${key} has been seen ${value} times\r\n")
      pw.flush()
    }

    override def close(): Unit = {
      if(contributorStore != null) {
        contributorStore.close()
      }
    }
  }
}

There are several take away points here, namely :

  • We create anonymous Processor, and override the following methods
    • Init
    • Process
    • Close
  • That we are able to gain access to state stores via the supplied ProcessorContext

 

As I say this is a terminal operation, and such when using the .Process in the DSL you will not see any following operations.

Adding A ValueTransformer to the DSL

  • Applies a ValueTransformer to each record, while retaining the key of the original record. transformValues() allows you to leverage the Processor API from the DSL. (details)
  • Each input record is transformed into exactly one output record (zero output records or multiple output records are not possible). The ValueTransformer may return null as the new value for a record.
  • transformValues is preferable to transform because it will not cause data re-partitioning. It is also possible to get read-only access to the input record key if you use ValueTransformerWithKey (provided via ValueTransformerWithKeySupplier) instead.

Ok so as before we will use the DSL to create a simple KStream and then transform the values using the Processor API. This example is a little contrived as the transformed value is the same as the original value, but you should get the idea

package processorapi.interop

import java.time.Duration
import java.util
import java.util.Properties

import common.PropsHelper
import entities.Contributor
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.scala.{StreamsBuilder, kstream}
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.{KafkaStreams, Topology}
import serialization.JSONSerde


class ProcessorApiTransformValuesTopology extends App {

  val props: Properties = PropsHelper.createBasicStreamProperties(
    "processor-api-transform-values-application", "localhost:9092")

  run()

  private def run(): Unit = {
    val topology = createTopolgy()
    val streams: KafkaStreams = new KafkaStreams(topology, props)
    streams.start()
    sys.ShutdownHookThread {
      streams.close(Duration.ofSeconds(10))
    }
  }

  def createTopolgy(): Topology = {

    implicit val stringSerde = Serdes.String
    implicit val contributorSerde = new JSONSerde[Contributor]
    implicit val consumed = kstream.Consumed.`with`(stringSerde, contributorSerde)
    implicit val materializer = Materialized.`with`(stringSerde, contributorSerde)

    import org.apache.kafka.streams.state.Stores
    val contributorStoreName = "contributorStore"

    val logConfig = new util.HashMap[String, String]
    logConfig.put("retention.ms", "172800000")
    logConfig.put("retention.bytes", "10000000000")
    logConfig.put("cleanup.policy", "compact,delete")
    val contributorStoreSupplier = Stores.inMemoryKeyValueStore(contributorStoreName)
    val contributorStoreBuilder = Stores.keyValueStoreBuilder(contributorStoreSupplier, Serdes.String, contributorSerde)
      .withLoggingEnabled(logConfig)
      .withCachingEnabled()


    val builder: StreamsBuilder = new StreamsBuilder
    val contribs: KStream[String, Contributor] =
          builder.stream[String, Contributor]("ProcessorApiTransformValuesInputTopic")

    builder.addStateStore(contributorStoreBuilder)

    contribs
      .transformValues(new ContributorTranformSupplier, contributorStoreName)
      .to("ProcessorApiTransformValuesOutputTopic")(Produced.`with`(stringSerde, contributorSerde))

    builder.build()
  }
}

Nice and simple we just use the transformValues() method to apply our custom ValueTransformer

package processorapi.interop

import java.time.Duration

import entities.Contributor
import org.apache.kafka.streams.kstream.{ValueTransformer, ValueTransformerSupplier}
import org.apache.kafka.streams.processor.{PunctuationType, Punctuator}

class ContributorTranformSupplier extends ValueTransformerSupplier[Contributor, Contributor] {
  override def get(): ValueTransformer[Contributor, Contributor] = new ValueTransformer[Contributor, Contributor] {

    import org.apache.kafka.streams.processor.ProcessorContext
    import org.apache.kafka.streams.state.KeyValueStore

    private var context:ProcessorContext  = null
    private var contributorStore:KeyValueStore[String, Contributor]  = null

    override def init(context: ProcessorContext): Unit = {
      import org.apache.kafka.streams.state.KeyValueStore
      this.context = context
      this.contributorStore = context.getStateStore("contributorStore")
        .asInstanceOf[KeyValueStore[String, Contributor]]

      //to punctuate you would do something like this
      //      context.schedule(Duration.ofSeconds(1),
      //            PunctuationType.WALL_CLOCK_TIME, new Punctuator {
      //        override def punctuate(timestamp: Long): Unit = {
      //
      //          val it = contributorStore.all
      //          val currentTime = System.currentTimeMillis
      //          while (it.hasNext) {
      //            val contributorValue = it.next.value
      //            if (contributorValue.updatedWithinLastMillis(currentTime, 11000))
      //              context.forward(contributorValue.email,contributorValue)
      //          }
      //        }
      //      })
    }

    override def transform(value: Contributor): Contributor = {

      var finalContributor:Contributor = null
      try {
        val contributor = contributorStore.get(value.email)
        if(contributor == null) {
          contributorStore.putIfAbsent(value.email, value)
          finalContributor = value
        }
        else {
          val newContributor = contributor.copy(
            ranking = contributor.ranking + 1,
            lastUpdatedTime = System.currentTimeMillis
          )
          contributorStore.put(value.email,newContributor)
          finalContributor = newContributor
        }

        finalContributor
      }
      catch {
        case e:NullPointerException => {
          contributorStore.putIfAbsent(value.email, value)
          value
        }
        case e:Exception => {
          value
        }
      }
    }

    override def close(): Unit = {
      if(contributorStore != null) {
        contributorStore.close()
      }
    }
  }
}

As before we have access to state store via the ProcessorContext. I also show how you could possibly punctuate (this essentially forwards on / commits records on some scheduled time) in the code above

 

And I think that is all I have to say this time. You can of course check out the tests to play with it more.

 

Next time we will look at the interactive queries API.

Kafka

KafkaStreams : Custom Serdes

Last time we look at Joining. This time we will continue to look at the streams DSL, and how we can supply our own Serdes (serializer / deserializer).

 

Where is the code?

The code for this post is all contained here

And the tests are all contained here

 

Serdes

Just to remind ourselves how Kafka Streams makes use of Serdes, from https://kafka.apache.org/10/documentation/streams/developer-guide/datatypes.html up on date 13/03/19

Every Kafka Streams application must provide SerDes (Serializer/Deserializer) for the data types of record keys and record values (e.g. java.lang.String) to materialize the data when necessary. Operations that require such SerDes information include: stream(), table(), to(), through(), groupByKey(), groupBy().

You can provide SerDes by using either of these methods:

By setting default SerDes via a StreamsConfig instance.
By specifying explicit SerDes when calling the appropriate API methods, thus overriding the defaults.

 

InBuilt Serdes

So Kafka comes with a whole bunch of pre canned Serdes which you can access using the following namespace

org.apache.kafka.common.serialization.Serdes

But what happens when  you want to send more than just primitives/byte[]

Imagine we want to send these types

  • Rating
  • List[Rating]

Where Rating may look like this

package entities

case class Rating(fromEmail: String, toEmail: String, score: Float)

Then the inbuilt Serdes may not cut the mustard, so we need to implement our own

Custom Serdes

So the first step is to implement the custom Serde. For this post we will assume we are using JSON and will use the Jackson library to deal with the JSON, so our Serde looks like this.

import java.lang.reflect.{ParameterizedType, Type}
import java.util
import com.fasterxml.jackson.annotation.JsonInclude
import com.fasterxml.jackson.core.JsonParseException
import com.fasterxml.jackson.core.`type`.TypeReference
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.exc.{UnrecognizedPropertyException => UPE}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.kafka.common.serialization.{Deserializer, Serde, Serializer}


package serialization {

  object Json {

    type ParseException = JsonParseException
    type UnrecognizedPropertyException = UPE

    private val mapper = new ObjectMapper()
    mapper.registerModule(DefaultScalaModule)
    mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL)

    private def typeReference[T: Manifest] = new TypeReference[T] {
      override def getType = typeFromManifest(manifest[T])
    }

    private def typeFromManifest(m: Manifest[_]): Type = {
      if (m.typeArguments.isEmpty) {
        m.runtimeClass
      }
      else new ParameterizedType {
        def getRawType = m.runtimeClass

        def getActualTypeArguments = m.typeArguments.map(typeFromManifest).toArray

        def getOwnerType = null
      }
    }

    object ByteArray {
      def encode(value: Any): Array[Byte] = mapper.writeValueAsBytes(value)

      def decode[T: Manifest](value: Array[Byte]): T =
        mapper.readValue(value, typeReference[T])
    }

  }

  /**
    * JSON serializer for JSON serde
    *
    * @tparam T
    */
  class JSONSerializer[T] extends Serializer[T] {
    override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = ()

    override def serialize(topic: String, data: T): Array[Byte] =
      Json.ByteArray.encode(data)

    override def close(): Unit = ()
  }

  /**
    * JSON deserializer for JSON serde
    *
    * @tparam T
    */
  class JSONDeserializer[T >: Null <: Any : Manifest] extends Deserializer[T] {
    override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = ()

    override def close(): Unit = ()

    override def deserialize(topic: String, data: Array[Byte]): T = {
      if (data == null) {
        return null
      } else {
        Json.ByteArray.decode[T](data)
      }
    }
  }

  /**
    * JSON serde for local state serialization
    *
    * @tparam T
    */
  class JSONSerde[T >: Null <: Any : Manifest] extends Serde[T] {
    override def deserializer(): Deserializer[T] = new JSONDeserializer[T]

    override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = ()

    override def close(): Unit = ()

    override def serializer(): Serializer[T] = new JSONSerializer[T]
  }

}

 

Notice how this Serde is generic and can work with any T, but in order to do this with we need to deal with the insanity of the JVM and type erasure (OMG they got that wrong, .NET absolutely nailed generics). So in this example we use the Manifest to store Type information such that the Type information is not “erased” and we know how to deserialize the JSON string back into a T

 

Ok so now that we have this, lets see an example of how it is used

package serialization

import java.time.Duration
import java.util.Properties

import common.PropsHelper
import entities.Rating
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala._
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.{KafkaStreams, Topology}


class CustomSerdesTopology extends App {

  import Serdes._

  val props: Properties = PropsHelper.createBasicStreamProperties(
    "custom-serdes-application", "localhost:9092")

  run()

  private def run(): Unit = {
    val topology = createTopolgy()
    val streams: KafkaStreams = new KafkaStreams(topology, props)
    streams.start()
    sys.ShutdownHookThread {
      streams.close(Duration.ofSeconds(10))
    }
  }

  def createTopolgy(): Topology = {

    implicit val stringSerde = Serdes.String
    implicit val ratingSerde = new JSONSerde[Rating]
    implicit val listRatingSerde = new JSONSerde[List[Rating]]
    implicit val consumed = kstream.Consumed.`with`(stringSerde, ratingSerde)
    implicit val materializer = Materialized.`with`(stringSerde, listRatingSerde)
    implicit val grouped = Grouped.`with`(stringSerde, ratingSerde)

    val builder: StreamsBuilder = new StreamsBuilder
    val ratings: KStream[String, Rating] =
          builder.stream[String, Rating]("CustomSerdesInputTopic")

    //When aggregating a grouped stream, you must provide an initializer (e.g., aggValue = 0)
    //and an “adder” aggregator (e.g., aggValue + curValue). When aggregating a grouped table,
    //you must provide a “subtractor” aggregator (think: aggValue - oldValue).
    val groupedBy = ratings.groupByKey
    val aggregatedTable =
      groupedBy
        .aggregate[List[Rating]](List[Rating]())((aggKey, newValue, aggValue) => newValue :: aggValue)

    var finalStream = aggregatedTable.toStream
    finalStream.peek((key, values) => {

      val theKey = key
      val theValues = values

    })


    finalStream.to("CustomSerdesOutputTopic")(Produced.`with`(stringSerde, listRatingSerde))

    builder.build()
  }
}

Pretty easy stuff, only call out points here are that we use the implicits at then start of the file to specify all the serdes that the code will need to look up implicitly

Just for completeness this is the test suite that goes with this example

package serialization

import java.io._
import java.lang
import java.util.Properties

import common.PropsHelper
import entities.Rating
import org.apache.kafka.common.serialization.{LongDeserializer, _}
import org.apache.kafka.streams.TopologyTestDriver
import org.apache.kafka.streams.test.{ConsumerRecordFactory, OutputVerifier}
import org.scalatest._
import org.apache.kafka.common.serialization.Serdes


class CustomSerdesTopologyTests
  extends FunSuite
  with BeforeAndAfter
  with Matchers {

  val props = PropsHelper.createBasicStreamProperties("custom-serdes-application", "localhost:9092")
  val stringDeserializer: StringDeserializer = new StringDeserializer
  val ratingLIstDeserializer: JSONDeserializer[List[Rating]] = new JSONDeserializer[List[Rating]]

  before {
  }

  after {
  }


  test("Should produce correct output") {

    //arrange
    val recordFactory: ConsumerRecordFactory[java.lang.String, Array[Byte]] =
        new ConsumerRecordFactory[java.lang.String, Array[Byte]](new StringSerializer, Serdes.ByteArray().serializer())
    val customSerdesTopology = new CustomSerdesTopology()


    val jsonSerde = new JSONSerde[Rating]

    val rating = Rating("jarden@here.com","sacha@here.com", 1.5f)
    val ratingBytes = jsonSerde.serializer().serialize("", rating)


    //NOTE : You may find you need to play with these Config values in order
    //to get the stateful operation to work correctly/how you want it to
    //    props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10000.asInstanceOf[Object])
    //    props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10485760.asInstanceOf[Object])
    //    props.put(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG, 50000.asInstanceOf[Object])
    //By playing around with these values you should be able to find the values that work for you
    //WARNING : Chaning these settings may have impact on the tests, as less frequent commits/state store
    //cache flushing may occur
    val testDriver = new TopologyTestDriver(customSerdesTopology.createTopolgy(), props)

    //Use the custom JSONSerde[Rating]
    testDriver.pipeInput(recordFactory.create("CustomSerdesInputTopic", rating.toEmail, ratingBytes, 9995L))

    val result = testDriver.readOutput("CustomSerdesOutputTopic", stringDeserializer, ratingLIstDeserializer)

    OutputVerifier.compareKeyValue(result, "sacha@here.com",List(Rating("jarden@here.com","sacha@here.com", 1.5f)))
    val result1 = testDriver.readOutput("CustomSerdesOutputTopic", stringDeserializer, ratingLIstDeserializer)
    assert(result1 == null)

    cleanup(props, testDriver)
  }


  def cleanup(props:Properties, testDriver: TopologyTestDriver) = {

    try {
      //there is a bug on windows which causes this line to throw exception
      testDriver.close
    } catch {
      case e: Exception => {
        delete(new File("C:\\data\\kafka-streams"))
      }
    }
  }

  def delete(file: File) {
    if (file.isDirectory)
      Option(file.listFiles).map(_.toList).getOrElse(Nil).foreach(delete(_))
    file.delete
  }
}

 

That’s it for now

So that’s all I wanted to say this time, so until the next time, hope this has shown you just how easy it is to hand roll your own object that you can serialize with ease

Uncategorized

OpenFin demo

It has been quite a while since I wrote a full article for codeproject.com, but I just needed a break from all the server side stuff for a while. So I decided to take something for a spin which has been on my TODO radar for ages.That thing is OpenFin

 

Where they market themselves is as the OS for finance, where the apps are built using HTML5, but financial apps still benefit from a desktop feel where Windows are used, and things can be shown in  a Notification tray, and can be docked etc etc

 

This is where OpenFin see themselves

 

This is what I have built and demonstrate in the fuller article at codeproject.com, If you want to know more the full article is here : https://www.codeproject.com/Articles/1279126/OpenFin-A-small-example-app-using-it

 

image

 

This is a simple React/Redux/OpenFin application

Kafka

KafkaStreams : Joining

Last time we look at Aggregating. This time we will continue to look at the streams DSL, and will look at joining. If you have ever done any standard SQL, this post will be very familiar

 

Where is the code?

The code for this post is all contained here

And the tests are all contained here

 

Joins

Types of join

KafkaStreams supports the following type of Joins, most of which we will look at using the KStream – KStream as our demo code, as its just easier to code tests up for

image

 

  • KStream-KStream joins are always windowed joins, because otherwise the size of the internal state store used to perform the join – e.g., a sliding window or “buffer” – would grow indefinite
  • KTable-KTable joins are always non-windowed joins. They are designed to be consistent with their counterparts in relational databases. The changelog streams of both KTables are materialized into local state stores to represent the latest snapshot of their table duals. The join result is a new KTable that represents the changelog stream of the join operation.
  • KStream-KTable joins are always non-windowed joins. They allow you to perform table lookups against a KTable (changelog stream) upon receiving a new record from the KStream (record stream).
  • KStream-GlobalKTable joins are always non-windowed joins. They allow you to perform table lookups against a GlobalKTable (entire changelog stream) upon receiving a new record from the KStream (record stream)

 

Join co-partitioning requirements

Before we look at the types of join, and the syntax for them, Kafka Streams does have the following requirements that need to be met in order to carry out the various Join operators

Input data must be co-partitioned when joining. This ensures that input records that have the same key on both sides of the join, are delivered to the same stream task during processing. It is the responsibility of the user to ensure data co-partitioning before applying join operators

The requirements for data co-partition

ing are:

  • The input topics on both sides of the join must have the same number of partitions.
    All applications that write to the input topics must have the same partitioning strategy. The keyspace of the input data must be distributed across partitions in the same manner. The partitioner may be set at various places, for example on the Producer API it can be set via the ProducerConfig.PARTITIONER_CLASS_CONFIG config value, and for the streams API can be used in #to or #through. The good news is that, if you happen to use the default partitioner-related settings across all applications, you do not need to worry about the partitioning strategy.

Inner Join

As with lots of other types of tools an inner join, will only join on inputs where there exists a key that is the same.

It should be noted that as I have chosen to use KStream-KStream joins for this demo code the following rules hold. However these rules may vary depending on if your inputs are KStream/KTable

 

  • The join is key-based, i.e. with the join predicate leftRecord.key == rightRecord.key, and window-based, i.e. two input records are joined if and only if their timestamps are “close” to each other as defined by the user-supplied JoinWindows, i.e. the window defines an additional join predicate over the record timestamps.
  • When it is triggered, the user-supplied ValueJoiner will be called to produce join output records.
  • Input records with a null key or a null value are ignored and do not trigger the join.

Anyway as is the way so far lets see the topology followed by the tests

def createTopolgy(): Topology = {

  val builder: StreamsBuilder = new StreamsBuilder
  val left: KStream[Int, String] =
    builder.stream[Int, String]("LeftTopic")
  val right: KStream[Int, String] =
    builder.stream[Int, String]("RightTopic")

  def tryToInt( s: String ) = Try(s.toInt).toOption

  //do the join
  val joinedStream = left.join(right)( (l: String, r: String) => {
    val result = (tryToInt(l), tryToInt(r))
    result match {
      case (Some(a), Some(b)) => a + b
      case (None, Some(b)) => b
      case (Some(a), None) => a
      case (None, None) => 0
    }
  } , JoinWindows.of(Duration.ofSeconds(2)))


  val peeked = joinedStream.peek((k,v)=> {

    val theK = k
    val theV = v
  })

  peeked.to("innerJoinOutput")

  builder.build()
}

In this example, since we are not guaranteed to get a value for both sides, I am using some simple helper methods to get me some Option[Int] for both sides and finally using simple pattern matching to form a final Integer value, which the tests will use to verify against

We can also see the JoinWindows of 2 seconds, again we will try and use that in our tests. I have used this pattern for all 3 of the KStream join examples for this post. Here are the relevant tests

//arrange
val recordFactory: ConsumerRecordFactory[java.lang.Integer, java.lang.String] =
  new ConsumerRecordFactory[java.lang.Integer, java.lang.String](new IntegerSerializer, new StringSerializer)
val innerJoinTopology = new InnerJoinTopology()
val testDriver = new TopologyTestDriver(innerJoinTopology.createTopolgy(), props)

//act
testDriver.pipeInput(recordFactory.create("LeftTopic", 1, null, 1900L))
testDriver.pipeInput(recordFactory.create("RightTopic", 1, null, 1901L))

testDriver.pipeInput(recordFactory.create("LeftTopic", 1, "1", 2902L))
testDriver.pipeInput(recordFactory.create("RightTopic", 1, "2",2903L ))

OutputVerifier.compareValue(testDriver.readOutput("innerJoinOutput", integerDeserializer, integerDeserializer),
  3.asInstanceOf[Integer])


//push these out past 2 seconds (which is what Topology Join Window duration is)
Thread.sleep(2500)

testDriver.pipeInput(recordFactory.create("LeftTopic", 1, "2", 5916L))
testDriver.pipeInput(recordFactory.create("RightTopic", 1, "4", 5917L))
OutputVerifier.compareValue(testDriver.readOutput("innerJoinOutput", integerDeserializer, integerDeserializer),
  6.asInstanceOf[Integer])

val result1 = testDriver.readOutput("innerJoinOutput", integerDeserializer, integerDeserializer)
assert(result1 == null)

It can be seen that we expect the following to hold true

  • Both inputs null within the time windows, doesn’t yield anything
  • That a matched key in the time window yields a combined value of both stream values converted to an Int and added together (so 3 in this case)
  • That if we wait 2 seconds, to exceed the join window, and then push 2 new values through, That a newly matched key in the new time window yields a combined value of both stream values converted to an Int and added together (so 6 in this case)

LeftJoin

As with lots of other types of tools an left join, will include the original left input even when the right input has no value where there exists a key that is the same.

It should be noted that as I have chosen to use KStream-KStream joins for this demo code the following rules hold. However these rules may vary depending on if your inputs are KStream/KTable

  • The join is key-based, i.e. with the join predicate leftRecord.key == rightRecord.key, and window-based, i.e. two input records are joined if and only if their timestamps are “close” to each other as defined by the user-supplied JoinWindows, i.e. the window defines an additional join predicate over the record timestamps.
  • The join will be triggered under the conditions listed below whenever new input is received. When it is triggered, the user-supplied ValueJoiner will be called to produce join output records.
  • Input records with a null key or a null value are ignored and do not trigger the join.
    For each input record on the left side that does not have any match on the right side, the ValueJoiner will be called with ValueJoiner#apply(leftRecord.value, null); this explains the row with timestamp=3 in the table below, which lists [A, null] in the LEFT JOIN column.

As before we start with the topology which looks like this

def createTopolgy(): Topology = {

  val builder: StreamsBuilder = new StreamsBuilder
  val left: KStream[Int, String] =
    builder.stream[Int, String]("LeftTopic")
  val right: KStream[Int, String] =
    builder.stream[Int, String]("RightTopic")

  //do the join
  val joinedStream = left.leftJoin(right)( (l: String, r: String) => {
    val result = s"Left='${l}', Right='${r}'"
    result
  } , JoinWindows.of(Duration.ofSeconds(2)))

  val peeked = joinedStream.peek((k,v)=> {

    val theK = k
    val theV = v
  })

  peeked.to("leftJoinOutput")

  builder.build()
}

Where we know we may not have a value on the 2nd input, but we may on the original input. So for this example I use a String, where KafkaStreams will emit “null” should there be no value on one of the inputs

Here are the relevant tests

//arrange
val recordFactory: ConsumerRecordFactory[java.lang.Integer, java.lang.String] =
  new ConsumerRecordFactory[java.lang.Integer, java.lang.String](new IntegerSerializer, new StringSerializer)
val leftJoinTopology = new LeftJoinTopology()
val testDriver = new TopologyTestDriver(leftJoinTopology.createTopolgy(), props)

//act
testDriver.pipeInput(recordFactory.create("LeftTopic", 1, null, 1900L))
testDriver.pipeInput(recordFactory.create("RightTopic", 1, null, 1901L))

testDriver.pipeInput(recordFactory.create("LeftTopic", 1, "1", 2902L))


OutputVerifier.compareValue(testDriver.readOutput("leftJoinOutput", integerDeserializer, stringDeserializer),
  "Left='1', Right='null'")


//push these out past 2 seconds (which is what Topology Join Window duration is)
Thread.sleep(2500)

testDriver.pipeInput(recordFactory.create("LeftTopic", 1, "2", 5916L))
testDriver.pipeInput(recordFactory.create("RightTopic", 1, "4", 5916L))
OutputVerifier.compareValue(testDriver.readOutput("leftJoinOutput", integerDeserializer, stringDeserializer),
  "Left='2', Right='4'")
OutputVerifier.compareValue(testDriver.readOutput("leftJoinOutput", integerDeserializer, stringDeserializer),
  "Left='2', Right='4'")
val result1 = testDriver.readOutput("leftJoinOutput", integerDeserializer, stringDeserializer)
assert(result1 == null)

It can be seen that we expect the following to hold true

  • Both inputs null within the time windows, doesn’t yield anything
  • That a single input with no matching right input in the time window yields a sinngle value for the left input, but null for the right
  • That if we wait 2 seconds, to exceed the join window, and then push 2 new values through, That we see oututs for the newly matched keys in the new time window

OuterJoin

As with lots of other types of tools an outer join, will include the original left input or the original right value providing at least one of them is not null, where there exists a key that is the same.

It should be noted that as I have chosen to use KStream-KStream joins for this demo code the following rules hold. However these rules may vary depending on if your inputs are KStream/KTable

  • The join is key-based, i.e. with the join predicate leftRecord.key == rightRecord.key, and window-based, i.e. two input records are joined if and only if their timestamps are “close” to each other as defined by the user-supplied JoinWindows, i.e. the window defines an additional join predicate over the record timestamps.
  • The join will be triggered under the conditions listed below whenever new input is received. When it is triggered, the user-supplied ValueJoiner will be called to produce join output records.
  • Input records with a null key or a null value are ignored and do not trigger the join.
    For each input record on one side that does not have any match on the other side, the ValueJoiner will be called with ValueJoiner#apply(leftRecord.value, null) or ValueJoiner#apply(null, rightRecord.value), respectively

As before we start with the topology which looks like this

def createTopolgy(): Topology = {

  val builder: StreamsBuilder = new StreamsBuilder
  val left: KStream[Int, String] =
    builder.stream[Int, String]("LeftTopic")
  val right: KStream[Int, String] =
    builder.stream[Int, String]("RightTopic")

  //do the join
  val joinedStream = left.outerJoin(right)( (l: String, r: String) => {
    val result = s"Left='${l}', Right='${r}'"
    result
  } , JoinWindows.of(Duration.ofSeconds(2)))

  val peeked = joinedStream.peek((k,v)=> {

    val theK = k
    val theV = v
  })

  peeked.to("outerJoinOutput")

  builder.build()
}

Here are the relevant tests

test("Should produce correct output") {

  //arrange
  val recordFactory: ConsumerRecordFactory[java.lang.Integer, java.lang.String] =
    new ConsumerRecordFactory[java.lang.Integer, java.lang.String](new IntegerSerializer, new StringSerializer)
  val outerJoinTopology = new OuterJoinTopology()
  val testDriver = new TopologyTestDriver(outerJoinTopology.createTopolgy(), props)

  //act
  testDriver.pipeInput(recordFactory.create("LeftTopic", 1, null, 1900L))
  testDriver.pipeInput(recordFactory.create("RightTopic", 1, null, 1901L))

  testDriver.pipeInput(recordFactory.create("LeftTopic", 1, "1", 2902L))


  OutputVerifier.compareValue(testDriver.readOutput("outerJoinOutput", integerDeserializer, stringDeserializer),
    "Left='1', Right='null'")


  //push these out past 2 seconds (which is what Topology Join Window duration is)
  Thread.sleep(2500)

  testDriver.pipeInput(recordFactory.create("RightTopic", 1, "4", 5916L))
  OutputVerifier.compareValue(testDriver.readOutput("outerJoinOutput", integerDeserializer, stringDeserializer),
    "Left='null', Right='4'")
  val result1 = testDriver.readOutput("outerJoinOutput", integerDeserializer, stringDeserializer)
  assert(result1 == null)

  cleanup(props, testDriver)
}

It can be seen that we expect the following to hold true

  • Both inputs null within the time windows, doesn’t yield anything
  • That a single input from either side with in the time window yields a single value for the input, but null for the other

 

As I say this post is all about KStream-KStream joins, but this table may help to compound what this post has shown so far

image

 

So other types of joins such as KStream-KTable you should consult the official documentation

 

That’s it for now

So that’s all I wanted to say this time, so until the next time, hope this has shown you just how cool KafkaStreams is

Kafka

KafkaStreams : Aggregating

So last time we looked at a whole bunch of stateless operations. This time we will continue our journey to look at how we can start to do aggregations, and make use of state stores.

Where is the code?

The code for this post is all contained here

And the tests are all contained here

Introduction to Stateful operations

Stateful operations (unlike stateless operations) require a state store for processing purposes. We briefly touched on state stores last time, but today I wanted to drill in a bit more into this.

 

Kafka comes with some inbuilt state stores

  • PersistentKeyValueStore<K, V>
    • Storage Engine : Rocks DB
    • Fault Tolerant : Yes by default
      • The recommended store type for most use cases.
      • Stores its data on local disk.
      • Storage capacity: managed local state can be larger than the memory (heap space) of an application instance, but must fit into the available local disk space.
      • RocksDB settings can be fine-tuned, see RocksDB configuration.
      • Available store variants: time window key-value store, session window key-value store.
  • InMemoryKeyValueStore<K,V>
    • Storage Engine: None
    • Fault Tolerant : Yes by default
      • Stores its data in memory.
      • Storage capacity: managed local state must fit into memory (heap space) of an application instance.
      • Useful when application instances run in an environment where local disk space is either not available or local disk space is wiped in-between app instance restarts.

 

So how do we create these stores? Well we will be seeing some examples of these, but it is as easy as this using the high level DSL (which is what we have been using until now)

//The "persistentKeyValueStore" is one of the pre-canned state store types
//and as such logging is enabled, so the ChangeLog topic to persist state
import org.apache.kafka.streams.state.Stores
val wordCountStoreName = "wordCountStore"
val wordCountStoreSupplied = Stores.persistentKeyValueStore(wordCountStoreName)

Now some of you may have noticed the bullet point above where it mentions “Fault Tolerant”. How exactly is that done? Well the inbuilt stores actually write to a specialized topic behind the scenes. This topic is sometimes called the changelog topic, and can be used to restore state. By default, persistent key-value stores are fault-tolerant. They are backed by a compacted changelog topic. The purpose of compacting this topic is to prevent the topic from growing indefinitely, to reduce the storage consumed in the associated Kafka cluster, and to minimize recovery time if a state store needs to be restored from its changelog topic.

 

Similarly, persistent window stores are fault-tolerant. They are backed by a topic that uses both compaction and deletion. Because of the structure of the message keys that are being sent to the changelog topics, this combination of deletion and compaction is required for the changelog topics of window stores. For window stores, the message keys are composite keys that include the “normal” key and window timestamps. For these types of composite keys it would not be sufficient to only enable compaction to prevent a changelog topic from growing out of bounds. With deletion enabled, old windows that have expired will be cleaned up by Kafka’s log cleaner as the log segments expire. The default retention setting is Materialized#withRetention() + 1 day. You can override this setting by specifying StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG in the StreamsConfig.

 

So how can we turn this feature on/off?

You can enable or disable fault tolerance for a state store by enabling or disabling the change logging of the store through withLoggingEnabled() and withLoggingDisabled(). You can also fine-tune the associated topic’s configuration if needed.

Example for enabled fault-tolerance, and passing through some other useful values

  val logConfig = new util.HashMap[String, String]
  logConfig.put("retention.ms", "172800000")
  logConfig.put("retention.bytes", "10000000000")
  logConfig.put("cleanup.policy", "compact,delete")
  val wordCountStoreSupplier = Stores.inMemoryKeyValueStore(wordCountStoreName)
  val wordCountStoreBuilder = Stores.keyValueStoreBuilder(wordCountStoreSupplier, Serdes.String, Serdes.Long)
    .withLoggingEnabled(logConfig)
    .withCachingEnabled()

As I mentioned in the previous post there are a number of config values that will effect how your stateless apps emit values down stream, just to remind ourselves of those

  • COMMIT_INTERVAL_MS_CONFIG : the number of milliseconds before a commit occurs. When a commit occur state store values are emitted downstream
  • CACHE_MAX_BYTES_BUFFERING_CONFIG : The max number of bytes to cache before state store values are emitted downstream

So you may find you need to fiddle with those to get what you want in terms of max cache size, commit interval

 

Aggregating

After records are grouped by key via groupByKey or groupBy – they become either a KGroupedStream or a KGroupedTable,  they can be aggregated. Aggregations are key-based operations, which means that they always operate over records where the key is the same. You can perform aggregations on windowed or non-windowed data.

Aggregate

Rolling aggregation. Aggregates the values of (non-windowed) records by the grouped key. Aggregating is a generalization of reduce and allows, for example, the aggregate value to have a different type than the input values. (KGroupedStream details, KGroupedTable details)

When aggregating a grouped stream, you must provide an initializer (e.g., aggValue = 0) and an “adder” aggregator (e.g., aggValue + curValue). When aggregating a grouped table, you must provide a “subtractor” aggregator (think: aggValue – oldValue).

Detailed behavior of KGroupedStream:

  • Input records with null keys are ignored.
  • When a record key is received for the first time, the initializer is called (and called before the adder).
    Whenever a record with a non-null value is received, the adder is called.

Detailed behavior of KGroupedTable:

  • Input records with null keys are ignored.
  • When a record key is received for the first time, the initializer is called (and called before the adder and subtractor). Note that, in contrast to KGroupedStream, over time the initializer may be called more than once for a key as a result of having received input tombstone records for that key
  • When the first non-null value is received for a key (e.g., INSERT), then only the adder is called.
  • When subsequent non-null values are received for a key (e.g., UPDATE), then the subtractor is called with the old value as stored in the table and the adder is called with the new value of the input record that was just received. The order of execution for the subtractor and adder is not defined
  • When a tombstone record – i.e. a record with a null value – is received for a key (e.g., DELETE), then only the subtractor is called. Note that, whenever the subtractor returns a null value itself, then the corresponding key is removed from the resulting KTable. If that happens, any next input record for that key will trigger the initializer again.
package stateful.transformations.aggregating

import java.time.Duration
import java.util.Properties

import common.PropsHelper
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala._
import org.apache.kafka.streams.scala.kstream.{Materialized, _}
import org.apache.kafka.streams.{KafkaStreams, Topology}


/**
  * This example simply maps values from 'InputTopic' to 'OutputTopic'
  * with no changes
  */
class AggregateTopology extends App {

  import Serdes._

  val props: Properties = PropsHelper.createBasicStreamProperties(
    "stateless-aggregate-application", "localhost:9092")

  run()

  private def run(): Unit = {
    val topology = createTopolgy()
    val streams: KafkaStreams = new KafkaStreams(topology, props)
    streams.start()
    sys.ShutdownHookThread {
      streams.close(Duration.ofSeconds(10))
    }
  }

  def createTopolgy(): Topology = {

    val builder: StreamsBuilder = new StreamsBuilder
    val textLines: KStream[Int, Int] =
      builder.stream[Int, Int]("AggregateKeyInputTopic")

    implicit val matererlized: Materialized[Int, Long, ByteArrayKeyValueStore]
      = Materialized.as("aggregated-stream-store")

    //Rolling aggregation. Aggregates the values of (non-windowed) records by the grouped key.
    //Aggregating is a generalization of reduce and allows, for example, the aggregate value to
    //have a different type than the input values. (KGroupedStream details, KGroupedTable details)
    //
    //When aggregating a grouped stream, you must provide an initializer (e.g., aggValue = 0)
    //and an “adder” aggregator (e.g., aggValue + curValue). When aggregating a grouped table,
    //you must provide a “subtractor” aggregator (think: aggValue - oldValue).
    val groupedBy = textLines.groupByKey
    val aggregatedTable =
      groupedBy
        .aggregate[Long](0L)((aggKey, newValue, aggValue) => aggValue + newValue)(matererlized)
    aggregatedTable
        .toStream
          .peek((k,v) =>
          {
            val theKey = k
            val theValue =v
          })
        .to("aggregateOutputTopic")
    builder.build()
  }
}


Count

Rolling aggregation. Counts the number of records by the grouped key.

Detailed behavior for KGroupedStream:

Input records with null keys or values are ignored.
Detailed behavior for KGroupedTable:

Input records with null keys are ignored. Records with null values are not ignored but interpreted as “tombstones” for the corresponding key, which indicate the deletion of the key from the table.

package stateful.transformations.aggregating

import java.time.Duration
import java.util.Properties

import common.PropsHelper
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala._
import org.apache.kafka.streams.scala.kstream.{Materialized, _}
import org.apache.kafka.streams.{KafkaStreams, Topology}


/**
  * This example simply maps values from 'InputTopic' to 'OutputTopic'
  * with no changes
  */
class CountTopology extends App {

  import Serdes._

  val props: Properties = PropsHelper.createBasicStreamProperties(
    "stateless-count-application", "localhost:9092")

  run()

  private def run(): Unit = {
    val topology = createTopolgy()
    val streams: KafkaStreams = new KafkaStreams(topology, props)
    streams.start()
    sys.ShutdownHookThread {
      streams.close(Duration.ofSeconds(10))
    }
  }

  def createTopolgy(): Topology = {

    val builder: StreamsBuilder = new StreamsBuilder
    val textLines: KStream[String, String] =
      builder.stream[String, String]("CountInputTopic")


    //lets create a named wordCountStore state store
    //The "persistentKeyValueStore" is one of the pre-canned state store types
    //and as such logging is enabled, so the ChangeLog topic to persist state
    import org.apache.kafka.streams.state.Stores
    val wordCountStoreName = "wordCountStore"
    val wordCountStoreSupplied = Stores.persistentKeyValueStore(wordCountStoreName)

    val wordCounts = textLines
      .flatMapValues(x => x.toLowerCase.split("\\W+"))
      .groupBy((key, word) => word)
      .count()(Materialized.as(wordCountStoreSupplied))
    wordCounts
      .toStream
      .peek((k,v) =>
      {
        val theKey = k
        val theValue =v
      })
      .to("WordsWithCountsOutputTopic")

    builder.build()
  }
}


Reduce

Combines the values of (non-windowed) records by the grouped key. The current record value is combined with the last reduced value, and a new reduced value is returned. The result value type cannot be changed, unlike aggregate.

When reducing a grouped stream, you must provide an “adder” reducer (e.g., aggValue + curValue). When reducing a grouped table, you must additionally provide a “subtractor” reducer (e.g., aggValue – oldValue).

Detailed behavior for KGroupedStream:

  • Input records with null keys are ignored in general.
  • When a record key is received for the first time, then the value of that record is used as the initial aggregate value.
  • Whenever a record with a non-null value is received, the adder is called.

Detailed behavior for KGroupedTable:

  • Input records with null keys are ignored in general.
  • When a record key is received for the first time, then the value of that record is used as the initial aggregate value. Note that, in contrast to KGroupedStream, over time this initialization step may happen more than once for a key as a result of having received input tombstone records for that key.
  • When the first non-null value is received for a key (e.g., INSERT), then only the adder is called.
  • When subsequent non-null values are received for a key (e.g., UPDATE), then the subtractor is called with the old value as stored in the table and the adder is called with the new value of the input record that was just received. The order of execution for the subtractor and adder is not defined.
  • When a tombstone record – i.e. a record with a null value – is received for a key (e.g., DELETE), then only the subtractor is called. Note that, whenever the subtractor returns a null value itself, then the corresponding key is removed from the resulting KTable. If that happens, any next input record for that key will re-initialize its aggregate value.

 

In this example we build a KTable from a KStream directly by using a little trick of going through an intermediate topic

package stateful.transformations.aggregating

import java.time.Duration
import java.util.Properties

import common.PropsHelper
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala._
import org.apache.kafka.streams.scala.kstream.{Materialized, _}
import org.apache.kafka.streams.{KafkaStreams,  Topology}


/**
  * This example simply maps values from 'InputTopic' to 'OutputTopic'
  * with no changes
  */
class ReduceTopology extends App {

  import Serdes._

  val props: Properties = PropsHelper.createBasicStreamProperties(
    "stateless-reduce-application", "localhost:9092")

  run()

  private def run(): Unit = {
    val topology = createTopolgy()
    val streams: KafkaStreams = new KafkaStreams(topology, props)
    streams.start()
    sys.ShutdownHookThread {
      streams.close(Duration.ofSeconds(10))
    }
  }

  def createTopolgy(): Topology = {

    val builder: StreamsBuilder = new StreamsBuilder
    val textLines: KStream[String, String] =
      builder.stream[String, String]("ReduceInputTopic")


    //lets create a named reduceStore state store
    //The "persistentKeyValueStore" is one of the pre-canned state store types
    //and as such logging is enabled, so the ChangeLog topic to persist state
    import org.apache.kafka.streams.state.Stores
    val reduceStoreName = "reduceStore"
    val reduceStoreSupplied = Stores.persistentKeyValueStore(reduceStoreName)

    //by using this dummy topic we can go straight from a KSTream to kTable
    textLines.to("Dummy-ReduceInputTopic")
    val userProfiles : KTable[String, String] = builder.table("Dummy-ReduceInputTopic")


    val groupedTable = userProfiles.groupBy((user, region) => (region, user.length()))
    val reduced: KTable[String, Integer] =
      groupedTable.reduce(
        (aggValue : Int, newValue: Int) => aggValue + newValue,
        (aggValue: Int, oldValue: Int) => aggValue - oldValue)
      .mapValues(v => new Integer(v))

    val finalStream = reduced.toStream
    finalStream.to("ReduceOutputTopic")

    builder.build()
  }
}


Custom State Stores

You should not to create custom state stores really, but if you find you do want/need to do this, the starting place for this is by implementing the following interfaces

  • org.apache.kafka.streams.state.KeyValueBytesStoreSupplier which expects you to implement the following
    • /**
       * Return the name of this state store supplier.
       * This must be a valid Kafka topic name; valid characters are ASCII alphanumerics, '.', '_' and '-'.
       *
       * @return the name of this state store supplier
       */
      String name();
      
      /**
       * Return a new {@link StateStore} instance.
       *
       * @return a new {@link StateStore} instance of type T
       */
      T get();
      
      /**
       * Return a String that is used as the scope for metrics recorded by Metered stores.
       * @return metricsScope
       */
      String metricsScope();
      
  • org.apache.kafka.streams.state.KeyValueStore<K, V>
    • /**
       * Update the value associated with this key.
       *
       * @param key The key to associate the value to
       * @param value The value to update, it can be {@code null};
       *              if the serialized bytes are also {@code null} it is interpreted as deletes
       * @throws NullPointerException If {@code null} is used for key.
       */
      void put(K key, V value);
      
      /**
       * Update the value associated with this key, unless a value is already associated with the key.
       *
       * @param key The key to associate the value to
       * @param value The value to update, it can be {@code null};
       *              if the serialized bytes are also {@code null} it is interpreted as deletes
       * @return The old value or {@code null} if there is no such key.
       * @throws NullPointerException If {@code null} is used for key.
       */
      V putIfAbsent(K key, V value);
      
      /**
       * Update all the given key/value pairs.
       *
       * @param entries A list of entries to put into the store;
       *                if the serialized bytes are also {@code null} it is interpreted as deletes
       * @throws NullPointerException If {@code null} is used for key.
       */
      void putAll(List<KeyValue<K, V>> entries);
      
      /**
       * Delete the value from the store (if there is one).
       *
       * @param key The key
       * @return The old value or {@code null} if there is no such key.
       * @throws NullPointerException If {@code null} is used for key.
       */
      V delete(K key);
      
      /**
       * The name of this store.
       * @return the storage name
       */
      String name();
      
      /**
       * Initializes this state store.
       * <p>
       * The implementation of this function must register the root store in the context via the
       * {@link ProcessorContext#register(StateStore, StateRestoreCallback)} function, where the
       * first {@link StateStore} parameter should always be the passed-in {@code root} object, and
       * the second parameter should be an object of user's implementation
       * of the {@link StateRestoreCallback} interface used for restoring the state store from the changelog.
       * <p>
       * Note that if the state store engine itself supports bulk writes, users can implement another
       * interface {@link BatchingStateRestoreCallback} which extends {@link StateRestoreCallback} to
       * let users implement bulk-load restoration logic instead of restoring one record at a time.
       *
       * @throws IllegalStateException If store gets registered after initialized is already finished
       * @throws StreamsException if the store's change log does not contain the partition
       */
      void init(ProcessorContext context, StateStore root);
      
      /**
       * Flush any cached data
       */
      void flush();
      
      /**
       * Close the storage engine.
       * Note that this function needs to be idempotent since it may be called
       * several times on the same state store.
       * <p>
       * Users only need to implement this function but should NEVER need to call this api explicitly
       * as it will be called by the library automatically when necessary
       */
      void close();
      
      /**
       * Return if the storage is persistent or not.
       *
       * @return  {@code true} if the storage is persistent&mdash;{@code false} otherwise
       */
      boolean persistent();
      
      /**
       * Is this store open for reading and writing
       * @return {@code true} if the store is open
       */
      boolean isOpen();|
      
      /**
       * Get the value corresponding to this key.
       *
       * @param key The key to fetch
       * @return The value or null if no value is found.
       * @throws NullPointerException If null is used for key.
       * @throws InvalidStateStoreException if the store is not initialized
       */
      V get(K key);
      
      /**
       * Get an iterator over a given range of keys. This iterator must be closed after use.
       * The returned iterator must be safe from {@link java.util.ConcurrentModificationException}s
       * and must not return null values. No ordering guarantees are provided.
       * @param from The first key that could be in the range
       * @param to The last key that could be in the range
       * @return The iterator for this range.
       * @throws NullPointerException If null is used for from or to.
       * @throws InvalidStateStoreException if the store is not initialized
       */
      KeyValueIterator<K, V> range(K from, K to);
      
      /**
       * Return an iterator over all keys in this store. This iterator must be closed after use.
       * The returned iterator must be safe from {@link java.util.ConcurrentModificationException}s
       * and must not return null values. No ordering guarantees are provided.
       * @return An iterator of all key/value pairs in the store.
       * @throws InvalidStateStoreException if the store is not initialized
       */
      KeyValueIterator<K, V> all();
      
      /**
       * Return an approximate count of key-value mappings in this store.
       *
       * The count is not guaranteed to be exact in order to accommodate stores
       * where an exact count is expensive to calculate.
       *
       * @return an approximate count of key-value mappings in the store.
       * @throws InvalidStateStoreException if the store is not initialized
       */
      long approximateNumEntries();
      

 

The best place to look at how to implement these is to look at the existing classes, for example

  • RocksDbKeyValueBytesStoreSupplier
  • RocksDBStore

That’s it for now

So that is all I wanted to talk about this time. Next time we will continue our journey and look at joining operations