Kafka

KafkaStreams : Using Processor API in DSL

Last time we look at how we can supply our own Serdes (serializer / deserializer) to the DSL. This time we will look at how we can make use of the lower level “Processor API” in the DSL, which is what we have been using until now.

 

Where is the code?

The code for this post is all contained here

And the tests are all contained here

 

DSL vs Processor API

So far we have been using the DSL, but it would be good to have a recap, of not only the Streams DSL but also the “Processor API”

 

Streams DSL

The Kafka Streams DSL (Domain Specific Language) is built on top of the Streams Processor API. It is the recommended for most users, especially beginners. Most data processing operations can be expressed in just a few lines of DSL code.

In comparison to the Processor API, only the DSL supports:

  • Built-in abstractions for streams and tables in the form of KStream, KTable, and GlobalKTable. Having first-class support for streams and tables is crucial because, in practice, most use cases require not just either streams or databases/tables, but a combination of both. For example, if your use case is to create a customer 360-degree view that is updated in real-time, what your application will be doing is transforming many input streams of customer-related events into an output table that contains a continuously updated 360-degree view of your customers.
  • Declarative, functional programming style with stateless transformations (e.g. map and filter) as well as stateful transformations such as aggregations (e.g. count and reduce), joins (e.g. leftJoin), and windowing (e.g. session windows).

With the DSL, you can define processor topologies (i.e., the logical processing plan) in your application. The steps to accomplish this are:

  1. Specify one or more input streams that are read from Kafka topics.
  2. Compose transformations on these streams.
  3. Write the resulting output streams back to Kafka topics, or expose the processing results of your application directly to other applications through interactive queries (e.g., via a REST API).
    After the application is run, the defined processor topologies are continuously executed (i.e., the processing plan is put into action). A step-by-step guide for writing a stream processing application using the DSL is provided below.

Once you have built your Kafka Streams application using the DSL you can view the underlying Topology by first executing StreamsBuilder#build() which returns the Topology object. Then to view the Topology you call Topology#desribe().

 

Processor API

The Processor API allows developers to define and connect custom processors and to interact with state stores. With the Processor API, you can define arbitrary stream processors that process one received record at a time, and connect these processors with their associated state stores to compose the processor topology that represents a customized processing logic

The Processor API can be used to implement both stateless as well as stateful operations, where the latter is achieved through the use of state stores.

 

Here is a Java 8 example of what a Processor API app may look like

Topology builder = new Topology();

// add the source processor node that takes Kafka topic "source-topic" as input
builder.addSource("Source", "source-topic")

    // add the WordCountProcessor node which takes the source processor as its upstream processor
    .addProcessor("Process", () -> new WordCountProcessor(), "Source")

    // add the count store associated with the WordCountProcessor processor
    .addStateStore(countStoreSupplier, "Process")

    // add the sink processor node that takes Kafka topic "sink-topic" as output
    // and the WordCountProcessor node as its upstream processor
    .addSink("Sink", "sink-topic", "Process");

 

So that is a rough outline from Confluent themselves about their 2 APIs. So just what are we going to talk about for this post?

 

Using the Processor API in the DSL

We are going to talk about how you can make use of the Processor API in the DSL. Now you may be asking yourself why would I want to do that, if the DSL can do things in a few lines of code, and the Processor API is very low level. Meh WHY?

Here are a few reasons

 

  • Customization: You need to implement special, customized logic that is not or not yet available in the DSL.
  • Combining ease-of-use with full flexibility where it’s needed: Even though you generally prefer to use the expressiveness of the DSL, there are certain steps in your processing that require more flexibility and tinkering than the DSL provides. For example, only the Processor API provides access to a record’s metadata such as its topic, partition, and offset information. However, you don’t want to switch completely to the Processor API just because of that.
  • Migrating from other tools: You are migrating from other stream processing technologies that provide an imperative API, and migrating some of your legacy code to the Processor API was faster and/or easier than to migrate completely to the DSL right away.

Ok so now that we know why we might do it, what things can we use from the ProcessorAPI with the DSL?

 

We can use these things

  • Processor
  • Transformer
  • ValuesTransformer

 

We will look at Processor and ValuesTransformer for this post

 

Adding A Processor to the DSL

Ok so as before we will use the DSL to create some simple functionality, where will do the following

  • GroupByKey
  • Count
  • Pipe to Processor API Processor class

It should be noted that adding a Processor to the DSL is a terminal step.

So here is the DSL part of this scenario, where it can be noted that we use the .process to add the custom Processor to the pipeline

package processorapi.interop

import java.io.PrintWriter
import java.time.Duration
import java.util
import java.util.Properties

