Apache Kafka Generic Avro Producer/Consumer

This is the 1st post in a small mini series that I will be doing using Apache Kafka + Avro. The programming language will be Scala. As such the following prerequisites need to be obtained should you wish to run the code that goes along with each post. The other point is that I am mainly a Windows user, as such the instructions, scripts will have a Windows bias to them. So if you are not a Windows user you will need to find the instructions for your OS of choice.

 

Prerequisites

 

So go and grab that lot if you want to follow along.

 

Avro Introduction

Since this mini series will be using Avro throughout I think it is a good idea that we include an introductory section on Avro, which I have shamelessly stolen from Wikipedia

 

Avro is a remote procedure call and data serialization framework developed within Apache’s Hadoop project. It uses JSON for defining data types and protocols, and serializes data in a compact binary format. Its primary use is in Apache Hadoop, where it can provide both a serialization format for persistent data, and a wire format for communication between Hadoop nodes, and from client programs to the Hadoop services.

It is similar to Thrift and Protocol Buffers, but does not require running a code-generation program when a schema changes (unless desired for statically-typed languages).

 

Avro Object Container File

An Avro Object Container File consists of:

  • A file header, followed by
  • one or more file data blocks.

A file header consists of:

  • Four bytes, ASCII ‘O’, ‘b’, ‘j’, followed by 1
  • file metadata, including the schema definition
  • The 16-byte, randomly-generated sync marker for this file.

For data blocks Avro specifies two serialization encodings: binary and JSON. Most applications will use the binary encoding, as it is smaller and faster. For debugging and web-based applications, the JSON encoding may sometimes be appropriate.

 

Schema definition

Avro schemas are defined using JSON. Schemas are composed of primitive types (null, boolean, int, long, float, double, bytes, and string) and complex types (record, enum, array, map, union, and fixed).

Simple schema example:

{
   "namespace": "example.avro",
   "type": "record",
   "name": "User",
   "fields": [
      {"name": "name", "type": "string"},
      {"name": "favorite_number",  "type": ["int", "null"]},
      {"name": "favorite_color", "type": ["string", "null"]}
   ] 
 }

 

Serializing and deserializing

Data in Avro might be stored with its corresponding schema, meaning a serialized item can be read without knowing the schema ahead of time.

 

Example serialization and deserialization code in Python

Serialization

import avro.schema
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter

schema = avro.schema.parse(open("user.avsc").read())  # need to know the schema to write

writer = DataFileWriter(open("users.avro", "w"), DatumWriter(), schema)
writer.append({"name": "Alyssa", "favorite_number": 256})
writer.append({"name": "Ben", "favorite_number": 7, "favorite_color": "red"})
writer.close()

 

File “users.avro” will contain the schema in JSON and a compact binary representation of the data:

image

 

Deserialization

 

reader = DataFileReader(open("users.avro", "r"), DatumReader())  # no need to know the schema to read
for user in reader:
    print user
reader.close()

 

This outputs:

 

{u'favorite_color': None, u'favorite_number': 256, u'name': u'Alyssa'}
{u'favorite_color': u'red', u'favorite_number': 7, u'name': u'Ben'}

 

The above section taken from https://en.wikipedia.org/wiki/Apache_Avro up on date 21/06/18

 

Kafka Producer/Consumer using Generic Avro Record

The code for this post can be found here : https://github.com/sachabarber/KafkaAvroExamples

 

Ok so now  that we know a little bit more about Apache Avro, let’s turn our attention back to the real heart of this mini series, which is how do I use Kafka with Avro?

Kafka has deep support for Avro and as such there are a few ways that we could proceed, for example we can use generic Avro messages (array of bytes) or we could use a specific type of object which would be used on the wire, we can also use the Schema Registry or not, we can can also use Avro when working with Kafka Streams.

This is all good stuff, and we will be covering all of this in this mini series.

 

However we must learn to crawl before we can walk, so lets start at the beginning.

 

What is the beginning?

Well the beginning is just what I thought was the simplest set of things I could get to work together that demonstrated how to produce/consume Avro data against a shared Avro schema.

As I say Kafka supports this in 2 different ways, either of these

  • GenericRecord, which serialize/deserialize as byte array
  • Specific Avro type

We will be looking   at both of these approaches. This post will be about how to use the GenericRecord within Kafka

 

Using GenericRecord

GenericRecord is actually an Apache Avro class that allows you to add field value by name or index, and get the values out by name and index. For this demo we will be using scala, as such we need a small SBT file to get us up and running (I have chosen to create a single IntelliJ IDEA project for this post, but in later posts I may create a project for producer and another for consumer).

 

Here is the SBT file for this post

name := "KafkaGenericAvro"

version := "1.0"

scalaVersion := "2.12.1"


