Kafka

KafkaStreams : Joining

Last time we look at Aggregating. This time we will continue to look at the streams DSL, and will look at joining. If you have ever done any standard SQL, this post will be very familiar

 

Where is the code?

The code for this post is all contained here

And the tests are all contained here

 

Joins

Types of join

KafkaStreams supports the following type of Joins, most of which we will look at using the KStream – KStream as our demo code, as its just easier to code tests up for

image

 

  • KStream-KStream joins are always windowed joins, because otherwise the size of the internal state store used to perform the join – e.g., a sliding window or “buffer” – would grow indefinite
  • KTable-KTable joins are always non-windowed joins. They are designed to be consistent with their counterparts in relational databases. The changelog streams of both KTables are materialized into local state stores to represent the latest snapshot of their table duals. The join result is a new KTable that represents the changelog stream of the join operation.
  • KStream-KTable joins are always non-windowed joins. They allow you to perform table lookups against a KTable (changelog stream) upon receiving a new record from the KStream (record stream).
  • KStream-GlobalKTable joins are always non-windowed joins. They allow you to perform table lookups against a GlobalKTable (entire changelog stream) upon receiving a new record from the KStream (record stream)

 

Join co-partitioning requirements

Before we look at the types of join, and the syntax for them, Kafka Streams does have the following requirements that need to be met in order to carry out the various Join operators

Input data must be co-partitioned when joining. This ensures that input records that have the same key on both sides of the join, are delivered to the same stream task during processing. It is the responsibility of the user to ensure data co-partitioning before applying join operators

The requirements for data co-partition

ing are:

  • The input topics on both sides of the join must have the same number of partitions.
    All applications that write to the input topics must have the same partitioning strategy. The keyspace of the input data must be distributed across partitions in the same manner. The partitioner may be set at various places, for example on the Producer API it can be set via the ProducerConfig.PARTITIONER_CLASS_CONFIG config value, and for the streams API can be used in #to or #through. The good news is that, if you happen to use the default partitioner-related settings across all applications, you do not need to worry about the partitioning strategy.

Inner Join

As with lots of other types of tools an inner join, will only join on inputs where there exists a key that is the same.

It should be noted that as I have chosen to use KStream-KStream joins for this demo code the following rules hold. However these rules may vary depending on if your inputs are KStream/KTable

 

  • The join is key-based, i.e. with the join predicate leftRecord.key == rightRecord.key, and window-based, i.e. two input records are joined if and only if their timestamps are “close” to each other as defined by the user-supplied JoinWindows, i.e. the window defines an additional join predicate over the record timestamps.
  • When it is triggered, the user-supplied ValueJoiner will be called to produce join output records.
  • Input records with a null key or a null value are ignored and do not trigger the join.

Anyway as is the way so far lets see the topology followed by the tests

def createTopolgy(): Topology = {

  val builder: StreamsBuilder = new StreamsBuilder
  val left: KStream[Int, String] =
    builder.stream[Int, String]("LeftTopic")
  val right: KStream[Int, String] =
    builder.stream[Int, String]("RightTopic")

  def tryToInt( s: String ) = Try(s.toInt).toOption

  //do the join
  val joinedStream = left.join(right)( (l: String, r: String) => {
    val result = (tryToInt(l), tryToInt(r))
    result match {
      case (Some(a), Some(b)) => a + b
      case (None, Some(b)) => b
      case (Some(a), None) => a
      case (None, None) => 0
    }
  } , JoinWindows.of(Duration.ofSeconds(2)))


  val peeked = joinedStream.peek((k,v)=> {

    val theK = k
    val theV = v
  })

  peeked.to("innerJoinOutput")

  builder.build()
}

In this example, since we are not guaranteed to get a value for both sides, I am using some simple helper methods to get me some Option[Int] for both sides and finally using simple pattern matching to form a final Integer value, which the tests will use to verify against

We can also see the JoinWindows of 2 seconds, again we will try and use that in our tests. I have used this pattern for all 3 of the KStream join examples for this post. Here are the relevant tests

//arrange
val recordFactory: ConsumerRecordFactory[java.lang.Integer, java.lang.String] =
  new ConsumerRecordFactory[java.lang.Integer, java.lang.String](new IntegerSerializer, new StringSerializer)
val innerJoinTopology = new InnerJoinTopology()
val testDriver = new TopologyTestDriver(innerJoinTopology.createTopolgy(), props)

//act
testDriver.pipeInput(recordFactory.create("LeftTopic", 1, null, 1900L))
testDriver.pipeInput(recordFactory.create("RightTopic", 1, null, 1901L))

testDriver.pipeInput(recordFactory.create("LeftTopic", 1, "1", 2902L))
testDriver.pipeInput(recordFactory.create("RightTopic", 1, "2",2903L ))

OutputVerifier.compareValue(testDriver.readOutput("innerJoinOutput", integerDeserializer, integerDeserializer),
  3.asInstanceOf[Integer])


//push these out past 2 seconds (which is what Topology Join Window duration is)
Thread.sleep(2500)

testDriver.pipeInput(recordFactory.create("LeftTopic", 1, "2", 5916L))
testDriver.pipeInput(recordFactory.create("RightTopic", 1, "4", 5917L))
OutputVerifier.compareValue(testDriver.readOutput("innerJoinOutput", integerDeserializer, integerDeserializer),
  6.asInstanceOf[Integer])

val result1 = testDriver.readOutput("innerJoinOutput", integerDeserializer, integerDeserializer)
assert(result1 == null)

It can be seen that we expect the following to hold true

  • Both inputs null within the time windows, doesn’t yield anything
  • That a matched key in the time window yields a combined value of both stream values converted to an Int and added together (so 3 in this case)
  • That if we wait 2 seconds, to exceed the join window, and then push 2 new values through, That a newly matched key in the new time window yields a combined value of both stream values converted to an Int and added together (so 6 in this case)

LeftJoin

As with lots of other types of tools an left join, will include the original left input even when the right input has no value where there exists a key that is the same.

It should be noted that as I have chosen to use KStream-KStream joins for this demo code the following rules hold. However these rules may vary depending on if your inputs are KStream/KTable

  • The join is key-based, i.e. with the join predicate leftRecord.key == rightRecord.key, and window-based, i.e. two input records are joined if and only if their timestamps are “close” to each other as defined by the user-supplied JoinWindows, i.e. the window defines an additional join predicate over the record timestamps.
  • The join will be triggered under the conditions listed below whenever new input is received. When it is triggered, the user-supplied ValueJoiner will be called to produce join output records.
  • Input records with a null key or a null value are ignored and do not trigger the join.
    For each input record on the left side that does not have any match on the right side, the ValueJoiner will be called with ValueJoiner#apply(leftRecord.value, null); this explains the row with timestamp=3 in the table below, which lists [A, null] in the LEFT JOIN column.

As before we start with the topology which looks like this

def createTopolgy(): Topology = {

  val builder: StreamsBuilder = new StreamsBuilder
  val left: KStream[Int, String] =
    builder.stream[Int, String]("LeftTopic")
  val right: KStream[Int, String] =
    builder.stream[Int, String]("RightTopic")

  //do the join
  val joinedStream = left.leftJoin(right)( (l: String, r: String) => {
    val result = s"Left='${l}', Right='${r}'"
    result
  } , JoinWindows.of(Duration.ofSeconds(2)))

  val peeked = joinedStream.peek((k,v)=> {

    val theK = k
    val theV = v
  })

  peeked.to("leftJoinOutput")

  builder.build()
}

Where we know we may not have a value on the 2nd input, but we may on the original input. So for this example I use a String, where KafkaStreams will emit “null” should there be no value on one of the inputs

Here are the relevant tests

//arrange
val recordFactory: ConsumerRecordFactory[java.lang.Integer, java.lang.String] =
  new ConsumerRecordFactory[java.lang.Integer, java.lang.String](new IntegerSerializer, new StringSerializer)
val leftJoinTopology = new LeftJoinTopology()
val testDriver = new TopologyTestDriver(leftJoinTopology.createTopolgy(), props)

//act
testDriver.pipeInput(recordFactory.create("LeftTopic", 1, null, 1900L))
testDriver.pipeInput(recordFactory.create("RightTopic", 1, null, 1901L))

testDriver.pipeInput(recordFactory.create("LeftTopic", 1, "1", 2902L))


OutputVerifier.compareValue(testDriver.readOutput("leftJoinOutput", integerDeserializer, stringDeserializer),
  "Left='1', Right='null'")


//push these out past 2 seconds (which is what Topology Join Window duration is)
Thread.sleep(2500)

testDriver.pipeInput(recordFactory.create("LeftTopic", 1, "2", 5916L))
testDriver.pipeInput(recordFactory.create("RightTopic", 1, "4", 5916L))
OutputVerifier.compareValue(testDriver.readOutput("leftJoinOutput", integerDeserializer, stringDeserializer),
  "Left='2', Right='4'")
OutputVerifier.compareValue(testDriver.readOutput("leftJoinOutput", integerDeserializer, stringDeserializer),
  "Left='2', Right='4'")
val result1 = testDriver.readOutput("leftJoinOutput", integerDeserializer, stringDeserializer)
assert(result1 == null)

It can be seen that we expect the following to hold true

  • Both inputs null within the time windows, doesn’t yield anything
  • That a single input with no matching right input in the time window yields a sinngle value for the left input, but null for the right
  • That if we wait 2 seconds, to exceed the join window, and then push 2 new values through, That we see oututs for the newly matched keys in the new time window

OuterJoin

As with lots of other types of tools an outer join, will include the original left input or the original right value providing at least one of them is not null, where there exists a key that is the same.

It should be noted that as I have chosen to use KStream-KStream joins for this demo code the following rules hold. However these rules may vary depending on if your inputs are KStream/KTable

  • The join is key-based, i.e. with the join predicate leftRecord.key == rightRecord.key, and window-based, i.e. two input records are joined if and only if their timestamps are “close” to each other as defined by the user-supplied JoinWindows, i.e. the window defines an additional join predicate over the record timestamps.
  • The join will be triggered under the conditions listed below whenever new input is received. When it is triggered, the user-supplied ValueJoiner will be called to produce join output records.
  • Input records with a null key or a null value are ignored and do not trigger the join.
    For each input record on one side that does not have any match on the other side, the ValueJoiner will be called with ValueJoiner#apply(leftRecord.value, null) or ValueJoiner#apply(null, rightRecord.value), respectively

As before we start with the topology which looks like this

def createTopolgy(): Topology = {

  val builder: StreamsBuilder = new StreamsBuilder
  val left: KStream[Int, String] =
    builder.stream[Int, String]("LeftTopic")
  val right: KStream[Int, String] =
    builder.stream[Int, String]("RightTopic")

  //do the join
  val joinedStream = left.outerJoin(right)( (l: String, r: String) => {
    val result = s"Left='${l}', Right='${r}'"
    result
  } , JoinWindows.of(Duration.ofSeconds(2)))

  val peeked = joinedStream.peek((k,v)=> {

    val theK = k
    val theV = v
  })

  peeked.to("outerJoinOutput")

  builder.build()
}

Here are the relevant tests

test("Should produce correct output") {

  //arrange
  val recordFactory: ConsumerRecordFactory[java.lang.Integer, java.lang.String] =
    new ConsumerRecordFactory[java.lang.Integer, java.lang.String](new IntegerSerializer, new StringSerializer)
  val outerJoinTopology = new OuterJoinTopology()
  val testDriver = new TopologyTestDriver(outerJoinTopology.createTopolgy(), props)

  //act
  testDriver.pipeInput(recordFactory.create("LeftTopic", 1, null, 1900L))
  testDriver.pipeInput(recordFactory.create("RightTopic", 1, null, 1901L))

  testDriver.pipeInput(recordFactory.create("LeftTopic", 1, "1", 2902L))


  OutputVerifier.compareValue(testDriver.readOutput("outerJoinOutput", integerDeserializer, stringDeserializer),
    "Left='1', Right='null'")


  //push these out past 2 seconds (which is what Topology Join Window duration is)
  Thread.sleep(2500)

  testDriver.pipeInput(recordFactory.create("RightTopic", 1, "4", 5916L))
  OutputVerifier.compareValue(testDriver.readOutput("outerJoinOutput", integerDeserializer, stringDeserializer),
    "Left='null', Right='4'")
  val result1 = testDriver.readOutput("outerJoinOutput", integerDeserializer, stringDeserializer)
  assert(result1 == null)

  cleanup(props, testDriver)
}

It can be seen that we expect the following to hold true

  • Both inputs null within the time windows, doesn’t yield anything
  • That a single input from either side with in the time window yields a single value for the input, but null for the other

 

As I say this post is all about KStream-KStream joins, but this table may help to compound what this post has shown so far

image

 

So other types of joins such as KStream-KTable you should consult the official documentation

 

That’s it for now

So that’s all I wanted to say this time, so until the next time, hope this has shown you just how cool KafkaStreams is

Advertisements
Kafka

KafkaStreams : Aggregating

So last time we looked at a whole bunch of stateless operations. This time we will continue our journey to look at how we can start to do aggregations, and make use of state stores.

Where is the code?

The code for this post is all contained here

And the tests are all contained here

Introduction to Stateful operations

Stateful operations (unlike stateless operations) require a state store for processing purposes. We briefly touched on state stores last time, but today I wanted to drill in a bit more into this.

 

Kafka comes with some inbuilt state stores

  • PersistentKeyValueStore<K, V>
    • Storage Engine : Rocks DB
    • Fault Tolerant : Yes by default
      • The recommended store type for most use cases.
      • Stores its data on local disk.
      • Storage capacity: managed local state can be larger than the memory (heap space) of an application instance, but must fit into the available local disk space.
      • RocksDB settings can be fine-tuned, see RocksDB configuration.
      • Available store variants: time window key-value store, session window key-value store.
  • InMemoryKeyValueStore<K,V>
    • Storage Engine: None
    • Fault Tolerant : Yes by default
      • Stores its data in memory.
      • Storage capacity: managed local state must fit into memory (heap space) of an application instance.
      • Useful when application instances run in an environment where local disk space is either not available or local disk space is wiped in-between app instance restarts.

 

So how do we create these stores? Well we will be seeing some examples of these, but it is as easy as this using the high level DSL (which is what we have been using until now)

