Kafka

KafkaStreams : Custom Serdes

Last time we look at Joining. This time we will continue to look at the streams DSL, and how we can supply our own Serdes (serializer / deserializer).

 

Where is the code?

The code for this post is all contained here

And the tests are all contained here

 

Serdes

Just to remind ourselves how Kafka Streams makes use of Serdes, from https://kafka.apache.org/10/documentation/streams/developer-guide/datatypes.html up on date 13/03/19

Every Kafka Streams application must provide SerDes (Serializer/Deserializer) for the data types of record keys and record values (e.g. java.lang.String) to materialize the data when necessary. Operations that require such SerDes information include: stream(), table(), to(), through(), groupByKey(), groupBy().

You can provide SerDes by using either of these methods:

By setting default SerDes via a StreamsConfig instance.
By specifying explicit SerDes when calling the appropriate API methods, thus overriding the defaults.

 

InBuilt Serdes

So Kafka comes with a whole bunch of pre canned Serdes which you can access using the following namespace

org.apache.kafka.common.serialization.Serdes

But what happens when  you want to send more than just primitives/byte[]

Imagine we want to send these types

  • Rating
  • List[Rating]

Where Rating may look like this

package entities

case class Rating(fromEmail: String, toEmail: String, score: Float)

Then the inbuilt Serdes may not cut the mustard, so we need to implement our own

Custom Serdes

So the first step is to implement the custom Serde. For this post we will assume we are using JSON and will use the Jackson library to deal with the JSON, so our Serde looks like this.

import java.lang.reflect.{ParameterizedType, Type}
import java.util
import com.fasterxml.jackson.annotation.JsonInclude
import com.fasterxml.jackson.core.JsonParseException
import com.fasterxml.jackson.core.`type`.TypeReference
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.exc.{UnrecognizedPropertyException => UPE}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.kafka.common.serialization.{Deserializer, Serde, Serializer}


package serialization {

  object Json {

    type ParseException = JsonParseException
    type UnrecognizedPropertyException = UPE

    private val mapper = new ObjectMapper()
    mapper.registerModule(DefaultScalaModule)
    mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL)

    private def typeReference[T: Manifest] = new TypeReference[T] {
      override def getType = typeFromManifest(manifest[T])
    }

    private def typeFromManifest(m: Manifest[_]): Type = {
      if (m.typeArguments.isEmpty) {
        m.runtimeClass
      }
      else new ParameterizedType {
        def getRawType = m.runtimeClass

        def getActualTypeArguments = m.typeArguments.map(typeFromManifest).toArray

        def getOwnerType = null
      }
    }

    object ByteArray {
      def encode(value: Any): Array[Byte] = mapper.writeValueAsBytes(value)

      def decode[T: Manifest](value: Array[Byte]): T =
        mapper.readValue(value, typeReference[T])
    }

  }

  /**
    * JSON serializer for JSON serde
    *
    * @tparam T
    */
  class JSONSerializer[T] extends Serializer[T] {
    override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = ()

    override def serialize(topic: String, data: T): Array[Byte] =
      Json.ByteArray.encode(data)

    override def close(): Unit = ()
  }

  /**
    * JSON deserializer for JSON serde
    *
    * @tparam T
    */
  class JSONDeserializer[T >: Null <: Any : Manifest] extends Deserializer[T] {
    override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = ()

    override def close(): Unit = ()

    override def deserialize(topic: String, data: Array[Byte]): T = {
      if (data == null) {
        return null
      } else {
        Json.ByteArray.decode[T](data)
      }
    }
  }

  /**
    * JSON serde for local state serialization
    *
    * @tparam T
    */
  class JSONSerde[T >: Null <: Any : Manifest] extends Serde[T] {
    override def deserializer(): Deserializer[T] = new JSONDeserializer[T]

    override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = ()

    override def close(): Unit = ()

    override def serializer(): Serializer[T] = new JSONSerializer[T]
  }

}

 

Notice how this Serde is generic and can work with any T, but in order to do this with we need to deal with the insanity of the JVM and type erasure (OMG they got that wrong, .NET absolutely nailed generics). So in this example we use the Manifest to store Type information such that the Type information is not “erased” and we know how to deserialize the JSON string back into a T

 

Ok so now that we have this, lets see an example of how it is used

package serialization

import java.time.Duration
import java.util.Properties

import common.PropsHelper
import entities.Rating
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala._
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.{KafkaStreams, Topology}


class CustomSerdesTopology extends App {

  import Serdes._

  val props: Properties = PropsHelper.createBasicStreamProperties(
    "custom-serdes-application", "localhost:9092")

  run()

  private def run(): Unit = {
    val topology = createTopolgy()
    val streams: KafkaStreams = new KafkaStreams(topology, props)
    streams.start()
    sys.ShutdownHookThread {
      streams.close(Duration.ofSeconds(10))
    }
  }