import common.PropsHelper
import entities.Contributor
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.scala.{ByteArrayKeyValueStore, StreamsBuilder, kstream}
import org.apache.kafka.streams.{KafkaStreams, Topology}
import serialization.JSONSerde


class ProcessorApiProcessSupplierTopology(val pw: PrintWriter) extends App {

  val props: Properties = PropsHelper.createBasicStreamProperties(
    "processor-api-process-supplier-application", "localhost:9092")

  run()

  private def run(): Unit = {
    val topology = createTopolgy()
    val streams: KafkaStreams = new KafkaStreams(topology, props)
    streams.start()
    sys.ShutdownHookThread {
      streams.close(Duration.ofSeconds(10))
    }
  }

  def stop() : Unit = {
    pw.close()
  }

  def createTopolgy(): Topology = {

    implicit val stringSerde = Serdes.String
    implicit val contributorSerde = new JSONSerde[Contributor]
    implicit val consumed = kstream.Consumed.`with`(stringSerde, contributorSerde)
    implicit val grouped = Grouped.`with`(stringSerde, contributorSerde)

    import org.apache.kafka.streams.state.Stores
    val contributorStoreName = "processContributorStore"

    val logConfig = new util.HashMap[String, String]
    logConfig.put("retention.ms", "172800000")
    logConfig.put("retention.bytes", "10000000000")
    logConfig.put("cleanup.policy", "compact,delete")
    val contributorStoreSupplier = Stores.inMemoryKeyValueStore(contributorStoreName)
    val contributorStoreBuilder = Stores.keyValueStoreBuilder(contributorStoreSupplier,
        Serdes.String, Serdes.Long())
      .withLoggingEnabled(logConfig)
      .withCachingEnabled()

    implicit val materializer =
      Materialized.as(contributorStoreSupplier)(Serdes.String, Serdes.Long())
      .asInstanceOf[Materialized[String, Long,ByteArrayKeyValueStore ]]

    val builder: StreamsBuilder = new StreamsBuilder
    val contribs: KStream[String, Contributor] =
          builder.stream[String, Contributor]("ProcessorApiProcessorSupplierInputTopic")

    contribs
      .groupByKey
      .count()(materializer)
      .toStream
      .process(() => new ContributorPrintingSupplier(pw).get(), contributorStoreName)



    builder.build()
  }
}

Nothing to scary there. So lets continue to examine the inner workings of the ContributorPrintingSupplier class, which looks like this

package processorapi.interop

import java.io.PrintWriter

import entities.Contributor
import org.apache.kafka.streams.processor.{Processor, ProcessorSupplier}

class ContributorPrintingSupplier(val pw: PrintWriter) extends ProcessorSupplier[String, Long] {
  override def get(): Processor[String, Long] = new Processor[String,Long] {

    import org.apache.kafka.streams.processor.ProcessorContext
    import org.apache.kafka.streams.state.KeyValueStore

    private var context:ProcessorContext  = null
    private var contributorStore:KeyValueStore[String, Long]  = null

    override def init(context: ProcessorContext): Unit = {
      import org.apache.kafka.streams.state.KeyValueStore
      this.context = context
      this.contributorStore = context.getStateStore("processContributorStore")
        .asInstanceOf[KeyValueStore[String, Long]]
    }

    override def process(key: String, value: Long): Unit = {
      pw.write(s"key ${key} has been seen ${value} times\r\n")
      pw.flush()
    }

    override def close(): Unit = {
      if(contributorStore != null) {
        contributorStore.close()
      }
    }
  }
}

There are several take away points here, namely :

  • We create anonymous Processor, and override the following methods
    • Init
    • Process
    • Close
  • That we are able to gain access to state stores via the supplied ProcessorContext

 

As I say this is a terminal operation, and such when using the .Process in the DSL you will not see any following operations.

Adding A ValueTransformer to the DSL

  • Applies a ValueTransformer to each record, while retaining the key of the original record. transformValues() allows you to leverage the Processor API from the DSL. (details)
  • Each input record is transformed into exactly one output record (zero output records or multiple output records are not possible). The ValueTransformer may return null as the new value for a record.
  • transformValues is preferable to transform because it will not cause data re-partitioning. It is also possible to get read-only access to the input record key if you use ValueTransformerWithKey (provided via ValueTransformerWithKeySupplier) instead.

Ok so as before we will use the DSL to create a simple KStream and then transform the values using the Processor API. This example is a little contrived as the transformed value is the same as the original value, but you should get the idea

package processorapi.interop

import java.time.Duration
import java.util
import java.util.Properties

import common.PropsHelper
import entities.Contributor
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.scala.{StreamsBuilder, kstream}
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.{KafkaStreams, Topology}
import serialization.JSONSerde


