Kafka

KafkaStreams : Stateless operations

Ok so our journey now continues with Kafka Streams. Last time we introduced a few key concepts, such as

  • Props
  • Topologies
  • How to test streams apps

This time we will continue to look at stateless operations. If you have ever using C# LINQ operators this post will probably be quite familiar looking, or at least the concepts will. The main difference is that we are using these operators on  streams of data coming from Kafka topics.

 

We will be using the same approach from the 1st article, where we will build a topology and then proceed to test it using the previously introduced TopologyTestDriver / OutputVerifier test tools from Confluent. As before the code is written in Scala.

 

Where is the code?

The code for this post is all contained here

And the tests are here

 

I will not be bothering to walk through every test, but you will find a set of tests for each new Topology that is introduced in this post. As this post is mainly around stateless transforms, I will not be spending too much time on talking about state, it is however not completely avoidable as some of the operations do require us to create state to demonstrate the stateless side of it

 

Branch

Allows us to apply one (or more) predicates to an initial KStream, to create one or more KStream objects for the matched predicates. If no match is found the input record is dropped.

def createTopolgy(): Topology = {

  val builder: StreamsBuilder = new StreamsBuilder
  val textLines: KStream[String, String] =
    builder.stream[String, String]("InputTopic")

  val predicates : List[(String, String) => Boolean] = List(
    (k,v) => v.startsWith("Odd"),
    (k,v) => v.startsWith("Even")
  )

  val branches : Array[KStream[String, String]] = textLines.branch(predicates:_*)
  branches(0).to("OddTopic")
  branches(1).to("EvenTopic")
  builder.build()
}

So for this example we could potentially end up with values written to the 2 output topics OddTopic/EvenTopic.

Filter/Inverse Filter

Filter keeps the records for which the predicate is satisfied, while filterNot (inverse filter) drops the records for which the predicate returns true.

def createTopolgy(): Topology = {

  val builder: StreamsBuilder = new StreamsBuilder
  val textLines: KStream[String, Long] =
    builder.stream[String, Long]("InputTopic")

  //Evaluates a boolean function for each element and retains those
  //for which the function returns true
  val above5 = textLines.filter((k,v) => v > 5)
  above5.to("Above5OutputTopic")

  //Evaluates a boolean function for each element and drops those
  //for which the function returns true.
  val belowOrEqualTo5 = textLines.filterNot((k,v) => v > 5)
  belowOrEqualTo5.to("BelowOrEqualTo5OutputTopic")

  builder.build()
}

FlatMap / FlatMap (Values only)

FlatMap allows you to produce 0-* items from an original KeyValue record tuple. You can create a new Key or Value or both, and alter the types of both. As this operation is capable of changing the key, it will cause re-partitioning to keep the data in the correct partition. So you should be wary of using this operation, and prefer flatMapValues

 

FlatMapValues allows you to produce 0-* items from an original KeyValue record tuple where you can create a new Value and alter its type, but maintain the original key. As the original key is maintained, this operation does not cause a re-partition to occur

def createTopolgy(): Topology = {

  val builder: StreamsBuilder = new StreamsBuilder
  val textLines: KStream[Int, Int] =
    builder.stream[Int, Int]("InputTopic")

  //Takes one record and produces zero, one, or more records.
  //You can modify the record keys and values, including their types
  val flatMapped = textLines.flatMap((k,v) => {
    List(
      (k + 1, v + 2),
      (k + 3, v + 4)
    )
  })
  flatMapped.to("flatMappedOutputTopic")


  //Takes one record and produces zero, one, or more records, while retaining the key of the original record.
  //You can modify the record values and the value type.
  //flatMapValues is preferable to flatMap because it will not cause data re-partitioning.
  //However, you cannot modify the key or key type like flatMap does
  val flatMappedValues = textLines.flatMapValues((k,v) => {
    List(
      v + 10,
      v + 20
    )
  })
  flatMappedValues.to("flatMappedValuesOutputTopic")

  builder.build()
}

Foreach

Foreach allows us to run some side effect on the input records. This is however a terminal state. These “side effects” cant be tracked by Kafka, and as this is a terminal state, nothing else is possible after a foreEach has been performed. For this one I am showing the entire app, as its easier to read the flow. But essentially what happens is that I am using forEach to write the input records to a file which the tests will read after running the topology

