Kafka Streams Using Avro/Schema Registry

This is the 4th and final post in a small mini series that I will be doing using Apache Kafka + Avro. The programming language will be Scala. As such the following prerequisites need to be obtained should you wish to run the code that goes along with each post. The other point is that I am mainly a Windows user, as such the instructions, scripts will have a Windows bias to them. So if you are not a Windows user you will need to find the instructions for your OS of choice.

Prerequisites

So go and grab that lot if you want to follow along.

Last time we talked about the Schema registry and how to use it. This time we will be looking at how we can use Avro/Schema Registry inside of a Kafka Streams App.

 

Where is the code for this post?

You can grab the code that goes with this post from here : https://github.com/sachabarber/KafkaAvroExamples/tree/master/KafkaStreamsSpecificAvro

 

What will be included in this post?

We will be dealing with these 3 elements to make this post work

 

  • Publisher (using User Avro object)
  • Streams (Taking User Avro objects and transforming them into UserWithUUID objects and sending them to output topic)
  • Consumer (using UserWithUUID Avro object)

 

Before we get into the nitty gritty lets just have a look at the Avro files

 

User

This is the User schema / Case class

{
    "namespace": "com.barber.kafka.avro",
     "type": "record",
     "name": "User",
     "fields":[
         {
            "name": "id", "type": "int"
         },
         {
            "name": "name",  "type": "string"
         }
     ]
}

 

/** MACHINE-GENERATED FROM AVRO SCHEMA. DO NOT EDIT DIRECTLY */
package com.barber.kafka.avro

import scala.annotation.switch
import scala.io.Source

case class User(var id: Int, var name: String) extends org.apache.avro.specific.SpecificRecordBase {
  def this() = this(0, "")
  def get(field$: Int): AnyRef = {
    (field$: @switch) match {
      case pos if pos == 0 => {
        id
      }.asInstanceOf[AnyRef]
      case pos if pos == 1 => {
        name
      }.asInstanceOf[AnyRef]
      case _ => new org.apache.avro.AvroRuntimeException("Bad index")
    }
  }
  def put(field$: Int, value: Any): Unit = {
    (field$: @switch) match {
      case pos if pos == 0 => this.id = {
        value
      }.asInstanceOf[Int]
      case pos if pos == 1 => this.name = {
        value.toString
      }.asInstanceOf[String]
      case _ => new org.apache.avro.AvroRuntimeException("Bad index")
    }
    ()
  }
  def getSchema: org.apache.avro.Schema = User.SCHEMA$
}

object User {
  val SCHEMA$ = new org.apache.avro.Schema.Parser().parse(
    Source.fromURL(getClass.getResource("/userSchema.avsc")).mkString)
}

 

UserWithUUID

This is the UserWithUUID schema / Case class

{
    "namespace": "com.barber.kafka.avro",
     "type": "record",
     "name": "UserWithUUID",
     "fields":[
         {
            "name": "id", "type": "int"
         },
         {
            "name": "name",  "type": "string"
         },
         {
            "name": "uuid",  "type": "string"
         }
     ]
}

 

/** MACHINE-GENERATED FROM AVRO SCHEMA. DO NOT EDIT DIRECTLY */
package com.barber.kafka.avro

import scala.annotation.switch
import scala.io.Source

case class UserWithUUID(var id: Int, var name: String, var uuid: String) extends org.apache.avro.specific.SpecificRecordBase {
  def this() = this(0, "","")
  def get(field$: Int): AnyRef = {
    (field$: @switch) match {
      case pos if pos == 0 => {
        id
      }.asInstanceOf[AnyRef]
      case pos if pos == 1 => {
        name
      }.asInstanceOf[AnyRef]
      case pos if pos == 2 => {
        uuid
      }.asInstanceOf[AnyRef]
      case _ => new org.apache.avro.AvroRuntimeException("Bad index")
    }
  }
  def put(field$: Int, value: Any): Unit = {
    (field$: @switch) match {
      case pos if pos == 0 => this.id = {
        value
      }.asInstanceOf[Int]
      case pos if pos == 1 => this.name = {
        value.toString
      }.asInstanceOf[String]
      case pos if pos == 2 => this.uuid = {
        value.toString
      }.asInstanceOf[String]
      case _ => new org.apache.avro.AvroRuntimeException("Bad index")
    }
    ()
  }
  def getSchema: org.apache.avro.Schema = UserWithUUID.SCHEMA$
}

object UserWithUUID {
  val SCHEMA$ = new org.apache.avro.Schema.Parser().parse(
    Source.fromURL(getClass.getResource("/userWithUUIDSchema.avsc")).mkString)
}

 

Producer

The producer is much the same as the previous examples, so I wont dwell on this too much. Here is the most relevant code

package com.barber.kafka.avro

import java.util.{Properties, UUID}

import io.confluent.kafka.serializers.KafkaAvroSerializer
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.clients.producer.ProducerConfig


class KafkaDemoAvroPublisher(val topic:String) {

  private val props = new Properties()
  props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
  props.put("schema.registry.url", "http://localhost:8081")
  props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getCanonicalName)
  props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,classOf[KafkaAvroSerializer].getCanonicalName)
  props.put(ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString())
  props.put(ProducerConfig.ACKS_CONFIG,  "all")
  props.put(ProducerConfig.RETRIES_CONFIG, "0")
  private val producer =   new KafkaProducer[String,User](props)

  def send(): Unit = {
    try {
      val rand =  new scala.util.Random(44343)

      for(i <- 1 to 10) {
        val id = rand.nextInt()
        val itemToSend = User(id , "ishot.com")
        println(s"Producer sending data ${itemToSend.toString}")
        producer.send(new ProducerRecord[String, User](topic, itemToSend))
        producer.flush()
      }
    } catch {
      case ex: Exception =>
        println(ex.printStackTrace().toString)
        ex.printStackTrace()
    }
  }
}

 

Streams App

This is the new element for this post. So what does it do?

Well quite simply it hooks up some stream processing to the producers topic (where User avro objects are being used). It then does a simple Map transform on the stream items to create a new KStream[String,UserWithUUID] which is then sent to the output topic the consumer will use. In a nutshell that is all there is to it.

Here is the code to do the streams app

 

package com.barber.kafka.avro


object StreamsApp extends App {
  private val inputTopic = "avro-streams-input-topic"
  private val outputTopic = "avro-streams-useruuid-output-topic"

  val consumer = new KafkaDemoAvroStreams(inputTopic, outputTopic)
  consumer.start()

}

 

package com.barber.kafka.avro


import java.util.{Collections, Properties}

import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
import org.apache.avro.specific.SpecificRecord
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.Serdes.StringSerde
import org.apache.kafka.common.serialization.{Serde, Serdes, StringDeserializer}
import org.apache.kafka.streams.kstream.{KStream, Produced}

import scala.concurrent.TimeoutException
import org.apache.kafka.streams.{KafkaStreams, KeyValue, StreamsBuilder, StreamsConfig}

class KafkaDemoAvroStreams(val inputTopic:String, val outputTopic:String) {


  val builder: StreamsBuilder = new StreamsBuilder()
  var streams: Option[KafkaStreams] = None

  val streamsConfiguration: Properties = {
    val p = new Properties()
    p.put(StreamsConfig.APPLICATION_ID_CONFIG, "avro-stream-demo-topic-streams")
    p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
    p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName())
    p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, classOf[SpecificAvroSerde[_ <: SpecificRecord]])
    p.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,  "http://localhost:8081")
    p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
    p
  }


  def start() = {

    try {
      Runtime.getRuntime.addShutdownHook(new Thread(() => close()))

      //https://github.com/confluentinc/kafka-streams-examples/blob/4.1.x/src/test/scala/io/confluent/examples/streams/SpecificAvroScalaIntegrationTest.scala

      // Write the input data as-is to the output topic.
      //
      // If 
      // 
      // a) we have already configured the correct default serdes for keys and
      // values
      // 
      // b) the types for keys and values are the same for both the input topic and the
      // output topic
      // 
      // We would only need to define:
      //
      //   builder.stream(inputTopic).to(outputTopic);
      //
      // However, in the code below we intentionally override the default serdes in `to()` to
      // demonstrate how you can construct and configure a specific Avro serde manually.
      val stringSerde: Serde[String] = Serdes.String
      val specificAvroUserSerde: Serde[User] = new SpecificAvroSerde[User]
      val specificAvroUserWithUUIDSerde: Serde[UserWithUUID] = new SpecificAvroSerde[UserWithUUID]

      // Note how we must manually call `configure()` on this serde to configure the schema registry
      // url.  This is different from the case of setting default serdes (see `streamsConfiguration`
      // above), which will be auto-configured based on the `StreamsConfiguration` instance.
      val isKeySerde: Boolean = false
      specificAvroUserSerde.configure(
        Collections.singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
        "http://localhost:8081"), isKeySerde)
      specificAvroUserWithUUIDSerde.configure(
        Collections.singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
          "http://localhost:8081"), isKeySerde)


      val stream: KStream[String, User] = builder.stream(inputTopic)

      val mappedStream  =
        stream.map[String, UserWithUUID]((k,v) => {
            println("Streams saw messsage ============ ")
            println(s"Saw User ${v}")
            new KeyValue(k, UserWithUUID(v.id,v.name, java.util.UUID.randomUUID().toString()))
        })

      //send UserWithUUID out on output topic
      mappedStream.to(outputTopic, Produced.`with`(stringSerde, specificAvroUserWithUUIDSerde))
      streams = Some(new KafkaStreams(builder.build(), streamsConfiguration))
      streams.map(_.start())

    }
    catch {
      case timeOutEx: TimeoutException =>
        println("Timeout ")
        false
      case ex: Exception => ex.printStackTrace()
        println("Got error when reading message ")
        false
    }
  }

  def close(): Unit = streams.map(_.close())

}