//The "persistentKeyValueStore" is one of the pre-canned state store types
//and as such logging is enabled, so the ChangeLog topic to persist state
import org.apache.kafka.streams.state.Stores
val wordCountStoreName = "wordCountStore"
val wordCountStoreSupplied = Stores.persistentKeyValueStore(wordCountStoreName)

Now some of you may have noticed the bullet point above where it mentions “Fault Tolerant”. How exactly is that done? Well the inbuilt stores actually write to a specialized topic behind the scenes. This topic is sometimes called the changelog topic, and can be used to restore state. By default, persistent key-value stores are fault-tolerant. They are backed by a compacted changelog topic. The purpose of compacting this topic is to prevent the topic from growing indefinitely, to reduce the storage consumed in the associated Kafka cluster, and to minimize recovery time if a state store needs to be restored from its changelog topic.

 

Similarly, persistent window stores are fault-tolerant. They are backed by a topic that uses both compaction and deletion. Because of the structure of the message keys that are being sent to the changelog topics, this combination of deletion and compaction is required for the changelog topics of window stores. For window stores, the message keys are composite keys that include the “normal” key and window timestamps. For these types of composite keys it would not be sufficient to only enable compaction to prevent a changelog topic from growing out of bounds. With deletion enabled, old windows that have expired will be cleaned up by Kafka’s log cleaner as the log segments expire. The default retention setting is Materialized#withRetention() + 1 day. You can override this setting by specifying StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG in the StreamsConfig.

 

So how can we turn this feature on/off?

You can enable or disable fault tolerance for a state store by enabling or disabling the change logging of the store through withLoggingEnabled() and withLoggingDisabled(). You can also fine-tune the associated topic’s configuration if needed.

Example for enabled fault-tolerance, and passing through some other useful values

  val logConfig = new util.HashMap[String, String]
  logConfig.put("retention.ms", "172800000")
  logConfig.put("retention.bytes", "10000000000")
  logConfig.put("cleanup.policy", "compact,delete")
  val wordCountStoreSupplier = Stores.inMemoryKeyValueStore(wordCountStoreName)
  val wordCountStoreBuilder = Stores.keyValueStoreBuilder(wordCountStoreSupplier, Serdes.String, Serdes.Long)
    .withLoggingEnabled(logConfig)
    .withCachingEnabled()

As I mentioned in the previous post there are a number of config values that will effect how your stateless apps emit values down stream, just to remind ourselves of those

  • COMMIT_INTERVAL_MS_CONFIG : the number of milliseconds before a commit occurs. When a commit occur state store values are emitted downstream
  • CACHE_MAX_BYTES_BUFFERING_CONFIG : The max number of bytes to cache before state store values are emitted downstream

So you may find you need to fiddle with those to get what you want in terms of max cache size, commit interval

 

Aggregating

After records are grouped by key via groupByKey or groupBy – they become either a KGroupedStream or a KGroupedTable,  they can be aggregated. Aggregations are key-based operations, which means that they always operate over records where the key is the same. You can perform aggregations on windowed or non-windowed data.

Aggregate

Rolling aggregation. Aggregates the values of (non-windowed) records by the grouped key. Aggregating is a generalization of reduce and allows, for example, the aggregate value to have a different type than the input values. (KGroupedStream details, KGroupedTable details)

When aggregating a grouped stream, you must provide an initializer (e.g., aggValue = 0) and an “adder” aggregator (e.g., aggValue + curValue). When aggregating a grouped table, you must provide a “subtractor” aggregator (think: aggValue – oldValue).

Detailed behavior of KGroupedStream:

  • Input records with null keys are ignored.
  • When a record key is received for the first time, the initializer is called (and called before the adder).
    Whenever a record with a non-null value is received, the adder is called.

Detailed behavior of KGroupedTable:

  • Input records with null keys are ignored.
  • When a record key is received for the first time, the initializer is called (and called before the adder and subtractor). Note that, in contrast to KGroupedStream, over time the initializer may be called more than once for a key as a result of having received input tombstone records for that key
  • When the first non-null value is received for a key (e.g., INSERT), then only the adder is called.
  • When subsequent non-null values are received for a key (e.g., UPDATE), then the subtractor is called with the old value as stored in the table and the adder is called with the new value of the input record that was just received. The order of execution for the subtractor and adder is not defined
  • When a tombstone record – i.e. a record with a null value – is received for a key (e.g., DELETE), then only the subtractor is called. Note that, whenever the subtractor returns a null value itself, then the corresponding key is removed from the resulting KTable. If that happens, any next input record for that key will trigger the initializer again.
package stateful.transformations.aggregating

import java.time.Duration
import java.util.Properties

import common.PropsHelper
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala._
import org.apache.kafka.streams.scala.kstream.{Materialized, _}
import org.apache.kafka.streams.{KafkaStreams, Topology}


/**
  * This example simply maps values from 'InputTopic' to 'OutputTopic'
  * with no changes
  */
class AggregateTopology extends App {

  import Serdes._

  val props: Properties = PropsHelper.createBasicStreamProperties(
    "stateless-aggregate-application", "localhost:9092")

  run()

  private def run(): Unit = {
    val topology = createTopolgy()
    val streams: KafkaStreams = new KafkaStreams(topology, props)
    streams.start()
    sys.ShutdownHookThread {
      streams.close(Duration.ofSeconds(10))
    }
  }

  def createTopolgy(): Topology = {

    val builder: StreamsBuilder = new StreamsBuilder
    val textLines: KStream[Int, Int] =
      builder.stream[Int, Int]("AggregateKeyInputTopic")

    implicit val matererlized: Materialized[Int, Long, ByteArrayKeyValueStore]
      = Materialized.as("aggregated-stream-store")

    //Rolling aggregation. Aggregates the values of (non-windowed) records by the grouped key.
    //Aggregating is a generalization of reduce and allows, for example, the aggregate value to
    //have a different type than the input values. (KGroupedStream details, KGroupedTable details)
    //
    //When aggregating a grouped stream, you must provide an initializer (e.g., aggValue = 0)
    //and an “adder” aggregator (e.g., aggValue + curValue). When aggregating a grouped table,
    //you must provide a “subtractor” aggregator (think: aggValue - oldValue).
    val groupedBy = textLines.groupByKey
    val aggregatedTable =
      groupedBy
        .aggregate[Long](0L)((aggKey, newValue, aggValue) => aggValue + newValue)(matererlized)
    aggregatedTable
        .toStream
          .peek((k,v) =>
          {
            val theKey = k
            val theValue =v
          })
        .to("aggregateOutputTopic")
    builder.build()
  }
}


Count

Rolling aggregation. Counts the number of records by the grouped key.

Detailed behavior for KGroupedStream:

Input records with null keys or values are ignored.
Detailed behavior for KGroupedTable:

Input records with null keys are ignored. Records with null values are not ignored but interpreted as “tombstones” for the corresponding key, which indicate the deletion of the key from the table.

package stateful.transformations.aggregating

import java.time.Duration
import java.util.Properties

import common.PropsHelper
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala._
import org.apache.kafka.streams.scala.kstream.{Materialized, _}
import org.apache.kafka.streams.{KafkaStreams, Topology}


/**
  * This example simply maps values from 'InputTopic' to 'OutputTopic'
  * with no changes
  */
class CountTopology extends App {

  import Serdes._

  val props: Properties = PropsHelper.createBasicStreamProperties(
    "stateless-count-application", "localhost:9092")

  run()

  private def run(): Unit = {
    val topology = createTopolgy()
    val streams: KafkaStreams = new KafkaStreams(topology, props)
    streams.start()
    sys.ShutdownHookThread {
      streams.close(Duration.ofSeconds(10))
    }
  }

  def createTopolgy(): Topology = {

    val builder: StreamsBuilder = new StreamsBuilder
    val textLines: KStream[String, String] =
      builder.stream[String, String]("CountInputTopic")


    //lets create a named wordCountStore state store
    //The "persistentKeyValueStore" is one of the pre-canned state store types
    //and as such logging is enabled, so the ChangeLog topic to persist state
    import org.apache.kafka.streams.state.Stores
    val wordCountStoreName = "wordCountStore"
    val wordCountStoreSupplied = Stores.persistentKeyValueStore(wordCountStoreName)

    val wordCounts = textLines
      .flatMapValues(x => x.toLowerCase.split("\\W+"))
      .groupBy((key, word) => word)
      .count()(Materialized.as(wordCountStoreSupplied))
    wordCounts
      .toStream
      .peek((k,v) =>
      {
        val theKey = k
        val theValue =v
      })
      .to("WordsWithCountsOutputTopic")

    builder.build()
  }
}


Reduce

Combines the values of (non-windowed) records by the grouped key. The current record value is combined with the last reduced value, and a new reduced value is returned. The result value type cannot be changed, unlike aggregate.

When reducing a grouped stream, you must provide an “adder” reducer (e.g., aggValue + curValue). When reducing a grouped table, you must additionally provide a “subtractor” reducer (e.g., aggValue – oldValue).

Detailed behavior for KGroupedStream:

  • Input records with null keys are ignored in general.
  • When a record key is received for the first time, then the value of that record is used as the initial aggregate value.
  • Whenever a record with a non-null value is received, the adder is called.

Detailed behavior for KGroupedTable:

  • Input records with null keys are ignored in general.
  • When a record key is received for the first time, then the value of that record is used as the initial aggregate value. Note that, in contrast to KGroupedStream, over time this initialization step may happen more than once for a key as a result of having received input tombstone records for that key.
  • When the first non-null value is received for a key (e.g., INSERT), then only the adder is called.
  • When subsequent non-null values are received for a key (e.g., UPDATE), then the subtractor is called with the old value as stored in the table and the adder is called with the new value of the input record that was just received. The order of execution for the subtractor and adder is not defined.
  • When a tombstone record – i.e. a record with a null value – is received for a key (e.g., DELETE), then only the subtractor is called. Note that, whenever the subtractor returns a null value itself, then the corresponding key is removed from the resulting KTable. If that happens, any next input record for that key will re-initialize its aggregate value.

 

In this example we build a KTable from a KStream directly by using a little trick of going through an intermediate topic

package stateful.transformations.aggregating

import java.time.Duration
import java.util.Properties

import common.PropsHelper
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala._
import org.apache.kafka.streams.scala.kstream.{Materialized, _}
import org.apache.kafka.streams.{KafkaStreams,  Topology}


/**
  * This example simply maps values from 'InputTopic' to 'OutputTopic'
  * with no changes
  */
class ReduceTopology extends App {

  import Serdes._

  val props: Properties = PropsHelper.createBasicStreamProperties(
    "stateless-reduce-application", "localhost:9092")

  run()

  private def run(): Unit = {
    val topology = createTopolgy()
    val streams: KafkaStreams = new KafkaStreams(topology, props)
    streams.start()
    sys.ShutdownHookThread {
      streams.close(Duration.ofSeconds(10))
    }
  }

  def createTopolgy(): Topology = {

    val builder: StreamsBuilder = new StreamsBuilder
    val textLines: KStream[String, String] =
      builder.stream[String, String]("ReduceInputTopic")


    //lets create a named reduceStore state store
    //The "persistentKeyValueStore" is one of the pre-canned state store types
    //and as such logging is enabled, so the ChangeLog topic to persist state
    import org.apache.kafka.streams.state.Stores
    val reduceStoreName = "reduceStore"
    val reduceStoreSupplied = Stores.persistentKeyValueStore(reduceStoreName)

    //by using this dummy topic we can go straight from a KSTream to kTable
    textLines.to("Dummy-ReduceInputTopic")
    val userProfiles : KTable[String, String] = builder.table("Dummy-ReduceInputTopic")


    val groupedTable = userProfiles.groupBy((user, region) => (region, user.length()))
    val reduced: KTable[String, Integer] =
      groupedTable.reduce(
        (aggValue : Int, newValue: Int) => aggValue + newValue,
        (aggValue: Int, oldValue: Int) => aggValue - oldValue)
      .mapValues(v => new Integer(v))

    val finalStream = reduced.toStream
    finalStream.to("ReduceOutputTopic")

    builder.build()
  }
}


Custom State Stores

