Distributed Systems, kaf, Kafka

Apache Kafka Specific Avro Producer/Consumer + Kafka Schema Registry

This is the 2nd post in a small mini series that I will be doing using Apache Kafka + Avro. The programming language will be Scala. As such the following prerequisites need to be obtained should you wish to run the code that goes along with each post. The other point is that I am mainly a Windows user, as such the instructions, scripts will have a Windows bias to them. So if you are not a Windows user you will need to find the instructions for your OS of choice.

 

Prerequisites

 

So go and grab that lot if you want to follow along.

 

Last time we talked about how to create a Kafka Producer/Consumer which did use Avro, but used the GenericRecord approach, which kind of works more like a dictionary of key value pairs. This time we will be talking about how to use the KafkaAvroSerializer to send specific Avro types using Kafka and the Kafka Schema Registry.

 

Kafka Schema Registry

So just what is the Kafka Schema Registry?

 

Schema Registry is part of the Confluent Open Source and Confluent Enterprise distributions. The Schema Registry stores a versioned history of all schemas and allows for the evolution of schemas according to the configured compatibility settings and expanded Avro support.

https://docs.confluent.io/current/schema-registry/docs/index.html up on date 26/06/18

 

I don’t want to dwell too much on the internals of the Kafka Schema Registry as we will be looking at in fine detail in the next post. But for now, it is important to understand that both Kafka/KafkaStreams suport using the Schema Registry when sending/receiving Avro content. They will also raise Exceptions should you try to send incompatible Avro content through a topic which uses the Schema Registry that breaks the Schemas compatibility rules within the Schema Registry.

If all of that sounds a bit whack, don’t worry we will be getting into it more in the next post

 

I just wanted to give a small overview of how the Schema Registry fits with the Kafka eco system. The rest of this article will be more about how we can make use of the KafkaAvroSerializer  to send Avro data of our own types rather than use the previously demonstrated GenericRecord approach.

 

Where is the code for this post?

You can grab the code that goes with this post from here : https://github.com/sachabarber/KafkaAvroExamples/tree/master/KafkaSpecificAvro

 

SBT

I decided that I would take the hit and craft a multi-project SBT setup, which looks like this

import Deps._
import sbt.Keys.scalaVersion

lazy val root = (project in file(".")).
  aggregate(publisher, subscriber).
  settings(
    inThisBuild(List(
      organization := "com.barber",
      scalaVersion := "2.12.1",
      resolvers += "Sonatype OSS Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots",
      resolvers += "io.confluent" at "http://packages.confluent.io/maven/"
    )),
    name := "scala_kafka_specific_avro_example"
  )

lazy val publisher = (project in file ("publisher")).
  settings(
    name := "publisher",
    /* sourceGenerators in Compile += (avroScalaGenerateSpecific in Compile).taskValue, */
    libraryDependencies ++= Seq(
      kafka,
      avro,
      avroSerializer,
      logBack)
  ).dependsOn(common)

lazy val subscriber = (project in file ("subscriber")).
  settings(
    name := "subscriber",
    /* sourceGenerators in Compile += (avroScalaGenerateSpecific in Compile).taskValue, */
    libraryDependencies ++= Seq(
      kafka,
      avro,
      avroSerializer,
      logBack)
  ).dependsOn(publisher, common)
  
  lazy val common = (project in file ("common")).
  settings(
    name := "common",
    libraryDependencies ++= Seq(
      kafka,
      avro,
      avroSerializer,
      logBack)
  )

 

Where the Deps.scala file looks like this

import sbt._

object Deps {
  lazy val kafka = "org.apache.kafka" % "kafka_2.11" % "1.1.0"
  lazy val avro =  "org.apache.avro" % "avro" % "1.8.2"
  lazy val avroSerializer = "io.confluent" % "kafka-avro-serializer" % "3.2.1"
  lazy val logBack = "ch.qos.logback" %  "logback-classic" % "1.1.7"
}

 

This essentially creates the following 3 projects

  • Common (this is where the shared Avro schema/object live)
  • Publisher
  • Subscriber

Once you have done a SBT clean/compile and opened it inside IntelliJ it should look something like this:

image

 

So now that we know roughly what the build file looks like, lets turn our attention to the 3 projects. I will walk through them below in turn

 

Common Project

As stated above the common project holds the Avro schema file (which is slightly different from the last post). The new Avro schema is as follows:

 