See how this time we are not working with Serializer/Deserializer directly but rather use things called “Serde”. These are serializer/deserializer objects. See how we also need to tell the Serde about the Schema Registry, and also how we set the serdes to use in the final “to” operation to send the transformed data to the output topic.

 

 

ConsumerApp

This too is not that different from the previous examples. The only thing to note here is that this one uses the outputs from the streams topic, so it will be using UserWithUUID objects. Here is the relevant code

package com.barber.kafka.avro

import java.util.Properties

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, ConsumerRecords, KafkaConsumer}
import java.util.Collections

import org.apache.kafka.common.errors.TimeoutException
import io.confluent.kafka.serializers.{KafkaAvroDeserializer, KafkaAvroDeserializerConfig}
import org.apache.kafka.common.serialization.StringDeserializer

class KafkaDemoAvroSubscriber(val topic:String) {

  private val props = new Properties()
  val groupId = "avro-stream-demo-topic-useruuid-consumer"
  var shouldRun : Boolean = true

  props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
  props.put("schema.registry.url", "http://localhost:8081")
  props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
  props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
  props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
  props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "10000")
  props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000")
  props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getCanonicalName)
  props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,classOf[KafkaAvroDeserializer].getCanonicalName)
  //Use Specific Record or else you get Avro GenericRecord.
  props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true")

  private val consumer = new KafkaConsumer[String, com.barber.kafka.avro.UserWithUUID](props)

  def start() = {

    try {
      Runtime.getRuntime.addShutdownHook(new Thread(() => close()))

      consumer.subscribe(Collections.singletonList(topic))

      while (shouldRun) {
        val records: ConsumerRecords[String,  com.barber.kafka.avro.UserWithUUID] = consumer.poll(1000)
        val it = records.iterator()
        while(it.hasNext()) {
          println("Getting message from queue.............")
          val record: ConsumerRecord[String,  com.barber.kafka.avro.UserWithUUID] = it.next()
          val recievedItem =record.value()
          println(s"Saw UserWithUUID ${recievedItem}")
          consumer.commitSync
        }
      }
    }
    catch {
      case timeOutEx: TimeoutException =>
        println("Timeout ")
        false
      case ex: Exception => ex.printStackTrace()
        println("Got error when reading message ")
        false
    }
  }

  def close(): Unit = shouldRun = false
}

 

 

 

So how do I run this stuff?

As I stated above you will need to download a few things, but once you have those in place you may find the small PowerShell script useful that is inside the projects called “RunThePipeline.ps1”. This script does a few things, such as cleans the Kafka/Zookeeper logs, stops any previous instances, starts new instances and also creates the Kafka topic (which you must have before you can use the code).

 

IMPORTANT NOTE : I have altered the Kafka log paths, and where Zookeeper logs to. This can be done using the relevant properties files from the Confluent installation

 

image

 

server.properties

This line was changed

# A comma seperated list of directories under which to store log files
log.dirs=c:/temp/kafka-logs

 

zookeeper.properties

This line was changed

# the directory where the snapshot is stored.
dataDir=c:/temp/zookeeper

 

The PowerShell script will be making assumptions based on the where you changed these values to, so if you have different settings for these please edit the PowerShell script too )

Essentially you need to have the following running before you can run the code example (this is what the PowerShell script automates for you, though you may prefer to use the Confluent CLI, I just had this script from a previous project, and it also cleans out old data which is useful when you are experimenting, and it also creates the required topic)

  • Zookeeper
  • Kafka broker
  • Starts the Schema Registry
  • Kafka topic created

 

Once you have run the PowerShell command line you can just run these projects

 

  • com.barber.kafka.avro.PublisherApp
  • com.barber.kafka.avro.StreamsApp
  • com.barber.kafka.avro.SubscriberApp
Advertisements

Kafka Schema Registry

This is the 3rd post in a small mini series that I will be doing using Apache Kafka + Avro. The programming language will be Scala. As such the following prerequisites need to be obtained should you wish to run the code that goes along with each post. The other point is that I am mainly a Windows user, as such the instructions, scripts will have a Windows bias to them. So if you are not a Windows user you will need to find the instructions for your OS of choice.

 

Prerequisites

So go and grab that lot if you want to follow along.

 

Last time we talked about how to create a Kafka Producer/Consumer which uses the KafkaAvroSerializer when using Avro data of a specific type. We also used the Kafka Schema Registry and had a tiny introduction as to what the Kafka Schema Registry was all about. This time we will be looking at the Kafka Schema Registry in a lot more detail.

 

 

Where is the code for this post?

You can grab the code that goes with this post from here : https://github.com/sachabarber/KafkaAvroExamples/tree/master/KafkaSpecificAvroBrokenProducer and here too : https://github.com/sachabarber/KafkaAvroExamples/tree/master/KafkaSchemaRegistryTests

 

Kafka Schema Registry Deep Dive

The idea behind the Schema Registry is that Confluent provide it as a service that exposes a REST API that integrates closely with the rest of the Kafka stack. The Schema Registry acts as a single store for schema metadata, in a versioned historical manner. It also provides compatibility settings that allow the evolution of schemas according to the configured compatibility setting. As we have already seen in the last post there are also serializers that are provided that work with the Registry and Avro format. Thus far we have only seen a Producer/Consumer both of which use the KafkaAvroSerializer, but there is also a Serde (serializer/deserializer) available when working with Kafka Streams.Which we will talk about in the next post.

 

You may be wondering just why you may want to use some sort of schema registry. Well have you ever found yourself in a situation where you store a JSON blob in a row in a database, and then later you make changes to the format of the original object that was used to store these historic rows. Then you find yourself in the unenviable position of no longer being able to easily deserialize your old database JSON data. You need to either write some mapping code or evolve the data to the latest schema format. This is not an easy task, I have done this by hand once or twice and its hard to get right. This is the job of the Kafka Schema registry essentially. It’s a helping/guiding light to help you traverse this issue.

 

I have taken the next section from the official docs, as there is good detail in here that is important to the rest of this post, and it really is a case of the official docs saying it best, so let’s just borrow this bit of text, the rest of the post will put this into action

 

The Schema Registry is a distributed storage layer for Avro Schemas which uses Kafka as its underlying storage mechanism. Some key design decisions:

  • Assigns globally unique id to each registered schema. Allocated ids are guaranteed to be monotonically increasing but not necessarily consecutive.
  • Kafka provides the durable backend, and functions as a write-ahead changelog for the state of the Schema Registry and the schemas it contains.
  • The Schema Registry is designed to be distributed, with single-master architecture, and ZooKeeper/Kafka coordinates master election (based on the configuration).

 

Kafka Backend

Kafka is used as the Schema Registry storage backend. The special Kafka topic <kafkastore.topic> (default _schemas), with a single partition, is used as a highly available write ahead log. All schemas, subject/version and id metadata, and compatibility settings are appended as messages to this log. A Schema Registry instance therefore both produces and consumes messages under the _schemas topic. It produces messages to the log when, for example, new schemas are registered under a subject, or when updates to compatibility settings are registered. The Schema Registry consumes from the _schemas log in a background thread, and updates its local caches on consumption of each new _schemas message to reflect the newly added schema or compatibility setting. Updating local state from the Kafka log in this manner ensures durability, ordering, and easy recoverability.

 

Compatibility

The Schema Registry server can enforce certain compatibility rules when new schemas are registered in a subject. Currently, we support the following compatibility rules.

  • Backward compatibility (default): A new schema is backwards compatible if it can be used to read the data written in the latest (older) registered schema.
  • Transitive backward compatibility: A new schema is transitively backwards compatible if it can be used to read the data written in all previously registered schemas. Backward compatibility is useful for loading data into systems like Hadoop since one can always query data of all versions using the latest schema.
  • Forward compatibility: A new schema is forward compatible if the latest (old) registered schema can read data written in this schema, or to put this another way “data encoded with a newer schema can be read with an older schema.”
  • Transitive forward compatibility: A new schema is transitively forward compatible if all previous schemas can read data written in this schema. Forward compatibility is useful for consumer applications that can only deal with data in a particular version that may not always be the latest version.
  • Full compatibility: A new schema is fully compatible if it’s both backward and forward compatible with the latest registered schema.
  • Transitive full compatibility: A new schema is transitively full compatible if it’s both backward and forward compatible with all previously registered schemas.
  • No compatibility: A new schema can be any schema as long as it’s a valid Avro.

 

An Evolution Example

Say we had this initial schema

{"namespace": "barbers.avro",
 "type": "record",
 "name": "person",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "age",  "type": "int"},
 ]
}

We can evolve this schema into this one, where we supply a default location.

{"namespace": "barbers.avro",
 "type": "record",
 "name": "person",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "age",  "type": "int"},
	 {"name": "location", "type": "string", "default": "uk"}
 ]
}

 

This allows data encoded with the old one to be read using this new schema. This is backward compatible. If we had not of supplied this default for ”location” this would no longer be backward compatible.

 

Forward compatibility means that data encoded with a newer schema can be read with an older schema. This new schema is also forward compatible with the original schema, since we can just drop the defaulted new field “location” when projecting new data into the old. Had the new schema chosen to no longer include “age” column it would not be forward compatible, as we would not know how to fill in this field for the the old schema.

For more information on this you should have a bit more of a read here : https://docs.confluent.io/current/avro.html

 

Ok so I think that gives us a flavor of what the Schema Registry is all about, let’s carry on and see some code examples.

 

So how do I run this stuff?

 