resolvers ++= Seq(
  Classpaths.typesafeReleases,
  "confluent" at "http://packages.confluent.io/maven/",
  Resolver.mavenLocal
)

libraryDependencies ++= Seq(

  "org.apache.kafka" % "kafka_2.11" % "1.1.0",
  "org.apache.avro" % "avro" % "1.8.2",
  "io.confluent" % "kafka-avro-serializer" % "3.2.1",
  "ch.qos.logback" %  "logback-classic" % "1.1.7"
)

 

This will bring all the required dependencies. So now that we have that lets see how we can use a simple Avro message, which of course must start with an Avro schema which for this post is as follows (userSchema.avsc in the code for this post)

 

{
    "namespace": "kakfa-avro.test",
     "type": "record",
     "name": "user",
     "fields":[
         {
            "name": "id", "type": "int"
         },
         {
            "name": "name",  "type": "string"
         },
         {
            "name": "email", "type": ["string", "null"]
         }
     ]
}

 

So now that we have that, just how do we use Kafka to send out some data that uses this schema?

 

The code below is a complete Kafka Producer that will creates some Avro GenericRecord data objects that adhere to the schema above

 

package com.barber.kafka.avro

import java.util.{Properties, UUID}
import org.apache.avro.Schema
import org.apache.avro.Schema.Parser
import org.apache.avro.generic.GenericData
import org.apache.avro.generic.GenericRecord
import org.apache.avro.specific.SpecificDatumWriter
import java.io.ByteArrayOutputStream
import com.barber.kafka.avro.models.User
import org.apache.avro.io._
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}
import scala.io.Source

class KafkaDemoAvroProducer(val topic:String) {

  private val props = new Properties()
  props.put("bootstrap.servers", "localhost:9092")
  props.put("key.serializer", classOf[StringSerializer].getCanonicalName)
  props.put("value.serializer",classOf[ByteArraySerializer].getCanonicalName)
  props.put("client.id", UUID.randomUUID().toString())
  private val producer =   new KafkaProducer[String,Array[Byte]](props)

  //Read avro schema file and send out
  val schema: Schema = new Parser().parse(
    Source.fromURL(getClass.getResource("/userSchema.avsc")).mkString)

  def send(): Unit = {
    try {
      while(true) {

          val id = scala.util.Random.nextInt()
          val user = User(id, "eva mendes", Some("eva@ishot.com"))
          val genericUser: GenericRecord = new GenericData.Record(schema)

          //Put data in that generic record object
          genericUser.put("id", user.id)
          genericUser.put("name", user.name)
          genericUser.put("email", user.email.orNull)

          // Serialize generic record object into byte array
          val writer = new SpecificDatumWriter[GenericRecord](schema)
          val out = new ByteArrayOutputStream()
          val encoder: BinaryEncoder = EncoderFactory.get().binaryEncoder(out, null)
          writer.write(genericUser, encoder)
          encoder.flush()
          out.close()

          val serializedBytes: Array[Byte] = out.toByteArray()
          println(s"Producer sending data ${serializedBytes.toString}")
          producer.send(new ProducerRecord[String, Array[Byte]](topic, serializedBytes))
      }
    } catch {
      case ex: Exception =>
        println(ex.printStackTrace().toString)
        ex.printStackTrace()
    }
  }
}

The main takeaways from the code above are

  • We use a StringSerializer for the Kafka key
  • We use a ByteArraySerializer for the Kafka key
  • We use Avro GenericRecord class which we add our field information to
  • We use a BinaryEncoder and a spcecialized writer that knows about the Avro schema which we read in
  • We use a KafkaProducer[String, Array[Byte]]

 

So that’s all there is to Producer, so what about the Consumer, what does that look like, well it looks like this

package com.barber.kafka.avro

import java.util.Properties
import com.barber.kafka.avro.models.User
import org.apache.avro.Schema
import org.apache.avro.io.DatumReader
import org.apache.avro.io.Decoder
import org.apache.avro.specific.SpecificDatumReader
import org.apache.avro.generic.GenericRecord
import org.apache.avro.io.DecoderFactory
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.errors.TimeoutException
import java.util.Collections
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer}
import scala.io.Source
import scala.util.{Failure, Success, Try}


class KafkaDemoAvroConsumer(val topic:String) {

  private val props = new Properties()
  val groupId = "avro-demo-topic-consumer"
  val schemaString = Source.fromURL(getClass.getResource("/userSchema.avsc")).mkString
  val schema: Schema = new Schema.Parser().parse(schemaString)
  var shouldRun : Boolean = true

