Last time we look at Joining. This time we will continue to look at the streams DSL, and how we can supply our own Serdes (serializer / deserializer).
Where is the code?
The code for this post is all contained here
And the tests are all contained here
Serdes
Just to remind ourselves how Kafka Streams makes use of Serdes, from https://kafka.apache.org/10/documentation/streams/developer-guide/datatypes.html up on date 13/03/19
Every Kafka Streams application must provide SerDes (Serializer/Deserializer) for the data types of record keys and record values (e.g. java.lang.String) to materialize the data when necessary. Operations that require such SerDes information include: stream(), table(), to(), through(), groupByKey(), groupBy().
You can provide SerDes by using either of these methods:
By setting default SerDes via a StreamsConfig instance.
By specifying explicit SerDes when calling the appropriate API methods, thus overriding the defaults.
InBuilt Serdes
So Kafka comes with a whole bunch of pre canned Serdes which you can access using the following namespace
org.apache.kafka.common.serialization.Serdes
But what happens when you want to send more than just primitives/byte[]
Imagine we want to send these types
- Rating
- List[Rating]
Where Rating may look like this
package entities case class Rating(fromEmail: String, toEmail: String, score: Float)
Then the inbuilt Serdes may not cut the mustard, so we need to implement our own
Custom Serdes
So the first step is to implement the custom Serde. For this post we will assume we are using JSON and will use the Jackson library to deal with the JSON, so our Serde looks like this.
import java.lang.reflect.{ParameterizedType, Type} import java.util import com.fasterxml.jackson.annotation.JsonInclude import com.fasterxml.jackson.core.JsonParseException import com.fasterxml.jackson.core.`type`.TypeReference import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.exc.{UnrecognizedPropertyException => UPE} import com.fasterxml.jackson.module.scala.DefaultScalaModule import org.apache.kafka.common.serialization.{Deserializer, Serde, Serializer} package serialization { object Json { type ParseException = JsonParseException type UnrecognizedPropertyException = UPE private val mapper = new ObjectMapper() mapper.registerModule(DefaultScalaModule) mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL) private def typeReference[T: Manifest] = new TypeReference[T] { override def getType = typeFromManifest(manifest[T]) } private def typeFromManifest(m: Manifest[_]): Type = { if (m.typeArguments.isEmpty) { m.runtimeClass } else new ParameterizedType { def getRawType = m.runtimeClass def getActualTypeArguments = m.typeArguments.map(typeFromManifest).toArray def getOwnerType = null } } object ByteArray { def encode(value: Any): Array[Byte] = mapper.writeValueAsBytes(value) def decode[T: Manifest](value: Array[Byte]): T = mapper.readValue(value, typeReference[T]) } } /** * JSON serializer for JSON serde * * @tparam T */ class JSONSerializer[T] extends Serializer[T] { override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = () override def serialize(topic: String, data: T): Array[Byte] = Json.ByteArray.encode(data) override def close(): Unit = () } /** * JSON deserializer for JSON serde * * @tparam T */ class JSONDeserializer[T >: Null <: Any : Manifest] extends Deserializer[T] { override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = () override def close(): Unit = () override def deserialize(topic: String, data: Array[Byte]): T = { if (data == null) { return null } else { Json.ByteArray.decode[T](data) } } } /** * JSON serde for local state serialization * * @tparam T */ class JSONSerde[T >: Null <: Any : Manifest] extends Serde[T] { override def deserializer(): Deserializer[T] = new JSONDeserializer[T] override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = () override def close(): Unit = () override def serializer(): Serializer[T] = new JSONSerializer[T] } }
Notice how this Serde is generic and can work with any T, but in order to do this with we need to deal with the insanity of the JVM and type erasure (OMG they got that wrong, .NET absolutely nailed generics). So in this example we use the Manifest to store Type information such that the Type information is not “erased” and we know how to deserialize the JSON string back into a T
Ok so now that we have this, lets see an example of how it is used
package serialization import java.time.Duration import java.util.Properties import common.PropsHelper import entities.Rating import org.apache.kafka.common.serialization.Serdes import org.apache.kafka.streams.scala.ImplicitConversions._ import org.apache.kafka.streams.scala._ import org.apache.kafka.streams.scala.kstream._ import org.apache.kafka.streams.{KafkaStreams, Topology} class CustomSerdesTopology extends App { import Serdes._ val props: Properties = PropsHelper.createBasicStreamProperties( "custom-serdes-application", "localhost:9092") run() private def run(): Unit = { val topology = createTopolgy() val streams: KafkaStreams = new KafkaStreams(topology, props) streams.start() sys.ShutdownHookThread { streams.close(Duration.ofSeconds(10)) } } def createTopolgy(): Topology = { implicit val stringSerde = Serdes.String implicit val ratingSerde = new JSONSerde[Rating] implicit val listRatingSerde = new JSONSerde[List[Rating]] implicit val consumed = kstream.Consumed.`with`(stringSerde, ratingSerde) implicit val materializer = Materialized.`with`(stringSerde, listRatingSerde) implicit val grouped = Grouped.`with`(stringSerde, ratingSerde) val builder: StreamsBuilder = new StreamsBuilder val ratings: KStream[String, Rating] = builder.stream[String, Rating]("CustomSerdesInputTopic") //When aggregating a grouped stream, you must provide an initializer (e.g., aggValue = 0) //and an “adder” aggregator (e.g., aggValue + curValue). When aggregating a grouped table, //you must provide a “subtractor” aggregator (think: aggValue - oldValue). val groupedBy = ratings.groupByKey val aggregatedTable = groupedBy .aggregate[List[Rating]](List[Rating]())((aggKey, newValue, aggValue) => newValue :: aggValue) var finalStream = aggregatedTable.toStream finalStream.peek((key, values) => { val theKey = key val theValues = values }) finalStream.to("CustomSerdesOutputTopic")(Produced.`with`(stringSerde, listRatingSerde)) builder.build() } }
Pretty easy stuff, only call out points here are that we use the implicits at then start of the file to specify all the serdes that the code will need to look up implicitly
Just for completeness this is the test suite that goes with this example
package serialization import java.io._ import java.lang import java.util.Properties import common.PropsHelper import entities.Rating import org.apache.kafka.common.serialization.{LongDeserializer, _} import org.apache.kafka.streams.TopologyTestDriver import org.apache.kafka.streams.test.{ConsumerRecordFactory, OutputVerifier} import org.scalatest._ import org.apache.kafka.common.serialization.Serdes class CustomSerdesTopologyTests extends FunSuite with BeforeAndAfter with Matchers { val props = PropsHelper.createBasicStreamProperties("custom-serdes-application", "localhost:9092") val stringDeserializer: StringDeserializer = new StringDeserializer val ratingLIstDeserializer: JSONDeserializer[List[Rating]] = new JSONDeserializer[List[Rating]] before { } after { } test("Should produce correct output") { //arrange val recordFactory: ConsumerRecordFactory[java.lang.String, Array[Byte]] = new ConsumerRecordFactory[java.lang.String, Array[Byte]](new StringSerializer, Serdes.ByteArray().serializer()) val customSerdesTopology = new CustomSerdesTopology() val jsonSerde = new JSONSerde[Rating] val rating = Rating("jarden@here.com","sacha@here.com", 1.5f) val ratingBytes = jsonSerde.serializer().serialize("", rating) //NOTE : You may find you need to play with these Config values in order //to get the stateful operation to work correctly/how you want it to // props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10000.asInstanceOf[Object]) // props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10485760.asInstanceOf[Object]) // props.put(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG, 50000.asInstanceOf[Object]) //By playing around with these values you should be able to find the values that work for you //WARNING : Chaning these settings may have impact on the tests, as less frequent commits/state store //cache flushing may occur val testDriver = new TopologyTestDriver(customSerdesTopology.createTopolgy(), props) //Use the custom JSONSerde[Rating] testDriver.pipeInput(recordFactory.create("CustomSerdesInputTopic", rating.toEmail, ratingBytes, 9995L)) val result = testDriver.readOutput("CustomSerdesOutputTopic", stringDeserializer, ratingLIstDeserializer) OutputVerifier.compareKeyValue(result, "sacha@here.com",List(Rating("jarden@here.com","sacha@here.com", 1.5f))) val result1 = testDriver.readOutput("CustomSerdesOutputTopic", stringDeserializer, ratingLIstDeserializer) assert(result1 == null) cleanup(props, testDriver) } def cleanup(props:Properties, testDriver: TopologyTestDriver) = { try { //there is a bug on windows which causes this line to throw exception testDriver.close } catch { case e: Exception => { delete(new File("C:\\data\\kafka-streams")) } } } def delete(file: File) { if (file.isDirectory) Option(file.listFiles).map(_.toList).getOrElse(Nil).foreach(delete(_)) file.delete } }
That’s it for now
So that’s all I wanted to say this time, so until the next time, hope this has shown you just how easy it is to hand roll your own object that you can serialize with ease
Hi Sacha, thanks for the explanation of using custom Serde. I am doing something similar, but getting serialization error.
I have posted my code here https://bit.ly/2kytfyf
Does your serializer work by itself outside of kafka streams dsl