As I stated above you will need to download a few things, but once you have those in place you may find the small PowerShell script useful that is inside the projects called “RunThePipeline.ps1”. This script does a few things, such as cleans the Kafka/Zookeeper logs, stops any previous instances, starts new instances and also creates the Kafka topic (which you must have before you can use the code).

IMPORTANT NOTE : I have altered the Kafka log paths, and where Zookeeper logs to. This can be done using the relevant properties files from the Confluent installation

 

image

 

server.properties

This line was changed

# A comma seperated list of directories under which to store log files
log.dirs=c:/temp/kafka-logs

 

zookeeper.properties

This line was changed

# the directory where the snapshot is stored.
dataDir=c:/temp/zookeeper

 

The PowerShell script will be making assumptions based on the where you changed these values to, so if you have different settings for these please edit the PowerShell script too )

 

Essentially you need to have the following running before you can run the code example (this is what the PowerShell script automates for you, though you may prefer to use the Confluent CLI, I just had this script from a previous project, and it also cleans out old data which is useful when you are experimenting, and it also creates the required topic)

 

  • Zookeeper
  • Kafka broker
  • Starts the Schema Registry
  • Kafka topic created

 

 

Using the producer to test out the Registry

This is this project in the GitHub demo code : https://github.com/sachabarber/KafkaAvroExamples/tree/master/KafkaSpecificAvroBrokenProducer

 

Since we worked with the Schema Registry and a Kafka Producer that made use of the Registry in the last post, I thought it might a good idea to make some changes to the publisher of the last post, to see if we can create some compatibility tests against the Schema Registry that the Producer is using.

 

So so before we see the running code, lets examine some schemas that are run in the PublisherApp in this exact order

 

User schema

{
    "namespace": "com.barber.kafka.avro",
     "type": "record",
     "name": "User",
     "fields":[
         {
            "name": "id", "type": "int"
         },
         {
            "name": "name",  "type": "string"
         }
     ]
}

 

User without name schema

{
    "namespace": "com.barber.kafka.avro",
     "type": "record",
     "name": "User",
     "fields":[
         {
            "name": "id", "type": "int"
         }
     ]
}

 

User with boolean “Id” field schema

{
    "namespace": "com.barber.kafka.avro",
     "type": "record",
     "name": "User",
     "fields":[
         {
            "name": "id", "type": "boolean"
         }
     ]
}

 

Another completely new type with string “Id” field schema

{
    "namespace": "com.barber.kafka.avro",
     "type": "record",
     "name": "AnotherExampleWithStringId",
     "fields":[
         {
            "name": "id", "type": "string"
         }
     ]
}

 

So since the Kafka Producer is setup to use the Kafka Schema Registry and is sending Avro using the KafkaAvroSerializer for the key, we start with the 1st schema (User Schema) shown above being the one that is registered against the Kafka Schema Registry subject Kafka-value (we will see more of the Registry API below for now just understand that when using the Schema Registry a auto registration is done against the topic/value for the given producer, on the 1st one to use the topic being the one that sets these items for the Registry).

 

Ok so we have stated that we start out with the 1st schema (User Schema) shown above, and then we progress through each of the other schemas shown above and try and send the Avro serialized data across the Kafka topic, here is the code from the Producer App

package com.barber.kafka.avro

import java.util.{Properties, UUID}

import io.confluent.kafka.serializers.KafkaAvroSerializer
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer

class KafkaDemoAvroPublisher(val topic:String) {

  private val props = new Properties()
  props.put("bootstrap.servers", "localhost:9092")
  props.put("schema.registry.url", "http://localhost:8081")
  props.put("key.serializer", classOf[StringSerializer].getCanonicalName)
  props.put("value.serializer",classOf[KafkaAvroSerializer].getCanonicalName)
  props.put("client.id", UUID.randomUUID().toString())

  private val producer =   new KafkaProducer[String,User](props)
  private val producerUserWithoutName =   new KafkaProducer[String,UserWithoutName](props)
  private val producerUserWithBooleanIdBad =   new KafkaProducer[String,UserWithBooleanId](props)
  private val producerAnotherExampleWithStringIdBad = new KafkaProducer[String,AnotherExampleWithStringId](props)

  def send(): Unit = {
    try {
      val rand =  new scala.util.Random(44343)

      //we expect this to work, as its the one that is going to define the Avro format of then topic
      //since its the 1st published message on the topic (Assuming you have not preregistered the topic key + Avro schemea
      //with the schema registry already)
      for(i <- 1 to 10) {
        val id = rand.nextInt()
        val itemToSend = User(id , "ishot.com")
        println(s"Producer sending data ${itemToSend.toString}")
        producer.send(new ProducerRecord[String, User](topic, itemToSend))
        producer.flush()
      }


      //we expect this to work as having a User without a name is ok, as Name is a "string" so can be empty
      for(i <- 1 to 10) {
        val id = rand.nextInt()
        val itemToSend = UserWithoutName(id)
        println(s"Producer sending data ${itemToSend.toString}")
        producerUserWithoutName.send(new ProducerRecord[String, UserWithoutName](topic, itemToSend))
        producerUserWithoutName.flush()
      }


      //we expect this to fail as its trying to send a different incompatible Avro object on the topic
      //which is currently using the "User" (Avro object / Schema)
      sendBadProducerValue("UserWithBooleanId", () => {
        val itemToSend = UserWithBooleanId(true)
        println(s"Producer sending data ${itemToSend.toString}")
        producerUserWithBooleanIdBad.send(new ProducerRecord[String, UserWithBooleanId](topic, itemToSend))
        producerUserWithBooleanIdBad.flush()
      })

      //we expect this to fail as its trying to send a different incompatible Avro object on the topic
      //which is currently using the "User" (Avro object / Schema)
      sendBadProducerValue("AnotherExampleWithStringId", () => {
        val itemToSend = AnotherExampleWithStringId("fdfdfdsdsfs")
        println(s"Producer sending data ${itemToSend.toString}")
        producerAnotherExampleWithStringIdBad.send(new ProducerRecord[String, AnotherExampleWithStringId](topic, itemToSend))
        producerAnotherExampleWithStringIdBad.flush()
      })

    } catch {
      case ex: Exception =>
        println(ex.printStackTrace().toString)
        ex.printStackTrace()
    }
  }

  def sendBadProducerValue(itemType: String, produceCallback: () => Unit) : Unit = {
    try {
      //we expect this to fail as its trying to send a different incompatible
      // Avro object on the topic which is currently using the "User" (Avro object / Schema)
      println(s"Sending $itemType")
      produceCallback()
    } catch {
      case ex: Exception => {
        println("==============================================================================\r\n")
        println(s" We were expecting this due to incompatble '$itemType' item being sent\r\n")
        println("==============================================================================\r\n")
        println(ex.printStackTrace().toString)
        println()
        ex.printStackTrace()
      }
    }
  }
}

 

Now lets see the relevant output

 

NOTE : This assumes a clean run with a new empty topic

 

 

We start by sending some User Schema data, which is fine as it’s the first data on the topic

 

Producer sending data {“id”: -777306865, “name”: “ishot.com”}
06:47:05.816 [kafka-producer-network-thread | 6c9d7365-1216-4c4e-9cfe-da3e3d5654d1] DEBUG org.apache.kafka.clients.NetworkClient – [Producer clientId=6c9d7365-1216-4c4e-9cfe-da3e3d5654d1] Using older server API v3 to send PRODUCE {acks=1,timeout=30000,partitionSizes=[avro-specific-demo-topic-0=88]} with correlation id 8 to node 0
Producer sending data {“id”: 1227013473, “name”: “ishot.com”}
06:47:05.818 [kafka-producer-network-thread | 6c9d7365-1216-4c4e-9cfe-da3e3d5654d1] DEBUG org.apache.kafka.clients.NetworkClient – [Producer clientId=6c9d7365-1216-4c4e-9cfe-da3e3d5654d1] Using older server API v3 to send PRODUCE {acks=1,timeout=30000,partitionSizes=[avro-specific-demo-topic-0=88]} with correlation id 9 to node 0
Producer sending data {“id”: 1899269599, “name”: “ishot.com”}
06:47:05.821 [kafka-producer-network-thread | 6c9d7365-1216-4c4e-9cfe-da3e3d5654d1] DEBUG org.apache.kafka.clients.NetworkClient – [Producer clientId=6c9d7365-1216-4c4e-9cfe-da3e3d5654d1] Using older server API v3 to send PRODUCE {acks=1,timeout=30000,partitionSizes=[avro-specific-demo-topic-0=88]} with correlation id 10 to node 0
Producer sending data {“id”: -1764893671, “name”: “ishot.com”}

 

We then try send some User Without Name Schema data, which is fine as it’s compatible with the User Schema already registered in the Kafka Registry

 

Producer sending data {“id”: -1005363706}
06:47:05.895 [kafka-producer-network-thread | 6c9d7365-1216-4c4e-9cfe-da3e3d5654d1] DEBUG org.apache.kafka.clients.NetworkClient – [Producer clientId=6c9d7365-1216-4c4e-9cfe-da3e3d5654d1] Using older server API v3 to send PRODUCE {acks=1,timeout=30000,partitionSizes=[avro-specific-demo-topic-0=78]} with correlation id 4 to node 0
Producer sending data {“id”: -528430870}
06:47:05.900 [kafka-producer-network-thread | 6c9d7365-1216-4c4e-9cfe-da3e3d5654d1] DEBUG org.apache.kafka.clients.NetworkClient – [Producer clientId=6c9d7365-1216-4c4e-9cfe-da3e3d5654d1] Using older server API v3 to send PRODUCE {acks=1,timeout=30000,partitionSizes=[avro-specific-demo-topic-0=78]} with correlation id 5 to node 0
Producer sending data {“id”: -877322591}
06:47:05.905 [kafka-producer-network-thread | 6c9d7365-1216-4c4e-9cfe-da3e3d5654d1] DEBUG org.apache.kafka.clients.NetworkClient – [Producer clientId=6c9d7365-1216-4c4e-9cfe-da3e3d5654d1] Using older server API v3 to send PRODUCE {acks=1,timeout=30000,partitionSizes=[avro-specific-demo-topic-0=78]} with correlation id 6 to node 0
Producer sending data {“id”: 2048308094}

 

