Kafka

KafkaStreams : Aggregating

So last time we looked at a whole bunch of stateless operations. This time we will continue our journey to look at how we can start to do aggregations, and make use of state stores.

Where is the code?

The code for this post is all contained here

And the tests are all contained here

Introduction to Stateful operations

Stateful operations (unlike stateless operations) require a state store for processing purposes. We briefly touched on state stores last time, but today I wanted to drill in a bit more into this.

 

Kafka comes with some inbuilt state stores

  • PersistentKeyValueStore<K, V>
    • Storage Engine : Rocks DB
    • Fault Tolerant : Yes by default
      • The recommended store type for most use cases.
      • Stores its data on local disk.
      • Storage capacity: managed local state can be larger than the memory (heap space) of an application instance, but must fit into the available local disk space.
      • RocksDB settings can be fine-tuned, see RocksDB configuration.
      • Available store variants: time window key-value store, session window key-value store.
  • InMemoryKeyValueStore<K,V>
    • Storage Engine: None
    • Fault Tolerant : Yes by default
      • Stores its data in memory.
      • Storage capacity: managed local state must fit into memory (heap space) of an application instance.
      • Useful when application instances run in an environment where local disk space is either not available or local disk space is wiped in-between app instance restarts.

 

So how do we create these stores? Well we will be seeing some examples of these, but it is as easy as this using the high level DSL (which is what we have been using until now)

//The "persistentKeyValueStore" is one of the pre-canned state store types
//and as such logging is enabled, so the ChangeLog topic to persist state
import org.apache.kafka.streams.state.Stores
val wordCountStoreName = "wordCountStore"
val wordCountStoreSupplied = Stores.persistentKeyValueStore(wordCountStoreName)

Now some of you may have noticed the bullet point above where it mentions “Fault Tolerant”. How exactly is that done? Well the inbuilt stores actually write to a specialized topic behind the scenes. This topic is sometimes called the changelog topic, and can be used to restore state. By default, persistent key-value stores are fault-tolerant. They are backed by a compacted changelog topic. The purpose of compacting this topic is to prevent the topic from growing indefinitely, to reduce the storage consumed in the associated Kafka cluster, and to minimize recovery time if a state store needs to be restored from its changelog topic.

 

Similarly, persistent window stores are fault-tolerant. They are backed by a topic that uses both compaction and deletion. Because of the structure of the message keys that are being sent to the changelog topics, this combination of deletion and compaction is required for the changelog topics of window stores. For window stores, the message keys are composite keys that include the “normal” key and window timestamps. For these types of composite keys it would not be sufficient to only enable compaction to prevent a changelog topic from growing out of bounds. With deletion enabled, old windows that have expired will be cleaned up by Kafka’s log cleaner as the log segments expire. The default retention setting is Materialized#withRetention() + 1 day. You can override this setting by specifying StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG in the StreamsConfig.

 

So how can we turn this feature on/off?

You can enable or disable fault tolerance for a state store by enabling or disabling the change logging of the store through withLoggingEnabled() and withLoggingDisabled(). You can also fine-tune the associated topic’s configuration if needed.

Example for enabled fault-tolerance, and passing through some other useful values

  val logConfig = new util.HashMap[String, String]
  logConfig.put("retention.ms", "172800000")
  logConfig.put("retention.bytes", "10000000000")
  logConfig.put("cleanup.policy", "compact,delete")
  val wordCountStoreSupplier = Stores.inMemoryKeyValueStore(wordCountStoreName)
  val wordCountStoreBuilder = Stores.keyValueStoreBuilder(wordCountStoreSupplier, Serdes.String, Serdes.Long)
    .withLoggingEnabled(logConfig)
    .withCachingEnabled()

As I mentioned in the previous post there are a number of config values that will effect how your stateless apps emit values down stream, just to remind ourselves of those

  • COMMIT_INTERVAL_MS_CONFIG : the number of milliseconds before a commit occurs. When a commit occur state store values are emitted downstream
  • CACHE_MAX_BYTES_BUFFERING_CONFIG : The max number of bytes to cache before state store values are emitted downstream

So you may find you need to fiddle with those to get what you want in terms of max cache size, commit interval

 

Aggregating

After records are grouped by key via groupByKey or groupBy – they become either a KGroupedStream or a KGroupedTable,  they can be aggregated. Aggregations are key-based operations, which means that they always operate over records where the key is the same. You can perform aggregations on windowed or non-windowed data.

Aggregate

Rolling aggregation. Aggregates the values of (non-windowed) records by the grouped key. Aggregating is a generalization of reduce and allows, for example, the aggregate value to have a different type than the input values. (KGroupedStream details, KGroupedTable details)

