Quote of the day #8

“you miss one hundred percent of the shots you don’t take”

Advertisements

Apache Kafka Generic Avro Producer/Consumer

This is the 1st post in a small mini series that I will be doing using Apache Kafka + Avro. The programming language will be Scala. As such the following prerequisites need to be obtained should you wish to run the code that goes along with each post. The other point is that I am mainly a Windows user, as such the instructions, scripts will have a Windows bias to them. So if you are not a Windows user you will need to find the instructions for your OS of choice.

 

Prerequisites

 

So go and grab that lot if you want to follow along.

 

Avro Introduction

Since this mini series will be using Avro throughout I think it is a good idea that we include an introductory section on Avro, which I have shamelessly stolen from Wikipedia

 

Avro is a remote procedure call and data serialization framework developed within Apache’s Hadoop project. It uses JSON for defining data types and protocols, and serializes data in a compact binary format. Its primary use is in Apache Hadoop, where it can provide both a serialization format for persistent data, and a wire format for communication between Hadoop nodes, and from client programs to the Hadoop services.

It is similar to Thrift and Protocol Buffers, but does not require running a code-generation program when a schema changes (unless desired for statically-typed languages).

 

Avro Object Container File

An Avro Object Container File consists of:

  • A file header, followed by
  • one or more file data blocks.

A file header consists of:

  • Four bytes, ASCII ‘O’, ‘b’, ‘j’, followed by 1
  • file metadata, including the schema definition
  • The 16-byte, randomly-generated sync marker for this file.

For data blocks Avro specifies two serialization encodings: binary and JSON. Most applications will use the binary encoding, as it is smaller and faster. For debugging and web-based applications, the JSON encoding may sometimes be appropriate.

 

Schema definition

Avro schemas are defined using JSON. Schemas are composed of primitive types (null, boolean, int, long, float, double, bytes, and string) and complex types (record, enum, array, map, union, and fixed).

Simple schema example:

{
   "namespace": "example.avro",
   "type": "record",
   "name": "User",
   "fields": [
      {"name": "name", "type": "string"},
      {"name": "favorite_number",  "type": ["int", "null"]},
      {"name": "favorite_color", "type": ["string", "null"]}
   ] 
 }

 

Serializing and deserializing

Data in Avro might be stored with its corresponding schema, meaning a serialized item can be read without knowing the schema ahead of time.

 

Example serialization and deserialization code in Python

Serialization

import avro.schema
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter

schema = avro.schema.parse(open("user.avsc").read())  # need to know the schema to write

writer = DataFileWriter(open("users.avro", "w"), DatumWriter(), schema)
writer.append({"name": "Alyssa", "favorite_number": 256})
writer.append({"name": "Ben", "favorite_number": 7, "favorite_color": "red"})
writer.close()

 

File “users.avro” will contain the schema in JSON and a compact binary representation of the data:

image

 

Deserialization

 

reader = DataFileReader(open("users.avro", "r"), DatumReader())  # no need to know the schema to read
for user in reader:
    print user
reader.close()

 

This outputs:

 

{u'favorite_color': None, u'favorite_number': 256, u'name': u'Alyssa'}
{u'favorite_color': u'red', u'favorite_number': 7, u'name': u'Ben'}

 

The above section taken from https://en.wikipedia.org/wiki/Apache_Avro up on date 21/06/18

 

Kafka Producer/Consumer using Generic Avro Record

The code for this post can be found here : https://github.com/sachabarber/KafkaAvroExamples

 

Ok so now  that we know a little bit more about Apache Avro, let’s turn our attention back to the real heart of this mini series, which is how do I use Kafka with Avro?

Kafka has deep support for Avro and as such there are a few ways that we could proceed, for example we can use generic Avro messages (array of bytes) or we could use a specific type of object which would be used on the wire, we can also use the Schema Registry or not, we can can also use Avro when working with Kafka Streams.

This is all good stuff, and we will be covering all of this in this mini series.

 

However we must learn to crawl before we can walk, so lets start at the beginning.

 

What is the beginning?

Well the beginning is just what I thought was the simplest set of things I could get to work together that demonstrated how to produce/consume Avro data against a shared Avro schema.

As I say Kafka supports this in 2 different ways, either of these

  • GenericRecord, which serialize/deserialize as byte array
  • Specific Avro type

We will be looking   at both of these approaches. This post will be about how to use the GenericRecord within Kafka

 

Using GenericRecord

GenericRecord is actually an Apache Avro class that allows you to add field value by name or index, and get the values out by name and index. For this demo we will be using scala, as such we need a small SBT file to get us up and running (I have chosen to create a single IntelliJ IDEA project for this post, but in later posts I may create a project for producer and another for consumer).

 

Here is the SBT file for this post