You should not to create custom state stores really, but if you find you do want/need to do this, the starting place for this is by implementing the following interfaces

  • org.apache.kafka.streams.state.KeyValueBytesStoreSupplier which expects you to implement the following
    • /**
       * Return the name of this state store supplier.
       * This must be a valid Kafka topic name; valid characters are ASCII alphanumerics, '.', '_' and '-'.
       *
       * @return the name of this state store supplier
       */
      String name();
      
      /**
       * Return a new {@link StateStore} instance.
       *
       * @return a new {@link StateStore} instance of type T
       */
      T get();
      
      /**
       * Return a String that is used as the scope for metrics recorded by Metered stores.
       * @return metricsScope
       */
      String metricsScope();
      
  • org.apache.kafka.streams.state.KeyValueStore<K, V>
    • /**
       * Update the value associated with this key.
       *
       * @param key The key to associate the value to
       * @param value The value to update, it can be {@code null};
       *              if the serialized bytes are also {@code null} it is interpreted as deletes
       * @throws NullPointerException If {@code null} is used for key.
       */
      void put(K key, V value);
      
      /**
       * Update the value associated with this key, unless a value is already associated with the key.
       *
       * @param key The key to associate the value to
       * @param value The value to update, it can be {@code null};
       *              if the serialized bytes are also {@code null} it is interpreted as deletes
       * @return The old value or {@code null} if there is no such key.
       * @throws NullPointerException If {@code null} is used for key.
       */
      V putIfAbsent(K key, V value);
      
      /**
       * Update all the given key/value pairs.
       *
       * @param entries A list of entries to put into the store;
       *                if the serialized bytes are also {@code null} it is interpreted as deletes
       * @throws NullPointerException If {@code null} is used for key.
       */
      void putAll(List<KeyValue<K, V>> entries);
      
      /**
       * Delete the value from the store (if there is one).
       *
       * @param key The key
       * @return The old value or {@code null} if there is no such key.
       * @throws NullPointerException If {@code null} is used for key.
       */
      V delete(K key);
      
      /**
       * The name of this store.
       * @return the storage name
       */
      String name();
      
      /**
       * Initializes this state store.
       * <p>
       * The implementation of this function must register the root store in the context via the
       * {@link ProcessorContext#register(StateStore, StateRestoreCallback)} function, where the
       * first {@link StateStore} parameter should always be the passed-in {@code root} object, and
       * the second parameter should be an object of user's implementation
       * of the {@link StateRestoreCallback} interface used for restoring the state store from the changelog.
       * <p>
       * Note that if the state store engine itself supports bulk writes, users can implement another
       * interface {@link BatchingStateRestoreCallback} which extends {@link StateRestoreCallback} to
       * let users implement bulk-load restoration logic instead of restoring one record at a time.
       *
       * @throws IllegalStateException If store gets registered after initialized is already finished
       * @throws StreamsException if the store's change log does not contain the partition
       */
      void init(ProcessorContext context, StateStore root);
      
      /**
       * Flush any cached data
       */
      void flush();
      
      /**
       * Close the storage engine.
       * Note that this function needs to be idempotent since it may be called
       * several times on the same state store.
       * <p>
       * Users only need to implement this function but should NEVER need to call this api explicitly
       * as it will be called by the library automatically when necessary
       */
      void close();
      
      /**
       * Return if the storage is persistent or not.
       *
       * @return  {@code true} if the storage is persistent&mdash;{@code false} otherwise
       */
      boolean persistent();
      
      /**
       * Is this store open for reading and writing
       * @return {@code true} if the store is open
       */
      boolean isOpen();|
      
      /**
       * Get the value corresponding to this key.
       *
       * @param key The key to fetch
       * @return The value or null if no value is found.
       * @throws NullPointerException If null is used for key.
       * @throws InvalidStateStoreException if the store is not initialized
       */
      V get(K key);
      
      /**
       * Get an iterator over a given range of keys. This iterator must be closed after use.
       * The returned iterator must be safe from {@link java.util.ConcurrentModificationException}s
       * and must not return null values. No ordering guarantees are provided.
       * @param from The first key that could be in the range
       * @param to The last key that could be in the range
       * @return The iterator for this range.
       * @throws NullPointerException If null is used for from or to.
       * @throws InvalidStateStoreException if the store is not initialized
       */
      KeyValueIterator<K, V> range(K from, K to);
      
      /**
       * Return an iterator over all keys in this store. This iterator must be closed after use.
       * The returned iterator must be safe from {@link java.util.ConcurrentModificationException}s
       * and must not return null values. No ordering guarantees are provided.
       * @return An iterator of all key/value pairs in the store.
       * @throws InvalidStateStoreException if the store is not initialized
       */
      KeyValueIterator<K, V> all();
      
      /**
       * Return an approximate count of key-value mappings in this store.
       *
       * The count is not guaranteed to be exact in order to accommodate stores
       * where an exact count is expensive to calculate.
       *
       * @return an approximate count of key-value mappings in the store.
       * @throws InvalidStateStoreException if the store is not initialized
       */
      long approximateNumEntries();
      

 

The best place to look at how to implement these is to look at the existing classes, for example

  • RocksDbKeyValueBytesStoreSupplier
  • RocksDBStore

That’s it for now

So that is all I wanted to talk about this time. Next time we will continue our journey and look at joining operations

Kafka

KafkaStreams : Stateless operations

Ok so our journey now continues with Kafka Streams. Last time we introduced a few key concepts, such as

  • Props
  • Topologies
  • How to test streams apps

This time we will continue to look at stateless operations. If you have ever using C# LINQ operators this post will probably be quite familiar looking, or at least the concepts will. The main difference is that we are using these operators on  streams of data coming from Kafka topics.

 

We will be using the same approach from the 1st article, where we will build a topology and then proceed to test it using the previously introduced TopologyTestDriver / OutputVerifier test tools from Confluent. As before the code is written in Scala.

 

Where is the code?

The code for this post is all contained here

And the tests are here

 

I will not be bothering to walk through every test, but you will find a set of tests for each new Topology that is introduced in this post. As this post is mainly around stateless transforms, I will not be spending too much time on talking about state, it is however not completely avoidable as some of the operations do require us to create state to demonstrate the stateless side of it

 

Branch

Allows us to apply one (or more) predicates to an initial KStream, to create one or more KStream objects for the matched predicates. If no match is found the input record is dropped.

def createTopolgy(): Topology = {

  val builder: StreamsBuilder = new StreamsBuilder
  val textLines: KStream[String, String] =
    builder.stream[String, String]("InputTopic")

  val predicates : List[(String, String) => Boolean] = List(
    (k,v) => v.startsWith("Odd"),
    (k,v) => v.startsWith("Even")
  )

  val branches : Array[KStream[String, String]] = textLines.branch(predicates:_*)
  branches(0).to("OddTopic")
  branches(1).to("EvenTopic")
  builder.build()
}

So for this example we could potentially end up with values written to the 2 output topics OddTopic/EvenTopic.

Filter/Inverse Filter

Filter keeps the records for which the predicate is satisfied, while filterNot (inverse filter) drops the records for which the predicate returns true.

def createTopolgy(): Topology = {

  val builder: StreamsBuilder = new StreamsBuilder
  val textLines: KStream[String, Long] =
    builder.stream[String, Long]("InputTopic")

  //Evaluates a boolean function for each element and retains those
  //for which the function returns true
  val above5 = textLines.filter((k,v) => v > 5)
  above5.to("Above5OutputTopic")

  //Evaluates a boolean function for each element and drops those
  //for which the function returns true.
  val belowOrEqualTo5 = textLines.filterNot((k,v) => v > 5)
  belowOrEqualTo5.to("BelowOrEqualTo5OutputTopic")

  builder.build()
}

FlatMap / FlatMap (Values only)

FlatMap allows you to produce 0-* items from an original KeyValue record tuple. You can create a new Key or Value or both, and alter the types of both. As this operation is capable of changing the key, it will cause re-partitioning to keep the data in the correct partition. So you should be wary of using this operation, and prefer flatMapValues

 

FlatMapValues allows you to produce 0-* items from an original KeyValue record tuple where you can create a new Value and alter its type, but maintain the original key. As the original key is maintained, this operation does not cause a re-partition to occur

def createTopolgy(): Topology = {

  val builder: StreamsBuilder = new StreamsBuilder
  val textLines: KStream[Int, Int] =
    builder.stream[Int, Int]("InputTopic")

  //Takes one record and produces zero, one, or more records.
  //You can modify the record keys and values, including their types
  val flatMapped = textLines.flatMap((k,v) => {
    List(
      (k + 1, v + 2),
      (k + 3, v + 4)
    )
  })
  flatMapped.to("flatMappedOutputTopic")


  //Takes one record and produces zero, one, or more records, while retaining the key of the original record.
  //You can modify the record values and the value type.
  //flatMapValues is preferable to flatMap because it will not cause data re-partitioning.
  //However, you cannot modify the key or key type like flatMap does
  val flatMappedValues = textLines.flatMapValues((k,v) => {
    List(
      v + 10,
      v + 20
    )
  })
  flatMappedValues.to("flatMappedValuesOutputTopic")

  builder.build()
}

Foreach

Foreach allows us to run some side effect on the input records. This is however a terminal state. These “side effects” cant be tracked by Kafka, and as this is a terminal state, nothing else is possible after a foreEach has been performed. For this one I am showing the entire app, as its easier to read the flow. But essentially what happens is that I am using forEach to write the input records to a file which the tests will read after running the topology

package stateless.transformations

import java.io.{File, PrintWriter}
import java.time.Duration
import java.util.Properties

import common.PropsHelper
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala._
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.{KafkaStreams, Topology}


/**
  * This example simply maps values from 'InputTopic' to 'OutputTopic'
  * with no changes
  */
class ForEachTopology(val pw: PrintWriter) extends App {

  import Serdes._

  val props: Properties = PropsHelper.createBasicStreamProperties(
    "stateless-foreach-application", "localhost:9092")

  run()

  def stop() : Unit = {
    pw.close()
  }

  private def run(): Unit = {


    val topology = createTopolgy()
    val streams: KafkaStreams = new KafkaStreams(topology, props)
    streams.start()
    sys.ShutdownHookThread {
      streams.close(Duration.ofSeconds(10))
    }
  }

  def createTopolgy(): Topology = {

    val builder: StreamsBuilder = new StreamsBuilder
    val textLines: KStream[Int, Int] =
      builder.stream[Int, Int]("InputTopic")

    //Terminal operation. Performs a stateless action on each record.

    //You would use foreach to cause side effects based on the input data (similar to peek)
    //and then stop further processing of the input data (unlike peek, which is not a terminal operation).
    //Note on processing guarantees: Any side effects of an action (such as writing to
    //external systems) are not trackable by Kafka, which means they will typically not
    //benefit from Kafka’s processing guarantees.
    val flatMapped = textLines.foreach((k,v)=> {
      pw.write(s"Saw input value line '$v'\r\n")
      pw.flush()
    })

    builder.build()
  }
}


Map/Map Values Only

Map works much the same as flatMap above apart from the fact that it emits a single new KeyValue tuple. As before this causes re-partitoning, so you should prefer mapValues instead.

MapValues works much the same as flatMapValues above apart from the fact that it emits a single new KeyValue tuple, where the original Key is maintained.

def createTopolgy(): Topology = {

  val builder: StreamsBuilder = new StreamsBuilder
  val textLines: KStream[Int, Int] =
    builder.stream[Int, Int]("InputTopic")

  //Takes one record and produces one record. You can modify the record key and value, including their types.
  //Marks the stream for data re-partitioning: Applying a grouping or a join after map will result in re-partitioning
  //of the records. If possible use mapValues instead, which will not cause data re-partitioning.
  val mapped = textLines.map((k,v) => {
      (k + 1, v + 2)
  })
  mapped.to("mappedOutputTopic")


  //Takes one record and produces one record, while retaining the key of the original record.
  //You can modify the record value and the value type. (KStream details, KTable details)
  //mapValues is preferable to map because it will not cause data re-partitioning. However, it
  //does not allow you to modify the key or key type like map does.
  val mappedValues = textLines.mapValues((k,v) => {
      v + 10
  })
  mappedValues.to("mappedValuesOutputTopic")

  builder.build()
}

 

Peek

Peek is an extremely useful operation that allows you to wire tap the stream, and cause side effects, whilst maintaining the stream values. It is like a better forEach that still emits values into the output stream processing. As with the forEach I have decided to show you the whole application here, it’s the same idea we write to a text file in the application code, and the tests can do assertions on the values in the text file at the end of the application run

package stateless.transformations

import java.io.PrintWriter
import java.time.Duration
import java.util.Properties

import common.PropsHelper
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala._
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.{KafkaStreams, Topology}


/**
  * This example simply maps values from 'InputTopic' to 'OutputTopic'
  * with no changes
  */
class PeekTopology(val pw: PrintWriter) extends App {

  import Serdes._

  val props: Properties = PropsHelper.createBasicStreamProperties(
    "stateless-peek-application", "localhost:9092")

  run()

  def stop() : Unit = {
    pw.close()
  }

  private def run(): Unit = {


    val topology = createTopolgy()
    val streams: KafkaStreams = new KafkaStreams(topology, props)
    streams.start()
    sys.ShutdownHookThread {
      streams.close(Duration.ofSeconds(10))
    }
  }

  def createTopolgy(): Topology = {

    val builder: StreamsBuilder = new StreamsBuilder
    val textLines: KStream[Int, Int] =
      builder.stream[Int, Int]("InputTopic")

    // Performs a stateless action on each record, and returns an unchanged stream.
    //
    // You would use peek to cause side effects based on the input data (similar to foreach)
    // and continue processing the input data (unlike foreach, which is a terminal operation).
    // Peek returns the input stream as-is; if you need to modify the input stream, use map or mapValues instead.
    // peek is helpful for use cases such as logging or tracking metrics or for debugging and troubleshooting.
    //
    // Note on processing guarantees: Any side effects of an action (such as writing to external systems)
    // are not trackable by Kafka, which means they will typically not benefit from Kafka’s processing guarantees.
    val peeked = textLines.peek((k,v)=> {
      pw.write(s"Saw input value line '$v'\r\n")
      pw.flush()
    })
    peeked.to("peekedOutputTopic")


    builder.build()
  }
}


SelectKey

Allows you to create a new key and possibly new type for the Key to the input records. As this changed the Key it will cause re-partitioning

def createTopolgy(): Topology = {

  val builder: StreamsBuilder = new StreamsBuilder
  val textLines: KStream[Int, Int] =
    builder.stream[Int, Int]("InputTopic")

  // Assigns a new key - possibly of a new key type - to each record.
  // Calling selectKey(mapper) is the same as calling map((key, value) -> mapper(key, value), value).
  // Marks the stream for data re-partitioning: Applying a grouping or a join after selectKey will
  // result in re-partitioning of the records.
  val mapped = textLines.selectKey((k,v) => {
      k.toString
  })
  mapped.to("selectKeyOutputTopic")

  builder.build()
}

 

GroupBy/GroupByKey

Before we get into this one, I just wanted to talk a little bit about stores and how they effect/are effected by some of the possible settings you could apply.

Stores

We will cover this in more detail in the next post, but for now its worth mentioning, that state stores as seen in these GroupBy/GroupByKey .Count() examples, which will create a KTable, will by default start out with Logging enabled, which means that an a special changelog Kafka topic will be used to store the state of the state stores.

In the next post we will see how we can setup our own state stores, and how we can provide more fine grained details over their creation such as

  • retension.ms
  • retention.bytes
  • cleanup.policy

But for now its just good to be aware that when you do find yourself performing some operation that takes you from a KStream –> KTable, the Kafka default is to use changelog logging, which persists your state store values, which  should protect you, should you Kafka Streams application need to be restarted. It should pick up again from its last committed offset using the previously saved changelog values.

As I say we will be seeing this in much more detail in the next posts.

Ok lets see an example

GroupBy

Groups the records by the existing key. To fully illustrate this, we need to perform some sort of aggregation, which takes us into Stateful operations. We end up with a KTable. But lets see the example which should help us in the next posts

