Azure, Kafka

Azure Event Hubs With Kafka

So it’s been a while since I wrote a post about Kafka (and Azure too actually, at work we use AWS)

 

But anyway someone mentioned to be the other day that Azure EventHubs come with the ability to interop with Kafka Producer/Consumer code with very little change. Naturally I could not skip trying that for myself, and I am a big fan of Kafka, and am actually using MSK (managed AWS Streaming Kafka service) at work right now.

 

There are a couple of good videos on this

And there is also this really good starting project

 

So probably the best place to start is to create yourself a new EventHub in the Azure portal. Once you have done that you will need to grab the following 2 bits of information

  • You will need the connection string from the portal as well as the FQDN that points to your Event Hub namespace. The FQDN can be found within your connection string as follows:

Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=XXXXXX;SharedAccessKey=XXXXXX

  • You will need the namespace portion of the connection string

 

Other considerations when creating the EventHub would be

  • Partition count : this indicates the the number of parallel consumers that you can have processing the events.
  • Message retention : this is how long you wish the message to be retained. Event Hub s don’t actually delete the messages, but rather expire them after the retention policy

 

Getting the starter project

So once you have done that, you can simply grab the code from this repo : https://github.com/Azure/azure-event-hubs-for-kafka/tree/master/quickstart/dotnet

Update the App.Config

Update the values of the EH_FQDN and EH_CONNECTION_STRING in App.config to direct the application to the Event Hubs Kafka endpoint with the correct authentication. Default values for the Event Hub/topic name (test) and consumer group ($Default) have been filled in, but feel free to change those as needed.

So for example this for my hub looks like this

<?xml version="1.0" encoding="utf-8" ?>
<configuration>
  <appSettings>
    <add key="EH_FQDN" value="sbkafkaeventhubtest.servicebus.windows.net:9093"/>
    <add key="EH_CONNECTION_STRING" value="Endpoint=sb://sbkafkaeventhubtest.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=DY96clYXl6MJINCbS0yBDN91h7EMDIsV4/a7FHz7ENY="/>
    <add key="EH_NAME" value="test"/>
    <add key="CONSUMER_GROUP" value="$Default"/>
    <add key="CA_CERT_LOCATION" value=".\cacert.pem"/>
  </appSettings>
</configuration>

One important note in there is the CA_CERT_LOCATION which points to a cacert.pem file which if you open you will see contains all the PUBLIC CA certifcates. This can actually be obtained using “cache” of CA certificates from the Mozilla project  here.

With all this in place what does the code look like?

Coding A Producer

Making sure that you have the Confluent.Kafka Nuget package a producer is as simple as this

public static async Task Producer(string brokerList, string connStr, string topic, string cacertlocation)
{
	try
	{
		var config = new ProducerConfig
		{
			BootstrapServers = brokerList,
			SecurityProtocol = SecurityProtocol.SaslSsl,
			SaslMechanism = SaslMechanism.Plain,
			SaslUsername = "$ConnectionString",
			SaslPassword = connStr,
			SslCaLocation = cacertlocation,
			//Debug = "security,broker,protocol"        //Uncomment for librdkafka debugging information
		};
		using (var producer = new ProducerBuilder<long, string>(config).SetKeySerializer(Serializers.Int64).SetValueSerializer(Serializers.Utf8).Build())
		{
			Console.WriteLine("Sending 10 messages to topic: " + topic + ", broker(s): " + brokerList);
			for (int x = 0; x < 10; x++)
			{
				var msg = string.Format("Sample message #{0} sent at {1}", x, DateTime.Now.ToString("yyyy-MM-dd_HH:mm:ss.ffff"));
				var deliveryReport = await producer.ProduceAsync(topic, new Message<long, string> { Key = DateTime.UtcNow.Ticks, Value = msg });
				Console.WriteLine(string.Format("Message {0} sent (value: '{1}')", x, msg));
			}
		}
	}
	catch (Exception e)
	{
		Console.WriteLine(string.Format("Exception Occurred - {0}", e.Message));
	}
}

The only changes to the code above versus connecting to a native Kafka broker list are these small changes, which are mandatory when using EventHub, but are optional (but recommended still) when using Kafka

  • SecurityProtocol = SecurityProtocol.SaslSsl
  • SaslMechanism = SaslMechanism.Plain
  • SaslUsername = “$ConnectionString”
  • SaslPassword = connStr
  • SslCaLocation = cacertlocation,

 

Coding A Consumer

And here is a consumer, where the same small set of changes are required, otherwise the code is exactly how it would be using native Kafka brokers

public static void Consumer(string brokerList, string connStr, string consumergroup, string topic, string cacertlocation)
{
	var config = new ConsumerConfig
	{
		BootstrapServers = brokerList,
		SecurityProtocol = SecurityProtocol.SaslSsl,
		SocketTimeoutMs = 60000,                //this corresponds to the Consumer config `request.timeout.ms`
		SessionTimeoutMs = 30000,
		SaslMechanism = SaslMechanism.Plain,
		SaslUsername = "$ConnectionString",
		SaslPassword = connStr,
		SslCaLocation = cacertlocation,
		GroupId = consumergroup,
		AutoOffsetReset = AutoOffsetReset.Earliest,
		BrokerVersionFallback = "1.0.0",        //Event Hubs for Kafka Ecosystems supports Kafka v1.0+, a fallback to an older API will fail
		//Debug = "security,broker,protocol"    //Uncomment for librdkafka debugging information
	};

	using (var consumer = new ConsumerBuilder<long, string>(config).SetKeyDeserializer(Deserializers.Int64).SetValueDeserializer(Deserializers.Utf8).Build())
	{
		CancellationTokenSource cts = new CancellationTokenSource();
		Console.CancelKeyPress += (_, e) => { e.Cancel = true; cts.Cancel(); };

		consumer.Subscribe(topic);

		Console.WriteLine("Consuming messages from topic: " + topic + ", broker(s): " + brokerList);

		while (true)
		{
			try
			{
				var msg = consumer.Consume(cts.Token);
				Console.WriteLine($"Received: '{msg.Value}'");
			}
			catch (ConsumeException e)
			{
				Console.WriteLine($"Consume error: {e.Error.Reason}");
			}
			catch (Exception e)
			{
				Console.WriteLine($"Error: {e.Message}");
			}
		}
	}
}

So to prove this all works, here is a small screen shot of it after a few runs where we can see some monitoring in the Azure portal for the EventHub showing the messages sent

AzureEVentHub

 

Final points

One final point I wanted to touch on, was just because the EventHub is able to accept Kafka messages from a Kafka Producer doesn’t mean you have to use the EventHub with a Kafka Consumer. You can still have the EventHub act like a regular EventHub for downstream services, but just get its inputs from a Kafka Producer. This is really nice I think

 

And that’s it, I am happy to see how easy this was to do, hope you enjoyed it

Leave a comment