When aggregating a grouped stream, you must provide an initializer (e.g., aggValue = 0) and an “adder” aggregator (e.g., aggValue + curValue). When aggregating a grouped table, you must provide a “subtractor” aggregator (think: aggValue – oldValue).

Detailed behavior of KGroupedStream:

  • Input records with null keys are ignored.
  • When a record key is received for the first time, the initializer is called (and called before the adder).
    Whenever a record with a non-null value is received, the adder is called.

Detailed behavior of KGroupedTable:

  • Input records with null keys are ignored.
  • When a record key is received for the first time, the initializer is called (and called before the adder and subtractor). Note that, in contrast to KGroupedStream, over time the initializer may be called more than once for a key as a result of having received input tombstone records for that key
  • When the first non-null value is received for a key (e.g., INSERT), then only the adder is called.
  • When subsequent non-null values are received for a key (e.g., UPDATE), then the subtractor is called with the old value as stored in the table and the adder is called with the new value of the input record that was just received. The order of execution for the subtractor and adder is not defined
  • When a tombstone record – i.e. a record with a null value – is received for a key (e.g., DELETE), then only the subtractor is called. Note that, whenever the subtractor returns a null value itself, then the corresponding key is removed from the resulting KTable. If that happens, any next input record for that key will trigger the initializer again.
package stateful.transformations.aggregating

import java.time.Duration
import java.util.Properties

import common.PropsHelper
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala._
import org.apache.kafka.streams.scala.kstream.{Materialized, _}
import org.apache.kafka.streams.{KafkaStreams, Topology}


/**
  * This example simply maps values from 'InputTopic' to 'OutputTopic'
  * with no changes
  */
class AggregateTopology extends App {

  import Serdes._

  val props: Properties = PropsHelper.createBasicStreamProperties(
    "stateless-aggregate-application", "localhost:9092")

  run()

  private def run(): Unit = {
    val topology = createTopolgy()
    val streams: KafkaStreams = new KafkaStreams(topology, props)
    streams.start()
    sys.ShutdownHookThread {
      streams.close(Duration.ofSeconds(10))
    }
  }

  def createTopolgy(): Topology = {

    val builder: StreamsBuilder = new StreamsBuilder
    val textLines: KStream[Int, Int] =
      builder.stream[Int, Int]("AggregateKeyInputTopic")

    implicit val matererlized: Materialized[Int, Long, ByteArrayKeyValueStore]
      = Materialized.as("aggregated-stream-store")

    //Rolling aggregation. Aggregates the values of (non-windowed) records by the grouped key.
    //Aggregating is a generalization of reduce and allows, for example, the aggregate value to
    //have a different type than the input values. (KGroupedStream details, KGroupedTable details)
    //
    //When aggregating a grouped stream, you must provide an initializer (e.g., aggValue = 0)
    //and an “adder” aggregator (e.g., aggValue + curValue). When aggregating a grouped table,
    //you must provide a “subtractor” aggregator (think: aggValue - oldValue).
    val groupedBy = textLines.groupByKey
    val aggregatedTable =
      groupedBy
        .aggregate[Long](0L)((aggKey, newValue, aggValue) => aggValue + newValue)(matererlized)
    aggregatedTable
        .toStream
          .peek((k,v) =>
          {
            val theKey = k
            val theValue =v
          })
        .to("aggregateOutputTopic")
    builder.build()
  }
}


Count

Rolling aggregation. Counts the number of records by the grouped key.

Detailed behavior for KGroupedStream:

Input records with null keys or values are ignored.
Detailed behavior for KGroupedTable:

Input records with null keys are ignored. Records with null values are not ignored but interpreted as “tombstones” for the corresponding key, which indicate the deletion of the key from the table.

package stateful.transformations.aggregating

import java.time.Duration
import java.util.Properties

import common.PropsHelper
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala._
import org.apache.kafka.streams.scala.kstream.{Materialized, _}
import org.apache.kafka.streams.{KafkaStreams, Topology}


/**
  * This example simply maps values from 'InputTopic' to 'OutputTopic'
  * with no changes
  */
class CountTopology extends App {

  import Serdes._

  val props: Properties = PropsHelper.createBasicStreamProperties(
    "stateless-count-application", "localhost:9092")

  run()

  private def run(): Unit = {
    val topology = createTopolgy()
    val streams: KafkaStreams = new KafkaStreams(topology, props)
    streams.start()
    sys.ShutdownHookThread {
      streams.close(Duration.ofSeconds(10))
    }
  }

