AWS

AWS : Kinesis

What are we talking about this time?

This time we are going to talk about AWS Kinesis

 

Initial setup

If you did not read the very first part of this series of posts, I urge you to go and read that one now as it shows you how to get started with AWS, and create an IAM user : https://sachabarbs.wordpress.com/2018/08/30/aws-initial-setup/

 

Where is the code

The code for this post can be found here in GitHub : https://github.com/sachabarber/AWS/tree/master/Messaging/Kinesis

 

What is AWS Kinesis?

So last time we talked about SQS, and we saw that if you had multiple consumers, it was whoever managed to read the message from the queue first would be the one and only possible consumer of a message. Kinesis is very different to this.

 

Lets consider this text from the official AWS docs

Amazon Kinesis is a fully managed, cloud-based service for real-time data processing over large, distributed data streams. With Amazon Kinesis, data can be collected from many sources such as website clickstreams, IT logs, social media feeds, billing related transactions, and sensor readings from IoT devices.

 

After data has been stored in Amazon Kinesis, you can consume and process data with the KCL for data analysis, archival, real-time dashboards, and much more. While you can use Amazon Kinesis API functions to process stream data directly, the KCL takes care of many complex tasks associated with distributed processing and allows you to focus on the record processing logic. For example, the KCL can automatically load balance record processing across many instances, allow the user to checkpoint records that are already processed, and handle instance failures.

 

C# support for the KCL is implemented using the MultiLangDaemon.  The core of the KCL logic (communicating with Amazon Kinesis, load balancing, handling instance failure, etc.) resides in Java, and Amazon KCL for C# uses the multi-language daemon protocol to communicate with the Java daemon.

 

It can be seen above that there are 2 possible way of reading Kinesis data using .NET

  • Kinesis APIs
  • KCL

We will be looking at both of these when we get to looking at consumers

Kinesis is more comparable with something like Kafka (except Kafka is miles better, in so many ways, but lets not go there). This diagram should help to show you where Kinesis fits into the big picture

image

 

From this diagram we can see that a stream has multiple shards, and there may be multiple consumers of the stream data. This is different from SQS, where once a message was read by one consumer, it was just gone. We can also see that KCL consumers are supposed to run on EC2 instances, where the KCL can use load balancing, allow consumer checkpointing etc etc

This is all stuff you would expect to find in a decent stream based API.  Like I say if you want a top of the line streaming service, Kafka is your guy. But we are talking about AWS Kinesis so lets carry on the post for now.

 

IAM user privileges needed for Kinesis

For the demo code I added the following permissions to my IAM user to allow them to use AWS Kinesis

  • AmazonDynamoDBFullAccess
  • AmazonKinesisFullAccess
  • AmazonDynamoDBFullAccesswithDataPipeline
  • AmazonKinesisAnalyticsFullAccess

 

This is the official recommendation:

image

 

image

 

As this is just demo code for me, I was ok with using full access, in practice it may be good to have a dedicated producer IAM user/consumer IAM user.

 

Producer

Ok so as I stated above there are 2 consumer libraries that can be used with Kinesis, however the producer code is the same for both. So lets see an example of what a typical Producer looks like.

 