We then try send some User With Boolean Id Field Schema data, which we expect to be NOT acceptable/incompatible with the User Schema already registered in the Kafka Registry. This is due to the fact we have changed the Id field into a boolean, where the already registered User Schema expects this field to be an “int”. So does it fail?

 

In a word yes, here is the relevant output

 

Sending UserWithBooleanId
Producer sending data {“id”: true}

06:47:05.954 [main] DEBUG io.confluent.kafka.schemaregistry.client.rest.RestService – Sending POST with input {“schema”:”{\”type\”:\”record\”,\”name\”:\”User\”,\”namespace\”:\”com.barber.kafka.avro\”,\”fields\”:[{\”name\”:\”id\”,\”type\”:\”boolean\”}]}”} to http://localhost:8081/subjects/avro-specific-demo-topic-value/versions
==============================================================================

We were expecting this due to incompatble ‘UserWithBooleanId’ item being sent

==============================================================================

org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: {“type”:”record”,”name”:”User”,”namespace”:”com.barber.kafka.avro”,”fields”:[{“name”:”id”,”type”:”boolean”}]}
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema; error code: 409
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:170)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:188)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:245)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:237)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:232)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:59)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:91)
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:72)
    at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:54)
    at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:65)
    at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:55)
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:807)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:784)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:671)
    at com.barber.kafka.avro.KafkaDemoAvroPublisher.$anonfun$send$3(KafkaDemoAvroPublisher.scala:54)
    at com.barber.kafka.avro.KafkaDemoAvroPublisher$$Lambda$17/746074699.apply$mcV$sp(Unknown Source)
    at com.barber.kafka.avro.KafkaDemoAvroPublisher.sendBadProducerValue(KafkaDemoAvroPublisher.scala:79)
    at com.barber.kafka.avro.KafkaDemoAvroPublisher.send(KafkaDemoAvroPublisher.scala:51)
    at com.barber.kafka.avro.PublisherApp$.delayedEndpoint$com$barber$kafka$avro$PublisherApp$1(PublisherApp.scala:6)
    at com.barber.kafka.avro.PublisherApp$delayedInit$body.apply(PublisherApp.scala:3)
    at scala.Function0.apply$mcV$sp(Function0.scala:34)
    at scala.Function0.apply$mcV$sp$(Function0.scala:34)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App.$anonfun$main$1$adapted(App.scala:76)
    at scala.App$$Lambda$5/511833308.apply(Unknown Source)
    at scala.collection.immutable.List.foreach(List.scala:378)
    at scala.App.main(App.scala:76)
    at scala.App.main$(App.scala:74)
    at com.barber.kafka.avro.PublisherApp$.main(PublisherApp.scala:3)
    at com.barber.kafka.avro.PublisherApp.main(PublisherApp.scala)
o

 

 

We then try send some Completely Different Object With String Id Field Schema data, which we expect to be NOT acceptable/incompatible with the User Schema already registered in the Kafka Registry. This is due to the fact we have changed the Id field into a string, where the already registered User Schema expects this field to be an “int”, and we have also omitted a required “name” field in this new type.

So does it fail?

 

In a word yes, here is the relevant output

 

Sending AnotherExampleWithStringId
Producer sending data {“id”: “fdfdfdsdsfs”}

06:47:06.059 [main] DEBUG io.confluent.kafka.schemaregistry.client.rest.RestService – Sending POST with input {“schema”:”{\”type\”:\”record\”,\”name\”:\”AnotherExampleWithStringId\”,\”namespace\”:\”com.barber.kafka.avro\”,\”fields\”:[{\”name\”:\”id\”,\”type\”:\”string\”}]}”} to http://localhost:8081/subjects/avro-specific-demo-topic-value/versions
==============================================================================

We were expecting this due to incompatble ‘AnotherExampleWithStringId’ item being sent

==============================================================================

()

org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: {“type”:”record”,”name”:”AnotherExampleWithStringId”,”namespace”:”com.barber.kafka.avro”,”fields”:[{“name”:”id”,”type”:”string”}]}
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema; error code: 409
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:170)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:188)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:245)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:237)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:232)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:59)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:91)
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:72)
    at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:54)
    at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:65)
    at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:55)
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:807)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:784)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:671)
    at com.barber.kafka.avro.KafkaDemoAvroPublisher.$anonfun$send$4(KafkaDemoAvroPublisher.scala:63)
    at com.barber.kafka.avro.KafkaDemoAvroPublisher$$Lambda$18/871790326.apply$mcV$sp(Unknown Source)
    at com.barber.kafka.avro.KafkaDemoAvroPublisher.sendBadProducerValue(KafkaDemoAvroPublisher.scala:79)
    at com.barber.kafka.avro.KafkaDemoAvroPublisher.send(KafkaDemoAvroPublisher.scala:60)
    at com.barber.kafka.avro.PublisherApp$.delayedEndpoint$com$barber$kafka$avro$PublisherApp$1(PublisherApp.scala:6)
    at com.barber.kafka.avro.PublisherApp$delayedInit$body.apply(PublisherApp.scala:3)
    at scala.Function0.apply$mcV$sp(Function0.scala:34)
    at scala.Function0.apply$mcV$sp$(Function0.scala:34)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App.$anonfun$main$1$adapted(App.scala:76)
    at scala.App$$Lambda$5/511833308.apply(Unknown Source)
    at scala.collection.immutable.List.foreach(List.scala:378)
    at scala.App.main(App.scala:76)
    at scala.App.main$(App.scala:74)
    at com.barber.kafka.avro.PublisherApp$.main(PublisherApp.scala:3)
    at com.barber.kafka.avro.PublisherApp.main(PublisherApp.scala)

Ok so that gives us a glimpse of what it’s like to work with the Kafka Producer and the Schema Registry. But surely there is more we can do with the Schema Registry REST API that is mentioned above?

 

Well yeah there is, we will now look at a second example in the codebase which will try a few more Schema Registry REST API calls.

 

 

 

 

Understanding Registry API

 

This is this project in the GitHub demo code : https://github.com/sachabarber/KafkaAvroExamples/tree/master/KafkaSchemaRegistryTests

 

I have chosen to use Akka Http for the REST API calls.

 

These are the 3 resources used by the code

 

compatibilityBACKWARD.json

{"compatibility": "BACKWARD"}

 

compatibilityNONE.json

{"compatibility": "NONE"}

 

simpleStringSchema.avsc

{"schema": "{\"type\": \"string\"}"}

 

 

Here is the entire codebase for various operations against the Schema Registry

import scala.io.Source
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import scala.concurrent._
import scala.concurrent.duration._
import akka.http.scaladsl.model._
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.stream.ActorMaterializer
import scala.concurrent.Future
import scala.util.{Failure, Success}
import akka.http.scaladsl.unmarshalling.Unmarshal
import AkkaHttpImplicits.{executionContext, materializer, system}

object RegistryApp extends App {

  var payload = ""
  var result = ""
  val schemaRegistryMediaType = MediaType.custom("application/vnd.schemaregistry.v1+json",false)
  implicit  val c1 = ContentType(schemaRegistryMediaType, () => HttpCharsets.`UTF-8`)

  //These queries are the same ones found here, but instead of using curl I am using Akka Http
  //https://docs.confluent.io/current/schema-registry/docs/intro.html


  //  # Register a new version of a schema under the subject "Kafka-key"
  //  $ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  //  --data '{"schema": "{\"type\": \"string\"}"}' \
  //  http://localhost:8081/subjects/Kafka-key/versions
  //  {"id":1}
  payload = Source.fromURL(getClass.getResource("/simpleStringSchema.avsc")).mkString
  result = post(payload,"http://localhost:8081/subjects/Kafka-key/versions")
  println("Register a new version of a schema under the subject \"Kafka-key\"")
  println("EXPECTING {\"id\":1}")
  println(s"GOT ${result}\r\n")


  //  # Register a new version of a schema under the subject "Kafka-value"
  //  $ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  //  --data '{"schema": "{\"type\": \"string\"}"}' \
  //  http://localhost:8081/subjects/Kafka-value/versions
  //  {"id":1}
  payload = Source.fromURL(getClass.getResource("/simpleStringSchema.avsc")).mkString
  result = post(payload,"http://localhost:8081/subjects/Kafka-value/versions")
  println("Register a new version of a schema under the subject \"Kafka-value\"")
  println("EXPECTING {\"id\":1}")
  println(s"GOT ${result}\r\n")


  //  # List all subjects
  //  $ curl -X GET http://localhost:8081/subjects
  //    ["Kafka-value","Kafka-key"]
  result = get("http://localhost:8081/subjects")
  println("List all subjects")
  println("EXPECTING [\"Kafka-value\",\"Kafka-key\"]")
  println(s"GOT ${result}\r\n")


  //  # Fetch a schema by globally unique id 1
  //  $ curl -X GET http://localhost:8081/schemas/ids/1
  //  {"schema":"\"string\""}
  result = get("http://localhost:8081/schemas/ids/1")
  println("Fetch a schema by globally unique id 1")
  println("EXPECTING {\"schema\":\"\\\"string\\\"\"}")
  println(s"GOT ${result}\r\n")


