Distributed Systems, kaf, Kafka

Apache Kafka Specific Avro Producer/Consumer + Kafka Schema Registry

This is the 2nd post in a small mini series that I will be doing using Apache Kafka + Avro. The programming language will be Scala. As such the following prerequisites need to be obtained should you wish to run the code that goes along with each post. The other point is that I am mainly a Windows user, as such the instructions, scripts will have a Windows bias to them. So if you are not a Windows user you will need to find the instructions for your OS of choice.

 

Prerequisites

 

So go and grab that lot if you want to follow along.

 

Last time we talked about how to create a Kafka Producer/Consumer which did use Avro, but used the GenericRecord approach, which kind of works more like a dictionary of key value pairs. This time we will be talking about how to use the KafkaAvroSerializer to send specific Avro types using Kafka and the Kafka Schema Registry.

 

Kafka Schema Registry

So just what is the Kafka Schema Registry?

 

Schema Registry is part of the Confluent Open Source and Confluent Enterprise distributions. The Schema Registry stores a versioned history of all schemas and allows for the evolution of schemas according to the configured compatibility settings and expanded Avro support.

https://docs.confluent.io/current/schema-registry/docs/index.html up on date 26/06/18

 

I don’t want to dwell too much on the internals of the Kafka Schema Registry as we will be looking at in fine detail in the next post. But for now, it is important to understand that both Kafka/KafkaStreams suport using the Schema Registry when sending/receiving Avro content. They will also raise Exceptions should you try to send incompatible Avro content through a topic which uses the Schema Registry that breaks the Schemas compatibility rules within the Schema Registry.

If all of that sounds a bit whack, don’t worry we will be getting into it more in the next post

 

I just wanted to give a small overview of how the Schema Registry fits with the Kafka eco system. The rest of this article will be more about how we can make use of the KafkaAvroSerializer  to send Avro data of our own types rather than use the previously demonstrated GenericRecord approach.

 

Where is the code for this post?

You can grab the code that goes with this post from here : https://github.com/sachabarber/KafkaAvroExamples/tree/master/KafkaSpecificAvro

 

SBT

I decided that I would take the hit and craft a multi-project SBT setup, which looks like this

import Deps._
import sbt.Keys.scalaVersion

lazy val root = (project in file(".")).
  aggregate(publisher, subscriber).
  settings(
    inThisBuild(List(
      organization := "com.barber",
      scalaVersion := "2.12.1",
      resolvers += "Sonatype OSS Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots",
      resolvers += "io.confluent" at "http://packages.confluent.io/maven/"
    )),
    name := "scala_kafka_specific_avro_example"
  )

lazy val publisher = (project in file ("publisher")).
  settings(
    name := "publisher",
    /* sourceGenerators in Compile += (avroScalaGenerateSpecific in Compile).taskValue, */
    libraryDependencies ++= Seq(
      kafka,
      avro,
      avroSerializer,
      logBack)
  ).dependsOn(common)

lazy val subscriber = (project in file ("subscriber")).
  settings(
    name := "subscriber",
    /* sourceGenerators in Compile += (avroScalaGenerateSpecific in Compile).taskValue, */
    libraryDependencies ++= Seq(
      kafka,
      avro,
      avroSerializer,
      logBack)
  ).dependsOn(publisher, common)
  
  lazy val common = (project in file ("common")).
  settings(
    name := "common",
    libraryDependencies ++= Seq(
      kafka,
      avro,
      avroSerializer,
      logBack)
  )

 

Where the Deps.scala file looks like this

import sbt._

object Deps {
  lazy val kafka = "org.apache.kafka" % "kafka_2.11" % "1.1.0"
  lazy val avro =  "org.apache.avro" % "avro" % "1.8.2"
  lazy val avroSerializer = "io.confluent" % "kafka-avro-serializer" % "3.2.1"
  lazy val logBack = "ch.qos.logback" %  "logback-classic" % "1.1.7"
}

 

This essentially creates the following 3 projects

  • Common (this is where the shared Avro schema/object live)
  • Publisher
  • Subscriber

Once you have done a SBT clean/compile and opened it inside IntelliJ it should look something like this:

image

 

So now that we know roughly what the build file looks like, lets turn our attention to the 3 projects. I will walk through them below in turn

 

Common Project

As stated above the common project holds the Avro schema file (which is slightly different from the last post). The new Avro schema is as follows:

 