{
    "namespace": "com.barber.kafka.avro",
     "type": "record",
     "name": "User",
     "fields":[
         {
            "name": "id", "type": "int"
         },
         {
            "name": "name",  "type": "string"
         }
     ]
}

Unlike last time where we used a GenericRecord this time we would like to use a case class to send down the wire. Something like this

case class User(var id: Int, var name: String) 

 

Unfortunately this does not work, if you were to try and send this down the wire using the KafkaProducer whilst the Schema Registry is running, you will most likely get an Exception stating that it can not be serialized.

 

What we actually need to do is something more like this, where we need to extend the SpecificRecordBase trait and also supply the Schema and the ability to set/get values.

 

/** MACHINE-GENERATED FROM AVRO SCHEMA. DO NOT EDIT DIRECTLY */
package com.barber.kafka.avro

import scala.annotation.switch
import scala.io.Source

case class User(var id: Int, var name: String) extends org.apache.avro.specific.SpecificRecordBase {
  def this() = this(0, "")
  def get(field$: Int): AnyRef = {
    (field$: @switch) match {
      case pos if pos == 0 => {
        id
      }.asInstanceOf[AnyRef]
      case pos if pos == 1 => {
        name
      }.asInstanceOf[AnyRef]
      case _ => new org.apache.avro.AvroRuntimeException("Bad index")
    }
  }
  def put(field$: Int, value: Any): Unit = {
    (field$: @switch) match {
      case pos if pos == 0 => this.id = {
        value
      }.asInstanceOf[Int]
      case pos if pos == 1 => this.name = {
        value.toString
      }.asInstanceOf[String]
      case _ => new org.apache.avro.AvroRuntimeException("Bad index")
    }
    ()
  }
  def getSchema: org.apache.avro.Schema = User.SCHEMA$
}

object User {
  val SCHEMA$ = new org.apache.avro.Schema.Parser().parse(
    Source.fromURL(getClass.getResource("/userSchema.avsc")).mkString)
}

 

Now that is quite a bit of code to write. Surely there are tools out there for this job aren’t there?

 

Why yes there are.

 

 

image

 

So that is all the Common project is really, its just a place for the shared Avro objects to live that the Publisher/Subscriber projects can use.

 

Publisher Project

This is the entire code for the Publisher

package com.barber.kafka.avro

import java.util.{Properties, UUID}
import io.confluent.kafka.serializers.KafkaAvroSerializer
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer

class KafkaDemoAvroPublisher(val topic:String) {

  private val props = new Properties()
  props.put("bootstrap.servers", "localhost:9092")
  props.put("schema.registry.url", "http://localhost:8081")
  props.put("key.serializer", classOf[StringSerializer].getCanonicalName)
  props.put("value.serializer",classOf[KafkaAvroSerializer].getCanonicalName)
  props.put("client.id", UUID.randomUUID().toString())

  private val producer =   new KafkaProducer[String,User](props)

  def send(): Unit = {
    try {
      val rand =  new scala.util.Random(44343)

      for(i <- 1 to 10) {
        val id = rand.nextInt()
        val itemToSend = User(id , "ishot.com")
        println(s"Producer sending data ${itemToSend.toString}")
        producer.send(new ProducerRecord[String, User](topic, itemToSend))
        producer.flush()
      }
    } catch {
      case ex: Exception =>
        println(ex.printStackTrace().toString)
        ex.printStackTrace()
    }
  }
}

The main points are these:

  • We now include a property for the schema.registry.url
  • We now use the KafkaAvroSerializer for the topic value serializer
  • We use the User class that we talked about in the common project, that was generated from the Avro schema talked about above. It is this User Avro object that we push out on the Kafka topic

 

Subscriber Project

Here is the subscriber code

package com.barber.kafka.avro

import java.util.Properties
import org.apache.kafka.clients.consumer.KafkaConsumer
import java.util.Collections
import org.apache.kafka.common.errors.TimeoutException
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.common.serialization.StringDeserializer

class KafkaDemoAvroSubscriber(val topic:String) {

  private val props = new Properties()
  val groupId = "avro-demo-topic-consumer"
  var shouldRun : Boolean = true