  //  # List all schema versions registered under the subject "Kafka-value"
  //  $ curl -X GET http://localhost:8081/subjects/Kafka-value/versions
  //    [1]
  result = get("http://localhost:8081/subjects/Kafka-value/versions")
  println("List all schema versions registered under the subject \"Kafka-value\"")
  println("EXPECTING [1]")
  println(s"GOT ${result}\r\n")


  //  # Fetch version 1 of the schema registered under subject "Kafka-value"
  //  $ curl -X GET http://localhost:8081/subjects/Kafka-value/versions/1
  //  {"subject":"Kafka-value","version":1,"id":1,"schema":"\"string\""}
  result = get("http://localhost:8081/subjects/Kafka-value/versions/1")
  println("Fetch version 1 of the schema registered under subject \"Kafka-value\"")
  println("EXPECTING {\"subject\":\"Kafka-value\",\"version\":1,\"id\":1,\"schema\":\"\\\"string\\\"\"}")
  println(s"GOT ${result}\r\n")


  //  # Deletes version 1 of the schema registered under subject "Kafka-value"
  //  $ curl -X DELETE http://localhost:8081/subjects/Kafka-value/versions/1
  //  1
  result = delete("http://localhost:8081/subjects/Kafka-value/versions/1")
  println("Deletes version 1 of the schema registered under subject \"Kafka-value\"")
  println("EXPECTING 1")
  println(s"GOT ${result}\r\n")


  //  # Register the same schema under the subject "Kafka-value"
  //  $ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  //  --data '{"schema": "{\"type\": \"string\"}"}' \
  //  http://localhost:8081/subjects/Kafka-value/versions
  //  {"id":1}
  payload = Source.fromURL(getClass.getResource("/simpleStringSchema.avsc")).mkString
  result = post(payload,"http://localhost:8081/subjects/Kafka-value/versions")
  println("Register the same schema under the subject \"Kafka-value\"")
  println("EXPECTING {\"id\":1}")
  println(s"GOT ${result}\r\n")


  //  # Deletes the most recently registered schema under subject "Kafka-value"
  //  $ curl -X DELETE http://localhost:8081/subjects/Kafka-value/versions/latest
  //  2
  result = delete("http://localhost:8081/subjects/Kafka-value/versions/latest")
  println("Deletes the most recently registered schema under subject \"Kafka-value\"")
  println("EXPECTING 2")
  println(s"GOT ${result}\r\n")


  //  # Register the same schema under the subject "Kafka-value"
  //  $ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  //  --data '{"schema": "{\"type\": \"string\"}"}' \
  //  http://localhost:8081/subjects/Kafka-value/versions
  //  {"id":1}
  payload = Source.fromURL(getClass.getResource("/simpleStringSchema.avsc")).mkString
  result = post(payload,"http://localhost:8081/subjects/Kafka-value/versions")
  println("Register the same schema under the subject \"Kafka-value\"")
  println("EXPECTING {\"id\":1}")
  println(s"GOT ${result}\r\n")


  //  # Fetch the schema again by globally unique id 1
  //  $ curl -X GET http://localhost:8081/schemas/ids/1
  //  {"schema":"\"string\""}
  result = get("http://localhost:8081/schemas/ids/1")
  println("Fetch the schema again by globally unique id 1")
  println("EXPECTING {\"schema\":\"\\\"string\\\"\"}")
  println(s"GOT ${result}\r\n")


  //  # Check whether a schema has been registered under subject "Kafka-key"
  //  $ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  //  --data '{"schema": "{\"type\": \"string\"}"}' \
  //  http://localhost:8081/subjects/Kafka-key
  //  {"subject":"Kafka-key","version":1,"id":1,"schema":"\"string\""}
  payload = Source.fromURL(getClass.getResource("/simpleStringSchema.avsc")).mkString
  result = post(payload,"http://localhost:8081/subjects/Kafka-key")
  println("Check whether a schema has been registered under subject \"Kafka-key\"")
  println("EXPECTING {\"subject\":\"Kafka-key\",\"version\":1,\"id\":1,\"schema\":\"\\\"string\\\"\"}")
  println(s"GOT ${result}\r\n")


  //  # Test compatibility of a schema with the latest schema under subject "Kafka-value"
  //  $ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  //  --data '{"schema": "{\"type\": \"string\"}"}' \
  //  http://localhost:8081/compatibility/subjects/Kafka-value/versions/latest
  //  {"is_compatible":true}
  payload = Source.fromURL(getClass.getResource("/simpleStringSchema.avsc")).mkString
  result = post(payload,"http://localhost:8081/compatibility/subjects/Kafka-value/versions/latest")
  println("Test compatibility of a schema with the latest schema under subject \"Kafka-value\"")
  println("EXPECTING {\"is_compatible\":true}")
  println(s"GOT ${result}\r\n")


  //  # Get top level config
  //    $ curl -X GET http://localhost:8081/config
  //    {"compatibilityLevel":"BACKWARD"}
  result = get("http://localhost:8081/config")
  println("Get top level config")
  println("EXPECTING {\"compatibilityLevel\":\"BACKWARD\"}")
  println(s"GOT ${result}\r\n")


  //  # Update compatibility requirements globally
  //    $ curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  //  --data '{"compatibility": "NONE"}' \
  //  http://localhost:8081/config
  //    {"compatibility":"NONE"}
  payload = Source.fromURL(getClass.getResource("/compatibilityNONE.json")).mkString
  result = put(payload,"http://localhost:8081/config")
  println("Update compatibility requirements globally")
  println("EXPECTING {\"compatibility\":\"NONE\"}")
  println(s"GOT ${result}\r\n")


  //  # Update compatibility requirements under the subject "Kafka-value"
  //  $ curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  //  --data '{"compatibility": "BACKWARD"}' \
  //  http://localhost:8081/config/Kafka-value
  //  {"compatibility":"BACKWARD"}
  payload = Source.fromURL(getClass.getResource("/compatibilityBACKWARD.json")).mkString
  result = put(payload,"http://localhost:8081/config/Kafka-value")
  println("Update compatibility requirements under the subject \"Kafka-value\"")
  println("EXPECTING {\"compatibility\":\"BACKWARD\"}")
  println(s"GOT ${result}\r\n")


  //  # Deletes all schema versions registered under the subject "Kafka-value"
  //  $ curl -X DELETE http://localhost:8081/subjects/Kafka-value
  //    [3]
  result = delete("http://localhost:8081/subjects/Kafka-value")
  println("Deletes all schema versions registered under the subject \"Kafka-value\"")
  println("EXPECTING [3]")
  println(s"GOT ${result}\r\n")

  //  # List all subjects
  //  $ curl -X GET http://localhost:8081/subjects
  //    ["Kafka-key"]
  result = get("http://localhost:8081/subjects")
  println("List all subjects")
  println("EXPECTING [\"Kafka-key\"]")
  println(s"GOT ${result}\r\n")


  private[RegistryApp] def post(data: String, url:String)(implicit contentType:ContentType): String = {
    sendData(data, url, HttpMethods.POST, contentType)
  }

  private[RegistryApp] def put(data: String, url:String)(implicit contentType:ContentType): String = {
    sendData(data, url, HttpMethods.PUT, contentType)
  }

  private[RegistryApp] def sendData(data: String, url:String, method:HttpMethod, contentType:ContentType): String = {
    val responseFuture: Future[HttpResponse] =
      Http(system).singleRequest(
        HttpRequest(
          method,
          url,
          entity = HttpEntity(contentType, data.getBytes())
        )
      )
    val html = Await.result(responseFuture.flatMap(x => Unmarshal(x.entity).to[String]), 5 seconds)
    html
  }

  private[RegistryApp] def get(url:String)(implicit contentType:ContentType): String = {
    noBodiedRequest(url, HttpMethods.GET, contentType)
  }

  private[RegistryApp] def delete(url:String)(implicit contentType:ContentType): String = {
    noBodiedRequest(url, HttpMethods.DELETE, contentType)
  }

  private[RegistryApp] def noBodiedRequest(url:String,method:HttpMethod, contentType:ContentType): String = {
    val responseFuture: Future[HttpResponse] = Http(system).singleRequest(HttpRequest(method,url))
    val html = Await.result(responseFuture.flatMap(x => Unmarshal(x.entity).to[String]), 5 seconds)
    html
  }
}

 

 

And this is the output, which I hope is pretty self explanatory now that we have spent some time with the previous explanations/tests. You can read more about the Kafka Schema Registry REST API here : https://docs.confluent.io/current/schema-registry/docs/api.html

 

Register a new version of a schema under the subject “Kafka-key”
EXPECTING {“id”:1}
GOT {“id”:1}

 

Register a new version of a schema under the subject “Kafka-value”
EXPECTING {“id”:1}
GOT {“id”:1}

 

List all subjects
EXPECTING [“Kafka-value”,”Kafka-key”]
GOT [“Kafka-value”,”Kafka-key”]

 

Fetch a schema by globally unique id 1
EXPECTING {“schema”:”\”string\””}
GOT {“schema”:”\”string\””}

 

List all schema versions registered under the subject “Kafka-value”
EXPECTING [1]
GOT [1]

 

Fetch version 1 of the schema registered under subject “Kafka-value”
EXPECTING {“subject”:”Kafka-value”,”version”:1,”id”:1,”schema”:”\”string\””}
GOT {“subject”:”Kafka-value”,”version”:1,”id”:1,”schema”:”\”string\””}

 

Deletes version 1 of the schema registered under subject “Kafka-value”
EXPECTING 1
GOT 1

 

Register the same schema under the subject “Kafka-value”
EXPECTING {“id”:1}
GOT {“id”:1}

 

Deletes the most recently registered schema under subject “Kafka-value”
EXPECTING 2
GOT 2

 

