Kafka

KafkaStreams : Joining

Last time we look at Aggregating. This time we will continue to look at the streams DSL, and will look at joining. If you have ever done any standard SQL, this post will be very familiar

 

Where is the code?

The code for this post is all contained here

And the tests are all contained here

 

Joins

Types of join

KafkaStreams supports the following type of Joins, most of which we will look at using the KStream – KStream as our demo code, as its just easier to code tests up for

image

 

  • KStream-KStream joins are always windowed joins, because otherwise the size of the internal state store used to perform the join – e.g., a sliding window or “buffer” – would grow indefinite
  • KTable-KTable joins are always non-windowed joins. They are designed to be consistent with their counterparts in relational databases. The changelog streams of both KTables are materialized into local state stores to represent the latest snapshot of their table duals. The join result is a new KTable that represents the changelog stream of the join operation.
  • KStream-KTable joins are always non-windowed joins. They allow you to perform table lookups against a KTable (changelog stream) upon receiving a new record from the KStream (record stream).
  • KStream-GlobalKTable joins are always non-windowed joins. They allow you to perform table lookups against a GlobalKTable (entire changelog stream) upon receiving a new record from the KStream (record stream)

 

Join co-partitioning requirements

Before we look at the types of join, and the syntax for them, Kafka Streams does have the following requirements that need to be met in order to carry out the various Join operators

Input data must be co-partitioned when joining. This ensures that input records that have the same key on both sides of the join, are delivered to the same stream task during processing. It is the responsibility of the user to ensure data co-partitioning before applying join operators

The requirements for data co-partition

ing are:

  • The input topics on both sides of the join must have the same number of partitions.
    All applications that write to the input topics must have the same partitioning strategy. The keyspace of the input data must be distributed across partitions in the same manner. The partitioner may be set at various places, for example on the Producer API it can be set via the ProducerConfig.PARTITIONER_CLASS_CONFIG config value, and for the streams API can be used in #to or #through. The good news is that, if you happen to use the default partitioner-related settings across all applications, you do not need to worry about the partitioning strategy.

Inner Join

As with lots of other types of tools an inner join, will only join on inputs where there exists a key that is the same.

It should be noted that as I have chosen to use KStream-KStream joins for this demo code the following rules hold. However these rules may vary depending on if your inputs are KStream/KTable

 

  • The join is key-based, i.e. with the join predicate leftRecord.key == rightRecord.key, and window-based, i.e. two input records are joined if and only if their timestamps are “close” to each other as defined by the user-supplied JoinWindows, i.e. the window defines an additional join predicate over the record timestamps.
  • When it is triggered, the user-supplied ValueJoiner will be called to produce join output records.
  • Input records with a null key or a null value are ignored and do not trigger the join.

Anyway as is the way so far lets see the topology followed by the tests

def createTopolgy(): Topology = {

  val builder: StreamsBuilder = new StreamsBuilder
  val left: KStream[Int, String] =
    builder.stream[Int, String]("LeftTopic")
  val right: KStream[Int, String] =
    builder.stream[Int, String]("RightTopic")

  def tryToInt( s: String ) = Try(s.toInt).toOption

  //do the join
  val joinedStream = left.join(right)( (l: String, r: String) => {
    val result = (tryToInt(l), tryToInt(r))
    result match {
      case (Some(a), Some(b)) => a + b
      case (None, Some(b)) => b
      case (Some(a), None) => a
      case (None, None) => 0
    }
  } , JoinWindows.of(Duration.ofSeconds(2)))


  val peeked = joinedStream.peek((k,v)=> {

    val theK = k
    val theV = v
  })

  peeked.to("innerJoinOutput")

  builder.build()
}

In this example, since we are not guaranteed to get a value for both sides, I am using some simple helper methods to get me some Option[Int] for both sides and finally using simple pattern matching to form a final Integer value, which the tests will use to verify against

We can also see the JoinWindows of 2 seconds, again we will try and use that in our tests. I have used this pattern for all 3 of the KStream join examples for this post. Here are the relevant tests

//arrange
val recordFactory: ConsumerRecordFactory[java.lang.Integer, java.lang.String] =
  new ConsumerRecordFactory[java.lang.Integer, java.lang.String](new IntegerSerializer, new StringSerializer)
val innerJoinTopology = new InnerJoinTopology()
val testDriver = new TopologyTestDriver(innerJoinTopology.createTopolgy(), props)