package stateless.transformations

import java.io.{File, PrintWriter}
import java.time.Duration
import java.util.Properties

import common.PropsHelper
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala._
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.{KafkaStreams, Topology}


/**
  * This example simply maps values from 'InputTopic' to 'OutputTopic'
  * with no changes
  */
class ForEachTopology(val pw: PrintWriter) extends App {

  import Serdes._

  val props: Properties = PropsHelper.createBasicStreamProperties(
    "stateless-foreach-application", "localhost:9092")

  run()

  def stop() : Unit = {
    pw.close()
  }

  private def run(): Unit = {


    val topology = createTopolgy()
    val streams: KafkaStreams = new KafkaStreams(topology, props)
    streams.start()
    sys.ShutdownHookThread {
      streams.close(Duration.ofSeconds(10))
    }
  }

  def createTopolgy(): Topology = {

    val builder: StreamsBuilder = new StreamsBuilder
    val textLines: KStream[Int, Int] =
      builder.stream[Int, Int]("InputTopic")

    //Terminal operation. Performs a stateless action on each record.

    //You would use foreach to cause side effects based on the input data (similar to peek)
    //and then stop further processing of the input data (unlike peek, which is not a terminal operation).
    //Note on processing guarantees: Any side effects of an action (such as writing to
    //external systems) are not trackable by Kafka, which means they will typically not
    //benefit from Kafka’s processing guarantees.
    val flatMapped = textLines.foreach((k,v)=> {
      pw.write(s"Saw input value line '$v'\r\n")
      pw.flush()
    })

    builder.build()
  }
}


Map/Map Values Only

Map works much the same as flatMap above apart from the fact that it emits a single new KeyValue tuple. As before this causes re-partitoning, so you should prefer mapValues instead.

MapValues works much the same as flatMapValues above apart from the fact that it emits a single new KeyValue tuple, where the original Key is maintained.

def createTopolgy(): Topology = {

  val builder: StreamsBuilder = new StreamsBuilder
  val textLines: KStream[Int, Int] =
    builder.stream[Int, Int]("InputTopic")

  //Takes one record and produces one record. You can modify the record key and value, including their types.
  //Marks the stream for data re-partitioning: Applying a grouping or a join after map will result in re-partitioning
  //of the records. If possible use mapValues instead, which will not cause data re-partitioning.
  val mapped = textLines.map((k,v) => {
      (k + 1, v + 2)
  })
  mapped.to("mappedOutputTopic")


  //Takes one record and produces one record, while retaining the key of the original record.
  //You can modify the record value and the value type. (KStream details, KTable details)
  //mapValues is preferable to map because it will not cause data re-partitioning. However, it
  //does not allow you to modify the key or key type like map does.
  val mappedValues = textLines.mapValues((k,v) => {
      v + 10
  })
  mappedValues.to("mappedValuesOutputTopic")

  builder.build()
}

 

Peek

Peek is an extremely useful operation that allows you to wire tap the stream, and cause side effects, whilst maintaining the stream values. It is like a better forEach that still emits values into the output stream processing. As with the forEach I have decided to show you the whole application here, it’s the same idea we write to a text file in the application code, and the tests can do assertions on the values in the text file at the end of the application run

package stateless.transformations

import java.io.PrintWriter
import java.time.Duration
import java.util.Properties

import common.PropsHelper
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala._
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.{KafkaStreams, Topology}


/**
  * This example simply maps values from 'InputTopic' to 'OutputTopic'
  * with no changes
  */
class PeekTopology(val pw: PrintWriter) extends App {

  import Serdes._

  val props: Properties = PropsHelper.createBasicStreamProperties(
    "stateless-peek-application", "localhost:9092")

  run()

  def stop() : Unit = {
    pw.close()
  }

  private def run(): Unit = {


    val topology = createTopolgy()
    val streams: KafkaStreams = new KafkaStreams(topology, props)
    streams.start()
    sys.ShutdownHookThread {
      streams.close(Duration.ofSeconds(10))
    }
  }

