Kafka

KafkaStreams : Windowing

SO last time we look at interactive queries. This time (the last one in this series) we will look at Windowing operations. This is a fairly dry subject, and I don’t have too much to add to this one over the official docs, so this is a nice short one to end the series.

Where is the code?

The code for this post is all contained here

What is Windowing anyway?

Windowing lets you control how to group records that have the same key for stateful operations such as aggregations or joins into so-called windows. Windows are tracked per record key. For example, in join operations, a windowing state store is used to store all the records received so far within the defined window boundary. In aggregating operations, a windowing state store is used to store the latest aggregation results per window. Old records in the state store are purged after the specified window retention period. Kafka Streams guarantees to keep a window for at least this specified time; the default value is one day and can be changed via Materialized#withRetention().

This is what Kafka Streams supports for windowing

image

 

 

Tumbling Windows

Tumbling time windows are a special case of hopping time windows and, like the latter, are windows based on time intervals. They model fixed-size, non-overlapping, gap-less windows. A tumbling window is defined by a single property: the window’s size. A tumbling window is a hopping window whose window size is equal to its advance interval. Since tumbling windows never overlap, a data record will belong to one and only one window.

../../_images/streams-time-windows-tumbling.png

Here is an example of this using the simple word count example we have used before

package windowing

import java.time.Duration
import java.util
import java.util.Properties
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.kstream.TimeWindows
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala._
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.{KafkaStreams, Topology}
import utils.Settings


class TumblingWordCountTopology extends App {

  import Serdes._

  val props: Properties = Settings.createBasicStreamProperties(
    "tumbling-window-wordcount-application","localhost:9092")

  run()

  private def run(): Unit = {
    val topology = wordCountToplogy()
    val streams: KafkaStreams = new KafkaStreams(topology, props)
    streams.start()
    sys.ShutdownHookThread {
      streams.close(Duration.ofSeconds(10))
    }
  }

   def wordCountToplogy() : Topology = {

    import org.apache.kafka.streams.state.Stores
    val wordCountStoreName = "wordCountStore"
    val wordCountStoreSupplied = Stores.inMemoryKeyValueStore(wordCountStoreName)

    val builder = new StreamsBuilder()
    val textLines: KStream[String, String] = builder.stream[String, String]("TextLinesTopic")
    val wordCounts = textLines.flatMapValues(x => x.toLowerCase.split("\\W+"))
                    .groupBy((key, word) => word)
      .windowedBy(TimeWindows.of(Duration.ofSeconds(5)))
      .count()
    wordCounts.toStream.to("WordsWithCountsTopic")
    builder.build()
  }
}

 

Hopping Time Window

Hopping time windows are windows based on time intervals. They model fixed-sized, (possibly) overlapping windows. A hopping window is defined by two properties: the window’s size and its advance interval (aka “hop”). The advance interval specifies by how much a window moves forward relative to the previous one. For example, you can configure a hopping window with a size 5 minutes and an advance interval of 1 minute. Since hopping windows can overlap — and in general they do — a data record may belong to more than one such window.

image

and here is an example of this using the word count example

package windowing

import java.time.Duration
import java.util
import java.util.Properties
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.kstream.TimeWindows
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala._
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.{KafkaStreams, Topology}
import utils.Settings


class HoppingTimeWordCountTopology extends App {

  val props: Properties = Settings.createBasicStreamProperties(
    "hopping-time-window-wordcount-application","localhost:9092")

  run()

  private def run(): Unit = {
    val topology = wordCountToplogy()
    val streams: KafkaStreams = new KafkaStreams(topology, props)
    streams.start()
    sys.ShutdownHookThread {
      streams.close(Duration.ofSeconds(10))
    }
  }

   def wordCountToplogy() : Topology = {

    import org.apache.kafka.streams.state.Stores
    val wordCountStoreName = "wordCountStore"
    val wordCountStoreSupplied = Stores.inMemoryKeyValueStore(wordCountStoreName)

    val builder = new StreamsBuilder()
    val textLines: KStream[String, String] = builder.stream[String, String]("TextLinesTopic")
    val wordCounts = textLines.flatMapValues(x => x.toLowerCase.split("\\W+"))
                    .groupBy((key, word) => word)
      .windowedBy(TimeWindows.of(Duration.ofSeconds(5).plus(Duration.ofMinutes(1))))
      .count()
    wordCounts.toStream.to("WordsWithCountsTopic")
    builder.build()
  }
}

That’s It

And that brings us to the end of this series, I will be diving back into .NET land next to write an implementation of the SWIM algorithm. After that I think I will get myself a AWS Solution Architect exam under my belt

 

I hope you have enjoyed the series. I know this one was a bit naff, but I kind of ran out of steam

2 thoughts on “KafkaStreams : Windowing

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s