{
    "namespace": "com.barber.kafka.avro",
     "type": "record",
     "name": "User",
     "fields":[
         {
            "name": "id", "type": "int"
         },
         {
            "name": "name",  "type": "string"
         }
     ]
}

Unlike last time where we used a GenericRecord this time we would like to use a case class to send down the wire. Something like this

case class User(var id: Int, var name: String) 

 

Unfortunately this does not work, if you were to try and send this down the wire using the KafkaProducer whilst the Schema Registry is running, you will most likely get an Exception stating that it can not be serialized.

 

What we actually need to do is something more like this, where we need to extend the SpecificRecordBase trait and also supply the Schema and the ability to set/get values.

 

/** MACHINE-GENERATED FROM AVRO SCHEMA. DO NOT EDIT DIRECTLY */
package com.barber.kafka.avro

import scala.annotation.switch
import scala.io.Source

case class User(var id: Int, var name: String) extends org.apache.avro.specific.SpecificRecordBase {
  def this() = this(0, "")
  def get(field$: Int): AnyRef = {
    (field$: @switch) match {
      case pos if pos == 0 => {
        id
      }.asInstanceOf[AnyRef]
      case pos if pos == 1 => {
        name
      }.asInstanceOf[AnyRef]
      case _ => new org.apache.avro.AvroRuntimeException("Bad index")
    }
  }
  def put(field$: Int, value: Any): Unit = {
    (field$: @switch) match {
      case pos if pos == 0 => this.id = {
        value
      }.asInstanceOf[Int]
      case pos if pos == 1 => this.name = {
        value.toString
      }.asInstanceOf[String]
      case _ => new org.apache.avro.AvroRuntimeException("Bad index")
    }
    ()
  }
  def getSchema: org.apache.avro.Schema = User.SCHEMA$
}

object User {
  val SCHEMA$ = new org.apache.avro.Schema.Parser().parse(
    Source.fromURL(getClass.getResource("/userSchema.avsc")).mkString)
}

 

Now that is quite a bit of code to write. Surely there are tools out there for this job aren’t there?

 

Why yes there are.

 

 

image

 

So that is all the Common project is really, its just a place for the shared Avro objects to live that the Publisher/Subscriber projects can use.

 

Publisher Project

This is the entire code for the Publisher

package com.barber.kafka.avro

import java.util.{Properties, UUID}
import io.confluent.kafka.serializers.KafkaAvroSerializer
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer

class KafkaDemoAvroPublisher(val topic:String) {

  private val props = new Properties()
  props.put("bootstrap.servers", "localhost:9092")
  props.put("schema.registry.url", "http://localhost:8081")
  props.put("key.serializer", classOf[StringSerializer].getCanonicalName)
  props.put("value.serializer",classOf[KafkaAvroSerializer].getCanonicalName)
  props.put("client.id", UUID.randomUUID().toString())

  private val producer =   new KafkaProducer[String,User](props)

  def send(): Unit = {
    try {
      val rand =  new scala.util.Random(44343)

      for(i <- 1 to 10) {
        val id = rand.nextInt()
        val itemToSend = User(id , "ishot.com")
        println(s"Producer sending data ${itemToSend.toString}")
        producer.send(new ProducerRecord[String, User](topic, itemToSend))
        producer.flush()
      }
    } catch {
      case ex: Exception =>
        println(ex.printStackTrace().toString)
        ex.printStackTrace()
    }
  }
}

The main points are these:

  • We now include a property for the schema.registry.url
  • We now use the KafkaAvroSerializer for the topic value serializer
  • We use the User class that we talked about in the common project, that was generated from the Avro schema talked about above. It is this User Avro object that we push out on the Kafka topic

 

Subscriber Project

Here is the subscriber code

package com.barber.kafka.avro

import java.util.Properties
import org.apache.kafka.clients.consumer.KafkaConsumer
import java.util.Collections
import org.apache.kafka.common.errors.TimeoutException
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.common.serialization.StringDeserializer

class KafkaDemoAvroSubscriber(val topic:String) {

  private val props = new Properties()
  val groupId = "avro-demo-topic-consumer"
  var shouldRun : Boolean = true