def createTopolgy(): Topology = {

    val builder: StreamsBuilder = new StreamsBuilder
    val groupByKeyTextLines: KStream[Int, String] =
      builder.stream[Int, String]("GroupByKeyInputTopic")

	//NOTE : You may find you need to play with these Config values in order
    //to get the stateful operation to work correctly/how you want it to
    //    props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10000.asInstanceOf[Object])
    //    props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10485760.asInstanceOf[Object])
    //    props.put(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG, 50000.asInstanceOf[Object])
    //By playing around with these values you should be able to find the values that work for you
    groupByKeyTextLines.groupByKey
        .count()
        .mapValues(x => x.toString)
        .toStream
        .to("groupedByKeyOutputTopic")


    builder.build()
  }

It can be seen that we start with a KSTream[int, String] and then when we use count() we would end up with a KTable[Int,Long]. This will use a state store behind the scenes. I think convert that into a KTable[Int, String] using mapValues(…) before using toStream to turn it back into a KStream[Int, String] and write it to the output topic.

Causes data re-partitioning if and only if the stream was marked for re-partitioning. groupByKey is preferable to groupBy because it re-partitions data only if the stream was already marked for
re-partitioning. However, groupByKey does not allow you to modify the key or key type like groupBy does.

 

As you can see in the comments above in the code. There are a number of values that effect how the state store values are emitted to downstream topologies

  • COMMIT_INTERVAL_MS_CONFIG : the number of milliseconds before a commit occurs. When a commit occur state store values are emitted downstream
  • CACHE_MAX_BYTES_BUFFERING_CONFIG : The max number of bytes to cache before state store values are emitted downstream

So for a given input record of (1, “one”)..(1,”one”)..(2, “two”) we may see something like (1, “2”)…(“two”,”1”) in the output topic (depending on the settings I talked about above)

GroupByKey

Groups the records by a new key, which may be of a different key type. When grouping a table, you may also specify a new value and value type. Always causes data re-partitioning: groupBy always causes data re-partitioning. If possible use groupByKey instead, which will re-partition data only if required.

def createTopolgy(): Topology = {

  val builder: StreamsBuilder = new StreamsBuilder
  val groupByTextLines: KStream[Int, String] =
    builder.stream[Int, String]("GroupByInputTopic")


  //Groups the records by a new key, which may be of a different key type.
  // 
  //NOTE : You may find you need to play with these Config values in order
  //to get the stateful operation to work correctly/how you want it to
  //    props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10000.asInstanceOf[Object])
  //    props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10485760.asInstanceOf[Object])
  //    props.put(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG, 50000.asInstanceOf[Object])
  //By playing around with these values you should be able to find the values that work for you
  groupByTextLines.flatMapValues(x => x.toLowerCase.split("\\W+"))
    .groupBy((key, word) => word)
    .count()
    .mapValues(x => x.toString)
    .toStream
    .to("groupedByOutputTopic")

  builder.build()
}

Much the same as above, however this time we take an input string, split it on words, use each word as the key, and then count the words.

So for a given input record of (1, “one two three one”) we may see something like (“one”, “2”)…(“two”,”1”)..(“three”, “1”) in the output topic (depending on the settings I talked about above)

Custom Partitioner

There may be times where you do not have a key, and you MUST come up with a way to partition your data on the fly. KafkaStreams allows this by using an intermediate topic, and a custom partitioner which you may write. In this trivial example I will always return partition 1, but you can imagine your own code here

I will show the complete example of how to do this, as I think it’s a lot clearer to see what’s going on. The full code is shown below. The important parts are

  • Using through with Produces where we pass in the custom StreamPartitioner
  • Custom StreamPartitioner class that decides what partition (1 in my case) to use
package stateless.transformations

import java.time.Duration
import java.util.Properties
import common.PropsHelper
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.processor.StreamPartitioner
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala._
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.{KafkaStreams, Topology}


class ThroughCustomPartitionerTopology() extends App {

  import Serdes._

  val props: Properties = PropsHelper.createBasicStreamProperties(
    "stateless-custompartitioner-application", "localhost:9092")

  run()

  private def run(): Unit = {


    val topology = createTopolgy()
    val streams: KafkaStreams = new KafkaStreams(topology, props)
    streams.start()
    sys.ShutdownHookThread {
      streams.close(Duration.ofSeconds(10))
    }
  }

  def createTopolgy(): Topology = {

    implicit val keySerde =  Serdes.Integer
    implicit val valueSerde =  Serdes.String

    val builder: StreamsBuilder = new StreamsBuilder
    implicit val consumed = kstream.Consumed.`with`(keySerde, valueSerde)

    val textLines: KStream[java.lang.Integer, String] =
      builder.stream[java.lang.Integer, String]("InputTopic")

    //should use implicit Produced with the through
    val repartitoned = textLines.through("InputTopic-RePartitioned")
      (Produced.`with`(new AlwaysOneCustomPartitioner()))

    val finalStream = repartitoned
      .mapValues((x) => s"${x} Through")

    finalStream.to("OutputTopic")
    builder.build()
  }
}

//Custom StreamPartitioner that ALWAYS just returns 1 for Partition
class AlwaysOneCustomPartitioner extends StreamPartitioner[java.lang.Integer, String] {
  override def partition(topic: String, key: java.lang.Integer, value: String, numPartitions: Int): Integer = {
    //just default it to 1
    1
  }
}

That’s It

And that is all I have for this one. In the next post we will be looking at stateful operations, namely Aggregating.

Kafka

Kafka Streams : Example 1 straight through processing / How to test Kafka Streams

Introduction

In this post we will look at some of the key objects we looked at last time, and we will also see what a typical Scala (though Kafkas libraries are mainly Java, I just prefer Scala) app looks like. It is not a complicated application, which is exactly what you want when you are starting to learn something. I have deliberately started simple, and we will expand our knowledge as we go

Where is the code?

The code will all be in this single repo : https://github.com/sachabarber/KafkaStreamsDemo

What tools will I need to follow along

As I am using Scala, if you really want to download and try the examples that will go with this series, you will need the following

  • Scala 2.12
  • SBT
  • Java 1.8 SDK
  • A Scala IDE (I like the community edition of IntelliJ IDEA myself)

A typical Kafka Streams application

So last time we talked about a few common objects you might expect to see when working with KafkaStreams such as

  • KStream
  • KTable
  • Global KTable
  • State Store

But we did not talk about what a typical app would look like. So lets do that now shall we.

So the good news is that KafkaStreams is not a massive bloated thing that just takes over. It is a simple library that you reference and you can just run it in your own application code. There are obviously a few things that it needs to run, so we will have a brief talk about that

Props

KafkaStreams just like Kafka works by being gives a set of Key/Value properties that it needs to configure itself. These are standard properties that are well known, and you can read all about them here

You can configure Kafka Streams by specifying parameters in a java.util.Properties instance.

Create a java.util.Properties instance.

Set the parameters. For example (this is Java, Scala syntax is a little different)

import java.util.Properties;
import org.apache.kafka.streams.StreamsConfig;

Properties props = new Properties();
// Set a few key parameters
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-first-streams-application");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
// Any further settings
props.put(... , ...);

 

These MUST be setup before you attempt to use a Streams application

Topology

The next thing you need, is to actual build how your streams topology should be wired up, and how you want processing to occur. To do this, there is a DSL, that looks like this (this is Scala example)

val builder: StreamsBuilder = new StreamsBuilder
val theInputStream = builder.stream[String, String]("InputTopic")
//stream DSL operations you want here
builder.build()

Start the streams

The final thing you would want to do is to actually start the streams processing, which can easily be done as follows:

val topology = createTopolgy()
val streams: KafkaStreams = new KafkaStreams(topology, props)
streams.start()
sys.ShutdownHookThread {
  streams.close(Duration.ofSeconds(10))
}

It’s a good idea to also add a shutdownHook to close the streams, which is shown above

Walkthrough of the example topology

Ok so believe it or not that small set of 3 points, plus the knowledge gained in the 1st post, are enough for us to write a fully working KafkaStreams Scala application.

The code for this example is this one, and the basic idea for this one works as follows

 

As you can see this is about as simple as I could make it. So now lets have a look at the code for this trivial example

SBT file

As I am using Scala there is a SBT file to manage the build, this is it in full

name := "ScalaKafkaStreamsDemo"

version := "1.0"

scalaVersion := "2.12.1"

libraryDependencies += "javax.ws.rs" % "javax.ws.rs-api" % "2.1.1" artifacts(Artifact("javax.ws.rs-api", "jar", "jar"))
libraryDependencies += "org.apache.kafka" %% "kafka" % "2.1.0"
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "2.1.0"
libraryDependencies += "org.apache.kafka" % "kafka-streams" % "2.1.0"
libraryDependencies += "org.apache.kafka" %% "kafka-streams-scala" % "2.1.0"

//TEST
libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.5" % Test
libraryDependencies += "org.apache.kafka" % "kafka-streams-test-utils" % "2.1.0" % Test

libraryDependencies += "ch.qos.logback" % "logback-classic" % "1.1.3"

 

As you can see there is a specific JAR for Scala, this is fairly new thing, and the current recommendation is to make sure you have at least Kafla 2.1.0 to work with the kafka-streams-scala JAR. There are also some test scoped JARs there, we will see how they get used in the next section. You may be asking yourself what the specific scala JAR brings to the table, well it makes working with Serdes/lambdas and implicits more Scala friendly.

One thing that is extremely weird in the javax.ws.rs-api dependency. There was a issue with a transitive dependency in Kafka itself, brought in by a Kafka Connect client, and it doesn’t resolve correctly unless you use a line something like the one shown here. I have raised a support ticket for this, and they were all over it, so I expect this will get fixed in subsequent releases

Anyway moving on to the Props, lets see what that looks like for this simple demo app. Here is my class (I will probably reuse this for all posts in this series)

import java.util.Properties
import org.apache.kafka.streams.scala._
import org.apache.kafka.streams.StreamsConfig


object PropsHelper  {

  def createBasicStreamProperties(applicationId: String, bootStrapServers: String) : Properties = {

    val props = new Properties()
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId)
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers)
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass)
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String.getClass)
    // Records should be flushed every 10 seconds. This is less than the default
    // in order to keep this example interactive.
    props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10000.asInstanceOf[Object])
    // For illustrative purposes we disable record caches
    props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0.asInstanceOf[Object])
    props.put(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG, 50000.asInstanceOf[Object])
    props.put(StreamsConfig.STATE_DIR_CONFIG, s"C:\\data\\kafka-streams".asInstanceOf[Object])
    props
  }
}

image

You can change these to suit your own requirements.

Ok so now on to the code. This is it, in full

import java.time.Duration
import org.apache.kafka.streams.Topology
import java.util.Properties
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala._
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.KafkaStreams

/**
  * This example simply maps values from 'InputTopic' to 'OutputTopic'
  * with no changes
  */
class StraightThroughTopology extends App {

  import Serdes._

  val props: Properties = PropsHelper.createBasicStreamProperties(
    "straight-through-application","localhost:9092")

  run()

  private def run(): Unit = {
    val topology = createTopolgy()
    val streams: KafkaStreams = new KafkaStreams(topology, props)
    streams.start()
    sys.ShutdownHookThread {
      streams.close(Duration.ofSeconds(10))
    }
  }

  def createTopolgy() : Topology = {

    val builder: StreamsBuilder = new StreamsBuilder
    val textLines: KStream[String, String] = 
      builder.stream[String, String]("InputTopic")
    textLines.mapValues(v => v).to("OutputTopic")
    builder.build()
  }

}

 

So this is inline with what we just discussed. The only addition is the line that does the mapValues(..). Now you may be wondering why I chose to use mapValues(..), and not map(..). Well the reason for that is if you had a lot of partitions and you used a map(..), that would cause the key to possibly change, which would involve re-partitioning, which is an expensive operation, and should be avoided if possible. 

And believe it or not that is enough to get your 1st streams app up and running.

Testing the example topology

Now if you did read the 1st post in this series you would have seen a diagram that looks like this

Image result for kafka partitions

And you are probably thinking how on earth am I going to test all that, I need

  • At least 1 Kafka broker
  • At least 1 Kafka producer
  • At least 1 Kafka consumer
  • Zookeeper
  • And my own Stream app in the middle somehow

This is indeed a daunting task. Luckily help is at hand by way of the VERY hand kafka-streams-tests-utils JAR. This handy JAR comes with a very cool class called TopologyTestDriver which allows you to test your Streams code without needing the full setup shown above. I wont repeat all the official docs here but you can think of this class as a handy little helper that can simulate ConsumerRecords which can be sent to a topic using the pipeInput(..) method, and to also be able to verify values written to output topics using the readOutput(..) method

The official docs do a rather fine job of talking about this in fine detail, but for now, here is the code that tests the simple demo topology (code is here should you want to know where it is)

import org.scalatest._
import Matchers._
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import java.io._
import java.util.Properties
import org.apache.kafka.streams.TopologyTestDriver
import org.apache.kafka.streams.test.ConsumerRecordFactory
import org.apache.kafka.streams.test.OutputVerifier

class StaightThoughStreamsTests
  extends FunSuite
  with BeforeAndAfter
  with Matchers {

  val props = PropsHelper.createBasicStreamProperties("straight-through-application-tests","localhost:9092")
  val stringDeserializer: StringDeserializer = new StringDeserializer

  before {
  }

  after {
  }


  test("Should produce correct output") {

    //arrange
    val recordFactory: ConsumerRecordFactory[String, String] =
      new ConsumerRecordFactory[String, String](new StringSerializer, new StringSerializer)
    val straightThroughTopology = new StraightThroughTopology()
    val testDriver = new TopologyTestDriver(straightThroughTopology.createTopolgy(), props)

    //act
    val consumerRecord = recordFactory.create("InputTopic", "key", "this is the input", 9999L)
    testDriver.pipeInput(consumerRecord)

    //assert
    OutputVerifier.compareKeyValue(testDriver.readOutput("OutputTopic", stringDeserializer, stringDeserializer), "key", "this is the input")
    val result = testDriver.readOutput("OutputTopic", stringDeserializer, stringDeserializer)
    assert(result == null)
    cleanup(props, testDriver)
  }


  def cleanup(props:Properties, testDriver: TopologyTestDriver) = {

    try {
	  //there is a bug on windows which causes this line to throw exception
      testDriver.close
    } catch {
      case e: Exception => {
        delete(new File("C:\\data\\kafka-streams"))
      }
    }
  }

  def delete(file: File) {
    if (file.isDirectory)
      Option(file.listFiles).map(_.toList).getOrElse(Nil).foreach(delete(_))
    file.delete
  }
}