  def createTopolgy(): Topology = {

    val builder: StreamsBuilder = new StreamsBuilder
    val textLines: KStream[String, String] =
      builder.stream[String, String]("CountInputTopic")


    //lets create a named wordCountStore state store
    //The "persistentKeyValueStore" is one of the pre-canned state store types
    //and as such logging is enabled, so the ChangeLog topic to persist state
    import org.apache.kafka.streams.state.Stores
    val wordCountStoreName = "wordCountStore"
    val wordCountStoreSupplied = Stores.persistentKeyValueStore(wordCountStoreName)

    val wordCounts = textLines
      .flatMapValues(x => x.toLowerCase.split("\\W+"))
      .groupBy((key, word) => word)
      .count()(Materialized.as(wordCountStoreSupplied))
    wordCounts
      .toStream
      .peek((k,v) =>
      {
        val theKey = k
        val theValue =v
      })
      .to("WordsWithCountsOutputTopic")

    builder.build()
  }
}


Reduce

Combines the values of (non-windowed) records by the grouped key. The current record value is combined with the last reduced value, and a new reduced value is returned. The result value type cannot be changed, unlike aggregate.

When reducing a grouped stream, you must provide an “adder” reducer (e.g., aggValue + curValue). When reducing a grouped table, you must additionally provide a “subtractor” reducer (e.g., aggValue – oldValue).

Detailed behavior for KGroupedStream:

  • Input records with null keys are ignored in general.
  • When a record key is received for the first time, then the value of that record is used as the initial aggregate value.
  • Whenever a record with a non-null value is received, the adder is called.

Detailed behavior for KGroupedTable:

  • Input records with null keys are ignored in general.
  • When a record key is received for the first time, then the value of that record is used as the initial aggregate value. Note that, in contrast to KGroupedStream, over time this initialization step may happen more than once for a key as a result of having received input tombstone records for that key.
  • When the first non-null value is received for a key (e.g., INSERT), then only the adder is called.
  • When subsequent non-null values are received for a key (e.g., UPDATE), then the subtractor is called with the old value as stored in the table and the adder is called with the new value of the input record that was just received. The order of execution for the subtractor and adder is not defined.
  • When a tombstone record – i.e. a record with a null value – is received for a key (e.g., DELETE), then only the subtractor is called. Note that, whenever the subtractor returns a null value itself, then the corresponding key is removed from the resulting KTable. If that happens, any next input record for that key will re-initialize its aggregate value.

 

In this example we build a KTable from a KStream directly by using a little trick of going through an intermediate topic

package stateful.transformations.aggregating

import java.time.Duration
import java.util.Properties

import common.PropsHelper
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala._
import org.apache.kafka.streams.scala.kstream.{Materialized, _}
import org.apache.kafka.streams.{KafkaStreams,  Topology}


/**
  * This example simply maps values from 'InputTopic' to 'OutputTopic'
  * with no changes
  */
class ReduceTopology extends App {

  import Serdes._

  val props: Properties = PropsHelper.createBasicStreamProperties(
    "stateless-reduce-application", "localhost:9092")

  run()

  private def run(): Unit = {
    val topology = createTopolgy()
    val streams: KafkaStreams = new KafkaStreams(topology, props)
    streams.start()
    sys.ShutdownHookThread {
      streams.close(Duration.ofSeconds(10))
    }
  }

  def createTopolgy(): Topology = {

    val builder: StreamsBuilder = new StreamsBuilder
    val textLines: KStream[String, String] =
      builder.stream[String, String]("ReduceInputTopic")


    //lets create a named reduceStore state store
    //The "persistentKeyValueStore" is one of the pre-canned state store types
    //and as such logging is enabled, so the ChangeLog topic to persist state
    import org.apache.kafka.streams.state.Stores
    val reduceStoreName = "reduceStore"
    val reduceStoreSupplied = Stores.persistentKeyValueStore(reduceStoreName)

    //by using this dummy topic we can go straight from a KSTream to kTable
    textLines.to("Dummy-ReduceInputTopic")
    val userProfiles : KTable[String, String] = builder.table("Dummy-ReduceInputTopic")


    val groupedTable = userProfiles.groupBy((user, region) => (region, user.length()))
    val reduced: KTable[String, Integer] =
      groupedTable.reduce(
        (aggValue : Int, newValue: Int) => aggValue + newValue,
        (aggValue: Int, oldValue: Int) => aggValue - oldValue)
      .mapValues(v => new Integer(v))

    val finalStream = reduced.toStream
    finalStream.to("ReduceOutputTopic")

    builder.build()
  }
}


Custom State Stores