  def createTopolgy(): Topology = {

    implicit val stringSerde = Serdes.String
    implicit val ratingSerde = new JSONSerde[Rating]
    implicit val listRatingSerde = new JSONSerde[List[Rating]]
    implicit val consumed = kstream.Consumed.`with`(stringSerde, ratingSerde)
    implicit val materializer = Materialized.`with`(stringSerde, listRatingSerde)
    implicit val grouped = Grouped.`with`(stringSerde, ratingSerde)

    val builder: StreamsBuilder = new StreamsBuilder
    val ratings: KStream[String, Rating] =
          builder.stream[String, Rating]("CustomSerdesInputTopic")

    //When aggregating a grouped stream, you must provide an initializer (e.g., aggValue = 0)
    //and an “adder” aggregator (e.g., aggValue + curValue). When aggregating a grouped table,
    //you must provide a “subtractor” aggregator (think: aggValue - oldValue).
    val groupedBy = ratings.groupByKey
    val aggregatedTable =
      groupedBy
        .aggregate[List[Rating]](List[Rating]())((aggKey, newValue, aggValue) => newValue :: aggValue)

    var finalStream = aggregatedTable.toStream
    finalStream.peek((key, values) => {

      val theKey = key
      val theValues = values

    })


    finalStream.to("CustomSerdesOutputTopic")(Produced.`with`(stringSerde, listRatingSerde))

    builder.build()
  }
}

Pretty easy stuff, only call out points here are that we use the implicits at then start of the file to specify all the serdes that the code will need to look up implicitly

Just for completeness this is the test suite that goes with this example

package serialization

import java.io._
import java.lang
import java.util.Properties

import common.PropsHelper
import entities.Rating
import org.apache.kafka.common.serialization.{LongDeserializer, _}
import org.apache.kafka.streams.TopologyTestDriver
import org.apache.kafka.streams.test.{ConsumerRecordFactory, OutputVerifier}
import org.scalatest._
import org.apache.kafka.common.serialization.Serdes


class CustomSerdesTopologyTests
  extends FunSuite
  with BeforeAndAfter
  with Matchers {

  val props = PropsHelper.createBasicStreamProperties("custom-serdes-application", "localhost:9092")
  val stringDeserializer: StringDeserializer = new StringDeserializer
  val ratingLIstDeserializer: JSONDeserializer[List[Rating]] = new JSONDeserializer[List[Rating]]

  before {
  }

  after {
  }


  test("Should produce correct output") {

    //arrange
    val recordFactory: ConsumerRecordFactory[java.lang.String, Array[Byte]] =
        new ConsumerRecordFactory[java.lang.String, Array[Byte]](new StringSerializer, Serdes.ByteArray().serializer())
    val customSerdesTopology = new CustomSerdesTopology()


    val jsonSerde = new JSONSerde[Rating]

    val rating = Rating("jarden@here.com","sacha@here.com", 1.5f)
    val ratingBytes = jsonSerde.serializer().serialize("", rating)


    //NOTE : You may find you need to play with these Config values in order
    //to get the stateful operation to work correctly/how you want it to
    //    props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10000.asInstanceOf[Object])
    //    props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10485760.asInstanceOf[Object])
    //    props.put(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG, 50000.asInstanceOf[Object])
    //By playing around with these values you should be able to find the values that work for you
    //WARNING : Chaning these settings may have impact on the tests, as less frequent commits/state store
    //cache flushing may occur
    val testDriver = new TopologyTestDriver(customSerdesTopology.createTopolgy(), props)

    //Use the custom JSONSerde[Rating]
    testDriver.pipeInput(recordFactory.create("CustomSerdesInputTopic", rating.toEmail, ratingBytes, 9995L))

    val result = testDriver.readOutput("CustomSerdesOutputTopic", stringDeserializer, ratingLIstDeserializer)

    OutputVerifier.compareKeyValue(result, "sacha@here.com",List(Rating("jarden@here.com","sacha@here.com", 1.5f)))
    val result1 = testDriver.readOutput("CustomSerdesOutputTopic", stringDeserializer, ratingLIstDeserializer)
    assert(result1 == null)

    cleanup(props, testDriver)
  }


  def cleanup(props:Properties, testDriver: TopologyTestDriver) = {

    try {
      //there is a bug on windows which causes this line to throw exception
      testDriver.close
    } catch {
      case e: Exception => {
        delete(new File("C:\\data\\kafka-streams"))
      }
    }
  }

  def delete(file: File) {
    if (file.isDirectory)
      Option(file.listFiles).map(_.toList).getOrElse(Nil).foreach(delete(_))
    file.delete
  }
}

 

That’s it for now

So that’s all I wanted to say this time, so until the next time, hope this has shown you just how easy it is to hand roll your own object that you can serialize with ease

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s