using Amazon.Kinesis.Model;
using System;
using System.Collections.Generic;
using System.IO;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Amazon.Kinesis.DataStreamproducer
{
    /// <summary>
    /// A sample producer of Kinesis records.
    /// </summary>
    class ProducerApp
    {


        private static readonly AmazonKinesisClient kinesisClient = 
            new AmazonKinesisClient(RegionEndpoint.EUWest2);
        const string myStreamName = "myTestStream";

        public static void Main(string[] args)
        {
            new ProducerApp().WriteToStream().GetAwaiter().GetResult();
        }

        private async Task WriteToStream()
        {

            const string myStreamName = "myTestStream";
            const int myStreamSize = 1;



            try
            {
                var createStreamRequest = new CreateStreamRequest();
                createStreamRequest.StreamName = myStreamName;
                createStreamRequest.ShardCount = myStreamSize;
                var createStreamReq = createStreamRequest;

                var existingStreams = await kinesisClient.ListStreamsAsync();

                if (!existingStreams.StreamNames.Contains(myStreamName))
                {

                    var CreateStreamResponse = await kinesisClient.CreateStreamAsync(createStreamReq);
                    Console.WriteLine("Created Stream : " + myStreamName);
                }
            }
            catch (ResourceInUseException)
            {
                Console.Error.WriteLine("Producer is quitting without creating stream " + myStreamName +
                    " to put records into as a stream of the same name already exists.");
                Environment.Exit(1);
            }

            await WaitForStreamToBecomeAvailableAsync(myStreamName);

            Console.Error.WriteLine("Putting records in stream : " + myStreamName);
            // Write 10 UTF-8 encoded records to the stream.
            for (int j = 0; j < 10; ++j)
            {
                byte[] dataAsBytes = Encoding.UTF8.GetBytes("testdata-" + j);
                using (MemoryStream memoryStream = new MemoryStream(dataAsBytes))
                {
                    try
                    {
                        PutRecordRequest requestRecord = new PutRecordRequest();
                        requestRecord.StreamName = myStreamName;
                        requestRecord.PartitionKey = "url-response-times";
                        requestRecord.Data = memoryStream;

                        PutRecordResponse responseRecord = 
                            await kinesisClient.PutRecordAsync(requestRecord);
                        Console.WriteLine("Successfully sent record to Kinesis. Sequence number: {0}", 
                            responseRecord.SequenceNumber);
                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine("Failed to send record to Kinesis. Exception: {0}", ex.Message);
                    }
                }
            }

            Console.ReadLine();

        }

        /// <summary>
        /// This method waits a maximum of 10 minutes for the specified stream to become active.
        /// <param name="myStreamName">Name of the stream whose active status is waited upon.</param>
        /// </summary>
        private static async Task WaitForStreamToBecomeAvailableAsync(string myStreamName)
        {
            var deadline = DateTime.UtcNow + TimeSpan.FromMinutes(10);
            while (DateTime.UtcNow < deadline)
            {
                DescribeStreamRequest describeStreamReq = new DescribeStreamRequest();
                describeStreamReq.StreamName = myStreamName;
                var describeResult = await kinesisClient.DescribeStreamAsync(describeStreamReq);
                string streamStatus = describeResult.StreamDescription.StreamStatus;
                Console.Error.WriteLine("  - current state: " + streamStatus);
                if (streamStatus == StreamStatus.ACTIVE)
                {
                    return;
                }
                Thread.Sleep(TimeSpan.FromSeconds(20));
            }

            throw new Exception("Stream " + myStreamName + " never went active.");
        }


    }
}

 

The main points above are that we use use a stream, which has a fixed number of partitions, and we are able to put records to it using the PutRecordAsync. There really isn’t too much more to talk about in the producer. The consumer however are fun

Consumer Data Streams API

So there is a low level (easier to use) API which you can use simply by adding a Nuget reference to AWSSDK.Kinesis, and then you can write code to iterate the shards and read all the messages from the shards contained with the Kinesis stream.  The code below is a complete Consumer using the Kinesis APIs

using Amazon.Kinesis.Model;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;

namespace Amazon.Kinesis.DataStreamConsumer
{
    /// <summary>
    /// A sample producer of Kinesis records.
    /// </summary>
    class ConsumerApp
    {
        private static readonly AmazonKinesisClient kinesisClient = 
            new AmazonKinesisClient(RegionEndpoint.EUWest2);
        const string myStreamName = "myTestStream";

        public static void Main(string[] args)
        {
            new ConsumerApp().ReadFromStream().GetAwaiter().GetResult();
        }