Register the same schema under the subject “Kafka-value”
EXPECTING {“id”:1}
GOT {“id”:1}

 

Fetch the schema again by globally unique id 1
EXPECTING {“schema”:”\”string\””}
GOT {“schema”:”\”string\””}

 

Check whether a schema has been registered under subject “Kafka-key”
EXPECTING {“subject”:”Kafka-key”,”version”:1,”id”:1,”schema”:”\”string\””}
GOT {“subject”:”Kafka-key”,”version”:1,”id”:1,”schema”:”\”string\””}

 

Test compatibility of a schema with the latest schema under subject “Kafka-value”
EXPECTING {“is_compatible”:true}
GOT {“is_compatible”:true}

 

Get top level config
EXPECTING {“compatibilityLevel”:”BACKWARD”}
GOT {“compatibilityLevel”:”BACKWARD”}

 

Update compatibility requirements globally
EXPECTING {“compatibility”:”NONE”}
GOT {“compatibility”:”NONE”}

 

Update compatibility requirements under the subject “Kafka-value”
EXPECTING {“compatibility”:”BACKWARD”}
GOT {“compatibility”:”BACKWARD”}

 

Deletes all schema versions registered under the subject “Kafka-value”
EXPECTING [3]
GOT [3]

 

List all subjects
EXPECTING [“Kafka-key”]
GOT [“Kafka-key”]

 

 

 

 

Conclusion

As with most things in the Confluent platform the Schema Registry is a great bit of kit and easy to use. I urge you all to give it a spin

Apache Kafka Specific Avro Producer/Consumer + Kafka Schema Registry

This is the 2nd post in a small mini series that I will be doing using Apache Kafka + Avro. The programming language will be Scala. As such the following prerequisites need to be obtained should you wish to run the code that goes along with each post. The other point is that I am mainly a Windows user, as such the instructions, scripts will have a Windows bias to them. So if you are not a Windows user you will need to find the instructions for your OS of choice.

 

Prerequisites

 

So go and grab that lot if you want to follow along.

 

Last time we talked about how to create a Kafka Producer/Consumer which did use Avro, but used the GenericRecord approach, which kind of works more like a dictionary of key value pairs. This time we will be talking about how to use the KafkaAvroSerializer to send specific Avro types using Kafka and the Kafka Schema Registry.

 

Kafka Schema Registry

So just what is the Kafka Schema Registry?

 

Schema Registry is part of the Confluent Open Source and Confluent Enterprise distributions. The Schema Registry stores a versioned history of all schemas and allows for the evolution of schemas according to the configured compatibility settings and expanded Avro support.

https://docs.confluent.io/current/schema-registry/docs/index.html up on date 26/06/18

 

I don’t want to dwell too much on the internals of the Kafka Schema Registry as we will be looking at in fine detail in the next post. But for now, it is important to understand that both Kafka/KafkaStreams suport using the Schema Registry when sending/receiving Avro content. They will also raise Exceptions should you try to send incompatible Avro content through a topic which uses the Schema Registry that breaks the Schemas compatibility rules within the Schema Registry.

If all of that sounds a bit whack, don’t worry we will be getting into it more in the next post

 

I just wanted to give a small overview of how the Schema Registry fits with the Kafka eco system. The rest of this article will be more about how we can make use of the KafkaAvroSerializer  to send Avro data of our own types rather than use the previously demonstrated GenericRecord approach.

 

Where is the code for this post?

You can grab the code that goes with this post from here : https://github.com/sachabarber/KafkaAvroExamples/tree/master/KafkaSpecificAvro

 

SBT

I decided that I would take the hit and craft a multi-project SBT setup, which looks like this

import Deps._
import sbt.Keys.scalaVersion

lazy val root = (project in file(".")).
  aggregate(publisher, subscriber).
  settings(
    inThisBuild(List(
      organization := "com.barber",
      scalaVersion := "2.12.1",
      resolvers += "Sonatype OSS Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots",
      resolvers += "io.confluent" at "http://packages.confluent.io/maven/"
    )),
    name := "scala_kafka_specific_avro_example"
  )

lazy val publisher = (project in file ("publisher")).
  settings(
    name := "publisher",
    /* sourceGenerators in Compile += (avroScalaGenerateSpecific in Compile).taskValue, */
    libraryDependencies ++= Seq(
      kafka,
      avro,
      avroSerializer,
      logBack)
  ).dependsOn(common)

lazy val subscriber = (project in file ("subscriber")).
  settings(
    name := "subscriber",
    /* sourceGenerators in Compile += (avroScalaGenerateSpecific in Compile).taskValue, */
    libraryDependencies ++= Seq(
      kafka,
      avro,
      avroSerializer,
      logBack)
  ).dependsOn(publisher, common)
  
  lazy val common = (project in file ("common")).
  settings(
    name := "common",
    libraryDependencies ++= Seq(
      kafka,
      avro,
      avroSerializer,
      logBack)
  )

 

Where the Deps.scala file looks like this

import sbt._

object Deps {
  lazy val kafka = "org.apache.kafka" % "kafka_2.11" % "1.1.0"
  lazy val avro =  "org.apache.avro" % "avro" % "1.8.2"
  lazy val avroSerializer = "io.confluent" % "kafka-avro-serializer" % "3.2.1"
  lazy val logBack = "ch.qos.logback" %  "logback-classic" % "1.1.7"
}

 

This essentially creates the following 3 projects

  • Common (this is where the shared Avro schema/object live)
  • Publisher
  • Subscriber

Once you have done a SBT clean/compile and opened it inside IntelliJ it should look something like this:

image

 

So now that we know roughly what the build file looks like, lets turn our attention to the 3 projects. I will walk through them below in turn

 

Common Project

As stated above the common project holds the Avro schema file (which is slightly different from the last post). The new Avro schema is as follows:

 

{
    "namespace": "com.barber.kafka.avro",
     "type": "record",
     "name": "User",
     "fields":[
         {
            "name": "id", "type": "int"
         },
         {
            "name": "name",  "type": "string"
         }
     ]
}

Unlike last time where we used a GenericRecord this time we would like to use a case class to send down the wire. Something like this

case class User(var id: Int, var name: String) 

 

Unfortunately this does not work, if you were to try and send this down the wire using the KafkaProducer whilst the Schema Registry is running, you will most likely get an Exception stating that it can not be serialized.

 

What we actually need to do is something more like this, where we need to extend the SpecificRecordBase trait and also supply the Schema and the ability to set/get values.

 

/** MACHINE-GENERATED FROM AVRO SCHEMA. DO NOT EDIT DIRECTLY */
package com.barber.kafka.avro

import scala.annotation.switch
import scala.io.Source

case class User(var id: Int, var name: String) extends org.apache.avro.specific.SpecificRecordBase {
  def this() = this(0, "")
  def get(field$: Int): AnyRef = {
    (field$: @switch) match {
      case pos if pos == 0 => {
        id
      }.asInstanceOf[AnyRef]
      case pos if pos == 1 => {
        name
      }.asInstanceOf[AnyRef]
      case _ => new org.apache.avro.AvroRuntimeException("Bad index")
    }
  }
  def put(field$: Int, value: Any): Unit = {
    (field$: @switch) match {
      case pos if pos == 0 => this.id = {
        value
      }.asInstanceOf[Int]
      case pos if pos == 1 => this.name = {
        value.toString
      }.asInstanceOf[String]
      case _ => new org.apache.avro.AvroRuntimeException("Bad index")
    }
    ()
  }
  def getSchema: org.apache.avro.Schema = User.SCHEMA$
}

object User {
  val SCHEMA$ = new org.apache.avro.Schema.Parser().parse(
    Source.fromURL(getClass.getResource("/userSchema.avsc")).mkString)
}

 

Now that is quite a bit of code to write. Surely there are tools out there for this job aren’t there?

 

Why yes there are.

 

 

image

 

So that is all the Common project is really, its just a place for the shared Avro objects to live that the Publisher/Subscriber projects can use.

 

Publisher Project

This is the entire code for the Publisher

package com.barber.kafka.avro

import java.util.{Properties, UUID}
import io.confluent.kafka.serializers.KafkaAvroSerializer
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer

class KafkaDemoAvroPublisher(val topic:String) {

  private val props = new Properties()
  props.put("bootstrap.servers", "localhost:9092")
  props.put("schema.registry.url", "http://localhost:8081")
  props.put("key.serializer", classOf[StringSerializer].getCanonicalName)
  props.put("value.serializer",classOf[KafkaAvroSerializer].getCanonicalName)
  props.put("client.id", UUID.randomUUID().toString())

  private val producer =   new KafkaProducer[String,User](props)

  def send(): Unit = {
    try {
      val rand =  new scala.util.Random(44343)

      for(i <- 1 to 10) {
        val id = rand.nextInt()
        val itemToSend = User(id , "ishot.com")
        println(s"Producer sending data ${itemToSend.toString}")
        producer.send(new ProducerRecord[String, User](topic, itemToSend))
        producer.flush()
      }
    } catch {
      case ex: Exception =>
        println(ex.printStackTrace().toString)
        ex.printStackTrace()
    }
  }
}

The main points are these:

  • We now include a property for the schema.registry.url
  • We now use the KafkaAvroSerializer for the topic value serializer
  • We use the User class that we talked about in the common project, that was generated from the Avro schema talked about above. It is this User Avro object that we push out on the Kafka topic

 

Subscriber Project

Here is the subscriber code

package com.barber.kafka.avro

import java.util.Properties
import org.apache.kafka.clients.consumer.KafkaConsumer
import java.util.Collections
import org.apache.kafka.common.errors.TimeoutException
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.common.serialization.StringDeserializer