And just to prove it all works lets see a screen shot of this test

image

The eagle eyed amongst you will see what looks like an Exception stack trace in the right hand panel. This is due to a bug in windows where the data directory cant be deleted correctly due to some file handle issue. This seems to be quite well documented

See you next time.

So hopefully I have wet your appetite of what Kafka Streams could do for you. This is a introductory article, so we saw hardly anything of the DSL that Kafka Streams offers, but in subsequent posts we will see lots more of these

Kafka

Kafka Streams Introduction

So this post will be an introductory one on Kafka Streams. It is not intended to be one on Apache Kafka itself. For that there are many interesting books/posts/documents available which cover this topic.

 

That said there are still a few key concepts that we will need to introduce before we can start talking about Kafka Streams, such as

 

  • Kafka overall architecture
  • Topics
  • Commit Log
  • Consumer group

 

After we have talked about these (and I will only be skimming the surface of these, you really would be better off reading about Kafka if this is brand new territory for you), I will then move on to talk about some of the basic building blocks of Kafka Streams

 

Kafka overall architecture

Image result for kafka partitions

 

As shown in the diagram above, Kafka is a broker and is intended to run in a cluster (which might be a single broker, but that is obviously not what’s recommended). It also makes use of Zookeeper for various state storage/metadata.

 

Topics

Kafka uses topics as a way of organizing what data is produced/read. Producers push data to topics, whilst consumers read data from topics.

 

Topics are divided into a number of partitions (there is some default partitioning strategy at play, but you can write you own). Partitions allow you to parallelize the data by potentially storing it across multiple machines. It may also be read in parallel

 

Another key part of a topic is the offset, which is maintained by the consumer, and it’s a indicator at to where the consumer should start reading data from within the topic. As one could imagine for this to be possible the messages must be ordered, this is something that Kafka guarantees for you.

 

Image result for kafka topic

Commit Log

Kafka maintains message for a configurable amount of time. Meaning that a consumer may go down, and restart again, and providing its within the period of log retention, it will just start reading messages from where it left of.

 

Consumer Groups

Kafka has Consumers, which read from a single partition. However Consumers can also be organized into Consumer groups for a Topic. Each Consumer will read from a partition, and the Consumer group as a whole will read the entire Topic. If you organize your consumers in such as way that you have more consumers than partitions, some will go idle. However if you have more partitions than consumers, some consumers will get messages from multiple partitions

 

Image result for kafka consumer group

 

Further Reading

 

As I say I did not want to get too bogged down with the Kafka fundamentals, as this series is really about Kafka Streams, but the few points above as well as the “further reading” section should get you to a pretty decent position to understand the rest of this post.

 

Kafka Streams Introduction

Kafka Streams is a client library for processing and analyzing data stored in Kafka. It builds upon important stream processing concepts such as properly distinguishing between event time and processing time, windowing support, and simple yet efficient management and real-time querying of application state.

Kafka Streams has a low barrier to entry: You can quickly write and run a small-scale proof-of-concept on a single machine; and you only need to run additional instances of your application on multiple machines to scale up to high-volume production workloads. Kafka Streams transparently handles the load balancing of multiple instances of the same application by leveraging Kafka’s parallelism model.

 

Stream Processing Topology

  • A stream is the most important abstraction provided by Kafka Streams: it represents an unbounded, continuously updating data set. A stream is an ordered, replayable, and fault-tolerant sequence of immutable data records, where a data record is defined as a key-value pair.
  • A stream processing application is any program that makes use of the Kafka Streams library. It defines its computational logic through one or more processor topologies, where a processor topology is a graph of stream processors (nodes) that are connected by streams (edges).
  • A stream processor is a node in the processor topology; it represents a processing step to transform data in streams by receiving one input record at a time from its upstream processors in the topology, applying its operation to it, and may subsequently produce one or more output records to its downstream processors.
    There are two special processors in the topology:
    Source Processor: A source processor is a special type of stream processor that does not have any upstream processors. It produces an input stream to its topology from one or multiple Kafka topics by consuming records from these topics and forwarding them to its down-stream processors.
    Sink Processor: A sink processor is a special type of stream processor that does not have down-stream processors. It sends any received records from its up-stream processors to a specified Kafka topic.
    Note that in normal processor nodes other remote systems can also be accessed while processing the current record. Therefore the processed results can either be streamed back into Kafka or written to an external system.

Although this post will not show any code at all, I want to introduce a couple of things that will be coming up in the next posts, namely

  • KStream
  • KTable
  • Global KTable
  • State Stores

 

KStream

A KStream is an abstraction of a record stream, where each data record represents a self-contained datum in the unbounded data set. Using the table analogy, data records in a record stream are always interpreted as an “INSERT” — think: adding more entries to an append-only ledger — because no record replaces an existing row with the same key. Examples are a credit card transaction, a page view event, or a server log entry.

To illustrate, let’s imagine the following two data records are being sent to the stream:

(“alice”, 1) –> (“alice”, 3)

If your stream processing application were to sum the values per user, it would return 4 for alice. Why? Because the second data record would not be considered an update of the previous record.

https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#streams_concepts_kstream

 

KTable

A KTable is an abstraction of a changelog stream, where each data record represents an update. More precisely, the value in a data record is interpreted as an “UPDATE” of the last value for the same record key, if any (if a corresponding key doesn’t exist yet, the update will be considered an INSERT). Using the table analogy, a data record in a changelog stream is interpreted as an UPSERT aka INSERT/UPDATE because any existing row with the same key is overwritten. Also, null values are interpreted in a special way: a record with a null value represents a “DELETE” or tombstone for the record’s key.

To illustrate, let’s imagine the following two data records are being sent to the stream:

(“alice”, 1) –> (“alice”, 3)

If your stream processing application were to sum the values per user, it would return 3 for alice. Why? Because the second data record would be considered an update of the previous record.

https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#streams_concepts_ktable

 

GlobalKTable

Like a KTable, a GlobalKTable is an abstraction of a changelog stream, where each data record represents an update.

A GlobalKTable differs from a KTable in the data that they are being populated with, i.e. which data from the underlying Kafka topic is being read into the respective table. Slightly simplified, imagine you have an input topic with 5 partitions. In your application, you want to read this topic into a table. Also, you want to run your application across 5 application instances for maximum parallelism.

  • If you read the input topic into a KTable, then the “local” KTable instance of each application instance will be populated with data from only 1 partition of the topic’s 5 partitions.
  • If you read the input topic into a GlobalKTable, then the local GlobalKTable instance of each application instance will be populated with data from all partitions of the topic.

GlobalKTable provides the ability to look up current values of data records by keys.

https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#streams_concepts_globalktable

 

State Stores

Kafka Streams provides so-called state stores, which can be used by stream processing applications to store and query data. This is an important capability when implementing stateful operations. Every task in Kafka Streams embeds one or more state stores that can be accessed via APIs to store and query data required for processing. These state stores can either be a persistent key-value store, an in-memory hashmap, or another convenient data structure. Kafka Streams offers fault-tolerance and automatic recovery for local state stores.

Kafka Streams allows direct read-only queries of the state stores by methods, threads, processes or applications external to the stream processing application that created the state stores. This is provided through a feature called Interactive Queries. All stores are named and Interactive Queries exposes only the read operations of the underlying implementation.

https://kafka.apache.org/21/documentation/streams/core-concepts#streams_state

 

That’s It

So that’s it for now. I have borrowed a lot of material from the official docs for this post, but as we move through the series we will start to form our own code/topologies, and as such our own tests/descriptions. So please forgive me this one.

Distributed Systems, Kafka

Kafka Streams Using Avro/Schema Registry

This is the 4th and final post in a small mini series that I will be doing using Apache Kafka + Avro. The programming language will be Scala. As such the following prerequisites need to be obtained should you wish to run the code that goes along with each post. The other point is that I am mainly a Windows user, as such the instructions, scripts will have a Windows bias to them. So if you are not a Windows user you will need to find the instructions for your OS of choice.

Prerequisites

So go and grab that lot if you want to follow along.

Last time we talked about the Schema registry and how to use it. This time we will be looking at how we can use Avro/Schema Registry inside of a Kafka Streams App.

 

Where is the code for this post?

You can grab the code that goes with this post from here : https://github.com/sachabarber/KafkaAvroExamples/tree/master/KafkaStreamsSpecificAvro

 

What will be included in this post?

We will be dealing with these 3 elements to make this post work

 

  • Publisher (using User Avro object)
  • Streams (Taking User Avro objects and transforming them into UserWithUUID objects and sending them to output topic)
  • Consumer (using UserWithUUID Avro object)

 

Before we get into the nitty gritty lets just have a look at the Avro files

 

User

This is the User schema / Case class

{
    "namespace": "com.barber.kafka.avro",
     "type": "record",
     "name": "User",
     "fields":[
         {
            "name": "id", "type": "int"
         },
         {
            "name": "name",  "type": "string"
         }
     ]
}

 

/** MACHINE-GENERATED FROM AVRO SCHEMA. DO NOT EDIT DIRECTLY */
package com.barber.kafka.avro

import scala.annotation.switch
import scala.io.Source

case class User(var id: Int, var name: String) extends org.apache.avro.specific.SpecificRecordBase {
  def this() = this(0, "")
  def get(field$: Int): AnyRef = {
    (field$: @switch) match {
      case pos if pos == 0 => {
        id
      }.asInstanceOf[AnyRef]
      case pos if pos == 1 => {
        name
      }.asInstanceOf[AnyRef]
      case _ => new org.apache.avro.AvroRuntimeException("Bad index")
    }
  }
  def put(field$: Int, value: Any): Unit = {
    (field$: @switch) match {
      case pos if pos == 0 => this.id = {
        value
      }.asInstanceOf[Int]
      case pos if pos == 1 => this.name = {
        value.toString
      }.asInstanceOf[String]
      case _ => new org.apache.avro.AvroRuntimeException("Bad index")
    }
    ()
  }
  def getSchema: org.apache.avro.Schema = User.SCHEMA$
}

object User {
  val SCHEMA$ = new org.apache.avro.Schema.Parser().parse(
    Source.fromURL(getClass.getResource("/userSchema.avsc")).mkString)
}

 

UserWithUUID

This is the UserWithUUID schema / Case class

{
    "namespace": "com.barber.kafka.avro",
     "type": "record",
     "name": "UserWithUUID",
     "fields":[
         {
            "name": "id", "type": "int"
         },
         {
            "name": "name",  "type": "string"
         },
         {
            "name": "uuid",  "type": "string"
         }
     ]
}

 

/** MACHINE-GENERATED FROM AVRO SCHEMA. DO NOT EDIT DIRECTLY */
package com.barber.kafka.avro

import scala.annotation.switch
import scala.io.Source

case class UserWithUUID(var id: Int, var name: String, var uuid: String) extends org.apache.avro.specific.SpecificRecordBase {
  def this() = this(0, "","")
  def get(field$: Int): AnyRef = {
    (field$: @switch) match {
      case pos if pos == 0 => {
        id
      }.asInstanceOf[AnyRef]
      case pos if pos == 1 => {
        name
      }.asInstanceOf[AnyRef]
      case pos if pos == 2 => {
        uuid
      }.asInstanceOf[AnyRef]
      case _ => new org.apache.avro.AvroRuntimeException("Bad index")
    }
  }
  def put(field$: Int, value: Any): Unit = {
    (field$: @switch) match {
      case pos if pos == 0 => this.id = {
        value
      }.asInstanceOf[Int]
      case pos if pos == 1 => this.name = {
        value.toString
      }.asInstanceOf[String]
      case pos if pos == 2 => this.uuid = {
        value.toString
      }.asInstanceOf[String]
      case _ => new org.apache.avro.AvroRuntimeException("Bad index")
    }
    ()
  }
  def getSchema: org.apache.avro.Schema = UserWithUUID.SCHEMA$
}

object UserWithUUID {
  val SCHEMA$ = new org.apache.avro.Schema.Parser().parse(
    Source.fromURL(getClass.getResource("/userWithUUIDSchema.avsc")).mkString)
}

 

Producer

The producer is much the same as the previous examples, so I wont dwell on this too much. Here is the most relevant code

package com.barber.kafka.avro

import java.util.{Properties, UUID}

import io.confluent.kafka.serializers.KafkaAvroSerializer
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.clients.producer.ProducerConfig


class KafkaDemoAvroPublisher(val topic:String) {

  private val props = new Properties()
  props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
  props.put("schema.registry.url", "http://localhost:8081")
  props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getCanonicalName)
  props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,classOf[KafkaAvroSerializer].getCanonicalName)
  props.put(ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString())
  props.put(ProducerConfig.ACKS_CONFIG,  "all")
  props.put(ProducerConfig.RETRIES_CONFIG, "0")
  private val producer =   new KafkaProducer[String,User](props)

  def send(): Unit = {
    try {
      val rand =  new scala.util.Random(44343)

      for(i <- 1 to 10) {
        val id = rand.nextInt()
        val itemToSend = User(id , "ishot.com")
        println(s"Producer sending data ${itemToSend.toString}")
        producer.send(new ProducerRecord[String, User](topic, itemToSend))
        producer.flush()
      }
    } catch {
      case ex: Exception =>
        println(ex.printStackTrace().toString)
        ex.printStackTrace()
    }
  }
}

 

Streams App

This is the new element for this post. So what does it do?

Well quite simply it hooks up some stream processing to the producers topic (where User avro objects are being used). It then does a simple Map transform on the stream items to create a new KStream[String,UserWithUUID] which is then sent to the output topic the consumer will use. In a nutshell that is all there is to it.

Here is the code to do the streams app

 

package com.barber.kafka.avro


object StreamsApp extends App {
  private val inputTopic = "avro-streams-input-topic"
  private val outputTopic = "avro-streams-useruuid-output-topic"

  val consumer = new KafkaDemoAvroStreams(inputTopic, outputTopic)
  consumer.start()

}

 

package com.barber.kafka.avro


import java.util.{Collections, Properties}

import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
import org.apache.avro.specific.SpecificRecord
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.Serdes.StringSerde
import org.apache.kafka.common.serialization.{Serde, Serdes, StringDeserializer}
import org.apache.kafka.streams.kstream.{KStream, Produced}