        private async Task ReadFromStream()
        {
            DescribeStreamRequest describeRequest = new DescribeStreamRequest();
            describeRequest.StreamName = myStreamName;

            DescribeStreamResponse describeResponse = 
                await kinesisClient.DescribeStreamAsync(describeRequest);
            List<Shard> shards = describeResponse.StreamDescription.Shards;

            foreach (Shard shard in shards)
            {
                GetShardIteratorRequest iteratorRequest = new GetShardIteratorRequest();
                iteratorRequest.StreamName = myStreamName;
                iteratorRequest.ShardId = shard.ShardId;
                iteratorRequest.ShardIteratorType = ShardIteratorType.TRIM_HORIZON;

                GetShardIteratorResponse iteratorResponse = await kinesisClient.GetShardIteratorAsync(iteratorRequest);
                string iteratorId = iteratorResponse.ShardIterator;

                while (!string.IsNullOrEmpty(iteratorId))
                {
                    GetRecordsRequest getRequest = new GetRecordsRequest();
                    getRequest.Limit = 1000;
                    getRequest.ShardIterator = iteratorId;

                    GetRecordsResponse getResponse = await kinesisClient.GetRecordsAsync(getRequest);
                    string nextIterator = getResponse.NextShardIterator;
                    List<Record> records = getResponse.Records;

                    if (records.Count > 0)
                    {
                        Console.WriteLine("Received {0} records. ", records.Count);
                        foreach (Record record in records)
                        {
                            string theMessage = Encoding.UTF8.GetString(record.Data.ToArray());
                            Console.WriteLine("message string: " + theMessage);
                        }
                    }
                    iteratorId = nextIterator;
                }
            }
        }

    }
}

If we ran the DataStreamProducer and DataStreamConsumer several times we would see output something like this, which shows the messages are persisted. There is a limit to this, which you must setup when you are creating your initial stream (again I prefer Kafka, but that’s just me)

 

image

Consumer KCL

This is a lot harder to get started with than the datastreams API (Nuget AWSSDK.Kinesis). So much so, that Amazon have pushed a starting kit for this, which you can find here: https://github.com/awslabs/amazon-kinesis-client-net/tree/bbe8abf6abb811f8b4c7469df7c1dd281dbb9caa

A typical c# record processor using KCL will look like this

using System;
using System.Collections.Generic;
using Amazon.Kinesis.ClientLibrary;

namespace Sample
{
    class SampleRecordProcessor : IRecordProcessor
    {
        public void Initialize(InitializationInput input)
        {
            // initialize
        }

        public void ProcessRecords(ProcessRecordsInput input)
        {
            // process batch of records (input.Records) and
            // checkpoint (using input.Checkpointer)
        }

        public void Shutdown(ShutdownInput input)
        {
            // cleanup
        }
    }

    class MainClass
    {
        public static void Main(string[] args)
        {
            KCLProcess.Create(new SampleRecordProcessor()).Run();
        }
    }
}

Which seems easy enough. However its not that easy, the bulk of the KCL is written in Java, and the .NET KCL communicates with the Java Daemon over StdIn/StdOut/StdError. As such there are quite a few steps you must follow to get a working C# KCL app up and running.

So what we really end up with is a C# consumer more like this:

/*
 * Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Amazon Software License (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 *  http://aws.amazon.com/asl/
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 */

using System;
using System.Collections.Generic;
using System.Threading;
using Amazon.Kinesis.ClientLibrary;

namespace Amazon.Kinesis.ClientLibrary.SampleConsumer
{
    /// <summary>
    /// A sample processor of Kinesis records.
    /// </summary>
    class SampleRecordProcessor : IRecordProcessor {

        /// <value>The time to wait before this record processor
        /// reattempts either a checkpoint, or the processing of a record.</value>
        private static readonly TimeSpan Backoff = TimeSpan.FromSeconds(3);

        /// <value>The interval this record processor waits between
        /// doing two successive checkpoints.</value>
        private static readonly TimeSpan CheckpointInterval = TimeSpan.FromMinutes(1);

        /// <value>The maximum number of times this record processor retries either
        /// a failed checkpoint, or the processing of a record that previously failed.</value>
        private static readonly int NumRetries = 10;

        /// <value>The shard ID on which this record processor is working.</value>
        private string _kinesisShardId;

        /// <value>The next checkpoint time expressed in milliseconds.</value>
        private DateTime _nextCheckpointTime = DateTime.UtcNow;