//act
testDriver.pipeInput(recordFactory.create("LeftTopic", 1, null, 1900L))
testDriver.pipeInput(recordFactory.create("RightTopic", 1, null, 1901L))

testDriver.pipeInput(recordFactory.create("LeftTopic", 1, "1", 2902L))
testDriver.pipeInput(recordFactory.create("RightTopic", 1, "2",2903L ))

OutputVerifier.compareValue(testDriver.readOutput("innerJoinOutput", integerDeserializer, integerDeserializer),
  3.asInstanceOf[Integer])


//push these out past 2 seconds (which is what Topology Join Window duration is)
Thread.sleep(2500)

testDriver.pipeInput(recordFactory.create("LeftTopic", 1, "2", 5916L))
testDriver.pipeInput(recordFactory.create("RightTopic", 1, "4", 5917L))
OutputVerifier.compareValue(testDriver.readOutput("innerJoinOutput", integerDeserializer, integerDeserializer),
  6.asInstanceOf[Integer])

val result1 = testDriver.readOutput("innerJoinOutput", integerDeserializer, integerDeserializer)
assert(result1 == null)

It can be seen that we expect the following to hold true

  • Both inputs null within the time windows, doesn’t yield anything
  • That a matched key in the time window yields a combined value of both stream values converted to an Int and added together (so 3 in this case)
  • That if we wait 2 seconds, to exceed the join window, and then push 2 new values through, That a newly matched key in the new time window yields a combined value of both stream values converted to an Int and added together (so 6 in this case)

LeftJoin

As with lots of other types of tools an left join, will include the original left input even when the right input has no value where there exists a key that is the same.

It should be noted that as I have chosen to use KStream-KStream joins for this demo code the following rules hold. However these rules may vary depending on if your inputs are KStream/KTable

  • The join is key-based, i.e. with the join predicate leftRecord.key == rightRecord.key, and window-based, i.e. two input records are joined if and only if their timestamps are “close” to each other as defined by the user-supplied JoinWindows, i.e. the window defines an additional join predicate over the record timestamps.
  • The join will be triggered under the conditions listed below whenever new input is received. When it is triggered, the user-supplied ValueJoiner will be called to produce join output records.
  • Input records with a null key or a null value are ignored and do not trigger the join.
    For each input record on the left side that does not have any match on the right side, the ValueJoiner will be called with ValueJoiner#apply(leftRecord.value, null); this explains the row with timestamp=3 in the table below, which lists [A, null] in the LEFT JOIN column.

As before we start with the topology which looks like this

def createTopolgy(): Topology = {

  val builder: StreamsBuilder = new StreamsBuilder
  val left: KStream[Int, String] =
    builder.stream[Int, String]("LeftTopic")
  val right: KStream[Int, String] =
    builder.stream[Int, String]("RightTopic")

  //do the join
  val joinedStream = left.leftJoin(right)( (l: String, r: String) => {
    val result = s"Left='${l}', Right='${r}'"
    result
  } , JoinWindows.of(Duration.ofSeconds(2)))

  val peeked = joinedStream.peek((k,v)=> {

    val theK = k
    val theV = v
  })

  peeked.to("leftJoinOutput")

  builder.build()
}

Where we know we may not have a value on the 2nd input, but we may on the original input. So for this example I use a String, where KafkaStreams will emit “null” should there be no value on one of the inputs

Here are the relevant tests

//arrange
val recordFactory: ConsumerRecordFactory[java.lang.Integer, java.lang.String] =
  new ConsumerRecordFactory[java.lang.Integer, java.lang.String](new IntegerSerializer, new StringSerializer)
val leftJoinTopology = new LeftJoinTopology()
val testDriver = new TopologyTestDriver(leftJoinTopology.createTopolgy(), props)

//act
testDriver.pipeInput(recordFactory.create("LeftTopic", 1, null, 1900L))
testDriver.pipeInput(recordFactory.create("RightTopic", 1, null, 1901L))

testDriver.pipeInput(recordFactory.create("LeftTopic", 1, "1", 2902L))


OutputVerifier.compareValue(testDriver.readOutput("leftJoinOutput", integerDeserializer, stringDeserializer),
  "Left='1', Right='null'")


//push these out past 2 seconds (which is what Topology Join Window duration is)
Thread.sleep(2500)

