Kafka

Kafka Streams : Example 1 straight through processing / How to test Kafka Streams

Introduction

In this post we will look at some of the key objects we looked at last time, and we will also see what a typical Scala (though Kafkas libraries are mainly Java, I just prefer Scala) app looks like. It is not a complicated application, which is exactly what you want when you are starting to learn something. I have deliberately started simple, and we will expand our knowledge as we go

Where is the code?

The code will all be in this single repo : https://github.com/sachabarber/KafkaStreamsDemo

What tools will I need to follow along

As I am using Scala, if you really want to download and try the examples that will go with this series, you will need the following

  • Scala 2.12
  • SBT
  • Java 1.8 SDK
  • A Scala IDE (I like the community edition of IntelliJ IDEA myself)

A typical Kafka Streams application

So last time we talked about a few common objects you might expect to see when working with KafkaStreams such as

  • KStream
  • KTable
  • Global KTable
  • State Store

But we did not talk about what a typical app would look like. So lets do that now shall we.

So the good news is that KafkaStreams is not a massive bloated thing that just takes over. It is a simple library that you reference and you can just run it in your own application code. There are obviously a few things that it needs to run, so we will have a brief talk about that

Props

KafkaStreams just like Kafka works by being gives a set of Key/Value properties that it needs to configure itself. These are standard properties that are well known, and you can read all about them here

You can configure Kafka Streams by specifying parameters in a java.util.Properties instance.

Create a java.util.Properties instance.

Set the parameters. For example (this is Java, Scala syntax is a little different)

import java.util.Properties;
import org.apache.kafka.streams.StreamsConfig;

Properties props = new Properties();
// Set a few key parameters
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-first-streams-application");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
// Any further settings
props.put(... , ...);

 

These MUST be setup before you attempt to use a Streams application

Topology

The next thing you need, is to actual build how your streams topology should be wired up, and how you want processing to occur. To do this, there is a DSL, that looks like this (this is Scala example)

val builder: StreamsBuilder = new StreamsBuilder
val theInputStream = builder.stream[String, String]("InputTopic")
//stream DSL operations you want here
builder.build()

Start the streams

The final thing you would want to do is to actually start the streams processing, which can easily be done as follows:

val topology = createTopolgy()
val streams: KafkaStreams = new KafkaStreams(topology, props)
streams.start()
sys.ShutdownHookThread {
  streams.close(Duration.ofSeconds(10))
}

It’s a good idea to also add a shutdownHook to close the streams, which is shown above

Walkthrough of the example topology

Ok so believe it or not that small set of 3 points, plus the knowledge gained in the 1st post, are enough for us to write a fully working KafkaStreams Scala application.

The code for this example is this one, and the basic idea for this one works as follows

 

As you can see this is about as simple as I could make it. So now lets have a look at the code for this trivial example

SBT file

As I am using Scala there is a SBT file to manage the build, this is it in full

name := "ScalaKafkaStreamsDemo"

version := "1.0"

scalaVersion := "2.12.1"

libraryDependencies += "javax.ws.rs" % "javax.ws.rs-api" % "2.1.1" artifacts(Artifact("javax.ws.rs-api", "jar", "jar"))
libraryDependencies += "org.apache.kafka" %% "kafka" % "2.1.0"
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "2.1.0"
libraryDependencies += "org.apache.kafka" % "kafka-streams" % "2.1.0"
libraryDependencies += "org.apache.kafka" %% "kafka-streams-scala" % "2.1.0"

//TEST
libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.5" % Test
libraryDependencies += "org.apache.kafka" % "kafka-streams-test-utils" % "2.1.0" % Test

libraryDependencies += "ch.qos.logback" % "logback-classic" % "1.1.3"

 

As you can see there is a specific JAR for Scala, this is fairly new thing, and the current recommendation is to make sure you have at least Kafla 2.1.0 to work with the kafka-streams-scala JAR. There are also some test scoped JARs there, we will see how they get used in the next section. You may be asking yourself what the specific scala JAR brings to the table, well it makes working with Serdes/lambdas and implicits more Scala friendly.

One thing that is extremely weird in the javax.ws.rs-api dependency. There was a issue with a transitive dependency in Kafka itself, brought in by a Kafka Connect client, and it doesn’t resolve correctly unless you use a line something like the one shown here. I have raised a support ticket for this, and they were all over it, so I expect this will get fixed in subsequent releases

Anyway moving on to the Props, lets see what that looks like for this simple demo app. Here is my class (I will probably reuse this for all posts in this series)

import java.util.Properties
import org.apache.kafka.streams.scala._
import org.apache.kafka.streams.StreamsConfig


object PropsHelper  {

  def createBasicStreamProperties(applicationId: String, bootStrapServers: String) : Properties = {

    val props = new Properties()
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId)
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers)
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass)
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String.getClass)
    // Records should be flushed every 10 seconds. This is less than the default
    // in order to keep this example interactive.
    props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10000.asInstanceOf[Object])
    // For illustrative purposes we disable record caches
    props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0.asInstanceOf[Object])
    props.put(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG, 50000.asInstanceOf[Object])
    props.put(StreamsConfig.STATE_DIR_CONFIG, s"C:\\data\\kafka-streams".asInstanceOf[Object])
    props
  }
}

