This is the 2nd post in a small mini series that I will be doing using Apache Kafka + Avro. The programming language will be Scala. As such the following prerequisites need to be obtained should you wish to run the code that goes along with each post. The other point is that I am mainly a Windows user, as such the instructions, scripts will have a Windows bias to them. So if you are not a Windows user you will need to find the instructions for your OS of choice.
Prerequisites
- Down the open source confluent platform : https://www.confluent.io/download/?utm_medium=ppc&utm_source=adwords&utm_campaign=Branded&utm_content=https://www.confluent.io/download/&utm_term=%2Bconfluent%20%2Bio%20%2Bdownload&b&gclid=CjwKCAjw9qfZBRA5EiwAiq0AbR4M47Cvwr5bXA8z5LbGdsvz7eQYhAs0CovCqiuHNHtF1EE4xhNf8RoCCnQQAvD_BwE#popup_form_1905
- IntelliJ IDEA Community edition (Scala IDE), you should enable the SBT plugin in this
- Java 1.8 SDK
- SBT (Scala build tool)
So go and grab that lot if you want to follow along.
Last time we talked about how to create a Kafka Producer/Consumer which did use Avro, but used the GenericRecord approach, which kind of works more like a dictionary of key value pairs. This time we will be talking about how to use the KafkaAvroSerializer to send specific Avro types using Kafka and the Kafka Schema Registry.
Kafka Schema Registry
So just what is the Kafka Schema Registry?
Schema Registry is part of the Confluent Open Source and Confluent Enterprise distributions. The Schema Registry stores a versioned history of all schemas and allows for the evolution of schemas according to the configured compatibility settings and expanded Avro support.
https://docs.confluent.io/current/schema-registry/docs/index.html up on date 26/06/18
I don’t want to dwell too much on the internals of the Kafka Schema Registry as we will be looking at in fine detail in the next post. But for now, it is important to understand that both Kafka/KafkaStreams suport using the Schema Registry when sending/receiving Avro content. They will also raise Exceptions should you try to send incompatible Avro content through a topic which uses the Schema Registry that breaks the Schemas compatibility rules within the Schema Registry.
If all of that sounds a bit whack, don’t worry we will be getting into it more in the next post
I just wanted to give a small overview of how the Schema Registry fits with the Kafka eco system. The rest of this article will be more about how we can make use of the KafkaAvroSerializer to send Avro data of our own types rather than use the previously demonstrated GenericRecord approach.
Where is the code for this post?
You can grab the code that goes with this post from here : https://github.com/sachabarber/KafkaAvroExamples/tree/master/KafkaSpecificAvro
SBT
I decided that I would take the hit and craft a multi-project SBT setup, which looks like this
import Deps._ import sbt.Keys.scalaVersion lazy val root = (project in file(".")). aggregate(publisher, subscriber). settings( inThisBuild(List( organization := "com.barber", scalaVersion := "2.12.1", resolvers += "Sonatype OSS Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots", resolvers += "io.confluent" at "http://packages.confluent.io/maven/" )), name := "scala_kafka_specific_avro_example" ) lazy val publisher = (project in file ("publisher")). settings( name := "publisher", /* sourceGenerators in Compile += (avroScalaGenerateSpecific in Compile).taskValue, */ libraryDependencies ++= Seq( kafka, avro, avroSerializer, logBack) ).dependsOn(common) lazy val subscriber = (project in file ("subscriber")). settings( name := "subscriber", /* sourceGenerators in Compile += (avroScalaGenerateSpecific in Compile).taskValue, */ libraryDependencies ++= Seq( kafka, avro, avroSerializer, logBack) ).dependsOn(publisher, common) lazy val common = (project in file ("common")). settings( name := "common", libraryDependencies ++= Seq( kafka, avro, avroSerializer, logBack) )
Where the Deps.scala file looks like this
import sbt._ object Deps { lazy val kafka = "org.apache.kafka" % "kafka_2.11" % "1.1.0" lazy val avro = "org.apache.avro" % "avro" % "1.8.2" lazy val avroSerializer = "io.confluent" % "kafka-avro-serializer" % "3.2.1" lazy val logBack = "ch.qos.logback" % "logback-classic" % "1.1.7" }
This essentially creates the following 3 projects
- Common (this is where the shared Avro schema/object live)
- Publisher
- Subscriber
Once you have done a SBT clean/compile and opened it inside IntelliJ it should look something like this:
So now that we know roughly what the build file looks like, lets turn our attention to the 3 projects. I will walk through them below in turn
Common Project
As stated above the common project holds the Avro schema file (which is slightly different from the last post). The new Avro schema is as follows:
{ "namespace": "com.barber.kafka.avro", "type": "record", "name": "User", "fields":[ { "name": "id", "type": "int" }, { "name": "name", "type": "string" } ] }
Unlike last time where we used a GenericRecord this time we would like to use a case class to send down the wire. Something like this
case class User(var id: Int, var name: String)
Unfortunately this does not work, if you were to try and send this down the wire using the KafkaProducer whilst the Schema Registry is running, you will most likely get an Exception stating that it can not be serialized.
What we actually need to do is something more like this, where we need to extend the SpecificRecordBase trait and also supply the Schema and the ability to set/get values.
/** MACHINE-GENERATED FROM AVRO SCHEMA. DO NOT EDIT DIRECTLY */ package com.barber.kafka.avro import scala.annotation.switch import scala.io.Source case class User(var id: Int, var name: String) extends org.apache.avro.specific.SpecificRecordBase { def this() = this(0, "") def get(field$: Int): AnyRef = { (field$: @switch) match { case pos if pos == 0 => { id }.asInstanceOf[AnyRef] case pos if pos == 1 => { name }.asInstanceOf[AnyRef] case _ => new org.apache.avro.AvroRuntimeException("Bad index") } } def put(field$: Int, value: Any): Unit = { (field$: @switch) match { case pos if pos == 0 => this.id = { value }.asInstanceOf[Int] case pos if pos == 1 => this.name = { value.toString }.asInstanceOf[String] case _ => new org.apache.avro.AvroRuntimeException("Bad index") } () } def getSchema: org.apache.avro.Schema = User.SCHEMA$ } object User { val SCHEMA$ = new org.apache.avro.Schema.Parser().parse( Source.fromURL(getClass.getResource("/userSchema.avsc")).mkString) }
Now that is quite a bit of code to write. Surely there are tools out there for this job aren’t there?
Why yes there are.
- There is this excellent SBT Plugin available : https://github.com/julianpeeters/sbt-avrohugger which will generate the Avro case class with the implemented trait as shown above, but does require you to integrate the SBT Tasks from that Plugin into your own build system. You may also get conflicts if you are using another existing code generator. But it is very useful and worth a look.
- There is also this excellent website hosted on Heroku : https://avro2caseclass.herokuapp.com/generate?schema=%7B%0D%0A++++%22namespace%22%3A+%22com.barber.kafka.avro%22%2C%0D%0A+++++%22type%22%3A+%22record%22%2C%0D%0A+++++%22name%22%3A+%22user%22%2C%0D%0A+++++%22fields%22%3A%5B%0D%0A+++++++++%7B%0D%0A++++++++++++%22name%22%3A+%22id%22%2C+%22type%22%3A+%22int%22%0D%0A+++++++++%7D%2C%0D%0A+++++++++%7B%0D%0A++++++++++++%22name%22%3A+%22name%22%2C++%22type%22%3A+%22string%22%0D%0A+++++++++%7D%0D%0A+++++%5D%0D%0A%7D&format=specific this simply allows you to paste your Avro schema and get the code generated for you which you can then use in your solution. Here is an example of using this site:
So that is all the Common project is really, its just a place for the shared Avro objects to live that the Publisher/Subscriber projects can use.
Publisher Project
This is the entire code for the Publisher
package com.barber.kafka.avro import java.util.{Properties, UUID} import io.confluent.kafka.serializers.KafkaAvroSerializer import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.common.serialization.StringSerializer class KafkaDemoAvroPublisher(val topic:String) { private val props = new Properties() props.put("bootstrap.servers", "localhost:9092") props.put("schema.registry.url", "http://localhost:8081") props.put("key.serializer", classOf[StringSerializer].getCanonicalName) props.put("value.serializer",classOf[KafkaAvroSerializer].getCanonicalName) props.put("client.id", UUID.randomUUID().toString()) private val producer = new KafkaProducer[String,User](props) def send(): Unit = { try { val rand = new scala.util.Random(44343) for(i <- 1 to 10) { val id = rand.nextInt() val itemToSend = User(id , "ishot.com") println(s"Producer sending data ${itemToSend.toString}") producer.send(new ProducerRecord[String, User](topic, itemToSend)) producer.flush() } } catch { case ex: Exception => println(ex.printStackTrace().toString) ex.printStackTrace() } } }
The main points are these:
- We now include a property for the schema.registry.url
- We now use the KafkaAvroSerializer for the topic value serializer
- We use the User class that we talked about in the common project, that was generated from the Avro schema talked about above. It is this User Avro object that we push out on the Kafka topic
Subscriber Project
Here is the subscriber code
package com.barber.kafka.avro import java.util.Properties import org.apache.kafka.clients.consumer.KafkaConsumer import java.util.Collections import org.apache.kafka.common.errors.TimeoutException import io.confluent.kafka.serializers.KafkaAvroDeserializer import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.ConsumerRecords import org.apache.kafka.common.serialization.StringDeserializer class KafkaDemoAvroSubscriber(val topic:String) { private val props = new Properties() val groupId = "avro-demo-topic-consumer" var shouldRun : Boolean = true props.put("bootstrap.servers", "localhost:9092") props.put("schema.registry.url", "http://localhost:8081") props.put("group.id", groupId) props.put("enable.auto.commit", "true") props.put("auto.commit.interval.ms", "10000") props.put("session.timeout.ms", "30000") props.put("consumer.timeout.ms", "120000") props.put("key.deserializer", classOf[StringDeserializer].getCanonicalName) props.put("value.deserializer",classOf[KafkaAvroDeserializer].getCanonicalName) //Use Specific Record or else you get Avro GenericRecord. props.put("specific.avro.reader", "true") private val consumer = new KafkaConsumer[String, com.barber.kafka.avro.User](props) def start() = { try { Runtime.getRuntime.addShutdownHook(new Thread(() => close())) consumer.subscribe(Collections.singletonList(topic)) while (shouldRun) { val records: ConsumerRecords[String, com.barber.kafka.avro.User] = consumer.poll(1000) val it = records.iterator() while(it.hasNext()) { println("Getting message from queue.............") val record: ConsumerRecord[String, com.barber.kafka.avro.User] = it.next() val recievedItem =record.value() println(s"Saw User ${recievedItem}") consumer.commitSync } } } catch { case timeOutEx: TimeoutException => println("Timeout ") false case ex: Exception => ex.printStackTrace() println("Got error when reading message ") false } } def close(): Unit = shouldRun = false }
This is not that different from last time, the main points are :
- We now include a property for the schema.registry.url
- We now use the KafkaAvroDeSerializer for the topic value deserializer
- We must set the property specific.avro.reader to have a value of true, which tells Kafka / Schema Registry that we will be using a specific Avro type (User type in this case) otherwise Kafka will expect GenericRecord to be used on the topic
- We use the User class that we talked about in the common project, that was generated from the Avro schema talked about above. It is this User Avro object that we receive on the Kafka topic
So how do I run this stuff?
As I stated above you will need to download a few things, but once you have those in place you may find the small PowerShell script useful that is inside the project called “RunThePipeline.ps1”. This script does a few things, such as cleans the Kafka/Zookeeper logs, stops any previous instances, starts new instances and also creates the Kafka topic (which you must have before you can use the code).
IMPORTANT NOTE : I have altered the Kafka log paths, and where Zookeeper logs to. This can be done using the relevant properties files from the Confluent installation
server.properties
This line was changed
# A comma seperated list of directories under which to store log files
log.dirs=c:/temp/kafka-logs
zookeeper.properties
This line was changed
# the directory where the snapshot is stored.
dataDir=c:/temp/zookeeper
The PowerShell script will be making assumptions based on the where you changed these values to, so if you have different settings for these please edit the PowerShell script too )
Essentially you need to have the following running before you can run the code example (this is what the PowerShell script automates for you, though you may prefer to use the Confluent CLI, I just had this script from a previous project, and it also cleans out old data which is useful when you are experimenting, and it also creates the required topic)
- Zookeeper
- Kafka broker
- Starts the Schema Registry
- Kafka topic created
Once you have that, you can simply open the code in IntelliJ and right click and run the 2 apps (or set up new configurations)
Run the Publisher
Run the Subscriber
Once you have the run the PowerShell “RunThePipeline.ps1” script associated with this code (the one in the demo folder) and run the Publisher/Subscriber as shown above, you should see some output something like this
Publisher output
Subscriber output
Conclusion
Using Avro with Kafka / Schema Registry is fairly straight forward, and is not that different from working with regular Array of bytes. I hope this post has wet your appetite a bit for the Kafka Schema registry which we will look into in a lot more detail in the next post.
Hello Sacha,
I enjoyed reading your posts on Kafka. I tried recreating the Kafka Producer/Consumer with Schema Registry project (in Mac + IntelliJ + Gradle). But I was unable to send messages to Consumer. I’m not sure what is going on. This is exact same code snippet as in this post. Can you please help me debug this issue?
I don’t know gradle at all, my stuff was all Scala and SBT. But where I would start is making sure you have the same JARs downloaded I show in my SBT file
Perhaps start without Avro and make sure your Porducer and Consumer can just send/receive simple string (ProducerRecord[string,string] say)
Also if you like this series you should check out my Kafka Streams series : https://sachabarbs.wordpress.com/kafka-streams-series/