testDriver.pipeInput(recordFactory.create("LeftTopic", 1, "2", 5916L))
testDriver.pipeInput(recordFactory.create("RightTopic", 1, "4", 5916L))
OutputVerifier.compareValue(testDriver.readOutput("leftJoinOutput", integerDeserializer, stringDeserializer),
  "Left='2', Right='4'")
OutputVerifier.compareValue(testDriver.readOutput("leftJoinOutput", integerDeserializer, stringDeserializer),
  "Left='2', Right='4'")
val result1 = testDriver.readOutput("leftJoinOutput", integerDeserializer, stringDeserializer)
assert(result1 == null)

It can be seen that we expect the following to hold true

  • Both inputs null within the time windows, doesn’t yield anything
  • That a single input with no matching right input in the time window yields a sinngle value for the left input, but null for the right
  • That if we wait 2 seconds, to exceed the join window, and then push 2 new values through, That we see oututs for the newly matched keys in the new time window

OuterJoin

As with lots of other types of tools an outer join, will include the original left input or the original right value providing at least one of them is not null, where there exists a key that is the same.

It should be noted that as I have chosen to use KStream-KStream joins for this demo code the following rules hold. However these rules may vary depending on if your inputs are KStream/KTable

  • The join is key-based, i.e. with the join predicate leftRecord.key == rightRecord.key, and window-based, i.e. two input records are joined if and only if their timestamps are “close” to each other as defined by the user-supplied JoinWindows, i.e. the window defines an additional join predicate over the record timestamps.
  • The join will be triggered under the conditions listed below whenever new input is received. When it is triggered, the user-supplied ValueJoiner will be called to produce join output records.
  • Input records with a null key or a null value are ignored and do not trigger the join.
    For each input record on one side that does not have any match on the other side, the ValueJoiner will be called with ValueJoiner#apply(leftRecord.value, null) or ValueJoiner#apply(null, rightRecord.value), respectively

As before we start with the topology which looks like this

def createTopolgy(): Topology = {

  val builder: StreamsBuilder = new StreamsBuilder
  val left: KStream[Int, String] =
    builder.stream[Int, String]("LeftTopic")
  val right: KStream[Int, String] =
    builder.stream[Int, String]("RightTopic")

  //do the join
  val joinedStream = left.outerJoin(right)( (l: String, r: String) => {
    val result = s"Left='${l}', Right='${r}'"
    result
  } , JoinWindows.of(Duration.ofSeconds(2)))

  val peeked = joinedStream.peek((k,v)=> {

    val theK = k
    val theV = v
  })

  peeked.to("outerJoinOutput")

  builder.build()
}

Here are the relevant tests

test("Should produce correct output") {

  //arrange
  val recordFactory: ConsumerRecordFactory[java.lang.Integer, java.lang.String] =
    new ConsumerRecordFactory[java.lang.Integer, java.lang.String](new IntegerSerializer, new StringSerializer)
  val outerJoinTopology = new OuterJoinTopology()
  val testDriver = new TopologyTestDriver(outerJoinTopology.createTopolgy(), props)

  //act
  testDriver.pipeInput(recordFactory.create("LeftTopic", 1, null, 1900L))
  testDriver.pipeInput(recordFactory.create("RightTopic", 1, null, 1901L))

  testDriver.pipeInput(recordFactory.create("LeftTopic", 1, "1", 2902L))


  OutputVerifier.compareValue(testDriver.readOutput("outerJoinOutput", integerDeserializer, stringDeserializer),
    "Left='1', Right='null'")


  //push these out past 2 seconds (which is what Topology Join Window duration is)
  Thread.sleep(2500)

  testDriver.pipeInput(recordFactory.create("RightTopic", 1, "4", 5916L))
  OutputVerifier.compareValue(testDriver.readOutput("outerJoinOutput", integerDeserializer, stringDeserializer),
    "Left='null', Right='4'")
  val result1 = testDriver.readOutput("outerJoinOutput", integerDeserializer, stringDeserializer)
  assert(result1 == null)

  cleanup(props, testDriver)
}

It can be seen that we expect the following to hold true

  • Both inputs null within the time windows, doesn’t yield anything
  • That a single input from either side with in the time window yields a single value for the input, but null for the other

 

As I say this post is all about KStream-KStream joins, but this table may help to compound what this post has shown so far

image

 

So other types of joins such as KStream-KTable you should consult the official documentation

 

That’s it for now

So that’s all I wanted to say this time, so until the next time, hope this has shown you just how cool KafkaStreams is

Advertisements

One thought on “KafkaStreams : Joining

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s