  props.put("bootstrap.servers", "localhost:9092")
  props.put("schema.registry.url", "http://localhost:8081")
  props.put("group.id", groupId)
  props.put("enable.auto.commit", "true")
  props.put("auto.commit.interval.ms", "10000")
  props.put("session.timeout.ms", "30000")
  props.put("consumer.timeout.ms", "120000")
  props.put("key.deserializer", classOf[StringDeserializer].getCanonicalName)
  props.put("value.deserializer",classOf[KafkaAvroDeserializer].getCanonicalName)
  //Use Specific Record or else you get Avro GenericRecord.
  props.put("specific.avro.reader", "true")

  private val consumer = new KafkaConsumer[String, com.barber.kafka.avro.User](props)

  def start() = {

    try {
      Runtime.getRuntime.addShutdownHook(new Thread(() => close()))

      consumer.subscribe(Collections.singletonList(topic))

      while (shouldRun) {
        val records: ConsumerRecords[String,  com.barber.kafka.avro.User] = consumer.poll(1000)
        val it = records.iterator()
        while(it.hasNext()) {
          println("Getting message from queue.............")
          val record: ConsumerRecord[String,  com.barber.kafka.avro.User] = it.next()
          val recievedItem =record.value()
          println(s"Saw User ${recievedItem}")
          consumer.commitSync
        }
      }
    }
    catch {
      case timeOutEx: TimeoutException =>
        println("Timeout ")
        false
      case ex: Exception => ex.printStackTrace()
        println("Got error when reading message ")
        false
    }
  }

  def close(): Unit = shouldRun = false
}

 

This is not that different from last time, the main points are :

  • We now include a property for the schema.registry.url
  • We now use the KafkaAvroDeSerializer for the topic value deserializer
  • We must set the property specific.avro.reader to have a value of true, which tells Kafka / Schema Registry that we will be using a specific Avro type (User type in this case) otherwise Kafka will expect GenericRecord to be used on the topic
  • We use the User class that we talked about in the common project, that was generated from the Avro schema talked about above. It is this User Avro object that we receive on the Kafka topic

 

So how do I run this stuff?

As I stated above you will need to download a few things, but once you have those in place you may find the small PowerShell script useful that is inside the project called “RunThePipeline.ps1”. This script does a few things, such as cleans the Kafka/Zookeeper logs, stops any previous instances, starts new instances and also creates the Kafka topic (which you must have before you can use the code).

 

IMPORTANT NOTE : I have altered the Kafka log paths, and where Zookeeper logs to. This can be done using the relevant properties files from the Confluent installation

 

image

 

server.properties

This line was changed

 

# A comma seperated list of directories under which to store log files
log.dirs=c:/temp/kafka-logs

 

zookeeper.properties

This line was changed

 

# the directory where the snapshot is stored.
dataDir=c:/temp/zookeeper

 

The PowerShell script will be making assumptions based on the where you changed these values to, so if you have different settings for these please edit the PowerShell script too )

 

Essentially you need to have the following running before you can run the code example (this is what the PowerShell script automates for you, though you may prefer to use the Confluent CLI, I just had this script from a previous project, and it also cleans out old data which is useful when you are experimenting, and it also creates the required topic)

  • Zookeeper
  • Kafka broker
  • Starts the Schema Registry
  • Kafka topic created

Once you have that, you can simply open the code in IntelliJ and right click and run the 2 apps (or set up new configurations)

 

Run the Publisher

 

image

 

Run the Subscriber

 

image

 

Once you have the run the PowerShell “RunThePipeline.ps1” script associated with this code (the one in the demo folder) and run the Publisher/Subscriber as shown above, you should see some output something like this

 

Publisher output

image

 

Subscriber output

image

 

 

Conclusion

Using Avro with Kafka / Schema Registry is fairly straight forward, and is not that different from working with regular Array of bytes. I hope this post has wet your appetite a bit for the Kafka Schema registry which we will look into in a lot more detail in the next post.

 

 

 

 

 

 

Advertisements
Distributed Systems, Kafka

Apache Kafka Generic Avro Producer/Consumer

This is the 1st post in a small mini series that I will be doing using Apache Kafka + Avro. The programming language will be Scala. As such the following prerequisites need to be obtained should you wish to run the code that goes along with each post. The other point is that I am mainly a Windows user, as such the instructions, scripts will have a Windows bias to them. So if you are not a Windows user you will need to find the instructions for your OS of choice.

 

Prerequisites

 

So go and grab that lot if you want to follow along.

 

Avro Introduction

Since this mini series will be using Avro throughout I think it is a good idea that we include an introductory section on Avro, which I have shamelessly stolen from Wikipedia

 