You should not to create custom state stores really, but if you find you do want/need to do this, the starting place for this is by implementing the following interfaces

  • org.apache.kafka.streams.state.KeyValueBytesStoreSupplier which expects you to implement the following
    • /**
       * Return the name of this state store supplier.
       * This must be a valid Kafka topic name; valid characters are ASCII alphanumerics, '.', '_' and '-'.
       *
       * @return the name of this state store supplier
       */
      String name();
      
      /**
       * Return a new {@link StateStore} instance.
       *
       * @return a new {@link StateStore} instance of type T
       */
      T get();
      
      /**
       * Return a String that is used as the scope for metrics recorded by Metered stores.
       * @return metricsScope
       */
      String metricsScope();
      
  • org.apache.kafka.streams.state.KeyValueStore<K, V>
    • /**
       * Update the value associated with this key.
       *
       * @param key The key to associate the value to
       * @param value The value to update, it can be {@code null};
       *              if the serialized bytes are also {@code null} it is interpreted as deletes
       * @throws NullPointerException If {@code null} is used for key.
       */
      void put(K key, V value);
      
      /**
       * Update the value associated with this key, unless a value is already associated with the key.
       *
       * @param key The key to associate the value to
       * @param value The value to update, it can be {@code null};
       *              if the serialized bytes are also {@code null} it is interpreted as deletes
       * @return The old value or {@code null} if there is no such key.
       * @throws NullPointerException If {@code null} is used for key.
       */
      V putIfAbsent(K key, V value);
      
      /**
       * Update all the given key/value pairs.
       *
       * @param entries A list of entries to put into the store;
       *                if the serialized bytes are also {@code null} it is interpreted as deletes
       * @throws NullPointerException If {@code null} is used for key.
       */
      void putAll(List<KeyValue<K, V>> entries);
      
      /**
       * Delete the value from the store (if there is one).
       *
       * @param key The key
       * @return The old value or {@code null} if there is no such key.
       * @throws NullPointerException If {@code null} is used for key.
       */
      V delete(K key);
      
      /**
       * The name of this store.
       * @return the storage name
       */
      String name();
      
      /**
       * Initializes this state store.
       * <p>
       * The implementation of this function must register the root store in the context via the
       * {@link ProcessorContext#register(StateStore, StateRestoreCallback)} function, where the
       * first {@link StateStore} parameter should always be the passed-in {@code root} object, and
       * the second parameter should be an object of user's implementation
       * of the {@link StateRestoreCallback} interface used for restoring the state store from the changelog.
       * <p>
       * Note that if the state store engine itself supports bulk writes, users can implement another
       * interface {@link BatchingStateRestoreCallback} which extends {@link StateRestoreCallback} to
       * let users implement bulk-load restoration logic instead of restoring one record at a time.
       *
       * @throws IllegalStateException If store gets registered after initialized is already finished
       * @throws StreamsException if the store's change log does not contain the partition
       */
      void init(ProcessorContext context, StateStore root);
      
      /**
       * Flush any cached data
       */
      void flush();
      
      /**
       * Close the storage engine.
       * Note that this function needs to be idempotent since it may be called
       * several times on the same state store.
       * <p>
       * Users only need to implement this function but should NEVER need to call this api explicitly
       * as it will be called by the library automatically when necessary
       */
      void close();
      
      /**
       * Return if the storage is persistent or not.
       *
       * @return  {@code true} if the storage is persistent&mdash;{@code false} otherwise
       */
      boolean persistent();
      
      /**
       * Is this store open for reading and writing
       * @return {@code true} if the store is open
       */
      boolean isOpen();|
      
      /**
       * Get the value corresponding to this key.
       *
       * @param key The key to fetch
       * @return The value or null if no value is found.
       * @throws NullPointerException If null is used for key.
       * @throws InvalidStateStoreException if the store is not initialized
       */
      V get(K key);
      
      /**
       * Get an iterator over a given range of keys. This iterator must be closed after use.
       * The returned iterator must be safe from {@link java.util.ConcurrentModificationException}s
       * and must not return null values. No ordering guarantees are provided.
       * @param from The first key that could be in the range
       * @param to The last key that could be in the range
       * @return The iterator for this range.
       * @throws NullPointerException If null is used for from or to.
       * @throws InvalidStateStoreException if the store is not initialized
       */
      KeyValueIterator<K, V> range(K from, K to);
      
      /**
       * Return an iterator over all keys in this store. This iterator must be closed after use.
       * The returned iterator must be safe from {@link java.util.ConcurrentModificationException}s
       * and must not return null values. No ordering guarantees are provided.
       * @return An iterator of all key/value pairs in the store.
       * @throws InvalidStateStoreException if the store is not initialized
       */
      KeyValueIterator<K, V> all();
      
      /**
       * Return an approximate count of key-value mappings in this store.
       *
       * The count is not guaranteed to be exact in order to accommodate stores
       * where an exact count is expensive to calculate.
       *
       * @return an approximate count of key-value mappings in the store.
       * @throws InvalidStateStoreException if the store is not initialized
       */
      long approximateNumEntries();
      

 

The best place to look at how to implement these is to look at the existing classes, for example

  • RocksDbKeyValueBytesStoreSupplier
  • RocksDBStore

That’s it for now

So that is all I wanted to talk about this time. Next time we will continue our journey and look at joining operations

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s