  def createTopolgy(): Topology = {

    val builder: StreamsBuilder = new StreamsBuilder
    val textLines: KStream[Int, Int] =
      builder.stream[Int, Int]("InputTopic")

    // Performs a stateless action on each record, and returns an unchanged stream.
    //
    // You would use peek to cause side effects based on the input data (similar to foreach)
    // and continue processing the input data (unlike foreach, which is a terminal operation).
    // Peek returns the input stream as-is; if you need to modify the input stream, use map or mapValues instead.
    // peek is helpful for use cases such as logging or tracking metrics or for debugging and troubleshooting.
    //
    // Note on processing guarantees: Any side effects of an action (such as writing to external systems)
    // are not trackable by Kafka, which means they will typically not benefit from Kafka’s processing guarantees.
    val peeked = textLines.peek((k,v)=> {
      pw.write(s"Saw input value line '$v'\r\n")
      pw.flush()
    })
    peeked.to("peekedOutputTopic")


    builder.build()
  }
}


SelectKey

Allows you to create a new key and possibly new type for the Key to the input records. As this changed the Key it will cause re-partitioning

def createTopolgy(): Topology = {

  val builder: StreamsBuilder = new StreamsBuilder
  val textLines: KStream[Int, Int] =
    builder.stream[Int, Int]("InputTopic")

  // Assigns a new key - possibly of a new key type - to each record.
  // Calling selectKey(mapper) is the same as calling map((key, value) -> mapper(key, value), value).
  // Marks the stream for data re-partitioning: Applying a grouping or a join after selectKey will
  // result in re-partitioning of the records.
  val mapped = textLines.selectKey((k,v) => {
      k.toString
  })
  mapped.to("selectKeyOutputTopic")

  builder.build()
}

 

GroupBy/GroupByKey

Before we get into this one, I just wanted to talk a little bit about stores and how they effect/are effected by some of the possible settings you could apply.

Stores

We will cover this in more detail in the next post, but for now its worth mentioning, that state stores as seen in these GroupBy/GroupByKey .Count() examples, which will create a KTable, will by default start out with Logging enabled, which means that an a special changelog Kafka topic will be used to store the state of the state stores.

In the next post we will see how we can setup our own state stores, and how we can provide more fine grained details over their creation such as

  • retension.ms
  • retention.bytes
  • cleanup.policy

But for now its just good to be aware that when you do find yourself performing some operation that takes you from a KStream –> KTable, the Kafka default is to use changelog logging, which persists your state store values, which  should protect you, should you Kafka Streams application need to be restarted. It should pick up again from its last committed offset using the previously saved changelog values.

As I say we will be seeing this in much more detail in the next posts.

Ok lets see an example

GroupBy

Groups the records by the existing key. To fully illustrate this, we need to perform some sort of aggregation, which takes us into Stateful operations. We end up with a KTable. But lets see the example which should help us in the next posts

def createTopolgy(): Topology = {

    val builder: StreamsBuilder = new StreamsBuilder
    val groupByKeyTextLines: KStream[Int, String] =
      builder.stream[Int, String]("GroupByKeyInputTopic")

	//NOTE : You may find you need to play with these Config values in order
    //to get the stateful operation to work correctly/how you want it to
    //    props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10000.asInstanceOf[Object])
    //    props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10485760.asInstanceOf[Object])
    //    props.put(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG, 50000.asInstanceOf[Object])
    //By playing around with these values you should be able to find the values that work for you
    groupByKeyTextLines.groupByKey
        .count()
        .mapValues(x => x.toString)
        .toStream
        .to("groupedByKeyOutputTopic")


    builder.build()
  }

It can be seen that we start with a KSTream[int, String] and then when we use count() we would end up with a KTable[Int,Long]. This will use a state store behind the scenes. I think convert that into a KTable[Int, String] using mapValues(…) before using toStream to turn it back into a KStream[Int, String] and write it to the output topic.

Causes data re-partitioning if and only if the stream was marked for re-partitioning. groupByKey is preferable to groupBy because it re-partitions data only if the stream was already marked for
re-partitioning. However, groupByKey does not allow you to modify the key or key type like groupBy does.

 