import scala.concurrent.TimeoutException
import org.apache.kafka.streams.{KafkaStreams, KeyValue, StreamsBuilder, StreamsConfig}

class KafkaDemoAvroStreams(val inputTopic:String, val outputTopic:String) {


  val builder: StreamsBuilder = new StreamsBuilder()
  var streams: Option[KafkaStreams] = None

  val streamsConfiguration: Properties = {
    val p = new Properties()
    p.put(StreamsConfig.APPLICATION_ID_CONFIG, "avro-stream-demo-topic-streams")
    p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
    p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName())
    p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, classOf[SpecificAvroSerde[_ <: SpecificRecord]])
    p.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,  "http://localhost:8081")
    p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
    p
  }


  def start() = {

    try {
      Runtime.getRuntime.addShutdownHook(new Thread(() => close()))

      //https://github.com/confluentinc/kafka-streams-examples/blob/4.1.x/src/test/scala/io/confluent/examples/streams/SpecificAvroScalaIntegrationTest.scala

      // Write the input data as-is to the output topic.
      //
      // If 
      // 
      // a) we have already configured the correct default serdes for keys and
      // values
      // 
      // b) the types for keys and values are the same for both the input topic and the
      // output topic
      // 
      // We would only need to define:
      //
      //   builder.stream(inputTopic).to(outputTopic);
      //
      // However, in the code below we intentionally override the default serdes in `to()` to
      // demonstrate how you can construct and configure a specific Avro serde manually.
      val stringSerde: Serde[String] = Serdes.String
      val specificAvroUserSerde: Serde[User] = new SpecificAvroSerde[User]
      val specificAvroUserWithUUIDSerde: Serde[UserWithUUID] = new SpecificAvroSerde[UserWithUUID]

      // Note how we must manually call `configure()` on this serde to configure the schema registry
      // url.  This is different from the case of setting default serdes (see `streamsConfiguration`
      // above), which will be auto-configured based on the `StreamsConfiguration` instance.
      val isKeySerde: Boolean = false
      specificAvroUserSerde.configure(
        Collections.singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
        "http://localhost:8081"), isKeySerde)
      specificAvroUserWithUUIDSerde.configure(
        Collections.singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
          "http://localhost:8081"), isKeySerde)


      val stream: KStream[String, User] = builder.stream(inputTopic)

      val mappedStream  =
        stream.map[String, UserWithUUID]((k,v) => {
            println("Streams saw messsage ============ ")
            println(s"Saw User ${v}")
            new KeyValue(k, UserWithUUID(v.id,v.name, java.util.UUID.randomUUID().toString()))
        })

      //send UserWithUUID out on output topic
      mappedStream.to(outputTopic, Produced.`with`(stringSerde, specificAvroUserWithUUIDSerde))
      streams = Some(new KafkaStreams(builder.build(), streamsConfiguration))
      streams.map(_.start())

    }
    catch {
      case timeOutEx: TimeoutException =>
        println("Timeout ")
        false
      case ex: Exception => ex.printStackTrace()
        println("Got error when reading message ")
        false
    }
  }

  def close(): Unit = streams.map(_.close())

}

See how this time we are not working with Serializer/Deserializer directly but rather use things called “Serde”. These are serializer/deserializer objects. See how we also need to tell the Serde about the Schema Registry, and also how we set the serdes to use in the final “to” operation to send the transformed data to the output topic.

 

 

ConsumerApp

This too is not that different from the previous examples. The only thing to note here is that this one uses the outputs from the streams topic, so it will be using UserWithUUID objects. Here is the relevant code

package com.barber.kafka.avro

import java.util.Properties

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, ConsumerRecords, KafkaConsumer}
import java.util.Collections

import org.apache.kafka.common.errors.TimeoutException
import io.confluent.kafka.serializers.{KafkaAvroDeserializer, KafkaAvroDeserializerConfig}
import org.apache.kafka.common.serialization.StringDeserializer

class KafkaDemoAvroSubscriber(val topic:String) {

  private val props = new Properties()
  val groupId = "avro-stream-demo-topic-useruuid-consumer"
  var shouldRun : Boolean = true

  props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
  props.put("schema.registry.url", "http://localhost:8081")
  props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
  props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
  props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
  props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "10000")
  props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000")
  props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getCanonicalName)
  props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,classOf[KafkaAvroDeserializer].getCanonicalName)
  //Use Specific Record or else you get Avro GenericRecord.
  props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true")

  private val consumer = new KafkaConsumer[String, com.barber.kafka.avro.UserWithUUID](props)

  def start() = {

    try {
      Runtime.getRuntime.addShutdownHook(new Thread(() => close()))

      consumer.subscribe(Collections.singletonList(topic))

      while (shouldRun) {
        val records: ConsumerRecords[String,  com.barber.kafka.avro.UserWithUUID] = consumer.poll(1000)
        val it = records.iterator()
        while(it.hasNext()) {
          println("Getting message from queue.............")
          val record: ConsumerRecord[String,  com.barber.kafka.avro.UserWithUUID] = it.next()
          val recievedItem =record.value()
          println(s"Saw UserWithUUID ${recievedItem}")
          consumer.commitSync
        }
      }
    }
    catch {
      case timeOutEx: TimeoutException =>
        println("Timeout ")
        false
      case ex: Exception => ex.printStackTrace()
        println("Got error when reading message ")
        false
    }
  }

  def close(): Unit = shouldRun = false
}

 

 

 

So how do I run this stuff?

As I stated above you will need to download a few things, but once you have those in place you may find the small PowerShell script useful that is inside the projects called “RunThePipeline.ps1”. This script does a few things, such as cleans the Kafka/Zookeeper logs, stops any previous instances, starts new instances and also creates the Kafka topic (which you must have before you can use the code).

 

IMPORTANT NOTE : I have altered the Kafka log paths, and where Zookeeper logs to. This can be done using the relevant properties files from the Confluent installation

 

image

 

server.properties

This line was changed

# A comma seperated list of directories under which to store log files
log.dirs=c:/temp/kafka-logs

 

zookeeper.properties

This line was changed

# the directory where the snapshot is stored.
dataDir=c:/temp/zookeeper

 

The PowerShell script will be making assumptions based on the where you changed these values to, so if you have different settings for these please edit the PowerShell script too )

Essentially you need to have the following running before you can run the code example (this is what the PowerShell script automates for you, though you may prefer to use the Confluent CLI, I just had this script from a previous project, and it also cleans out old data which is useful when you are experimenting, and it also creates the required topic)

  • Zookeeper
  • Kafka broker
  • Starts the Schema Registry
  • Kafka topic created

 

Once you have run the PowerShell command line you can just run these projects

 

  • com.barber.kafka.avro.PublisherApp
  • com.barber.kafka.avro.StreamsApp
  • com.barber.kafka.avro.SubscriberApp
Distributed Systems, Kafka, Scala

Kafka Schema Registry

This is the 3rd post in a small mini series that I will be doing using Apache Kafka + Avro. The programming language will be Scala. As such the following prerequisites need to be obtained should you wish to run the code that goes along with each post. The other point is that I am mainly a Windows user, as such the instructions, scripts will have a Windows bias to them. So if you are not a Windows user you will need to find the instructions for your OS of choice.

 

Prerequisites

So go and grab that lot if you want to follow along.

 

Last time we talked about how to create a Kafka Producer/Consumer which uses the KafkaAvroSerializer when using Avro data of a specific type. We also used the Kafka Schema Registry and had a tiny introduction as to what the Kafka Schema Registry was all about. This time we will be looking at the Kafka Schema Registry in a lot more detail.

 

 

Where is the code for this post?

You can grab the code that goes with this post from here : https://github.com/sachabarber/KafkaAvroExamples/tree/master/KafkaSpecificAvroBrokenProducer and here too : https://github.com/sachabarber/KafkaAvroExamples/tree/master/KafkaSchemaRegistryTests

 

Kafka Schema Registry Deep Dive

The idea behind the Schema Registry is that Confluent provide it as a service that exposes a REST API that integrates closely with the rest of the Kafka stack. The Schema Registry acts as a single store for schema metadata, in a versioned historical manner. It also provides compatibility settings that allow the evolution of schemas according to the configured compatibility setting. As we have already seen in the last post there are also serializers that are provided that work with the Registry and Avro format. Thus far we have only seen a Producer/Consumer both of which use the KafkaAvroSerializer, but there is also a Serde (serializer/deserializer) available when working with Kafka Streams.Which we will talk about in the next post.

 

You may be wondering just why you may want to use some sort of schema registry. Well have you ever found yourself in a situation where you store a JSON blob in a row in a database, and then later you make changes to the format of the original object that was used to store these historic rows. Then you find yourself in the unenviable position of no longer being able to easily deserialize your old database JSON data. You need to either write some mapping code or evolve the data to the latest schema format. This is not an easy task, I have done this by hand once or twice and its hard to get right. This is the job of the Kafka Schema registry essentially. It’s a helping/guiding light to help you traverse this issue.

 

I have taken the next section from the official docs, as there is good detail in here that is important to the rest of this post, and it really is a case of the official docs saying it best, so let’s just borrow this bit of text, the rest of the post will put this into action

 

The Schema Registry is a distributed storage layer for Avro Schemas which uses Kafka as its underlying storage mechanism. Some key design decisions:

  • Assigns globally unique id to each registered schema. Allocated ids are guaranteed to be monotonically increasing but not necessarily consecutive.
  • Kafka provides the durable backend, and functions as a write-ahead changelog for the state of the Schema Registry and the schemas it contains.
  • The Schema Registry is designed to be distributed, with single-master architecture, and ZooKeeper/Kafka coordinates master election (based on the configuration).

 

Kafka Backend

Kafka is used as the Schema Registry storage backend. The special Kafka topic <kafkastore.topic> (default _schemas), with a single partition, is used as a highly available write ahead log. All schemas, subject/version and id metadata, and compatibility settings are appended as messages to this log. A Schema Registry instance therefore both produces and consumes messages under the _schemas topic. It produces messages to the log when, for example, new schemas are registered under a subject, or when updates to compatibility settings are registered. The Schema Registry consumes from the _schemas log in a background thread, and updates its local caches on consumption of each new _schemas message to reflect the newly added schema or compatibility setting. Updating local state from the Kafka log in this manner ensures durability, ordering, and easy recoverability.

 

Compatibility

The Schema Registry server can enforce certain compatibility rules when new schemas are registered in a subject. Currently, we support the following compatibility rules.

  • Backward compatibility (default): A new schema is backwards compatible if it can be used to read the data written in the latest (older) registered schema.
  • Transitive backward compatibility: A new schema is transitively backwards compatible if it can be used to read the data written in all previously registered schemas. Backward compatibility is useful for loading data into systems like Hadoop since one can always query data of all versions using the latest schema.
  • Forward compatibility: A new schema is forward compatible if the latest (old) registered schema can read data written in this schema, or to put this another way “data encoded with a newer schema can be read with an older schema.”
  • Transitive forward compatibility: A new schema is transitively forward compatible if all previous schemas can read data written in this schema. Forward compatibility is useful for consumer applications that can only deal with data in a particular version that may not always be the latest version.
  • Full compatibility: A new schema is fully compatible if it’s both backward and forward compatible with the latest registered schema.
  • Transitive full compatibility: A new schema is transitively full compatible if it’s both backward and forward compatible with all previously registered schemas.
  • No compatibility: A new schema can be any schema as long as it’s a valid Avro.

 

An Evolution Example

Say we had this initial schema

{"namespace": "barbers.avro",
 "type": "record",
 "name": "person",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "age",  "type": "int"},
 ]
}

We can evolve this schema into this one, where we supply a default location.

{"namespace": "barbers.avro",
 "type": "record",
 "name": "person",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "age",  "type": "int"},
	 {"name": "location", "type": "string", "default": "uk"}
 ]
}

 

This allows data encoded with the old one to be read using this new schema. This is backward compatible. If we had not of supplied this default for ”location” this would no longer be backward compatible.

 

Forward compatibility means that data encoded with a newer schema can be read with an older schema. This new schema is also forward compatible with the original schema, since we can just drop the defaulted new field “location” when projecting new data into the old. Had the new schema chosen to no longer include “age” column it would not be forward compatible, as we would not know how to fill in this field for the the old schema.

For more information on this you should have a bit more of a read here : https://docs.confluent.io/current/avro.html

 

Ok so I think that gives us a flavor of what the Schema Registry is all about, let’s carry on and see some code examples.

 

So how do I run this stuff?

 

As I stated above you will need to download a few things, but once you have those in place you may find the small PowerShell script useful that is inside the projects called “RunThePipeline.ps1”. This script does a few things, such as cleans the Kafka/Zookeeper logs, stops any previous instances, starts new instances and also creates the Kafka topic (which you must have before you can use the code).

IMPORTANT NOTE : I have altered the Kafka log paths, and where Zookeeper logs to. This can be done using the relevant properties files from the Confluent installation

 

image

 

server.properties

This line was changed

# A comma seperated list of directories under which to store log files
log.dirs=c:/temp/kafka-logs

 

zookeeper.properties

This line was changed

# the directory where the snapshot is stored.
dataDir=c:/temp/zookeeper

 

The PowerShell script will be making assumptions based on the where you changed these values to, so if you have different settings for these please edit the PowerShell script too )

 

Essentially you need to have the following running before you can run the code example (this is what the PowerShell script automates for you, though you may prefer to use the Confluent CLI, I just had this script from a previous project, and it also cleans out old data which is useful when you are experimenting, and it also creates the required topic)

 

  • Zookeeper
  • Kafka broker
  • Starts the Schema Registry
  • Kafka topic created

 

 

Using the producer to test out the Registry

This is this project in the GitHub demo code : https://github.com/sachabarber/KafkaAvroExamples/tree/master/KafkaSpecificAvroBrokenProducer

 

Since we worked with the Schema Registry and a Kafka Producer that made use of the Registry in the last post, I thought it might a good idea to make some changes to the publisher of the last post, to see if we can create some compatibility tests against the Schema Registry that the Producer is using.

 

So so before we see the running code, lets examine some schemas that are run in the PublisherApp in this exact order

 

User schema

{
    "namespace": "com.barber.kafka.avro",
     "type": "record",
     "name": "User",
     "fields":[
         {
            "name": "id", "type": "int"
         },
         {
            "name": "name",  "type": "string"
         }
     ]
}

 