        /// <summary>
        /// This method is invoked by the Amazon Kinesis Client Library before records from the specified shard
        /// are delivered to this SampleRecordProcessor.
        /// </summary>
        /// <param name="input">
        /// InitializationInput containing information such as the name of the shard whose records this
        /// SampleRecordProcessor will process.
        /// </param>
        public void Initialize(InitializationInput input)
        {
            Console.Error.WriteLine("Initializing record processor for shard: " + input.ShardId);
            this._kinesisShardId = input.ShardId;
        }

        /// <summary>
        /// This method processes the given records and checkpoints using the given checkpointer.
        /// </summary>
        /// <param name="input">
        /// ProcessRecordsInput that contains records, a Checkpointer and contextual information.
        /// </param>
        public void ProcessRecords(ProcessRecordsInput input)
        {
            Console.Error.WriteLine("Processing " + input.Records.Count + " records from " + _kinesisShardId);

            // Process records and perform all exception handling.
            ProcessRecordsWithRetries(input.Records);

            // Checkpoint once every checkpoint interval.
            if (DateTime.UtcNow >= _nextCheckpointTime)
            {
                Checkpoint(input.Checkpointer);
                _nextCheckpointTime = DateTime.UtcNow + CheckpointInterval;
            }
        }

        /// <summary>
        /// This shuts down the record processor and checkpoints the specified checkpointer.
        /// </summary>
        /// <param name="input">
        /// ShutdownContext containing information such as the reason for shutting down the record processor,
        /// as well as a Checkpointer.
        /// </param>
        public void Shutdown(ShutdownInput input)
        {
            Console.Error.WriteLine("Shutting down record processor for shard: " + _kinesisShardId);
            // Checkpoint after reaching end of shard, so we can start processing data from child shards.
            if (input.Reason == ShutdownReason.TERMINATE)
            {
                Checkpoint(input.Checkpointer);
            }
        }

        /// <summary>
        /// This method processes records, performing retries as needed.
        /// </summary>
        /// <param name="records">The records to be processed.</param>
        private void ProcessRecordsWithRetries(List<Record> records)
        {
            foreach (Record rec in records)
            {
                bool processedSuccessfully = false;
                string data = null;
                for (int i = 0; i < NumRetries; ++i) {
                    try {
                        // As per the accompanying AmazonKinesisSampleProducer.cs, the payload
                        // is interpreted as UTF-8 characters.
                        data = System.Text.Encoding.UTF8.GetString(rec.Data);

                        // Uncomment the following if you wish to see the retrieved record data.
                        //Console.WriteLine(
                        //    String.Format("=========== > Retrieved record:\n\tpartition key = {0},\n\tsequence number = {1},\n\tdata = {2}",
                        //    rec.PartitionKey, rec.SequenceNumber, data));

                        // Your own logic to process a record goes here.
                        Console.WriteLine("===================================================");
                        Console.WriteLine($"===========> Saw message : {data}");
                        Console.WriteLine("===================================================");

                        processedSuccessfully = true;
                        break;
                    } catch (Exception e) {
                        Console.Error.WriteLine("Exception processing record data: " + data, e);
                    }

                    //Back off before retrying upon an exception.
                    Thread.Sleep(Backoff);
                }

                if (!processedSuccessfully)
                {
                    Console.Error.WriteLine("Couldn't process record " + rec + ". Skipping the record.");
                }
            }
        }

        /// <summary>
        /// This checkpoints the specified checkpointer with retries.
        /// </summary>
        /// <param name="checkpointer">The checkpointer used to do checkpoints.</param>
        private void Checkpoint(Checkpointer checkpointer)
        {
            Console.Error.WriteLine("Checkpointing shard " + _kinesisShardId);

            // You can optionally provide an error handling delegate to be invoked when checkpointing fails.
            // The library comes with a default implementation that retries for a number of times with a fixed
            // delay between each attempt. If you do not provide an error handler, the checkpointing operation
            // will not be retried, but processing will continue.
            checkpointer.Checkpoint(RetryingCheckpointErrorHandler.Create(NumRetries, Backoff));
        }
    }