  props.put("bootstrap.servers", "localhost:9092")
  props.put("group.id", groupId)
  props.put("enable.auto.commit", "true")
  props.put("auto.commit.interval.ms", "10000")
  props.put("session.timeout.ms", "30000")
  props.put("consumer.timeout.ms", "120000")
  props.put("key.deserializer", classOf[StringDeserializer].getCanonicalName)
  props.put("value.deserializer",classOf[ByteArrayDeserializer].getCanonicalName)

  private val consumer = new KafkaConsumer[String, Array[Byte]](props)

  def start() = {

    try {
      Runtime.getRuntime.addShutdownHook(new Thread(() => close()))

      consumer.subscribe(Collections.singletonList(topic))

      while (shouldRun) {
        val records: ConsumerRecords[String,  Array[Byte]] = consumer.poll(1000)
        val it = records.iterator()
        while(it.hasNext()) {
          println("Getting message from queue.............")
          val record: ConsumerRecord[String,  Array[Byte]] = it.next()

          //val bytes = record.value()
          //val text = (bytes.map(_.toChar)).mkString
          //println(s"Saw Text ${text}")
          val user = parseUser(record.value())
          println(s"Saw User ${user}")
          consumer.commitSync
        }
      }
    }
    catch {
      case timeOutEx: TimeoutException =>
        println("Timeout ")
        false
      case ex: Exception => ex.printStackTrace()
        println("Got error when reading message ")
        false
    }
  }


  private def parseUser(message: Array[Byte]): Option[User] = {

    // Deserialize and create generic record
    val reader: DatumReader[GenericRecord] =
      new SpecificDatumReader[GenericRecord](schema)
    val decoder: Decoder = DecoderFactory.get().binaryDecoder(message, null)
    val userData: GenericRecord = reader.read(null, decoder)

    // Make user object
    val finalUser = Try[User](
      User(userData.get("id").toString.toInt, userData.get("name").toString, try {
        Some(userData.get("email").toString)
      } catch {
        case _ => None
      })
    )

    finalUser match {
      case Success(u) =>
        Some(u)
      case Failure(e) =>
        None
    }
  }

  def close(): Unit = shouldRun = false

}

The most relevant parts of this are

  • We use a KafkaConsumer[String, Array[Byte]]
  • We use a StringDeserializer for the Kafka key
  • We use a ByteArrayDeserializer for the Kafka key
  • We are able to parse the incoming byte array into a GenericRecord and then into our specific Avro object again

 

So in terms of actual code that is about it.

 

So how do I run this stuff?

As I stated above you will need to download a few things, but once you have those in place you may find the small PowerShell script useful that is inside the project called “RunThePipeline.ps1”. This script does a few things, such as cleans the Kafka/Zookeeper logs, stops any previous instances, starts new instances and also creates the Kafka topic (which you must have before you can use the code).

 

IMPORTANT NOTE : I have altered the Kafka log paths, and where Zookeeper logs to. This can be done using the relevant properties files from the Confluent installation

 

image

 

server.properties

This line was changed

 

# A comma seperated list of directories under which to store log files
log.dirs=c:/temp/kafka-logs

 

zookeeper.properties

This line was changed

 

# the directory where the snapshot is stored.
dataDir=c:/temp/zookeeper

 

The PowerShell script will be making assumptions based on the where you changed these values to, so if you have different settings for these please edit the PowerShell script too )

 

Essentially you need to have the following running before you can run the code example (this is what the PowerShell script automates for you, though you may prefer to use the Confluent CLI, I just had this script from a previous project, and it also cleans out old data which is useful when you are experimenting, and it also creates the required topic)

  • Zookeeper
  • Kafka broker
  • Kafka topic created

Once you have that, you can simply open the code in IntelliJ and right click and run the 2 apps (or set up new configurations)

 

Producer

image

 

Consumer

image

 

After you have done this you should be able to see producer/consumer output like this

 

Producer output

image

 

Consumer output

image

 

Conclusion

And that is it for this post, in the next post we will look at how we can use specific Avro objects instead of GenericRecord

Advertisements

Apache Kafka + Avro Mini Series

I am a big fan of Apache Kafka + Kafka Streams, but one element of the Confluent Platform (the people behind Kafka) that I always glossed over was the Schema Registry. I have a bit of time at the moment, so thought I would do a set of mini posts on how to use Apache Kafkas Schema Registry. The Schema Registry uses Avro, so I also thought it may make sense to do a couple of posts on that too.

 

As such this is the rough road map of what I will be doing :

 

  • Apache Kafka Generic Avro Producer/Consumer
  • Apache Kafka Specific Avro Producer/Consumer
  • Apache Kafka Schema Registry playground
  • Apache Kafka Specific Avro Producer/Consumer + Schema Registry integration
  • Apache Kafka Specific Avro Kafka Streams + Schema Registry integration

 

MicroService Service Discovery Using Consul

 

I have just published a new article on micro service Service Discovery using Consul here : https://www.codeproject.com/Articles/1248381/Microservices-Service-Discovery

 