Avro is a remote procedure call and data serialization framework developed within Apache’s Hadoop project. It uses JSON for defining data types and protocols, and serializes data in a compact binary format. Its primary use is in Apache Hadoop, where it can provide both a serialization format for persistent data, and a wire format for communication between Hadoop nodes, and from client programs to the Hadoop services.

It is similar to Thrift and Protocol Buffers, but does not require running a code-generation program when a schema changes (unless desired for statically-typed languages).

 

Avro Object Container File

An Avro Object Container File consists of:

  • A file header, followed by
  • one or more file data blocks.

A file header consists of:

  • Four bytes, ASCII ‘O’, ‘b’, ‘j’, followed by 1
  • file metadata, including the schema definition
  • The 16-byte, randomly-generated sync marker for this file.

For data blocks Avro specifies two serialization encodings: binary and JSON. Most applications will use the binary encoding, as it is smaller and faster. For debugging and web-based applications, the JSON encoding may sometimes be appropriate.

 

Schema definition

Avro schemas are defined using JSON. Schemas are composed of primitive types (null, boolean, int, long, float, double, bytes, and string) and complex types (record, enum, array, map, union, and fixed).

Simple schema example:

{
   "namespace": "example.avro",
   "type": "record",
   "name": "User",
   "fields": [
      {"name": "name", "type": "string"},
      {"name": "favorite_number",  "type": ["int", "null"]},
      {"name": "favorite_color", "type": ["string", "null"]}
   ] 
 }

 

Serializing and deserializing

Data in Avro might be stored with its corresponding schema, meaning a serialized item can be read without knowing the schema ahead of time.

 

Example serialization and deserialization code in Python

Serialization

import avro.schema
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter

schema = avro.schema.parse(open("user.avsc").read())  # need to know the schema to write

writer = DataFileWriter(open("users.avro", "w"), DatumWriter(), schema)
writer.append({"name": "Alyssa", "favorite_number": 256})
writer.append({"name": "Ben", "favorite_number": 7, "favorite_color": "red"})
writer.close()

 

File “users.avro” will contain the schema in JSON and a compact binary representation of the data:

image

 

Deserialization

 

reader = DataFileReader(open("users.avro", "r"), DatumReader())  # no need to know the schema to read
for user in reader:
    print user
reader.close()

 

This outputs:

 

{u'favorite_color': None, u'favorite_number': 256, u'name': u'Alyssa'}
{u'favorite_color': u'red', u'favorite_number': 7, u'name': u'Ben'}

 

The above section taken from https://en.wikipedia.org/wiki/Apache_Avro up on date 21/06/18

 

Kafka Producer/Consumer using Generic Avro Record

The code for this post can be found here : https://github.com/sachabarber/KafkaAvroExamples

 

Ok so now  that we know a little bit more about Apache Avro, let’s turn our attention back to the real heart of this mini series, which is how do I use Kafka with Avro?

Kafka has deep support for Avro and as such there are a few ways that we could proceed, for example we can use generic Avro messages (array of bytes) or we could use a specific type of object which would be used on the wire, we can also use the Schema Registry or not, we can can also use Avro when working with Kafka Streams.

This is all good stuff, and we will be covering all of this in this mini series.

 

However we must learn to crawl before we can walk, so lets start at the beginning.

 

What is the beginning?

Well the beginning is just what I thought was the simplest set of things I could get to work together that demonstrated how to produce/consume Avro data against a shared Avro schema.

As I say Kafka supports this in 2 different ways, either of these

  • GenericRecord, which serialize/deserialize as byte array
  • Specific Avro type

We will be looking   at both of these approaches. This post will be about how to use the GenericRecord within Kafka

 

Using GenericRecord

GenericRecord is actually an Apache Avro class that allows you to add field value by name or index, and get the values out by name and index. For this demo we will be using scala, as such we need a small SBT file to get us up and running (I have chosen to create a single IntelliJ IDEA project for this post, but in later posts I may create a project for producer and another for consumer).

 

Here is the SBT file for this post

name := "KafkaGenericAvro"

version := "1.0"

scalaVersion := "2.12.1"


resolvers ++= Seq(
  Classpaths.typesafeReleases,
  "confluent" at "http://packages.confluent.io/maven/",
  Resolver.mavenLocal
)