    class MainClass
    {
        /// <summary>
        /// This method creates a KclProcess and starts running an SampleRecordProcessor instance.
        /// </summary>
        public static void Main(string[] args)
        {
            try
            {
                KclProcess.Create(new SampleRecordProcessor()).Run();
            }
            catch (Exception e) {
                Console.Error.WriteLine("ERROR: " + e);
            }
        }
    }
}

 

To actually run a C# KCL example you should do this

1. Download the sample from here : https://github.com/awslabs/amazon-kinesis-client-net

2. Follow the getting started guide here : https://github.com/awslabs/amazon-kinesis-client-net/tree/bbe8abf6abb811f8b4c7469df7c1dd281dbb9caa#getting-started

3. Follow this checklist

  • Run the boostrapper project to obtain the relevant KCL MultiLanuage JAR files
  • Make sure you have environment variables for AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY for the bootstrap app
  • Make sure you have Java installed (and its in your path)
  • Ensured the bootstrap has these command line args to include “–properties kcl.properties –execute”
  • Make sure the kcl.properties file has the correct streamName/executableName/regionName (which should be changed to match your region for producer/AWS account, so eu-west-2 was mine)
  • Make sure the executableName includes the FULL path to the consumer library, mine looks like this in kcl.properties : executableName = dotnet C:\\Users\\sacha\\Desktop\\AWS-Playground\\AWS\\Messaging\\Kinesis\\KCL\\SampleConsumer\\bin\\Debug\\netcoreapp2.0\\SampleConsumer.dll

Once and only once you have done that should you be able to run the example SampleProducer and Bootstrap projects together, and you should see output like this

 

image

 

While you can use Amazon Kinesis API functions to process stream data directly, the KCL takes care of many complex tasks associated with distributed processing and allows you to focus on the record processing logic. For example, the KCL can automatically load balance record processing across many instances, allow the user to checkpoint records that are already processed, and handle instance failures.

When you start a KCL application, it calls the KCL to instantiate a worker. This call provides the KCL with configuration information for the application, such as the stream name and AWS credentials.

The KCL performs the following tasks:

  • Connects to the stream
  • Enumerates the shards
  • Coordinates shard associations with other workers (if any)
  • Instantiates a record processor for every shard it manages
  • Pulls data records from the stream
  • Pushes the records to the corresponding record processor
  • Checkpoints processed records
  • Balances shard-worker associations when the worker instance count changes
  • Balances shard-worker associations when shards are split or merged

Behind the scenes the KCL library uses the Stateless .NET state machine library to ensure the correctness of the implementation is met

How do we deploy a KCL .NET Core App to AWS?

The recommended advise seems to be to use AWS Elastic Beanstalk, and although there are not really any specific AWS KCL C# guides, these should help you out, though I have to say if you want to run something like a .NET Core Console app in Elastic Beanstalk (rather than what everyone shows the classic ASP .NET Core example) you may have to dig quite deep. The advise seems to be to run that on a dedicated EC2 instance, rather than use Elastic Beanstalk

 

dotnet eb deploy-environment --publish-options "--runtime win-x64"

 

Here is an example of a Elastic Beanstalk environment that I created to host .NET apps:

 

image

 

From here you can use the “Upload and Deploy” button

 

See ya later, not goodbye

Ok that’s it for now until the next post

4 thoughts on “AWS : Kinesis

  1. Hey – did you find a way to debug a KCL app on the .NET side? It’s easy enough to get running, but I haven’t found a way to debug my actual consumer since it’s being run by Java. I can’t seem to find the process even when it’s running to attach to. Thanks!

    1. No o have to say it’s a frigging nightmare

      Even toyed with using bridge library

      Buy have not tried in anger

      If yiu find a way please post your answer

      1. Ha – I will. I’d much rather do it natively using the API, but like you mention here the KCL handles so much under the hood it’s just not worth it. I’d love for someone to re-write the KCL in .NET so we can do it all natively without this silly multi lang demon.

Leave a comment