name := "KafkaGenericAvro"

version := "1.0"

scalaVersion := "2.12.1"


resolvers ++= Seq(
  Classpaths.typesafeReleases,
  "confluent" at "http://packages.confluent.io/maven/",
  Resolver.mavenLocal
)

libraryDependencies ++= Seq(

  "org.apache.kafka" % "kafka_2.11" % "1.1.0",
  "org.apache.avro" % "avro" % "1.8.2",
  "io.confluent" % "kafka-avro-serializer" % "3.2.1",
  "ch.qos.logback" %  "logback-classic" % "1.1.7"
)

 

This will bring all the required dependencies. So now that we have that lets see how we can use a simple Avro message, which of course must start with an Avro schema which for this post is as follows (userSchema.avsc in the code for this post)

 

{
    "namespace": "kakfa-avro.test",
     "type": "record",
     "name": "user",
     "fields":[
         {
            "name": "id", "type": "int"
         },
         {
            "name": "name",  "type": "string"
         },
         {
            "name": "email", "type": ["string", "null"]
         }
     ]
}

 

So now that we have that, just how do we use Kafka to send out some data that uses this schema?

 

The code below is a complete Kafka Producer that will creates some Avro GenericRecord data objects that adhere to the schema above

 

package com.barber.kafka.avro

import java.util.{Properties, UUID}
import org.apache.avro.Schema
import org.apache.avro.Schema.Parser
import org.apache.avro.generic.GenericData
import org.apache.avro.generic.GenericRecord
import org.apache.avro.specific.SpecificDatumWriter
import java.io.ByteArrayOutputStream
import com.barber.kafka.avro.models.User
import org.apache.avro.io._
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}
import scala.io.Source

class KafkaDemoAvroProducer(val topic:String) {

  private val props = new Properties()
  props.put("bootstrap.servers", "localhost:9092")
  props.put("key.serializer", classOf[StringSerializer].getCanonicalName)
  props.put("value.serializer",classOf[ByteArraySerializer].getCanonicalName)
  props.put("client.id", UUID.randomUUID().toString())
  private val producer =   new KafkaProducer[String,Array[Byte]](props)

  //Read avro schema file and send out
  val schema: Schema = new Parser().parse(
    Source.fromURL(getClass.getResource("/userSchema.avsc")).mkString)

  def send(): Unit = {
    try {
      while(true) {

          val id = scala.util.Random.nextInt()
          val user = User(id, "eva mendes", Some("eva@ishot.com"))
          val genericUser: GenericRecord = new GenericData.Record(schema)

          //Put data in that generic record object
          genericUser.put("id", user.id)
          genericUser.put("name", user.name)
          genericUser.put("email", user.email.orNull)

          // Serialize generic record object into byte array
          val writer = new SpecificDatumWriter[GenericRecord](schema)
          val out = new ByteArrayOutputStream()
          val encoder: BinaryEncoder = EncoderFactory.get().binaryEncoder(out, null)
          writer.write(genericUser, encoder)
          encoder.flush()
          out.close()

          val serializedBytes: Array[Byte] = out.toByteArray()
          println(s"Producer sending data ${serializedBytes.toString}")
          producer.send(new ProducerRecord[String, Array[Byte]](topic, serializedBytes))
      }
    } catch {
      case ex: Exception =>
        println(ex.printStackTrace().toString)
        ex.printStackTrace()
    }
  }
}

The main takeaways from the code above are

  • We use a StringSerializer for the Kafka key
  • We use a ByteArraySerializer for the Kafka key
  • We use Avro GenericRecord class which we add our field information to
  • We use a BinaryEncoder and a spcecialized writer that knows about the Avro schema which we read in
  • We use a KafkaProducer[String, Array[Byte]]

 

So that’s all there is to Producer, so what about the Consumer, what does that look like, well it looks like this

package com.barber.kafka.avro

import java.util.Properties
import com.barber.kafka.avro.models.User
import org.apache.avro.Schema
import org.apache.avro.io.DatumReader
import org.apache.avro.io.Decoder
import org.apache.avro.specific.SpecificDatumReader
import org.apache.avro.generic.GenericRecord
import org.apache.avro.io.DecoderFactory
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.errors.TimeoutException
import java.util.Collections
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer}
import scala.io.Source
import scala.util.{Failure, Success, Try}


class KafkaDemoAvroConsumer(val topic:String) {

  private val props = new Properties()
  val groupId = "avro-demo-topic-consumer"
  val schemaString = Source.fromURL(getClass.getResource("/userSchema.avsc")).mkString
  val schema: Schema = new Schema.Parser().parse(schemaString)
  var shouldRun : Boolean = true