libraryDependencies ++= Seq(

  "org.apache.kafka" % "kafka_2.11" % "1.1.0",
  "org.apache.avro" % "avro" % "1.8.2",
  "io.confluent" % "kafka-avro-serializer" % "3.2.1",
  "ch.qos.logback" %  "logback-classic" % "1.1.7"
)

 

This will bring all the required dependencies. So now that we have that lets see how we can use a simple Avro message, which of course must start with an Avro schema which for this post is as follows (userSchema.avsc in the code for this post)

 

{
    "namespace": "kakfa-avro.test",
     "type": "record",
     "name": "user",
     "fields":[
         {
            "name": "id", "type": "int"
         },
         {
            "name": "name",  "type": "string"
         },
         {
            "name": "email", "type": ["string", "null"]
         }
     ]
}

 

So now that we have that, just how do we use Kafka to send out some data that uses this schema?

 

The code below is a complete Kafka Producer that will creates some Avro GenericRecord data objects that adhere to the schema above

 

package com.barber.kafka.avro

import java.util.{Properties, UUID}
import org.apache.avro.Schema
import org.apache.avro.Schema.Parser
import org.apache.avro.generic.GenericData
import org.apache.avro.generic.GenericRecord
import org.apache.avro.specific.SpecificDatumWriter
import java.io.ByteArrayOutputStream
import com.barber.kafka.avro.models.User
import org.apache.avro.io._
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}
import scala.io.Source

class KafkaDemoAvroProducer(val topic:String) {

  private val props = new Properties()
  props.put("bootstrap.servers", "localhost:9092")
  props.put("key.serializer", classOf[StringSerializer].getCanonicalName)
  props.put("value.serializer",classOf[ByteArraySerializer].getCanonicalName)
  props.put("client.id", UUID.randomUUID().toString())
  private val producer =   new KafkaProducer[String,Array[Byte]](props)

  //Read avro schema file and send out
  val schema: Schema = new Parser().parse(
    Source.fromURL(getClass.getResource("/userSchema.avsc")).mkString)

  def send(): Unit = {
    try {
      while(true) {

          val id = scala.util.Random.nextInt()
          val user = User(id, "eva mendes", Some("eva@ishot.com"))
          val genericUser: GenericRecord = new GenericData.Record(schema)

          //Put data in that generic record object
          genericUser.put("id", user.id)
          genericUser.put("name", user.name)
          genericUser.put("email", user.email.orNull)

          // Serialize generic record object into byte array
          val writer = new SpecificDatumWriter[GenericRecord](schema)
          val out = new ByteArrayOutputStream()
          val encoder: BinaryEncoder = EncoderFactory.get().binaryEncoder(out, null)
          writer.write(genericUser, encoder)
          encoder.flush()
          out.close()

          val serializedBytes: Array[Byte] = out.toByteArray()
          println(s"Producer sending data ${serializedBytes.toString}")
          producer.send(new ProducerRecord[String, Array[Byte]](topic, serializedBytes))
      }
    } catch {
      case ex: Exception =>
        println(ex.printStackTrace().toString)
        ex.printStackTrace()
    }
  }
}

The main takeaways from the code above are

  • We use a StringSerializer for the Kafka key
  • We use a ByteArraySerializer for the Kafka key
  • We use Avro GenericRecord class which we add our field information to
  • We use a BinaryEncoder and a spcecialized writer that knows about the Avro schema which we read in
  • We use a KafkaProducer[String, Array[Byte]]

 

So that’s all there is to Producer, so what about the Consumer, what does that look like, well it looks like this

package com.barber.kafka.avro

import java.util.Properties
import com.barber.kafka.avro.models.User
import org.apache.avro.Schema
import org.apache.avro.io.DatumReader
import org.apache.avro.io.Decoder
import org.apache.avro.specific.SpecificDatumReader
import org.apache.avro.generic.GenericRecord
import org.apache.avro.io.DecoderFactory
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.errors.TimeoutException
import java.util.Collections
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer}
import scala.io.Source
import scala.util.{Failure, Success, Try}


class KafkaDemoAvroConsumer(val topic:String) {

  private val props = new Properties()
  val groupId = "avro-demo-topic-consumer"
  val schemaString = Source.fromURL(getClass.getResource("/userSchema.avsc")).mkString
  val schema: Schema = new Schema.Parser().parse(schemaString)
  var shouldRun : Boolean = true