As you can see in the comments above in the code. There are a number of values that effect how the state store values are emitted to downstream topologies

  • COMMIT_INTERVAL_MS_CONFIG : the number of milliseconds before a commit occurs. When a commit occur state store values are emitted downstream
  • CACHE_MAX_BYTES_BUFFERING_CONFIG : The max number of bytes to cache before state store values are emitted downstream

So for a given input record of (1, “one”)..(1,”one”)..(2, “two”) we may see something like (1, “2”)…(“two”,”1”) in the output topic (depending on the settings I talked about above)

GroupByKey

Groups the records by a new key, which may be of a different key type. When grouping a table, you may also specify a new value and value type. Always causes data re-partitioning: groupBy always causes data re-partitioning. If possible use groupByKey instead, which will re-partition data only if required.

def createTopolgy(): Topology = {

  val builder: StreamsBuilder = new StreamsBuilder
  val groupByTextLines: KStream[Int, String] =
    builder.stream[Int, String]("GroupByInputTopic")


  //Groups the records by a new key, which may be of a different key type.
  // 
  //NOTE : You may find you need to play with these Config values in order
  //to get the stateful operation to work correctly/how you want it to
  //    props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10000.asInstanceOf[Object])
  //    props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10485760.asInstanceOf[Object])
  //    props.put(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG, 50000.asInstanceOf[Object])
  //By playing around with these values you should be able to find the values that work for you
  groupByTextLines.flatMapValues(x => x.toLowerCase.split("\\W+"))
    .groupBy((key, word) => word)
    .count()
    .mapValues(x => x.toString)
    .toStream
    .to("groupedByOutputTopic")

  builder.build()
}

Much the same as above, however this time we take an input string, split it on words, use each word as the key, and then count the words.

So for a given input record of (1, “one two three one”) we may see something like (“one”, “2”)…(“two”,”1”)..(“three”, “1”) in the output topic (depending on the settings I talked about above)

Custom Partitioner

There may be times where you do not have a key, and you MUST come up with a way to partition your data on the fly. KafkaStreams allows this by using an intermediate topic, and a custom partitioner which you may write. In this trivial example I will always return partition 1, but you can imagine your own code here

I will show the complete example of how to do this, as I think it’s a lot clearer to see what’s going on. The full code is shown below. The important parts are

  • Using through with Produces where we pass in the custom StreamPartitioner
  • Custom StreamPartitioner class that decides what partition (1 in my case) to use
package stateless.transformations

import java.time.Duration
import java.util.Properties
import common.PropsHelper
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.processor.StreamPartitioner
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala._
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.{KafkaStreams, Topology}


class ThroughCustomPartitionerTopology() extends App {

  import Serdes._

  val props: Properties = PropsHelper.createBasicStreamProperties(
    "stateless-custompartitioner-application", "localhost:9092")

  run()

  private def run(): Unit = {


    val topology = createTopolgy()
    val streams: KafkaStreams = new KafkaStreams(topology, props)
    streams.start()
    sys.ShutdownHookThread {
      streams.close(Duration.ofSeconds(10))
    }
  }

  def createTopolgy(): Topology = {

    implicit val keySerde =  Serdes.Integer
    implicit val valueSerde =  Serdes.String

    val builder: StreamsBuilder = new StreamsBuilder
    implicit val consumed = kstream.Consumed.`with`(keySerde, valueSerde)

    val textLines: KStream[java.lang.Integer, String] =
      builder.stream[java.lang.Integer, String]("InputTopic")

    //should use implicit Produced with the through
    val repartitoned = textLines.through("InputTopic-RePartitioned")
      (Produced.`with`(new AlwaysOneCustomPartitioner()))

    val finalStream = repartitoned
      .mapValues((x) => s"${x} Through")

    finalStream.to("OutputTopic")
    builder.build()
  }
}

//Custom StreamPartitioner that ALWAYS just returns 1 for Partition
class AlwaysOneCustomPartitioner extends StreamPartitioner[java.lang.Integer, String] {
  override def partition(topic: String, key: java.lang.Integer, value: String, numPartitions: Int): Integer = {
    //just default it to 1
    1
  }
}

That’s It

And that is all I have for this one. In the next post we will be looking at stateful operations, namely Aggregating.