If you are currently doing/have done microservices you will know that one of the trickier elements to this is “Service Discovery”. This article will briefly talk about some existing solutions to this, and will spend the rest of the article talking about on particular framework for aiding in this area called “Consul”.

Problem We Are Trying To Solve

So what is the problem exactly. Lets consider this image

 

Say we have a bunch of services out in the wild SystemB/SystemC in this example, and we have other services (or UIs) SystemA/SystemD in this example, that want to make use of the existsing services. However in order to do that we need to know where these existing services live, or make some assumptions about their DNS names that should be long lived. This the act of “discovery”, and is essentially what the article will talk about in some detail. So if this floats your boat head on over to the article for further information

Crossbar.io quick look at it

A while ago someone posted another SignalR article on www.codeproject.com, and I stated hey you should take a look at this stuff : crossbar.io, and not liking to me someone that is not willing to die by the sword (afting living by it of course) I decided to put my money where my mouth was/is, and write a small article on crossbar.io.

So what its crossbar.io? Well quite simply it is a message broker that has many language bindings, that should all be able to communicate together seamlessly.

Here is what the people being crossbar.io have to say about their own product

Crossbar.io is an open source networking platform for distributed and microservice applications. It implements the open Web Application Messaging Protocol (WAMP), is feature rich, scalable, robust and secure. Let Crossbar.io take care of the hard parts of messaging so you can focus on your app’s features.

I decided to take a look at this a bit more which I have written up here : https://www.codeproject.com/Articles/1183744/Crossbar-io-a-quick-look-at-it

A Look At Akka .NET

A while back I wrote an Actor model for NetMQ (the .NET port of ZeroMQ), which is now part of the live codebase, I was happy with this.

 

I do like the idea of Actor Models, where you spin up and talk to an actor, rather than worry about locks/semaphores etc etc.

 

It just gels with me rather well. To this end I have been experimenting with Akka.NET which is a pretty complete port of the original Akka, it is a lot of fun, and a really nice way to write distributed multithreaded code if you ask me.

 

To this end I have written a small article, which should be viewed as an introductory article on Akka.NET. If you like the sound of this you can read the full article over here at Codeproject :

 

http://www.codeproject.com/Articles/1007161/A-Look-saAt-Akka-NET

 

Enjoy

NetMQ : Documentation

I don’t know how many of you managed to read my series of articles on NetMQ

https://sachabarbs.wordpress.com/distributed-systems/

Or followed my last codeproject article, which made use of NetMQ to create a streaming server/client(s).

http://www.codeproject.com/Articles/853476/NetMQplus-RX-Streaming-Data-Demo-App-of

I did this one with the help of the author of NetMQ Doron. After we did that article Doron and I had a small chat about me possibly becoming more active on NetMQ in general. I suggested one way in which I might be able to help, was to write some documentation for NetMQ, as this is one area where sadly this great library is lacking a bit.

Doron liked this idea (guess documentation aint for everyone huh)

 

Doron (NetMQ author) and I will be working on this together, and I will be running any documentation I write by him first, but that is one area where I will be helping out, so you can expect to see me post a few more posts here from time to time, saying that some new documentation is available for NetMQ.

I hope that makes at least a few folks happy out there

 

 

 

 

 

 

NetMQ + RX Streaming Data Demo App

So last time I showed you a slightly cut down demo app (well cut down based on the great work done by our friends at Adaptive, where they created the awesome demo : Reactive Trader).  The demo app we looked at last time used SignalR + RX. I also promised that there would be a part 2 of that article, where I showed how to do the same sort of thing using NetMQ  (the pure c# port of ZeroMQ the extremely popular socket library) + RX.

I am pleased to say that Doron (author of NetMQ) and I have now published this article and it is available to read at the following Url:

http://www.codeproject.com/Articles/853476/NetMQplus-RX-Streaming-Data-Demo-App-of

If you looked at the 1st part in this mini series of articles you will know that we cover things like

  • Creating a streaming API of TickerDto objects
  • We make a resilient RX stream, that may be repeated
  • We expect the client(s) to know when the server is unavailable
  • We expect the client(s) to know when the server is available again

We show all of this, just like we did with the 1st demo app (SignalR + RX), but you will also learn a few things along the way such as:

  • How to create a generic NetMQ heartbeat
  • How to create a NetMQ Pub/Sub topoloogy
  • The NetMQ Actor model
  • The NetMQ Poller
  • How to use NetMQ, which will actually put you in very good stead to use ZeroMQ in any of the other language bindings (and it has loads)
  • A few of the different types of sockets within NetMQ
  • How to actually use NetMQ in a real life project

We are hoping you will actually get quite a lot from this article. So if you do enjoy it, please leave a vote over at the codeproject web site