class ProcessorApiTransformValuesTopology extends App {

  val props: Properties = PropsHelper.createBasicStreamProperties(
    "processor-api-transform-values-application", "localhost:9092")

  run()

  private def run(): Unit = {
    val topology = createTopolgy()
    val streams: KafkaStreams = new KafkaStreams(topology, props)
    streams.start()
    sys.ShutdownHookThread {
      streams.close(Duration.ofSeconds(10))
    }
  }

  def createTopolgy(): Topology = {

    implicit val stringSerde = Serdes.String
    implicit val contributorSerde = new JSONSerde[Contributor]
    implicit val consumed = kstream.Consumed.`with`(stringSerde, contributorSerde)
    implicit val materializer = Materialized.`with`(stringSerde, contributorSerde)

    import org.apache.kafka.streams.state.Stores
    val contributorStoreName = "contributorStore"

    val logConfig = new util.HashMap[String, String]
    logConfig.put("retention.ms", "172800000")
    logConfig.put("retention.bytes", "10000000000")
    logConfig.put("cleanup.policy", "compact,delete")
    val contributorStoreSupplier = Stores.inMemoryKeyValueStore(contributorStoreName)
    val contributorStoreBuilder = Stores.keyValueStoreBuilder(contributorStoreSupplier, Serdes.String, contributorSerde)
      .withLoggingEnabled(logConfig)
      .withCachingEnabled()


    val builder: StreamsBuilder = new StreamsBuilder
    val contribs: KStream[String, Contributor] =
          builder.stream[String, Contributor]("ProcessorApiTransformValuesInputTopic")

    builder.addStateStore(contributorStoreBuilder)

    contribs
      .transformValues(new ContributorTranformSupplier, contributorStoreName)
      .to("ProcessorApiTransformValuesOutputTopic")(Produced.`with`(stringSerde, contributorSerde))

    builder.build()
  }
}

Nice and simple we just use the transformValues() method to apply our custom ValueTransformer

package processorapi.interop

import java.time.Duration

import entities.Contributor
import org.apache.kafka.streams.kstream.{ValueTransformer, ValueTransformerSupplier}
import org.apache.kafka.streams.processor.{PunctuationType, Punctuator}

class ContributorTranformSupplier extends ValueTransformerSupplier[Contributor, Contributor] {
  override def get(): ValueTransformer[Contributor, Contributor] = new ValueTransformer[Contributor, Contributor] {

    import org.apache.kafka.streams.processor.ProcessorContext
    import org.apache.kafka.streams.state.KeyValueStore

    private var context:ProcessorContext  = null
    private var contributorStore:KeyValueStore[String, Contributor]  = null

    override def init(context: ProcessorContext): Unit = {
      import org.apache.kafka.streams.state.KeyValueStore
      this.context = context
      this.contributorStore = context.getStateStore("contributorStore")
        .asInstanceOf[KeyValueStore[String, Contributor]]

      //to punctuate you would do something like this
      //      context.schedule(Duration.ofSeconds(1),
      //            PunctuationType.WALL_CLOCK_TIME, new Punctuator {
      //        override def punctuate(timestamp: Long): Unit = {
      //
      //          val it = contributorStore.all
      //          val currentTime = System.currentTimeMillis
      //          while (it.hasNext) {
      //            val contributorValue = it.next.value
      //            if (contributorValue.updatedWithinLastMillis(currentTime, 11000))
      //              context.forward(contributorValue.email,contributorValue)
      //          }
      //        }
      //      })
    }

    override def transform(value: Contributor): Contributor = {

      var finalContributor:Contributor = null
      try {
        val contributor = contributorStore.get(value.email)
        if(contributor == null) {
          contributorStore.putIfAbsent(value.email, value)
          finalContributor = value
        }
        else {
          val newContributor = contributor.copy(
            ranking = contributor.ranking + 1,
            lastUpdatedTime = System.currentTimeMillis
          )
          contributorStore.put(value.email,newContributor)
          finalContributor = newContributor
        }

        finalContributor
      }
      catch {
        case e:NullPointerException => {
          contributorStore.putIfAbsent(value.email, value)
          value
        }
        case e:Exception => {
          value
        }
      }
    }

    override def close(): Unit = {
      if(contributorStore != null) {
        contributorStore.close()
      }
    }
  }
}

As before we have access to state store via the ProcessorContext. I also show how you could possibly punctuate (this essentially forwards on / commits records on some scheduled time) in the code above

 

And I think that is all I have to say this time. You can of course check out the tests to play with it more.

 

Next time we will look at the interactive queries API.