class KafkaDemoAvroSubscriber(val topic:String) {

  private val props = new Properties()
  val groupId = "avro-demo-topic-consumer"
  var shouldRun : Boolean = true

  props.put("bootstrap.servers", "localhost:9092")
  props.put("schema.registry.url", "http://localhost:8081")
  props.put("group.id", groupId)
  props.put("enable.auto.commit", "true")
  props.put("auto.commit.interval.ms", "10000")
  props.put("session.timeout.ms", "30000")
  props.put("consumer.timeout.ms", "120000")
  props.put("key.deserializer", classOf[StringDeserializer].getCanonicalName)
  props.put("value.deserializer",classOf[KafkaAvroDeserializer].getCanonicalName)
  //Use Specific Record or else you get Avro GenericRecord.
  props.put("specific.avro.reader", "true")

  private val consumer = new KafkaConsumer[String, com.barber.kafka.avro.User](props)

  def start() = {

    try {
      Runtime.getRuntime.addShutdownHook(new Thread(() => close()))

      consumer.subscribe(Collections.singletonList(topic))

      while (shouldRun) {
        val records: ConsumerRecords[String,  com.barber.kafka.avro.User] = consumer.poll(1000)
        val it = records.iterator()
        while(it.hasNext()) {
          println("Getting message from queue.............")
          val record: ConsumerRecord[String,  com.barber.kafka.avro.User] = it.next()
          val recievedItem =record.value()
          println(s"Saw User ${recievedItem}")
          consumer.commitSync
        }
      }
    }
    catch {
      case timeOutEx: TimeoutException =>
        println("Timeout ")
        false
      case ex: Exception => ex.printStackTrace()
        println("Got error when reading message ")
        false
    }
  }

  def close(): Unit = shouldRun = false
}

 

This is not that different from last time, the main points are :

  • We now include a property for the schema.registry.url
  • We now use the KafkaAvroDeSerializer for the topic value deserializer
  • We must set the property specific.avro.reader to have a value of true, which tells Kafka / Schema Registry that we will be using a specific Avro type (User type in this case) otherwise Kafka will expect GenericRecord to be used on the topic
  • We use the User class that we talked about in the common project, that was generated from the Avro schema talked about above. It is this User Avro object that we receive on the Kafka topic

 

So how do I run this stuff?

As I stated above you will need to download a few things, but once you have those in place you may find the small PowerShell script useful that is inside the project called “RunThePipeline.ps1”. This script does a few things, such as cleans the Kafka/Zookeeper logs, stops any previous instances, starts new instances and also creates the Kafka topic (which you must have before you can use the code).

 

IMPORTANT NOTE : I have altered the Kafka log paths, and where Zookeeper logs to. This can be done using the relevant properties files from the Confluent installation

 

image

 

server.properties

This line was changed

 

# A comma seperated list of directories under which to store log files
log.dirs=c:/temp/kafka-logs

 

zookeeper.properties

This line was changed

 

# the directory where the snapshot is stored.
dataDir=c:/temp/zookeeper

 

The PowerShell script will be making assumptions based on the where you changed these values to, so if you have different settings for these please edit the PowerShell script too )

 

Essentially you need to have the following running before you can run the code example (this is what the PowerShell script automates for you, though you may prefer to use the Confluent CLI, I just had this script from a previous project, and it also cleans out old data which is useful when you are experimenting, and it also creates the required topic)

  • Zookeeper
  • Kafka broker
  • Starts the Schema Registry
  • Kafka topic created

Once you have that, you can simply open the code in IntelliJ and right click and run the 2 apps (or set up new configurations)

 

Run the Publisher

 

image

 

Run the Subscriber

 

image

 

Once you have the run the PowerShell “RunThePipeline.ps1” script associated with this code (the one in the demo folder) and run the Publisher/Subscriber as shown above, you should see some output something like this

 

Publisher output

image

 

Subscriber output

image

 

 

Conclusion

Using Avro with Kafka / Schema Registry is fairly straight forward, and is not that different from working with regular Array of bytes. I hope this post has wet your appetite a bit for the Kafka Schema registry which we will look into in a lot more detail in the next post.

 

 

 

 

 

 

Apache Kafka Generic Avro Producer/Consumer

This is the 1st post in a small mini series that I will be doing using Apache Kafka + Avro. The programming language will be Scala. As such the following prerequisites need to be obtained should you wish to run the code that goes along with each post. The other point is that I am mainly a Windows user, as such the instructions, scripts will have a Windows bias to them. So if you are not a Windows user you will need to find the instructions for your OS of choice.

 

Prerequisites

 

So go and grab that lot if you want to follow along.

 

Avro Introduction

Since this mini series will be using Avro throughout I think it is a good idea that we include an introductory section on Avro, which I have shamelessly stolen from Wikipedia

 

Avro is a remote procedure call and data serialization framework developed within Apache’s Hadoop project. It uses JSON for defining data types and protocols, and serializes data in a compact binary format. Its primary use is in Apache Hadoop, where it can provide both a serialization format for persistent data, and a wire format for communication between Hadoop nodes, and from client programs to the Hadoop services.

It is similar to Thrift and Protocol Buffers, but does not require running a code-generation program when a schema changes (unless desired for statically-typed languages).

 

Avro Object Container File

An Avro Object Container File consists of:

  • A file header, followed by
  • one or more file data blocks.

A file header consists of:

  • Four bytes, ASCII ‘O’, ‘b’, ‘j’, followed by 1
  • file metadata, including the schema definition
  • The 16-byte, randomly-generated sync marker for this file.

For data blocks Avro specifies two serialization encodings: binary and JSON. Most applications will use the binary encoding, as it is smaller and faster. For debugging and web-based applications, the JSON encoding may sometimes be appropriate.

 

Schema definition

Avro schemas are defined using JSON. Schemas are composed of primitive types (null, boolean, int, long, float, double, bytes, and string) and complex types (record, enum, array, map, union, and fixed).

Simple schema example:

{
   "namespace": "example.avro",
   "type": "record",
   "name": "User",
   "fields": [
      {"name": "name", "type": "string"},
      {"name": "favorite_number",  "type": ["int", "null"]},
      {"name": "favorite_color", "type": ["string", "null"]}
   ] 
 }

 

Serializing and deserializing

Data in Avro might be stored with its corresponding schema, meaning a serialized item can be read without knowing the schema ahead of time.

 

Example serialization and deserialization code in Python

Serialization

import avro.schema
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter

schema = avro.schema.parse(open("user.avsc").read())  # need to know the schema to write

writer = DataFileWriter(open("users.avro", "w"), DatumWriter(), schema)
writer.append({"name": "Alyssa", "favorite_number": 256})
writer.append({"name": "Ben", "favorite_number": 7, "favorite_color": "red"})
writer.close()

 

File “users.avro” will contain the schema in JSON and a compact binary representation of the data:

image

 

Deserialization

 

reader = DataFileReader(open("users.avro", "r"), DatumReader())  # no need to know the schema to read
for user in reader:
    print user
reader.close()

 

This outputs:

 

{u'favorite_color': None, u'favorite_number': 256, u'name': u'Alyssa'}
{u'favorite_color': u'red', u'favorite_number': 7, u'name': u'Ben'}

 

The above section taken from https://en.wikipedia.org/wiki/Apache_Avro up on date 21/06/18

 

Kafka Producer/Consumer using Generic Avro Record

The code for this post can be found here : https://github.com/sachabarber/KafkaAvroExamples

 

Ok so now  that we know a little bit more about Apache Avro, let’s turn our attention back to the real heart of this mini series, which is how do I use Kafka with Avro?

Kafka has deep support for Avro and as such there are a few ways that we could proceed, for example we can use generic Avro messages (array of bytes) or we could use a specific type of object which would be used on the wire, we can also use the Schema Registry or not, we can can also use Avro when working with Kafka Streams.

This is all good stuff, and we will be covering all of this in this mini series.

 

However we must learn to crawl before we can walk, so lets start at the beginning.

 

What is the beginning?

Well the beginning is just what I thought was the simplest set of things I could get to work together that demonstrated how to produce/consume Avro data against a shared Avro schema.

As I say Kafka supports this in 2 different ways, either of these

  • GenericRecord, which serialize/deserialize as byte array
  • Specific Avro type

We will be looking   at both of these approaches. This post will be about how to use the GenericRecord within Kafka

 

Using GenericRecord

GenericRecord is actually an Apache Avro class that allows you to add field value by name or index, and get the values out by name and index. For this demo we will be using scala, as such we need a small SBT file to get us up and running (I have chosen to create a single IntelliJ IDEA project for this post, but in later posts I may create a project for producer and another for consumer).

 

Here is the SBT file for this post

name := "KafkaGenericAvro"

version := "1.0"

scalaVersion := "2.12.1"


resolvers ++= Seq(
  Classpaths.typesafeReleases,
  "confluent" at "http://packages.confluent.io/maven/",
  Resolver.mavenLocal
)

libraryDependencies ++= Seq(

  "org.apache.kafka" % "kafka_2.11" % "1.1.0",
  "org.apache.avro" % "avro" % "1.8.2",
  "io.confluent" % "kafka-avro-serializer" % "3.2.1",
  "ch.qos.logback" %  "logback-classic" % "1.1.7"
)

 

This will bring all the required dependencies. So now that we have that lets see how we can use a simple Avro message, which of course must start with an Avro schema which for this post is as follows (userSchema.avsc in the code for this post)

 

{
    "namespace": "kakfa-avro.test",
     "type": "record",
     "name": "user",
     "fields":[
         {
            "name": "id", "type": "int"
         },
         {
            "name": "name",  "type": "string"
         },
         {
            "name": "email", "type": ["string", "null"]
         }
     ]
}

 