image

You can change these to suit your own requirements.

Ok so now on to the code. This is it, in full

import java.time.Duration
import org.apache.kafka.streams.Topology
import java.util.Properties
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala._
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.KafkaStreams

/**
  * This example simply maps values from 'InputTopic' to 'OutputTopic'
  * with no changes
  */
class StraightThroughTopology extends App {

  import Serdes._

  val props: Properties = PropsHelper.createBasicStreamProperties(
    "straight-through-application","localhost:9092")

  run()

  private def run(): Unit = {
    val topology = createTopolgy()
    val streams: KafkaStreams = new KafkaStreams(topology, props)
    streams.start()
    sys.ShutdownHookThread {
      streams.close(Duration.ofSeconds(10))
    }
  }

  def createTopolgy() : Topology = {

    val builder: StreamsBuilder = new StreamsBuilder
    val textLines: KStream[String, String] = 
      builder.stream[String, String]("InputTopic")
    textLines.mapValues(v => v).to("OutputTopic")
    builder.build()
  }

}

 

So this is inline with what we just discussed. The only addition is the line that does the mapValues(..). Now you may be wondering why I chose to use mapValues(..), and not map(..). Well the reason for that is if you had a lot of partitions and you used a map(..), that would cause the key to possibly change, which would involve re-partitioning, which is an expensive operation, and should be avoided if possible. 

And believe it or not that is enough to get your 1st streams app up and running.

Testing the example topology

Now if you did read the 1st post in this series you would have seen a diagram that looks like this

Image result for kafka partitions

And you are probably thinking how on earth am I going to test all that, I need

  • At least 1 Kafka broker
  • At least 1 Kafka producer
  • At least 1 Kafka consumer
  • Zookeeper
  • And my own Stream app in the middle somehow

This is indeed a daunting task. Luckily help is at hand by way of the VERY hand kafka-streams-tests-utils JAR. This handy JAR comes with a very cool class called TopologyTestDriver which allows you to test your Streams code without needing the full setup shown above. I wont repeat all the official docs here but you can think of this class as a handy little helper that can simulate ConsumerRecords which can be sent to a topic using the pipeInput(..) method, and to also be able to verify values written to output topics using the readOutput(..) method

The official docs do a rather fine job of talking about this in fine detail, but for now, here is the code that tests the simple demo topology (code is here should you want to know where it is)

import org.scalatest._
import Matchers._
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import java.io._
import java.util.Properties
import org.apache.kafka.streams.TopologyTestDriver
import org.apache.kafka.streams.test.ConsumerRecordFactory
import org.apache.kafka.streams.test.OutputVerifier

class StaightThoughStreamsTests
  extends FunSuite
  with BeforeAndAfter
  with Matchers {

  val props = PropsHelper.createBasicStreamProperties("straight-through-application-tests","localhost:9092")
  val stringDeserializer: StringDeserializer = new StringDeserializer

  before {
  }

  after {
  }


  test("Should produce correct output") {

    //arrange
    val recordFactory: ConsumerRecordFactory[String, String] =
      new ConsumerRecordFactory[String, String](new StringSerializer, new StringSerializer)
    val straightThroughTopology = new StraightThroughTopology()
    val testDriver = new TopologyTestDriver(straightThroughTopology.createTopolgy(), props)

    //act
    val consumerRecord = recordFactory.create("InputTopic", "key", "this is the input", 9999L)
    testDriver.pipeInput(consumerRecord)

    //assert
    OutputVerifier.compareKeyValue(testDriver.readOutput("OutputTopic", stringDeserializer, stringDeserializer), "key", "this is the input")
    val result = testDriver.readOutput("OutputTopic", stringDeserializer, stringDeserializer)
    assert(result == null)
    cleanup(props, testDriver)
  }


  def cleanup(props:Properties, testDriver: TopologyTestDriver) = {

    try {
	  //there is a bug on windows which causes this line to throw exception
      testDriver.close
    } catch {
      case e: Exception => {
        delete(new File("C:\\data\\kafka-streams"))
      }
    }
  }

  def delete(file: File) {
    if (file.isDirectory)
      Option(file.listFiles).map(_.toList).getOrElse(Nil).foreach(delete(_))
    file.delete
  }
}

And just to prove it all works lets see a screen shot of this test

image

The eagle eyed amongst you will see what looks like an Exception stack trace in the right hand panel. This is due to a bug in windows where the data directory cant be deleted correctly due to some file handle issue. This seems to be quite well documented

See you next time.

So hopefully I have wet your appetite of what Kafka Streams could do for you. This is a introductory article, so we saw hardly anything of the DSL that Kafka Streams offers, but in subsequent posts we will see lots more of these

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s