  props.put("bootstrap.servers", "localhost:9092")
  props.put("group.id", groupId)
  props.put("enable.auto.commit", "true")
  props.put("auto.commit.interval.ms", "10000")
  props.put("session.timeout.ms", "30000")
  props.put("consumer.timeout.ms", "120000")
  props.put("key.deserializer", classOf[StringDeserializer].getCanonicalName)
  props.put("value.deserializer",classOf[ByteArrayDeserializer].getCanonicalName)

  private val consumer = new KafkaConsumer[String, Array[Byte]](props)

  def start() = {

    try {
      Runtime.getRuntime.addShutdownHook(new Thread(() => close()))

      consumer.subscribe(Collections.singletonList(topic))

      while (shouldRun) {
        val records: ConsumerRecords[String,  Array[Byte]] = consumer.poll(1000)
        val it = records.iterator()
        while(it.hasNext()) {
          println("Getting message from queue.............")
          val record: ConsumerRecord[String,  Array[Byte]] = it.next()

          //val bytes = record.value()
          //val text = (bytes.map(_.toChar)).mkString
          //println(s"Saw Text ${text}")
          val user = parseUser(record.value())
          println(s"Saw User ${user}")
          consumer.commitSync
        }
      }
    }
    catch {
      case timeOutEx: TimeoutException =>
        println("Timeout ")
        false
      case ex: Exception => ex.printStackTrace()
        println("Got error when reading message ")
        false
    }
  }


  private def parseUser(message: Array[Byte]): Option[User] = {

    // Deserialize and create generic record
    val reader: DatumReader[GenericRecord] =
      new SpecificDatumReader[GenericRecord](schema)
    val decoder: Decoder = DecoderFactory.get().binaryDecoder(message, null)
    val userData: GenericRecord = reader.read(null, decoder)

    // Make user object
    val finalUser = Try[User](
      User(userData.get("id").toString.toInt, userData.get("name").toString, try {
        Some(userData.get("email").toString)
      } catch {
        case _ => None
      })
    )

    finalUser match {
      case Success(u) =>
        Some(u)
      case Failure(e) =>
        None
    }
  }

  def close(): Unit = shouldRun = false

}

The most relevant parts of this are

  • We use a KafkaConsumer[String, Array[Byte]]
  • We use a StringDeserializer for the Kafka key
  • We use a ByteArrayDeserializer for the Kafka key
  • We are able to parse the incoming byte array into a GenericRecord and then into our specific Avro object again

 

So in terms of actual code that is about it.

 

So how do I run this stuff?

As I stated above you will need to download a few things, but once you have those in place you may find the small PowerShell script useful that is inside the project called “RunThePipeline.ps1”. This script does a few things, such as cleans the Kafka/Zookeeper logs, stops any previous instances, starts new instances and also creates the Kafka topic (which you must have before you can use the code).

 

IMPORTANT NOTE : I have altered the Kafka log paths, and where Zookeeper logs to. This can be done using the relevant properties files from the Confluent installation

 

image

 

server.properties

This line was changed

 

# A comma seperated list of directories under which to store log files
log.dirs=c:/temp/kafka-logs

 

zookeeper.properties

This line was changed

 

# the directory where the snapshot is stored.
dataDir=c:/temp/zookeeper

 

The PowerShell script will be making assumptions based on the where you changed these values to, so if you have different settings for these please edit the PowerShell script too )

 

Essentially you need to have the following running before you can run the code example (this is what the PowerShell script automates for you, though you may prefer to use the Confluent CLI, I just had this script from a previous project, and it also cleans out old data which is useful when you are experimenting, and it also creates the required topic)

  • Zookeeper
  • Kafka broker
  • Kafka topic created

Once you have that, you can simply open the code in IntelliJ and right click and run the 2 apps (or set up new configurations)

 

Producer

image

 

Consumer

image

 

After you have done this you should be able to see producer/consumer output like this

 

Producer output

image

 

Consumer output

image

 

Conclusion

And that is it for this post, in the next post we will look at how we can use specific Avro objects instead of GenericRecord

Apache Kafka + Avro Mini Series

I am a big fan of Apache Kafka + Kafka Streams, but one element of the Confluent Platform (the people behind Kafka) that I always glossed over was the Schema Registry. I have a bit of time at the moment, so thought I would do a set of mini posts on how to use Apache Kafkas Schema Registry. The Schema Registry uses Avro, so I also thought it may make sense to do a couple of posts on that too.

 

As such this is the rough road map of what I will be doing :

 

  • Apache Kafka Generic Avro Producer/Consumer
  • Apache Kafka Specific Avro Producer/Consumer
  • Apache Kafka Schema Registry playground
  • Apache Kafka Specific Avro Producer/Consumer + Schema Registry integration
  • Apache Kafka Specific Avro Kafka Streams + Schema Registry integration