So now that we have that, just how do we use Kafka to send out some data that uses this schema?

 

The code below is a complete Kafka Producer that will creates some Avro GenericRecord data objects that adhere to the schema above

 

package com.barber.kafka.avro

import java.util.{Properties, UUID}
import org.apache.avro.Schema
import org.apache.avro.Schema.Parser
import org.apache.avro.generic.GenericData
import org.apache.avro.generic.GenericRecord
import org.apache.avro.specific.SpecificDatumWriter
import java.io.ByteArrayOutputStream
import com.barber.kafka.avro.models.User
import org.apache.avro.io._
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}
import scala.io.Source

class KafkaDemoAvroProducer(val topic:String) {

  private val props = new Properties()
  props.put("bootstrap.servers", "localhost:9092")
  props.put("key.serializer", classOf[StringSerializer].getCanonicalName)
  props.put("value.serializer",classOf[ByteArraySerializer].getCanonicalName)
  props.put("client.id", UUID.randomUUID().toString())
  private val producer =   new KafkaProducer[String,Array[Byte]](props)

  //Read avro schema file and send out
  val schema: Schema = new Parser().parse(
    Source.fromURL(getClass.getResource("/userSchema.avsc")).mkString)

  def send(): Unit = {
    try {
      while(true) {

          val id = scala.util.Random.nextInt()
          val user = User(id, "eva mendes", Some("eva@ishot.com"))
          val genericUser: GenericRecord = new GenericData.Record(schema)

          //Put data in that generic record object
          genericUser.put("id", user.id)
          genericUser.put("name", user.name)
          genericUser.put("email", user.email.orNull)

          // Serialize generic record object into byte array
          val writer = new SpecificDatumWriter[GenericRecord](schema)
          val out = new ByteArrayOutputStream()
          val encoder: BinaryEncoder = EncoderFactory.get().binaryEncoder(out, null)
          writer.write(genericUser, encoder)
          encoder.flush()
          out.close()

          val serializedBytes: Array[Byte] = out.toByteArray()
          println(s"Producer sending data ${serializedBytes.toString}")
          producer.send(new ProducerRecord[String, Array[Byte]](topic, serializedBytes))
      }
    } catch {
      case ex: Exception =>
        println(ex.printStackTrace().toString)
        ex.printStackTrace()
    }
  }
}

The main takeaways from the code above are

  • We use a StringSerializer for the Kafka key
  • We use a ByteArraySerializer for the Kafka key
  • We use Avro GenericRecord class which we add our field information to
  • We use a BinaryEncoder and a spcecialized writer that knows about the Avro schema which we read in
  • We use a KafkaProducer[String, Array[Byte]]

 

So that’s all there is to Producer, so what about the Consumer, what does that look like, well it looks like this

package com.barber.kafka.avro

import java.util.Properties
import com.barber.kafka.avro.models.User
import org.apache.avro.Schema
import org.apache.avro.io.DatumReader
import org.apache.avro.io.Decoder
import org.apache.avro.specific.SpecificDatumReader
import org.apache.avro.generic.GenericRecord
import org.apache.avro.io.DecoderFactory
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.errors.TimeoutException
import java.util.Collections
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer}
import scala.io.Source
import scala.util.{Failure, Success, Try}


class KafkaDemoAvroConsumer(val topic:String) {

  private val props = new Properties()
  val groupId = "avro-demo-topic-consumer"
  val schemaString = Source.fromURL(getClass.getResource("/userSchema.avsc")).mkString
  val schema: Schema = new Schema.Parser().parse(schemaString)
  var shouldRun : Boolean = true

  props.put("bootstrap.servers", "localhost:9092")
  props.put("group.id", groupId)
  props.put("enable.auto.commit", "true")
  props.put("auto.commit.interval.ms", "10000")
  props.put("session.timeout.ms", "30000")
  props.put("consumer.timeout.ms", "120000")
  props.put("key.deserializer", classOf[StringDeserializer].getCanonicalName)
  props.put("value.deserializer",classOf[ByteArrayDeserializer].getCanonicalName)

  private val consumer = new KafkaConsumer[String, Array[Byte]](props)

  def start() = {

    try {
      Runtime.getRuntime.addShutdownHook(new Thread(() => close()))

      consumer.subscribe(Collections.singletonList(topic))

      while (shouldRun) {
        val records: ConsumerRecords[String,  Array[Byte]] = consumer.poll(1000)
        val it = records.iterator()
        while(it.hasNext()) {
          println("Getting message from queue.............")
          val record: ConsumerRecord[String,  Array[Byte]] = it.next()

          //val bytes = record.value()
          //val text = (bytes.map(_.toChar)).mkString
          //println(s"Saw Text ${text}")
          val user = parseUser(record.value())
          println(s"Saw User ${user}")
          consumer.commitSync
        }
      }
    }
    catch {
      case timeOutEx: TimeoutException =>
        println("Timeout ")
        false
      case ex: Exception => ex.printStackTrace()
        println("Got error when reading message ")
        false
    }
  }


  private def parseUser(message: Array[Byte]): Option[User] = {

    // Deserialize and create generic record
    val reader: DatumReader[GenericRecord] =
      new SpecificDatumReader[GenericRecord](schema)
    val decoder: Decoder = DecoderFactory.get().binaryDecoder(message, null)
    val userData: GenericRecord = reader.read(null, decoder)

    // Make user object
    val finalUser = Try[User](
      User(userData.get("id").toString.toInt, userData.get("name").toString, try {
        Some(userData.get("email").toString)
      } catch {
        case _ => None
      })
    )

    finalUser match {
      case Success(u) =>
        Some(u)
      case Failure(e) =>
        None
    }
  }

  def close(): Unit = shouldRun = false

}

The most relevant parts of this are

  • We use a KafkaConsumer[String, Array[Byte]]
  • We use a StringDeserializer for the Kafka key
  • We use a ByteArrayDeserializer for the Kafka key
  • We are able to parse the incoming byte array into a GenericRecord and then into our specific Avro object again

 

So in terms of actual code that is about it.

 

So how do I run this stuff?

As I stated above you will need to download a few things, but once you have those in place you may find the small PowerShell script useful that is inside the project called “RunThePipeline.ps1”. This script does a few things, such as cleans the Kafka/Zookeeper logs, stops any previous instances, starts new instances and also creates the Kafka topic (which you must have before you can use the code).

 

IMPORTANT NOTE : I have altered the Kafka log paths, and where Zookeeper logs to. This can be done using the relevant properties files from the Confluent installation

 

image

 

server.properties

This line was changed

 

# A comma seperated list of directories under which to store log files
log.dirs=c:/temp/kafka-logs

 

zookeeper.properties

This line was changed

 

# the directory where the snapshot is stored.
dataDir=c:/temp/zookeeper

 

The PowerShell script will be making assumptions based on the where you changed these values to, so if you have different settings for these please edit the PowerShell script too )

 

Essentially you need to have the following running before you can run the code example (this is what the PowerShell script automates for you, though you may prefer to use the Confluent CLI, I just had this script from a previous project, and it also cleans out old data which is useful when you are experimenting, and it also creates the required topic)

  • Zookeeper
  • Kafka broker
  • Kafka topic created

Once you have that, you can simply open the code in IntelliJ and right click and run the 2 apps (or set up new configurations)

 

Producer

image

 

Consumer

image

 

After you have done this you should be able to see producer/consumer output like this

 

Producer output

image

 

Consumer output

image

 

Conclusion

And that is it for this post, in the next post we will look at how we can use specific Avro objects instead of GenericRecord

Apache Kafka + Avro Mini Series

I am a big fan of Apache Kafka + Kafka Streams, but one element of the Confluent Platform (the people behind Kafka) that I always glossed over was the Schema Registry. I have a bit of time at the moment, so thought I would do a set of mini posts on how to use Apache Kafkas Schema Registry. The Schema Registry uses Avro, so I also thought it may make sense to do a couple of posts on that too.

 

As such this is the rough road map of what I will be doing :

 

 

MicroService Service Discovery Using Consul

 

I have just published a new article on micro service Service Discovery using Consul here : https://www.codeproject.com/Articles/1248381/Microservices-Service-Discovery

 

If you are currently doing/have done microservices you will know that one of the trickier elements to this is “Service Discovery”. This article will briefly talk about some existing solutions to this, and will spend the rest of the article talking about on particular framework for aiding in this area called “Consul”.

Problem We Are Trying To Solve

So what is the problem exactly. Lets consider this image

 

Say we have a bunch of services out in the wild SystemB/SystemC in this example, and we have other services (or UIs) SystemA/SystemD in this example, that want to make use of the existsing services. However in order to do that we need to know where these existing services live, or make some assumptions about their DNS names that should be long lived. This the act of “discovery”, and is essentially what the article will talk about in some detail. So if this floats your boat head on over to the article for further information

Crossbar.io quick look at it

A while ago someone posted another SignalR article on www.codeproject.com, and I stated hey you should take a look at this stuff : crossbar.io, and not liking to me someone that is not willing to die by the sword (afting living by it of course) I decided to put my money where my mouth was/is, and write a small article on crossbar.io.

So what its crossbar.io? Well quite simply it is a message broker that has many language bindings, that should all be able to communicate together seamlessly.

Here is what the people being crossbar.io have to say about their own product

Crossbar.io is an open source networking platform for distributed and microservice applications. It implements the open Web Application Messaging Protocol (WAMP), is feature rich, scalable, robust and secure. Let Crossbar.io take care of the hard parts of messaging so you can focus on your app’s features.

I decided to take a look at this a bit more which I have written up here : https://www.codeproject.com/Articles/1183744/Crossbar-io-a-quick-look-at-it