  props.put("bootstrap.servers", "localhost:9092")
  props.put("schema.registry.url", "http://localhost:8081")
  props.put("group.id", groupId)
  props.put("enable.auto.commit", "true")
  props.put("auto.commit.interval.ms", "10000")
  props.put("session.timeout.ms", "30000")
  props.put("consumer.timeout.ms", "120000")
  props.put("key.deserializer", classOf[StringDeserializer].getCanonicalName)
  props.put("value.deserializer",classOf[KafkaAvroDeserializer].getCanonicalName)
  //Use Specific Record or else you get Avro GenericRecord.
  props.put("specific.avro.reader", "true")

  private val consumer = new KafkaConsumer[String, com.barber.kafka.avro.User](props)

  def start() = {

    try {
      Runtime.getRuntime.addShutdownHook(new Thread(() => close()))

      consumer.subscribe(Collections.singletonList(topic))

      while (shouldRun) {
        val records: ConsumerRecords[String,  com.barber.kafka.avro.User] = consumer.poll(1000)
        val it = records.iterator()
        while(it.hasNext()) {
          println("Getting message from queue.............")
          val record: ConsumerRecord[String,  com.barber.kafka.avro.User] = it.next()
          val recievedItem =record.value()
          println(s"Saw User ${recievedItem}")
          consumer.commitSync
        }
      }
    }
    catch {
      case timeOutEx: TimeoutException =>
        println("Timeout ")
        false
      case ex: Exception => ex.printStackTrace()
        println("Got error when reading message ")
        false
    }
  }

  def close(): Unit = shouldRun = false
}

 

This is not that different from last time, the main points are :

  • We now include a property for the schema.registry.url
  • We now use the KafkaAvroDeSerializer for the topic value deserializer
  • We must set the property specific.avro.reader to have a value of true, which tells Kafka / Schema Registry that we will be using a specific Avro type (User type in this case) otherwise Kafka will expect GenericRecord to be used on the topic
  • We use the User class that we talked about in the common project, that was generated from the Avro schema talked about above. It is this User Avro object that we receive on the Kafka topic

 

So how do I run this stuff?

As I stated above you will need to download a few things, but once you have those in place you may find the small PowerShell script useful that is inside the project called “RunThePipeline.ps1”. This script does a few things, such as cleans the Kafka/Zookeeper logs, stops any previous instances, starts new instances and also creates the Kafka topic (which you must have before you can use the code).

 

IMPORTANT NOTE : I have altered the Kafka log paths, and where Zookeeper logs to. This can be done using the relevant properties files from the Confluent installation

 

image

 

server.properties

This line was changed

 

# A comma seperated list of directories under which to store log files
log.dirs=c:/temp/kafka-logs

 

zookeeper.properties

This line was changed

 

# the directory where the snapshot is stored.
dataDir=c:/temp/zookeeper

 

The PowerShell script will be making assumptions based on the where you changed these values to, so if you have different settings for these please edit the PowerShell script too )

 

Essentially you need to have the following running before you can run the code example (this is what the PowerShell script automates for you, though you may prefer to use the Confluent CLI, I just had this script from a previous project, and it also cleans out old data which is useful when you are experimenting, and it also creates the required topic)

  • Zookeeper
  • Kafka broker
  • Starts the Schema Registry
  • Kafka topic created

Once you have that, you can simply open the code in IntelliJ and right click and run the 2 apps (or set up new configurations)

 

Run the Publisher

 

image

 

Run the Subscriber

 

image

 

Once you have the run the PowerShell “RunThePipeline.ps1” script associated with this code (the one in the demo folder) and run the Publisher/Subscriber as shown above, you should see some output something like this

 

Publisher output

image

 

Subscriber output

image

 

 

Conclusion

Using Avro with Kafka / Schema Registry is fairly straight forward, and is not that different from working with regular Array of bytes. I hope this post has wet your appetite a bit for the Kafka Schema registry which we will look into in a lot more detail in the next post.

 

 

 

 

 

 

3 thoughts on “Apache Kafka Specific Avro Producer/Consumer + Kafka Schema Registry

  1. Hello Sacha,
    I enjoyed reading your posts on Kafka. I tried recreating the Kafka Producer/Consumer with Schema Registry project (in Mac + IntelliJ + Gradle). But I was unable to send messages to Consumer. I’m not sure what is going on. This is exact same code snippet as in this post. Can you please help me debug this issue?

    1. I don’t know gradle at all, my stuff was all Scala and SBT. But where I would start is making sure you have the same JARs downloaded I show in my SBT file

      Perhaps start without Avro and make sure your Porducer and Consumer can just send/receive simple string (ProducerRecord[string,string] say)

Leave a comment