  props.put("bootstrap.servers", "localhost:9092")
  props.put("group.id", groupId)
  props.put("enable.auto.commit", "true")
  props.put("auto.commit.interval.ms", "10000")
  props.put("session.timeout.ms", "30000")
  props.put("consumer.timeout.ms", "120000")
  props.put("key.deserializer", classOf[StringDeserializer].getCanonicalName)
  props.put("value.deserializer",classOf[ByteArrayDeserializer].getCanonicalName)

  private val consumer = new KafkaConsumer[String, Array[Byte]](props)

  def start() = {

    try {
      Runtime.getRuntime.addShutdownHook(new Thread(() => close()))

      consumer.subscribe(Collections.singletonList(topic))

      while (shouldRun) {
        val records: ConsumerRecords[String,  Array[Byte]] = consumer.poll(1000)
        val it = records.iterator()
        while(it.hasNext()) {
          println("Getting message from queue.............")
          val record: ConsumerRecord[String,  Array[Byte]] = it.next()

          //val bytes = record.value()
          //val text = (bytes.map(_.toChar)).mkString
          //println(s"Saw Text ${text}")
          val user = parseUser(record.value())
          println(s"Saw User ${user}")
          consumer.commitSync
        }
      }
    }
    catch {
      case timeOutEx: TimeoutException =>
        println("Timeout ")
        false
      case ex: Exception => ex.printStackTrace()
        println("Got error when reading message ")
        false
    }
  }


  private def parseUser(message: Array[Byte]): Option[User] = {

    // Deserialize and create generic record
    val reader: DatumReader[GenericRecord] =
      new SpecificDatumReader[GenericRecord](schema)
    val decoder: Decoder = DecoderFactory.get().binaryDecoder(message, null)
    val userData: GenericRecord = reader.read(null, decoder)

    // Make user object
    val finalUser = Try[User](
      User(userData.get("id").toString.toInt, userData.get("name").toString, try {
        Some(userData.get("email").toString)
      } catch {
        case _ => None
      })
    )

    finalUser match {
      case Success(u) =>
        Some(u)
      case Failure(e) =>
        None
    }
  }

  def close(): Unit = shouldRun = false

}

The most relevant parts of this are

  • We use a KafkaConsumer[String, Array[Byte]]
  • We use a StringDeserializer for the Kafka key
  • We use a ByteArrayDeserializer for the Kafka key
  • We are able to parse the incoming byte array into a GenericRecord and then into our specific Avro object again

 

So in terms of actual code that is about it.

 

So how do I run this stuff?

As I stated above you will need to download a few things, but once you have those in place you may find the small PowerShell script useful that is inside the project called “RunThePipeline.ps1”. This script does a few things, such as cleans the Kafka/Zookeeper logs, stops any previous instances, starts new instances and also creates the Kafka topic (which you must have before you can use the code).

 

IMPORTANT NOTE : I have altered the Kafka log paths, and where Zookeeper logs to. This can be done using the relevant properties files from the Confluent installation

 

image

 

server.properties

This line was changed

 

# A comma seperated list of directories under which to store log files
log.dirs=c:/temp/kafka-logs

 

zookeeper.properties

This line was changed

 

# the directory where the snapshot is stored.
dataDir=c:/temp/zookeeper

 

The PowerShell script will be making assumptions based on the where you changed these values to, so if you have different settings for these please edit the PowerShell script too )

 

Essentially you need to have the following running before you can run the code example (this is what the PowerShell script automates for you, though you may prefer to use the Confluent CLI, I just had this script from a previous project, and it also cleans out old data which is useful when you are experimenting, and it also creates the required topic)

  • Zookeeper
  • Kafka broker
  • Kafka topic created

Once you have that, you can simply open the code in IntelliJ and right click and run the 2 apps (or set up new configurations)

 

Producer

image

 

Consumer

image

 

After you have done this you should be able to see producer/consumer output like this

 

Producer output

image

 

Consumer output

image

 

Conclusion

And that is it for this post, in the next post we will look at how we can use specific Avro objects instead of GenericRecord

Distributed Systems, Kafka

Apache Kafka + Avro Mini Series

I am a big fan of Apache Kafka + Kafka Streams, but one element of the Confluent Platform (the people behind Kafka) that I always glossed over was the Schema Registry. I have a bit of time at the moment, so thought I would do a set of mini posts on how to use Apache Kafkas Schema Registry. The Schema Registry uses Avro, so I also thought it may make sense to do a couple of posts on that too.

 

As such this is the rough road map of what I will be doing :