User without name schema

{
    "namespace": "com.barber.kafka.avro",
     "type": "record",
     "name": "User",
     "fields":[
         {
            "name": "id", "type": "int"
         }
     ]
}

 

User with boolean “Id” field schema

{
    "namespace": "com.barber.kafka.avro",
     "type": "record",
     "name": "User",
     "fields":[
         {
            "name": "id", "type": "boolean"
         }
     ]
}

 

Another completely new type with string “Id” field schema

{
    "namespace": "com.barber.kafka.avro",
     "type": "record",
     "name": "AnotherExampleWithStringId",
     "fields":[
         {
            "name": "id", "type": "string"
         }
     ]
}

 

So since the Kafka Producer is setup to use the Kafka Schema Registry and is sending Avro using the KafkaAvroSerializer for the key, we start with the 1st schema (User Schema) shown above being the one that is registered against the Kafka Schema Registry subject Kafka-value (we will see more of the Registry API below for now just understand that when using the Schema Registry a auto registration is done against the topic/value for the given producer, on the 1st one to use the topic being the one that sets these items for the Registry).

 

Ok so we have stated that we start out with the 1st schema (User Schema) shown above, and then we progress through each of the other schemas shown above and try and send the Avro serialized data across the Kafka topic, here is the code from the Producer App

package com.barber.kafka.avro

import java.util.{Properties, UUID}

import io.confluent.kafka.serializers.KafkaAvroSerializer
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer

class KafkaDemoAvroPublisher(val topic:String) {

  private val props = new Properties()
  props.put("bootstrap.servers", "localhost:9092")
  props.put("schema.registry.url", "http://localhost:8081")
  props.put("key.serializer", classOf[StringSerializer].getCanonicalName)
  props.put("value.serializer",classOf[KafkaAvroSerializer].getCanonicalName)
  props.put("client.id", UUID.randomUUID().toString())

  private val producer =   new KafkaProducer[String,User](props)
  private val producerUserWithoutName =   new KafkaProducer[String,UserWithoutName](props)
  private val producerUserWithBooleanIdBad =   new KafkaProducer[String,UserWithBooleanId](props)
  private val producerAnotherExampleWithStringIdBad = new KafkaProducer[String,AnotherExampleWithStringId](props)

  def send(): Unit = {
    try {
      val rand =  new scala.util.Random(44343)

      //we expect this to work, as its the one that is going to define the Avro format of then topic
      //since its the 1st published message on the topic (Assuming you have not preregistered the topic key + Avro schemea
      //with the schema registry already)
      for(i <- 1 to 10) {
        val id = rand.nextInt()
        val itemToSend = User(id , "ishot.com")
        println(s"Producer sending data ${itemToSend.toString}")
        producer.send(new ProducerRecord[String, User](topic, itemToSend))
        producer.flush()
      }


      //we expect this to work as having a User without a name is ok, as Name is a "string" so can be empty
      for(i <- 1 to 10) {
        val id = rand.nextInt()
        val itemToSend = UserWithoutName(id)
        println(s"Producer sending data ${itemToSend.toString}")
        producerUserWithoutName.send(new ProducerRecord[String, UserWithoutName](topic, itemToSend))
        producerUserWithoutName.flush()
      }


      //we expect this to fail as its trying to send a different incompatible Avro object on the topic
      //which is currently using the "User" (Avro object / Schema)
      sendBadProducerValue("UserWithBooleanId", () => {
        val itemToSend = UserWithBooleanId(true)
        println(s"Producer sending data ${itemToSend.toString}")
        producerUserWithBooleanIdBad.send(new ProducerRecord[String, UserWithBooleanId](topic, itemToSend))
        producerUserWithBooleanIdBad.flush()
      })

      //we expect this to fail as its trying to send a different incompatible Avro object on the topic
      //which is currently using the "User" (Avro object / Schema)
      sendBadProducerValue("AnotherExampleWithStringId", () => {
        val itemToSend = AnotherExampleWithStringId("fdfdfdsdsfs")
        println(s"Producer sending data ${itemToSend.toString}")
        producerAnotherExampleWithStringIdBad.send(new ProducerRecord[String, AnotherExampleWithStringId](topic, itemToSend))
        producerAnotherExampleWithStringIdBad.flush()
      })

    } catch {
      case ex: Exception =>
        println(ex.printStackTrace().toString)
        ex.printStackTrace()
    }
  }

  def sendBadProducerValue(itemType: String, produceCallback: () => Unit) : Unit = {
    try {
      //we expect this to fail as its trying to send a different incompatible
      // Avro object on the topic which is currently using the "User" (Avro object / Schema)
      println(s"Sending $itemType")
      produceCallback()
    } catch {
      case ex: Exception => {
        println("==============================================================================\r\n")
        println(s" We were expecting this due to incompatble '$itemType' item being sent\r\n")
        println("==============================================================================\r\n")
        println(ex.printStackTrace().toString)
        println()
        ex.printStackTrace()
      }
    }
  }
}

 

Now lets see the relevant output

 

NOTE : This assumes a clean run with a new empty topic

 

 

We start by sending some User Schema data, which is fine as it’s the first data on the topic

 

Producer sending data {“id”: -777306865, “name”: “ishot.com”}
06:47:05.816 [kafka-producer-network-thread | 6c9d7365-1216-4c4e-9cfe-da3e3d5654d1] DEBUG org.apache.kafka.clients.NetworkClient – [Producer clientId=6c9d7365-1216-4c4e-9cfe-da3e3d5654d1] Using older server API v3 to send PRODUCE {acks=1,timeout=30000,partitionSizes=[avro-specific-demo-topic-0=88]} with correlation id 8 to node 0
Producer sending data {“id”: 1227013473, “name”: “ishot.com”}
06:47:05.818 [kafka-producer-network-thread | 6c9d7365-1216-4c4e-9cfe-da3e3d5654d1] DEBUG org.apache.kafka.clients.NetworkClient – [Producer clientId=6c9d7365-1216-4c4e-9cfe-da3e3d5654d1] Using older server API v3 to send PRODUCE {acks=1,timeout=30000,partitionSizes=[avro-specific-demo-topic-0=88]} with correlation id 9 to node 0
Producer sending data {“id”: 1899269599, “name”: “ishot.com”}
06:47:05.821 [kafka-producer-network-thread | 6c9d7365-1216-4c4e-9cfe-da3e3d5654d1] DEBUG org.apache.kafka.clients.NetworkClient – [Producer clientId=6c9d7365-1216-4c4e-9cfe-da3e3d5654d1] Using older server API v3 to send PRODUCE {acks=1,timeout=30000,partitionSizes=[avro-specific-demo-topic-0=88]} with correlation id 10 to node 0
Producer sending data {“id”: -1764893671, “name”: “ishot.com”}

 

We then try send some User Without Name Schema data, which is fine as it’s compatible with the User Schema already registered in the Kafka Registry

 

Producer sending data {“id”: -1005363706}
06:47:05.895 [kafka-producer-network-thread | 6c9d7365-1216-4c4e-9cfe-da3e3d5654d1] DEBUG org.apache.kafka.clients.NetworkClient – [Producer clientId=6c9d7365-1216-4c4e-9cfe-da3e3d5654d1] Using older server API v3 to send PRODUCE {acks=1,timeout=30000,partitionSizes=[avro-specific-demo-topic-0=78]} with correlation id 4 to node 0
Producer sending data {“id”: -528430870}
06:47:05.900 [kafka-producer-network-thread | 6c9d7365-1216-4c4e-9cfe-da3e3d5654d1] DEBUG org.apache.kafka.clients.NetworkClient – [Producer clientId=6c9d7365-1216-4c4e-9cfe-da3e3d5654d1] Using older server API v3 to send PRODUCE {acks=1,timeout=30000,partitionSizes=[avro-specific-demo-topic-0=78]} with correlation id 5 to node 0
Producer sending data {“id”: -877322591}
06:47:05.905 [kafka-producer-network-thread | 6c9d7365-1216-4c4e-9cfe-da3e3d5654d1] DEBUG org.apache.kafka.clients.NetworkClient – [Producer clientId=6c9d7365-1216-4c4e-9cfe-da3e3d5654d1] Using older server API v3 to send PRODUCE {acks=1,timeout=30000,partitionSizes=[avro-specific-demo-topic-0=78]} with correlation id 6 to node 0
Producer sending data {“id”: 2048308094}

 

We then try send some User With Boolean Id Field Schema data, which we expect to be NOT acceptable/incompatible with the User Schema already registered in the Kafka Registry. This is due to the fact we have changed the Id field into a boolean, where the already registered User Schema expects this field to be an “int”. So does it fail?

 

In a word yes, here is the relevant output

 

Sending UserWithBooleanId
Producer sending data {“id”: true}

06:47:05.954 [main] DEBUG io.confluent.kafka.schemaregistry.client.rest.RestService – Sending POST with input {“schema”:”{\”type\”:\”record\”,\”name\”:\”User\”,\”namespace\”:\”com.barber.kafka.avro\”,\”fields\”:[{\”name\”:\”id\”,\”type\”:\”boolean\”}]}”} to http://localhost:8081/subjects/avro-specific-demo-topic-value/versions
==============================================================================

We were expecting this due to incompatble ‘UserWithBooleanId’ item being sent

==============================================================================

org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: {“type”:”record”,”name”:”User”,”namespace”:”com.barber.kafka.avro”,”fields”:[{“name”:”id”,”type”:”boolean”}]}
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema; error code: 409
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:170)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:188)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:245)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:237)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:232)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:59)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:91)
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:72)
    at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:54)
    at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:65)
    at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:55)
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:807)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:784)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:671)
    at com.barber.kafka.avro.KafkaDemoAvroPublisher.$anonfun$send$3(KafkaDemoAvroPublisher.scala:54)
    at com.barber.kafka.avro.KafkaDemoAvroPublisher$$Lambda$17/746074699.apply$mcV$sp(Unknown Source)
    at com.barber.kafka.avro.KafkaDemoAvroPublisher.sendBadProducerValue(KafkaDemoAvroPublisher.scala:79)
    at com.barber.kafka.avro.KafkaDemoAvroPublisher.send(KafkaDemoAvroPublisher.scala:51)
    at com.barber.kafka.avro.PublisherApp$.delayedEndpoint$com$barber$kafka$avro$PublisherApp$1(PublisherApp.scala:6)
    at com.barber.kafka.avro.PublisherApp$delayedInit$body.apply(PublisherApp.scala:3)
    at scala.Function0.apply$mcV$sp(Function0.scala:34)
    at scala.Function0.apply$mcV$sp$(Function0.scala:34)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App.$anonfun$main$1$adapted(App.scala:76)
    at scala.App$$Lambda$5/511833308.apply(Unknown Source)
    at scala.collection.immutable.List.foreach(List.scala:378)
    at scala.App.main(App.scala:76)
    at scala.App.main$(App.scala:74)
    at com.barber.kafka.avro.PublisherApp$.main(PublisherApp.scala:3)
    at com.barber.kafka.avro.PublisherApp.main(PublisherApp.scala)
o

 

 

We then try send some Completely Different Object With String Id Field Schema data, which we expect to be NOT acceptable/incompatible with the User Schema already registered in the Kafka Registry. This is due to the fact we have changed the Id field into a string, where the already registered User Schema expects this field to be an “int”, and we have also omitted a required “name” field in this new type.

So does it fail?

 

In a word yes, here is the relevant output

 

Sending AnotherExampleWithStringId
Producer sending data {“id”: “fdfdfdsdsfs”}

06:47:06.059 [main] DEBUG io.confluent.kafka.schemaregistry.client.rest.RestService – Sending POST with input {“schema”:”{\”type\”:\”record\”,\”name\”:\”AnotherExampleWithStringId\”,\”namespace\”:\”com.barber.kafka.avro\”,\”fields\”:[{\”name\”:\”id\”,\”type\”:\”string\”}]}”} to http://localhost:8081/subjects/avro-specific-demo-topic-value/versions
==============================================================================

We were expecting this due to incompatble ‘AnotherExampleWithStringId’ item being sent

==============================================================================

()

org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: {“type”:”record”,”name”:”AnotherExampleWithStringId”,”namespace”:”com.barber.kafka.avro”,”fields”:[{“name”:”id”,”type”:”string”}]}
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema; error code: 409
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:170)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:188)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:245)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:237)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:232)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:59)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:91)
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:72)
    at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:54)
    at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:65)
    at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:55)
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:807)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:784)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:671)
    at com.barber.kafka.avro.KafkaDemoAvroPublisher.$anonfun$send$4(KafkaDemoAvroPublisher.scala:63)
    at com.barber.kafka.avro.KafkaDemoAvroPublisher$$Lambda$18/871790326.apply$mcV$sp(Unknown Source)
    at com.barber.kafka.avro.KafkaDemoAvroPublisher.sendBadProducerValue(KafkaDemoAvroPublisher.scala:79)
    at com.barber.kafka.avro.KafkaDemoAvroPublisher.send(KafkaDemoAvroPublisher.scala:60)
    at com.barber.kafka.avro.PublisherApp$.delayedEndpoint$com$barber$kafka$avro$PublisherApp$1(PublisherApp.scala:6)
    at com.barber.kafka.avro.PublisherApp$delayedInit$body.apply(PublisherApp.scala:3)
    at scala.Function0.apply$mcV$sp(Function0.scala:34)
    at scala.Function0.apply$mcV$sp$(Function0.scala:34)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App.$anonfun$main$1$adapted(App.scala:76)
    at scala.App$$Lambda$5/511833308.apply(Unknown Source)
    at scala.collection.immutable.List.foreach(List.scala:378)
    at scala.App.main(App.scala:76)
    at scala.App.main$(App.scala:74)
    at com.barber.kafka.avro.PublisherApp$.main(PublisherApp.scala:3)
    at com.barber.kafka.avro.PublisherApp.main(PublisherApp.scala)

Ok so that gives us a glimpse of what it’s like to work with the Kafka Producer and the Schema Registry. But surely there is more we can do with the Schema Registry REST API that is mentioned above?

 

Well yeah there is, we will now look at a second example in the codebase which will try a few more Schema Registry REST API calls.

 

 

 

 

Understanding Registry API

 

This is this project in the GitHub demo code : https://github.com/sachabarber/KafkaAvroExamples/tree/master/KafkaSchemaRegistryTests

 

I have chosen to use Akka Http for the REST API calls.

 

These are the 3 resources used by the code

 

compatibilityBACKWARD.json

{"compatibility": "BACKWARD"}

 

compatibilityNONE.json

{"compatibility": "NONE"}

 

simpleStringSchema.avsc

{"schema": "{\"type\": \"string\"}"}

 

 

Here is the entire codebase for various operations against the Schema Registry

import scala.io.Source
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import scala.concurrent._
import scala.concurrent.duration._
import akka.http.scaladsl.model._
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.stream.ActorMaterializer
import scala.concurrent.Future
import scala.util.{Failure, Success}
import akka.http.scaladsl.unmarshalling.Unmarshal
import AkkaHttpImplicits.{executionContext, materializer, system}

object RegistryApp extends App {

  var payload = ""
  var result = ""
  val schemaRegistryMediaType = MediaType.custom("application/vnd.schemaregistry.v1+json",false)
  implicit  val c1 = ContentType(schemaRegistryMediaType, () => HttpCharsets.`UTF-8`)

  //These queries are the same ones found here, but instead of using curl I am using Akka Http
  //https://docs.confluent.io/current/schema-registry/docs/intro.html


  //  # Register a new version of a schema under the subject "Kafka-key"
  //  $ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  //  --data '{"schema": "{\"type\": \"string\"}"}' \
  //  http://localhost:8081/subjects/Kafka-key/versions
  //  {"id":1}
  payload = Source.fromURL(getClass.getResource("/simpleStringSchema.avsc")).mkString
  result = post(payload,"http://localhost:8081/subjects/Kafka-key/versions")
  println("Register a new version of a schema under the subject \"Kafka-key\"")
  println("EXPECTING {\"id\":1}")
  println(s"GOT ${result}\r\n")


  //  # Register a new version of a schema under the subject "Kafka-value"
  //  $ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  //  --data '{"schema": "{\"type\": \"string\"}"}' \
  //  http://localhost:8081/subjects/Kafka-value/versions
  //  {"id":1}
  payload = Source.fromURL(getClass.getResource("/simpleStringSchema.avsc")).mkString
  result = post(payload,"http://localhost:8081/subjects/Kafka-value/versions")
  println("Register a new version of a schema under the subject \"Kafka-value\"")
  println("EXPECTING {\"id\":1}")
  println(s"GOT ${result}\r\n")


  //  # List all subjects
  //  $ curl -X GET http://localhost:8081/subjects
  //    ["Kafka-value","Kafka-key"]
  result = get("http://localhost:8081/subjects")
  println("List all subjects")
  println("EXPECTING [\"Kafka-value\",\"Kafka-key\"]")
  println(s"GOT ${result}\r\n")


  //  # Fetch a schema by globally unique id 1
  //  $ curl -X GET http://localhost:8081/schemas/ids/1
  //  {"schema":"\"string\""}
  result = get("http://localhost:8081/schemas/ids/1")
  println("Fetch a schema by globally unique id 1")
  println("EXPECTING {\"schema\":\"\\\"string\\\"\"}")
  println(s"GOT ${result}\r\n")


  //  # List all schema versions registered under the subject "Kafka-value"
  //  $ curl -X GET http://localhost:8081/subjects/Kafka-value/versions
  //    [1]
  result = get("http://localhost:8081/subjects/Kafka-value/versions")
  println("List all schema versions registered under the subject \"Kafka-value\"")
  println("EXPECTING [1]")
  println(s"GOT ${result}\r\n")


  //  # Fetch version 1 of the schema registered under subject "Kafka-value"
  //  $ curl -X GET http://localhost:8081/subjects/Kafka-value/versions/1
  //  {"subject":"Kafka-value","version":1,"id":1,"schema":"\"string\""}
  result = get("http://localhost:8081/subjects/Kafka-value/versions/1")
  println("Fetch version 1 of the schema registered under subject \"Kafka-value\"")
  println("EXPECTING {\"subject\":\"Kafka-value\",\"version\":1,\"id\":1,\"schema\":\"\\\"string\\\"\"}")
  println(s"GOT ${result}\r\n")


  //  # Deletes version 1 of the schema registered under subject "Kafka-value"
  //  $ curl -X DELETE http://localhost:8081/subjects/Kafka-value/versions/1
  //  1
  result = delete("http://localhost:8081/subjects/Kafka-value/versions/1")
  println("Deletes version 1 of the schema registered under subject \"Kafka-value\"")
  println("EXPECTING 1")
  println(s"GOT ${result}\r\n")


  //  # Register the same schema under the subject "Kafka-value"
  //  $ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  //  --data '{"schema": "{\"type\": \"string\"}"}' \
  //  http://localhost:8081/subjects/Kafka-value/versions
  //  {"id":1}
  payload = Source.fromURL(getClass.getResource("/simpleStringSchema.avsc")).mkString
  result = post(payload,"http://localhost:8081/subjects/Kafka-value/versions")
  println("Register the same schema under the subject \"Kafka-value\"")
  println("EXPECTING {\"id\":1}")
  println(s"GOT ${result}\r\n")


  //  # Deletes the most recently registered schema under subject "Kafka-value"
  //  $ curl -X DELETE http://localhost:8081/subjects/Kafka-value/versions/latest
  //  2
  result = delete("http://localhost:8081/subjects/Kafka-value/versions/latest")
  println("Deletes the most recently registered schema under subject \"Kafka-value\"")
  println("EXPECTING 2")
  println(s"GOT ${result}\r\n")


  //  # Register the same schema under the subject "Kafka-value"
  //  $ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  //  --data '{"schema": "{\"type\": \"string\"}"}' \
  //  http://localhost:8081/subjects/Kafka-value/versions
  //  {"id":1}
  payload = Source.fromURL(getClass.getResource("/simpleStringSchema.avsc")).mkString
  result = post(payload,"http://localhost:8081/subjects/Kafka-value/versions")
  println("Register the same schema under the subject \"Kafka-value\"")
  println("EXPECTING {\"id\":1}")
  println(s"GOT ${result}\r\n")


  //  # Fetch the schema again by globally unique id 1
  //  $ curl -X GET http://localhost:8081/schemas/ids/1
  //  {"schema":"\"string\""}
  result = get("http://localhost:8081/schemas/ids/1")
  println("Fetch the schema again by globally unique id 1")
  println("EXPECTING {\"schema\":\"\\\"string\\\"\"}")
  println(s"GOT ${result}\r\n")


  //  # Check whether a schema has been registered under subject "Kafka-key"
  //  $ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  //  --data '{"schema": "{\"type\": \"string\"}"}' \
  //  http://localhost:8081/subjects/Kafka-key
  //  {"subject":"Kafka-key","version":1,"id":1,"schema":"\"string\""}
  payload = Source.fromURL(getClass.getResource("/simpleStringSchema.avsc")).mkString
  result = post(payload,"http://localhost:8081/subjects/Kafka-key")
  println("Check whether a schema has been registered under subject \"Kafka-key\"")
  println("EXPECTING {\"subject\":\"Kafka-key\",\"version\":1,\"id\":1,\"schema\":\"\\\"string\\\"\"}")
  println(s"GOT ${result}\r\n")


  //  # Test compatibility of a schema with the latest schema under subject "Kafka-value"
  //  $ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  //  --data '{"schema": "{\"type\": \"string\"}"}' \
  //  http://localhost:8081/compatibility/subjects/Kafka-value/versions/latest
  //  {"is_compatible":true}
  payload = Source.fromURL(getClass.getResource("/simpleStringSchema.avsc")).mkString
  result = post(payload,"http://localhost:8081/compatibility/subjects/Kafka-value/versions/latest")
  println("Test compatibility of a schema with the latest schema under subject \"Kafka-value\"")
  println("EXPECTING {\"is_compatible\":true}")
  println(s"GOT ${result}\r\n")


  //  # Get top level config
  //    $ curl -X GET http://localhost:8081/config
  //    {"compatibilityLevel":"BACKWARD"}
  result = get("http://localhost:8081/config")
  println("Get top level config")
  println("EXPECTING {\"compatibilityLevel\":\"BACKWARD\"}")
  println(s"GOT ${result}\r\n")


  //  # Update compatibility requirements globally
  //    $ curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  //  --data '{"compatibility": "NONE"}' \
  //  http://localhost:8081/config
  //    {"compatibility":"NONE"}
  payload = Source.fromURL(getClass.getResource("/compatibilityNONE.json")).mkString
  result = put(payload,"http://localhost:8081/config")
  println("Update compatibility requirements globally")
  println("EXPECTING {\"compatibility\":\"NONE\"}")
  println(s"GOT ${result}\r\n")


  //  # Update compatibility requirements under the subject "Kafka-value"
  //  $ curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  //  --data '{"compatibility": "BACKWARD"}' \
  //  http://localhost:8081/config/Kafka-value
  //  {"compatibility":"BACKWARD"}
  payload = Source.fromURL(getClass.getResource("/compatibilityBACKWARD.json")).mkString
  result = put(payload,"http://localhost:8081/config/Kafka-value")
  println("Update compatibility requirements under the subject \"Kafka-value\"")
  println("EXPECTING {\"compatibility\":\"BACKWARD\"}")
  println(s"GOT ${result}\r\n")


  //  # Deletes all schema versions registered under the subject "Kafka-value"
  //  $ curl -X DELETE http://localhost:8081/subjects/Kafka-value
  //    [3]
  result = delete("http://localhost:8081/subjects/Kafka-value")
  println("Deletes all schema versions registered under the subject \"Kafka-value\"")
  println("EXPECTING [3]")
  println(s"GOT ${result}\r\n")

  //  # List all subjects
  //  $ curl -X GET http://localhost:8081/subjects
  //    ["Kafka-key"]
  result = get("http://localhost:8081/subjects")
  println("List all subjects")
  println("EXPECTING [\"Kafka-key\"]")
  println(s"GOT ${result}\r\n")


  private[RegistryApp] def post(data: String, url:String)(implicit contentType:ContentType): String = {
    sendData(data, url, HttpMethods.POST, contentType)
  }

  private[RegistryApp] def put(data: String, url:String)(implicit contentType:ContentType): String = {
    sendData(data, url, HttpMethods.PUT, contentType)
  }

  private[RegistryApp] def sendData(data: String, url:String, method:HttpMethod, contentType:ContentType): String = {
    val responseFuture: Future[HttpResponse] =
      Http(system).singleRequest(
        HttpRequest(
          method,
          url,
          entity = HttpEntity(contentType, data.getBytes())
        )
      )
    val html = Await.result(responseFuture.flatMap(x => Unmarshal(x.entity).to[String]), 5 seconds)
    html
  }

  private[RegistryApp] def get(url:String)(implicit contentType:ContentType): String = {
    noBodiedRequest(url, HttpMethods.GET, contentType)
  }

  private[RegistryApp] def delete(url:String)(implicit contentType:ContentType): String = {
    noBodiedRequest(url, HttpMethods.DELETE, contentType)
  }

  private[RegistryApp] def noBodiedRequest(url:String,method:HttpMethod, contentType:ContentType): String = {
    val responseFuture: Future[HttpResponse] = Http(system).singleRequest(HttpRequest(method,url))
    val html = Await.result(responseFuture.flatMap(x => Unmarshal(x.entity).to[String]), 5 seconds)
    html
  }
}

 

 

And this is the output, which I hope is pretty self explanatory now that we have spent some time with the previous explanations/tests. You can read more about the Kafka Schema Registry REST API here : https://docs.confluent.io/current/schema-registry/docs/api.html

 

Register a new version of a schema under the subject “Kafka-key”
EXPECTING {“id”:1}
GOT {“id”:1}

 

Register a new version of a schema under the subject “Kafka-value”
EXPECTING {“id”:1}
GOT {“id”:1}

 

List all subjects
EXPECTING [“Kafka-value”,”Kafka-key”]
GOT [“Kafka-value”,”Kafka-key”]

 

Fetch a schema by globally unique id 1
EXPECTING {“schema”:”\”string\””}
GOT {“schema”:”\”string\””}

 

List all schema versions registered under the subject “Kafka-value”
EXPECTING [1]
GOT [1]

 

Fetch version 1 of the schema registered under subject “Kafka-value”
EXPECTING {“subject”:”Kafka-value”,”version”:1,”id”:1,”schema”:”\”string\””}
GOT {“subject”:”Kafka-value”,”version”:1,”id”:1,”schema”:”\”string\””}

 

Deletes version 1 of the schema registered under subject “Kafka-value”
EXPECTING 1
GOT 1

 

Register the same schema under the subject “Kafka-value”
EXPECTING {“id”:1}
GOT {“id”:1}

 

Deletes the most recently registered schema under subject “Kafka-value”
EXPECTING 2
GOT 2

 

Register the same schema under the subject “Kafka-value”
EXPECTING {“id”:1}
GOT {“id”:1}

 

Fetch the schema again by globally unique id 1
EXPECTING {“schema”:”\”string\””}
GOT {“schema”:”\”string\””}

 

Check whether a schema has been registered under subject “Kafka-key”
EXPECTING {“subject”:”Kafka-key”,”version”:1,”id”:1,”schema”:”\”string\””}
GOT {“subject”:”Kafka-key”,”version”:1,”id”:1,”schema”:”\”string\””}

 

Test compatibility of a schema with the latest schema under subject “Kafka-value”
EXPECTING {“is_compatible”:true}
GOT {“is_compatible”:true}

 

Get top level config
EXPECTING {“compatibilityLevel”:”BACKWARD”}
GOT {“compatibilityLevel”:”BACKWARD”}

 

Update compatibility requirements globally
EXPECTING {“compatibility”:”NONE”}
GOT {“compatibility”:”NONE”}

 

Update compatibility requirements under the subject “Kafka-value”
EXPECTING {“compatibility”:”BACKWARD”}
GOT {“compatibility”:”BACKWARD”}

 

Deletes all schema versions registered under the subject “Kafka-value”
EXPECTING [3]
GOT [3]

 

List all subjects
EXPECTING [“Kafka-key”]
GOT [“Kafka-key”]

 

 

 

 

Conclusion

As with most things in the Confluent platform the Schema Registry is a great bit of kit and easy to use. I urge you all to give it a spin