AWS

AWS Using Serveless Framework To Create A Lambda Function

What are we talking about this time?

This time we are going to talk about how to expose our AWS lambda function over HTTP, which is exactly the same as what I did in my last AWS article, the thing is I did it all by hand in that article. This time we are going to be talking about the “Serverless Framework”.

Initial setup

If you did not read the very first part of this series of posts, I urge you to go and read that one now as it shows you how to get started with AWS, and create an IAM user : https://sachabarbs.wordpress.com/2018/08/30/aws-initial-setup/

Where is the code

The code for this post can be found here in GitHub : https://github.com/sachabarber/AWS/tree/master/Compute/ServerlessFrameworkLambda

What Is The Serverless Framework?

Picture says a 1000ns words and all that, here is a quick intro picture to the Serverless Framework.

image

At its heart it is an abstraction layer between your code and cloud, where it is cloud agnostic, your code may not be, but serverless itself is, and can be used quite happily against Azure/AWS etc etc

You should be asking yourself just how it does this abstraction over these cloud vendors? That’s a pretty neat trick isn’t it? Why yes, lets see how it works.

Its all mainly down to this rather clever abstraction file called “serverless.yml” which tells the framework what it should provision for you, and how things communicate, and should be setup.

Here is the example one for this demo app which is a simple AWS Lambda exposes as a GET REST API using AWS API Gateway

# Welcome to Serverless!
#
# This file is the main config file for your service.
# It's very minimal at this point and uses default values.
# You can always add more config options for more control.
# We've included some commented out config examples here.
# Just uncomment any of them to get that config option.
#
# For full config options, check the docs:
#    docs.serverless.com
#
# Happy Coding!

service: ServerlessFrameworkLambda # NOTE: update this with your service name

# You can pin your service to only deploy with a specific Serverless version
# Check out our docs for more details
# frameworkVersion: "=X.X.X"

provider:
  name: aws
  runtime: dotnetcore2.1

# you can overwrite defaults here
#  stage: dev
  region: eu-west-2

# you can add statements to the Lambda function's IAM Role here
#  iamRoleStatements:
#    - Effect: "Allow"
#      Action:
#        - "s3:ListBucket"
#      Resource: { "Fn::Join" : ["", ["arn:aws:s3:::", { "Ref" : "ServerlessDeploymentBucket" } ] ]  }
#    - Effect: "Allow"
#      Action:
#        - "s3:PutObject"
#      Resource:
#        Fn::Join:
#          - ""
#          - - "arn:aws:s3:::"
#            - "Ref" : "ServerlessDeploymentBucket"
#            - "/*"

# you can define service wide environment variables here
#  environment:
#    variable1: value1

# you can add packaging information here
package:
  artifact: bin/release/netcoreapp2.1/deploy-package.zip
#  exclude:
#    - exclude-me.js
#    - exclude-me-dir/**

functions:
  hello:
    handler: CsharpHandlers::AwsDotnetCsharp.Handler::Hello

#    The following are a few example events you can configure
#    NOTE: Please make sure to change your handler code to work with those events
#    Check the event documentation for details
    events:
      - http:
          path: gettime
          method: get
          cors: true
#      - s3: ${env:BUCKET}
#      - schedule: rate(10 minutes)
#      - sns: greeter-topic
#      - stream: arn:aws:dynamodb:region:XXXXXX:table/foo/stream/1970-01-01T00:00:00.000
#      - alexaSkill: amzn1.ask.skill.xx-xx-xx-xx
#      - alexaSmartHome: amzn1.ask.skill.xx-xx-xx-xx
#      - iot:
#          sql: "SELECT * FROM 'some_topic'"
#      - cloudwatchEvent:
#          event:
#            source:
#              - "aws.ec2"
#            detail-type:
#              - "EC2 Instance State-change Notification"
#            detail:
#              state:
#                - pending
#      - cloudwatchLog: '/aws/lambda/hello'
#      - cognitoUserPool:
#          pool: MyUserPool
#          trigger: PreSignUp

#    Define function environment variables here
#    environment:
#      variable2: value2

# you can add CloudFormation resource templates here
#resources:
#  Resources:
#    NewResource:
#      Type: AWS::S3::Bucket
#      Properties:
#        BucketName: my-new-bucket
#  Outputs:
#     NewOutput:
#       Description: "Description for the output"
#       Value: "Some output value"

You can see from the commented lines, how you might configure some of the other functionality you may need, and the docs are pretty decent. You can see more examples here : https://github.com/serverless/examples

But isn’t this quite limited? No not really, for example this is what the AWS offering looks like for serverless functions. This is pretty much exactly what AWS offers without the use of Serverless Framework.

imageAnd just to contrast here is the Azure offering.

image

Now as I say all you care about is the code, and the serverless.yml file, that governs the deployment/update/rollback. Serverless Framework will deal with the cloud provider for you. But just how do you get started with this framework?

The rest of this post will talk you through that.

Installation

The Serverless Framework is a node based command line installation, as such you will need to install node if you don’t already have it, download it from https://nodejs.org/en/download/. Once you have downloaded that, simply open a Node command line window and install the Serverless Framework as follows

npm install -g serverless

Credentials

The next thing you will need to do is associate your cloud provider credentials with the Serverless Framework command line, which you can read about here : https://serverless.com/framework/docs/providers/aws/guide/credentials/

For AWS this would look something like this

serverless config credentials --provider aws --key ANN7EXAMPLE --secret wJalrXUtnFYEXAMPLEKEY

Create A Project

Ok now that you are that far is, you can now create a project. This is easily done as follows, where this one uses a AWS c# template.

serverless create --template aws-csharp --path myService

NOTE : I found that I could not include “.” in the name of my service

Changes I Made At This Point

At this point I updated the serverless.yml file to the one I showed a minute ago, and I then updated the C# function to match almost exactly with the code from my last AWS article

using Amazon.Lambda.APIGatewayEvents;
using Amazon.Lambda.Core;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Net;

[assembly:LambdaSerializer(typeof(Amazon.Lambda.Serialization.Json.JsonSerializer))]

namespace AwsDotnetCsharp
{
    public class Handler
    {
        ITimeProcessor processor = new TimeProcessor();

        public APIGatewayProxyResponse Hello(
           APIGatewayProxyRequest apigProxyEvent, ILambdaContext context)
        {
            LogMessage(context, "Processing request started");
            APIGatewayProxyResponse response;
            try
            {
                var result = processor.CurrentTimeUTC();
                response = CreateResponse(result);

                LogMessage(context, "Processing request succeeded.");
            }
            catch (Exception ex)
            {
                LogMessage(context,
                    string.Format("Processing request failed - {0}", ex.Message));
                response = CreateResponse(null);
            }

            return response;
        }

        APIGatewayProxyResponse CreateResponse(DateTime? result)
        {
            int statusCode = (result != null) ?
                (int)HttpStatusCode.OK :
                (int)HttpStatusCode.InternalServerError;

            string body = (result != null) ?
                JsonConvert.SerializeObject(result) : string.Empty;

            var response = new APIGatewayProxyResponse
            {
                StatusCode = statusCode,
                Body = body,
                Headers = new Dictionary<string, string>
                {
                    { "Content-Type", "application/json" },
                    { "Access-Control-Allow-Origin", "*" }
                }
            };
            return response;
        }

        /// 
<summary>
        /// Logs messages to cloud watch
        /// </summary>

        void LogMessage(ILambdaContext ctx, string msg)
        {
            ctx.Logger.LogLine(
                string.Format("{0}:{1} - {2}",
                    ctx.AwsRequestId,
                    ctx.FunctionName,
                    msg));
        }
    }

  
}

Package A Project

Once you have edited your code, and potentially the serverless.yml file you need to package it, which for a C# project means running the build.cmd command in PowerShell. Internally this runs the AWS dot net command line extensions for Lambda https://github.com/aws/aws-extensions-for-dotnet-cli so you may find you also need to install those too. See my last AWS article on details about how to do that

Deploy The Function

Open the node command prompt at the place where you have your serverless.yml file, and run serverless deploy, and you should see something like this, where it is provisioning the various cloud provider items needed by your serverless.yml fileimageLet’s go and have a look what it created.  imageOk so we see 1 matching AWS function. CoolimageLet’s drill in a bit further, on the matching functionimageWe see the function does indeed have a matching API Gateway (just like in the previous AWS post I did)imageOk so now lets drill into the API GatewayimageSo far so good, we can see the resource created matches what we specified in our serverless.yml fileimageSo lets test the endpoint. Woohoo we see the time, its working, and this was significantly less hassle than the last article were I had to jump into IAM settings, API Gateway configuration, Lambda configuration, publish from Visual Studio/command line wizards. 

We have only really scratched the surface of using the Serverless Framework for Lambda/functions in the cloud, as I say the documentation is pretty good, it really is worth a try if you have not played with it before.


Beware

See ya later, not goodbye

Ok that’s it for now until the next post

 

 



Advertisements
Lambdas / Anonomous delegates

Nice little trick when working with Expression API

So I like writing Expression tree API magic, but I am mortal and don’t find it that easy to do, and it normally takes me a while to do this. So I am up for any help that I can get.

I just learnt a neat little trick, which is that you can apply TypeDescriptor attributes at runtime, which is cool. This one thing allows us to use a fairly standard PropertyGrid to visualize Expression trees

For example here is a simple Winforms UI I crafted with a few Expressions to visualize and each one just gets set to the PropertyGrid SelectedObject when selected from the list

image

Ok its not perfect but it is good enough to help you out

Here is the relevant code from the simple Winforms UI

Expression<Func<string, string, int>> fun1 = (s1, s2) => s1.Length - s2.Length;

TypeDescriptor.AddAttributes(typeof(Expression),
    new TypeConverterAttribute(typeof(ExpandableObjectConverter)));


propertyGrid1.SelectedObject = fun1;
AWS

AWS Lambda exposed via ApiGateway

 

What are we talking about this time?

This time we are going to talk about how to expose our AWS lambda function over HTTP.  This is actually fairly simple to do so this will not be a big post, and will certainly build on what we saw in the last post where I introduced how to create and publish a new AWS lambda function.

Initial setup

If you did not read the very first part of this series of posts, I urge you to go and read that one now as it shows you how to get started with AWS, and create an IAM user : https://sachabarbs.wordpress.com/2018/08/30/aws-initial-setup/

Where is the code

The code for this post can be found here in GitHub : https://github.com/sachabarber/AWS/tree/master/Compute/Lambda.ApiGateway.DemoApp

What is AWS API Gateway?

Amazon API Gateway is an AWS service that enables developers to create, publish, maintain, monitor, and secure APIs at any scale. You can create APIs that access AWS or other web services, as well as data stored in the AWS Cloud.

image

In practical terms, API Gateway lets you create, configure, and host a RESTful API to enable applications to access the AWS Cloud. For example, an application can call an API in API Gateway to upload a user’s annual income and expense data to Amazon Simple Storage Service or Amazon DynamoDB, process the data in AWS Lambda to compute tax owed, and file a tax return via the IRS website.

As shown in the diagram, an app (or client application) gains programmatic access to AWS services, or a website on the internet, through one or more APIs, which are hosted in API Gateway. The app is at the API’s frontend. The integrated AWS services and websites are located at the API’s backend. In API Gateway, the frontend is encapsulated by method requests and method responses, and the backend is encapsulated by integration requests and integration responses.

With Amazon API Gateway, you can build an API to provide your users with an integrated and consistent developer experience to build AWS cloud-based applications.

https://docs.aws.amazon.com/apigateway/latest/developerguide/welcome.html up on date 07/10/18

Writing a Lambda that Is exposes via API Gateway

Ok so now that we know what an API Gateway is, how do we write a AWS Lambda to use it? Well as before there are APIGatewayEvents that can be used inside of a Lambda function. Lets see the relevant code shall we:

using System;
using System.Collections.Generic;
using System.Net;
using Amazon.Lambda.APIGatewayEvents;
using Amazon.Lambda.Core;
using Newtonsoft.Json;

// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.Json.JsonSerializer))]

namespace Lambda.ApiGateway.DemoApp
{
    public class Function
    {

        ITimeProcessor processor = new TimeProcessor();

        /// <summary>
        /// Default constructor. This constructor is used by Lambda to construct
        /// the instance. When invoked in a Lambda environment
        /// the AWS credentials will come from the IAM role associated with the
        /// function and the AWS region will be set to the
        /// region the Lambda function is executed in.
        /// </summary>
        public Function()
        {

        }

        public APIGatewayProxyResponse FunctionHandler(
            APIGatewayProxyRequest apigProxyEvent, ILambdaContext context)
        {
            LogMessage(context, "Processing request started");
            APIGatewayProxyResponse response;
            try
            {
                var result = processor.CurrentTimeUTC();
                response = CreateResponse(result);

                LogMessage(context, "Processing request succeeded.");
            }
            catch (Exception ex)
            {
                LogMessage(context, 
                    string.Format("Processing request failed - {0}", ex.Message));
                response = CreateResponse(null);
            }

            return response;
        }

        APIGatewayProxyResponse CreateResponse(DateTime? result)
        {
            int statusCode = (result != null) ?
                (int)HttpStatusCode.OK :
                (int)HttpStatusCode.InternalServerError;

            string body = (result != null) ?
                JsonConvert.SerializeObject(result) : string.Empty;

            var response = new APIGatewayProxyResponse
            {
                StatusCode = statusCode,
                Body = body,
                Headers = new Dictionary<string, string>
                {
                    { "Content-Type", "application/json" },
                    { "Access-Control-Allow-Origin", "*" }
                }
            };
            return response;
        }

        /// <summary>
        /// Logs messages to cloud watch
        /// </summary>
        void LogMessage(ILambdaContext ctx, string msg)
        {
            ctx.Logger.LogLine(
                string.Format("{0}:{1} - {2}",
                    ctx.AwsRequestId,
                    ctx.FunctionName,
                    msg));
        }

    }
}

It can be seen that we need to use a specialized APIGatewayProxyRequest/APIGatewayProxyResponse pair

In this example we are exposing the AWS Lambda as a GET only operation. If you wanted to accept POST/DELETE/PUT data you could use the APIGatewayProxyRequest.Body to get the data representing the request.

Ok, so now that we have the code, and lets assume that the Lambda has been published to AWS (see the last article for a detailed explanation of how to do that). For now lets assume we have published the above code to AWS, and we have it available in the AWS console, we would now need to configure the API Gateway part of it.

Which  starts with just telling the ApiGateway trigger which stage to run in. This is shown below

imageOnce we have configured the ApiGateway trigger for the published lambda, we should see it shown something like the screen shot below. We then need to move on to setting up the actual ApiGateway resources themselves and how they relate to the Lambda call.

image

We can do this either by following the link shown within the Api Gateway section of our lambda as shown above, or via the AWS console where we just search for the Api Gateway. Both paths are valid, and should lead you to a screen something like the one below. It is from this screen that we will add new resources.

imageSo we wish (at least for this demo) to create a GET resource that will call our Lambda. We can do this by using the Actions menu, and creating a new GET from the drop down options. We then setup the GET resource to call the lambda (the one for this demo). This is all shown in the screen shot below

imageOnce we have added the resource we should be able to test it out using the Test button (the one shown below with the lightning bolt on it). This will test the Api resource. So for this demo this should call the Lambda and GET a new time returned to the Api Gateway call, and we should see a status code of 200 (Ok)

imageSo if that tests out just fine, we are almost there. All we need to do now ensure that the Api Gateway is deployed. This can be done using the “Deploy API” menu option from the Api Gateway portal, as shown below.

imageWith all that done, we should be able to test our deployed Api Gateway pointing to our Lambda, so lets grab the public endpoint for the Api Gateway, which we can do my examining the Stages menu, then finding our resource (GET in this case) and getting hold of the Invoke Url.

imageSo for me this looks like this

imageCool looks like its working.

See ya later, not goodbye

Ok that’s it for now until the next post

AWS

AWS : Lambda

What are we talking about this time?

This time we are going to talk about AWS Lambda, believe it or not this is a fairly big subject for such a simple topic.This will probably require a few posts, so this is what I would like to cover I think

  • SQS source Lambda writing to S3 bucket
  • Using Serverless framework
  • Kinesis Firehose Lambda writing to S3 bucket
  • Step functions using Lambda functions

Initial setup

If you did not read the very first part of this series of posts, I urge you to go and read that one now as it shows you how to get started with AWS, and create an IAM user : https://sachabarbs.wordpress.com/2018/08/30/aws-initial-setup/

Where is the code

The code for this post can be found here in GitHub : https://github.com/sachabarber/AWS/tree/master/Compute/Lambda.SQS.DemoApp

What is AWS Lambda?

AWS Lambda is an event-driven, serverless computing platform provided by Amazon as a part of the Amazon Web Services. It is a computing service that runs code in response to events and automatically manages the computing resources required by that code. There are of course servers behind the scenes, but these are provisioned when needed, and you are only paying for what time you use of their compute resource.

AWS Lambda Limits

There are some limitations when using Lambda that one should be aware of before getting started. These are shown below

image

image

AWS Lambda will dynamically scale capacity in response to increased traffic, subject to the concurrent executions limit noted previously. To handle any burst in traffic, AWS Lambda will immediately increase your concurrently executing functions by a predetermined amount, dependent on which region it’s executed, as noted below:

image

Getting started with AWS Lambda

In this post we are going to build this pipeline in AWS

image

It is worth noting that there are MANY sources for Lambda, you can find out more here : https://github.com/aws/aws-lambda-dotnet, but for those of you that just want to know now, here is the current list

image

Don’t try and click this it’s a screenshot

AWS Toolkit

So the easiest way to get started with your own Lambda function is using the AWS Toolkit, which if you followed part1 you would have installed. So lets use the wizard to create a SQS triggered Lambda.

image

Once you have run through this wizard you will be left with the shell of a Lambda project that is triggered via a SQS event, and has unit tests for that. I however wanted my Lambda to write to S3, and I also wanted a SQS publisher that I could use to push messages to my SQS queue to test my lambda for real later on.

So the final solution for me looks like this

image

SQS Publisher

Lets start with the simple bit, the SQS Publisher, this is as follows

using System;
using Amazon.SQS;
using Amazon.SQS.Model;
using Nito.AsyncEx;

namespace SQSSPublisher
{
    class Program
    {
        private static bool _receieverShouldDeleteMessage = false;
        private static AmazonSQSClient _sqs = new AmazonSQSClient();
        private static string _myQueueUrl;
        private static string _queueName = "lamda-sqs-demo-app";

        static void Main(string[] args)
        {
            AsyncContext.Run(() => MainAsync(args));
        }

        static async void MainAsync(string[] args)
        {
            try
            {

                var listQueuesRequest = new ListQueuesRequest();
                var listQueuesResponse = await _sqs.ListQueuesAsync(listQueuesRequest);

                try
                {
                    Console.WriteLine($"Checking for a queue called {_queueName}.\n");
                    var resp = await _sqs.GetQueueUrlAsync(_queueName);
                    _myQueueUrl = resp.QueueUrl;

                }
                catch(QueueDoesNotExistException quex)
                {
                    //Creating a queue
                    Console.WriteLine($"Create a queue called {_queueName}.\n");
                    var sqsRequest = new CreateQueueRequest { QueueName = _queueName };
                    var createQueueResponse = await _sqs.CreateQueueAsync(sqsRequest);
                    _myQueueUrl = createQueueResponse.QueueUrl;
                }

                //Sending a message
                for (int i = 0; i < 2; i++)
                {
                    var message = $"This is my message text-Id-{Guid.NewGuid().ToString("N")}";
                    //var message = $"This is my message text";
                    Console.WriteLine($"Sending a message to MyQueue : {message}");
                    var sendMessageRequest = new SendMessageRequest
                    {
                        QueueUrl = _myQueueUrl, //URL from initial queue creation
                        MessageBody = message
                    };
                    await _sqs.SendMessageAsync(sendMessageRequest);
                }
            }
            catch (AmazonSQSException ex)
            {
                Console.WriteLine("Caught Exception: " + ex.Message);
                Console.WriteLine("Response Status Code: " + ex.StatusCode);
                Console.WriteLine("Error Code: " + ex.ErrorCode);
                Console.WriteLine("Error Type: " + ex.ErrorType);
                Console.WriteLine("Request ID: " + ex.RequestId);
            }

            Console.WriteLine("Press Enter to continue...");
            Console.Read();
        }


        
    }
}

Lambda

Now lets see the Lambda itself. Remember it will get triggered to run when a SQS event arrives in the queue its listening to, and it will write to s3 bucket.

using System;
using System.Linq;
using System.Threading.Tasks;
using Amazon.Lambda.Core;
using Amazon.Lambda.SQSEvents;
using Amazon.S3;
using Amazon.S3.Model;


// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.Json.JsonSerializer))]

namespace Lambda.SQS.DemoApp
{
    public class Function
    {
        static IAmazonS3 client;
        private static string bucketName = "lamda-sqs-demo-app-out-bucket";

        /// 
<summary>
        /// Default constructor. This constructor is used by Lambda to construct the instance. When invoked in a Lambda environment
        /// the AWS credentials will come from the IAM role associated with the function and the AWS region will be set to the
        /// region the Lambda function is executed in.
        /// </summary>

        public Function()
        {

        }


        /// 
<summary>
        /// This method is called for every Lambda invocation. This method takes in an SQS event object and can be used 
        /// to respond to SQS messages.
        /// </summary>

        /// <param name="evnt"></param>
        /// <param name="context"></param>
        /// <returns></returns>
        public async Task FunctionHandler(SQSEvent evnt, ILambdaContext context)
        {
            foreach(var message in evnt.Records)
            {
                await ProcessMessageAsync(message, context);
            }
        }

        private async Task ProcessMessageAsync(SQSEvent.SQSMessage message, ILambdaContext context)
        {
            context.Logger.LogLine($"Processed message {message.Body}");

            using (client = new AmazonS3Client(Amazon.RegionEndpoint.EUWest2))
            {
                Console.WriteLine("Creating a bucket");
                await CreateABucketAsync(bucketName, false);
                Console.WriteLine("Writing message from SQS to bucket");
                await WritingAnObjectAsync(message.Body.ToUpper(), Guid.NewGuid().ToString("N").ToLower());
            }


            // TODO: Do interesting work based on the new message
            await Task.CompletedTask;
        }


        async Task WritingAnObjectAsync(string messageBody, string keyName)
        {
            await CarryOutAWSTask<Unit>(async () =>
            {
                // simple object put
                PutObjectRequest request = new PutObjectRequest()
                {
                    ContentBody = messageBody,
                    BucketName = bucketName,
                    Key = keyName
                };

                PutObjectResponse response = await client.PutObjectAsync(request);
                return Unit.Empty;
            }, "Writing object");
        }


        async Task CreateABucketAsync(string bucketToCreate, bool isPublic = true)
        {
            await CarryOutAWSTask<Unit>(async () =>
            {
                if(await BucketExists(bucketToCreate))
                {
                    Console.WriteLine($"{bucketToCreate} already exists, skipping this step");
                }

                PutBucketRequest putBucketRequest = new PutBucketRequest()
                {
                    BucketName = bucketToCreate,
                    BucketRegion = S3Region.EUW2,
                    CannedACL = isPublic ? S3CannedACL.PublicRead : S3CannedACL.Private
                };
                var response = await client.PutBucketAsync(putBucketRequest);
                return Unit.Empty;
            }, "Create a bucket");
        }


        async Task<bool> BucketExists(string bucketName)
        {
            return await CarryOutAWSTask<bool>(async () =>
            {
                ListBucketsResponse response = await client.ListBucketsAsync();
                return  response.Buckets.Select(x => x.BucketName).Contains(bucketName);
            }, "Listing buckets");
        }

        async Task<T> CarryOutAWSTask<T>(Func<Task<T>> taskToPerform, string op)
        {
            try
            {
                return await taskToPerform();
            }
            catch (AmazonS3Exception amazonS3Exception)
            {
                if (amazonS3Exception.ErrorCode != null &&
                    (amazonS3Exception.ErrorCode.Equals("InvalidAccessKeyId") ||
                    amazonS3Exception.ErrorCode.Equals("InvalidSecurity")))
                {
                    Console.WriteLine("Please check the provided AWS Credentials.");
                    Console.WriteLine("If you haven't signed up for Amazon S3, please visit http://aws.amazon.com/s3");
                }
                else
                {
                    Console.WriteLine($"An Error, number '{amazonS3Exception.ErrorCode}', " +
                                      $"occurred when '{op}' with the message '{amazonS3Exception.Message}'");
                }

                return default(T);
            }
        }


    }



    public class Unit
    {
        public static Unit Empty => new Unit();
    }
}

Tests

And this is what the tests look like, where we test the lamba input, and check the output is stored in s3

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

using Xunit;
using Amazon.Lambda.TestUtilities;
using Amazon.Lambda.SQSEvents;

using Lambda.SQS.DemoApp;
using Amazon.S3;
using Amazon.S3.Model;

namespace Lambda.SQS.DemoApp.Tests
{
    public class FunctionTest
    {
        private static string bucketName = "lamda-sqs-demo-app-out-bucket";


        [Fact]
        public async Task TestSQSEventLambdaFunction()
        {
            var sqsEvent = new SQSEvent
            {
                Records = new List<SQSEvent.SQSMessage>
                {
                    new SQSEvent.SQSMessage
                    {
                        Body = "foobar"
                    }
                }
            };

            var logger = new TestLambdaLogger();
            var context = new TestLambdaContext
            {
                Logger = logger
            };

            var countBefore = await CountOfItemsInBucketAsync(bucketName);

            var function = new Function();
            await function.FunctionHandler(sqsEvent, context);

            var countAfter = await CountOfItemsInBucketAsync(bucketName);

            Assert.Contains("Processed message foobar", logger.Buffer.ToString());

            Assert.Equal(1, countAfter - countBefore);
        }


        private async Task<int> CountOfItemsInBucketAsync(string bucketName)
        {
            using (var client = new AmazonS3Client(Amazon.RegionEndpoint.EUWest2))
            {
                ListObjectsRequest request = new ListObjectsRequest();
                request.BucketName = bucketName;
                ListObjectsResponse response = await client.ListObjectsAsync(request);
                return response.S3Objects.Count;
            }
        }
    }
}

Deploying the Lambda to AWS

We have a few options available to us here, Dotnet command line / VS2017

VS2017

So for now lets just right click the lambda project, and “Publish to AWS Lambda”. Follow this wizard will show something like this

image

image

There are several inbuilt roles to choose from to use to run your Lambda. I started with the AWSLambdaFullAccess. However I still need to add SQS and S3 to that. We will see how to do that below

Dot net tool

Run dotnet tool install -g Amazon.Lambda.Tools (you need .NET Core 2.1.3 or above) o grab the dot net core command line CLI. Once you have that installed you can create/deploy lambdas straight from the command line

Adjusting the deployed Lambda for extra policy requirements

So we started out using this AWSLambdaFullAccess role for our lambda, but now we need to create the execution policy. This is described here : https://docs.aws.amazon.com/lambda/latest/dg/with-sqs-create-execution-role.html

But is easier to locate the role for the lambda you just created and give it the extra permissions using the IAM console, where you grab the ARNs for the SQS queue and S3 buckets etc etc

image

Testing it out

In AWS Console using test events

We can use the AWS console and use test events (not SQS events), and we can see that it all looks good, we have a nice green panel for the test execution

image

And we can check out the s3 bucket to ensure we got the expected output.

image

Cool, this looks good.

Use the real SQS source

So how about the real input from SQS?Well we can go back the AWS lamdba console, and configure the SQS trigger

image

Where we need to fill in the SQS event configuration for the lambda, so it knows what queue to use

image

Once we have done that, and we have made sure the role our Lambda is using is ok, the AWS lambda console should show something like this where we have a nicely configured SQS event trigger

image

Ok now we just run the SQSPublisher in the solution, check the s3 bucket, and wham we see 2 new messages. hooray

image

 

image

This corresponds nicely to the code in the SQSPublisher……….Its only working, YAY

image

See ya later, not goodbye

Ok that’s it for now until the next post

JS

ParcelJs vs Webpack

Now those of you that follow my blog will know I normally don’t blog about JavaScript. That is not to say I don’t try and keep an eye on it.

Not being big headed here, I am in fact fairly ok with talking Webpack/React/Redux/Babel with seasoned WebDevs, and have done a few projects using the following bits and bobs

  • Babel
  • SCSS (compass CSS)
  • Typescript
  • React
  • Bootstrap
  • ES6 modules

My defacto setup has always to use these tools:

  • NPM for module installation
  • Webpack
    • Modules
    • Source maps
    • Minification
    • Hashing of file names
    • Running one transpiler through another (Typescript –> Babel –> JS)
  • Babel for polyfilling features

I have been fairly happy with this in the past. Ok from time to time you have to figure out why LibraryX doesn’t play nice with LibraryY, which is a right PITA, but eventually you get there (if you ignore major updates to Webpack which seemed to break a lot of stuff)

Typical Webpack.config

This has been a pretty typical webpack.config file (for DEV I have a different one for PROD, where I minify/have no source map etc etc)

let path = require('path');

let ExtractTextPlugin = require('extract-text-webpack-plugin');

let babelOptions = {
    "presets": ["es2015", "react"]
};

let entries = {
    index: './src/index.tsx'
};


module.exports = {
    entry: entries,
    context: __dirname,
    resolve: {
        extensions: [".tsx", ".ts", ".js", ".jsx"],
        modules: [path.resolve(__dirname, "src"), "node_modules"]
    },
    mode: 'development',
    devtool: "eval-source-map",
    output: {
        filename: 'bundle.js',
        path: path.resolve(__dirname, 'dist')
    },
    plugins: [

        //scss/sass files extracted to common css bundle
        new ExtractTextPlugin({
            filename: '[name].bundle.[hash].css',
            allChunks: true,
            disable: true
        }),
    ],
    module: {
        rules: [
            // All files with a '.ts' or '.tsx' extension will be handled by 'awesome-typescript-loader' 1st 
            // then 'babel-loader'
            // NOTE : loaders run right to left (think of them as a cmd line pipe)
            {
                test: /\.ts(x?)$/,
                exclude: /node_modules/,
                use: [
                    {
                        loader: 'babel-loader',
                        options: babelOptions
                    },
                    {
                        loader: 'awesome-typescript-loader'
                    }
                ]
            },


            // All files with a .scss|.sass extenson will be handled by 'sass-loader'
            {
                test: /\.(sass|scss)$/,
                exclude: /node_modules/,
                //loader: ExtractTextPlugin.extract(['css-loader', 'sass-loader'])
                loader: ['style-loader','css-loader', 'sass-loader']
            },

            // All files with a '.js' extension will be handled by 'babel-loader'.
            {
                test: /\.js$/,
                exclude: /node_modules/,
                use: [
                    {
                        loader: 'babel-loader',
                        options: babelOptions
                    }
                ]
            },

            {
                test: /\.png$/,
                include: /node_modules/,
                loader: "url-loader?limit=100000"
            },

            {
                test: /\.jpg$/,
                include: /node_modules/,
                loader: "file-loader"
            },

            {
                test: /\.woff(\?.*)?$/,
                include: /node_modules/,
                loader: 'url-loader?prefix=fonts/&name=fonts/[name].[ext]&limit=10000&mimetype=application/font-woff'
            },

            {
                test: /\.woff2(\?.*)?$/,
                include: /node_modules/,
                loader: 'url-loader?prefix=fonts/&name=fonts/[name].[ext]&limit=10000&mimetype=application/font-woff2'
            },

            {
                test: /\.ttf(\?.*)?$/,
                include: /node_modules/,
                loader: 'url-loader?prefix=fonts/&name=fonts/[name].[ext]&limit=10000&mimetype=application/octet-stream'
            },

            {
                test: /\.eot(\?.*)?$/, loader: 'file-loader?prefix=fonts/&name=fonts/[name].[ext]'
            },

            {
                test: /\.svg(\?.*)?$/,
                include: /node_modules/,
                loader: 'url-loader?prefix=fonts/&name=fonts/[name].[ext]&limit=10000&mimetype=image/svg+xml'
            },

            // All output '.js' files will have any sourcemaps re-processed by 'source-map-loader'.
            {
                enforce: "pre",
                test: /\.js$/,
                loader: "source-map-loader"
            }
        ]
    }
};

 

So I was working with our in-house web expert, who said “hey you should have a look at ParcelJS, it’s much easier than WebPack”.

ParcelJS

My initial reaction was ok, I’ll take a look……This is the first thing you see when you go to its website

image

ZERO? Did I just read that right? ZERO it said, yep ZERO. Ok lets put that to the test

I have this small github repo which I used to test it : https://github.com/sachabarber/ParcelJS-demo

image

This repo contains the following

  • Simple HTML page
  • NPM packages.json
    • React
    • Bootstrap
    • ….
  • Some custom React TSX (Typescript react files)
  • Some custom SCSS files

Lets look at some of these, in reverse chronological order

Custom SCSS files

Here is one of my SCSS files see how it uses ES6 imports

@import './variables.scss';

body {
    background-color: $body-background;
    font-size:12px;
}

 

And here is an example Typescript React file, again note the imports, and the fact it is actually TypeScript here, and that this file also imports a node_modules BootStrap.css file



import * as React from "react";
import * as ReactDOM from "react-dom";
import "/node_modules/bootstrap/dist/css/bootstrap.min.css";


import {
    Nav,
    Navbar,
    NavItem,
    NavDropdown,
    MenuItem,
    Button
} from "react-bootstrap";
import { Router, Route, hashHistory } from 'react-router';
import { Details } from "./Details";
import { Orders } from "./Orders";
import "../scss/index.scss";


class MainNav extends React.Component<undefined, undefined> {

    constructor(props: any) {
        super(props);
    }

    render() {
        return (
			<Navbar collapseOnSelect>
				<Navbar.Header>
					<Navbar.Brand>
						<span>Simple React-Redux example</span>
					</Navbar.Brand>
					<Navbar.Toggle />
				</Navbar.Header>
				<Navbar.Collapse>
					
<Nav pullRight>
                        <NavItem eventKey={2} href='#/detailsPage'>Details</NavItem>
                        <NavItem eventKey={2} href='#/ordersPage'>Orders</NavItem>
					</Nav>

				</Navbar.Collapse>
			</Navbar> 
        )
    }
}

class App extends React.Component<undefined, undefined> {
    render() {
        return (
            
<div>
                
<div>
                    <MainNav/>
                    {this.props.children}
                </div>

            </div>

        )
    }
}


ReactDOM.render((
    <Router history={hashHistory}>
        <Route component={App}>
            <Route path="/" component={Details} />
            <Route path="/detailsPage" component={Details} />
            <Route path="/ordersPage" component={Orders} />
        </Route>
    </Router>
), document.getElementById('root'));
</pre>

HTML file

This is the HTML file I use for the app. Notice how there is NO CSS / JS in the head section at all. All there is, is a single reference to the main entry point JSX file Index.tsx. That is it.

 

<html>

<head>
	
</head>
<body>
  <div id="root"></div>
  <script src="./src/index.tsx"></script>
</body>
</html>

 

Running it all

So that’s it, you will not find ANY configuration what so ever. ParcelJS just deals with it. In order to test this out lets do the following

Ensure you have done the following

  • npm install
  • npm install -g parcel-bundler
  • open directory to wwwroot folder in nodeJS command prompt and issue this command parcel index.html
  • open browser and you should see the following

alt text

WOW that’s pretty nuts. What does this prove? Well quite a lot actually:

  • ES6 modules are working with ZERO config
  • Our entry point Js file is working with ZERO config
  • Typescripts –> Babel –> standard Js transpiling is working with ZERO config
  • SCSS transpiling is working with ZERO config

 

That’s not all, See how source maps are on my default

 

alt text

What Does It Actually Produce With This Magic?

There are some sensible defaults at play with Parcel, we just saw one of them auto Source-Maps unless stated otherwise. Where stuff gets generated is another sensible default of “dist

Lets have a look at the default output see what we get:

image

  • Boostrap glyphs
  • Some hash named CSS/JS files (good as we will see cache misses when these change names)
  • Source map
  • And we get this HTML file created all without the use of the HtmlWebpackPlugin

 

<html>

<head>

<link rel="stylesheet" href="/src.aeb21ac6.css"></head>
<body>
<div id="root"></div>
<script src="/src.aeb21ac6.js"></script>
</body>
</html>

 

Lets just take a breather there. I did not write one single line of config (go check out the Webpack.config file again), I seem to have the same functionality as that, without writing one line of stuff. That’s pretty A-MAZING if you ask me.

PROD Release

For prod you would likely want to use the following command parcel index.html –no-source-maps. You can read more on the ParcelJs CLI section : https://parceljs.org/cli.html

Conclusion

I have to say it does what it says on the tin. I guess if you want really fine grained control over EVERYTHING, WebPack will do you proud, but if you are happy with some  BLOODY SENSIBLE DEFAULTS being applied I have to say I was up and running with Parcel in minutes.

Its awesome

AWS

AWS Deploying ASP .NET Core app to Elastic Beanstalk

What are we talking about this time?

This time we are going to talk about AWS Elastic Beanstalk and how we can use it to deploy a scalable load balanced web site

Initial setup

If you did not read the very first part of this series of posts, I urge you to go and read that one now as it shows you how to get started with AWS, and create an IAM user : https://sachabarbs.wordpress.com/2018/08/30/aws-initial-setup/

Where is the code

The code for this post can be found here in GitHub : https://github.com/sachabarber/AWS/tree/master/Compute/ElasticBeanstalk/WebApiForElasticBeanstalk

What is Elastic Beanstalk?

Elastic Beanstalk is one of the AWS compute services. It comes with several platform languages supported. In other languages such as Java there is a web kind of role and a worker kind of role. However for .NET there is ONLY a IIS web kind of role. However don’t let that put you off, The whole planet like web sites of late it would seem, and it just so happens Elastic Beanstalk is a perfect fit for these type of apps.

Elastic Beanstalk has the following architecture

Image result for elastic beanstalk

It can be seen that EC2 instance are part of an Auto Scale Group (scalability) and we also get a load balancer out of the box with a single URI endpoint for our Elastic Beanstalk app, which will load balance amongst the running web apps hosted on the EC2 instances which are all running the web app. This is quite a lot of stuff that is good to have for free. This sort of stuff is quite hard to configure by yourself, so this is quite cool.

Deploying from Visual Studio

So when you create a new .NET Core Web App (The demo uses a standard .NET Core WebApi project) you can publish it straight to Elastic Beanstalk straight from Visual Studio.

image

This is obviously thanks to the AWS Toolkit (which I have installed and talk about in the 1st article in this series). This will launch a wizard, which looks like this

image

You can choose to create a new application environment or use one that you previously created

image

We can then pick a name for the application and its URI that will be publicly available

image

You then pick your hardware requirements (EC2 instance types)

image

You then pick your application permissions / Service permissions

image

We then pick our application options

When you Finish the wizard you should see something like this screen

image

The entire deployment takes about 10 minutes to do, so be patient. One thing to note is that the Visual Studio deployment also creates the aws-beanstalk-tools-defaults.json file to aid in the final application deployment to AWS. This is its contents for this demo app

{
    "comment" : "This file is used to help set default values when using the dotnet CLI extension Amazon.ElasticBeanstalk.Tools. For more information run \"dotnet eb --help\" from the project root.",
    "profile" : "default",
    "region"  : "eu-west-2",
    "application" : "WebApiForElasticBeanstalk",
    "environment" : "WebApiForElasticBeanstalk-dev",
    "cname"       : "webapiforelasticbeanstalk-dev",
    "solution-stack" : "64bit Windows Server 2016 v1.2.0 running IIS 10.0",
    "environment-type" : "SingleInstance",
    "instance-profile" : "aws-elasticbeanstalk-ec2-role",
    "service-role"     : "aws-elasticbeanstalk-service-role",
    "health-check-url" : "/",
    "instance-type"    : "t2.micro",
    "key-pair"         : "",
    "iis-website"      : "Default Web Site",
    "app-path"         : "/",
    "enable-xray"      : false
}

This is what is happening behind the scenes

Launching an environment creates the following resources:

  • EC2 instance – An Amazon Elastic Compute Cloud (Amazon EC2) virtual machine configured to run web apps on the platform that you choose.

    Each platform runs a specific set of software, configuration files, and scripts to support a specific language version, framework, web container, or combination thereof. Most platforms use either Apache or nginx as a reverse proxy that sits in front of your web app, forwards requests to it, serves static assets, and generates access and error logs.

  • Instance security group – An Amazon EC2 security group configured to allow ingress on port 80. This resource lets HTTP traffic from the load balancer reach the EC2 instance running your web app. By default, traffic isn’t allowed on other ports.
  • Load balancer – An Elastic Load Balancing load balancer configured to distribute requests to the instances running your application. A load balancer also eliminates the need to expose your instances directly to the internet.
  • Load balancer security group – An Amazon EC2 security group configured to allow ingress on port 80. This resource lets HTTP traffic from the internet reach the load balancer. By default, traffic isn’t allowed on other ports.
  • Auto Scaling group – An Auto Scaling group configured to replace an instance if it is terminated or becomes unavailable.
  • Amazon S3 bucket – A storage location for your source code, logs, and other artifacts that are created when you use Elastic Beanstalk.
  • Amazon CloudWatch alarms – Two CloudWatch alarms that monitor the load on the instances in your environment and are triggered if the load is too high or too low. When an alarm is triggered, your Auto Scaling group scales up or down in response.
  • AWS CloudFormation stack – Elastic Beanstalk uses AWS CloudFormation to launch the resources in your environment and propagate configuration changes. The resources are defined in a template that you can view in the AWS CloudFormation console.
  • Domain name – A domain name that routes to your web app in the formsubdomain.region.elasticbeanstalk.com.

All of these resources are managed by Elastic Beanstalk. When you terminate your environment, Elastic Beanstalk terminates all the resources that it contains.

 

Taken from https://docs.aws.amazon.com/elasticbeanstalk/latest/dg/dotnet-core-tutorial.html#dotnet-core-tutorial-deploy up on date 19/09/18

Deploying from the command line

Ok so we can deploy from Visual Studio, this is cool, but it certainly won’t cut the mustard for CI purposes. Luckily AWS exposes some new .NET Core commands that we can use to deploy to Elastic Beanstalk. We need to grab the CLI tools to do this

Get the AWS Dot Net Core CLI tools

You will need .NET Core SDk 2.1.300 and later (dotnet tool is only available in 2.1.3 or later), once you have that installed you should be able to run this command dotnet tool install -g Amazon.ElasticBeanstalk.Tools to install the AWS Elastic Beanstalk dotnet commands. You can read more about this here : https://github.com/aws/aws-extensions-for-dotnet-cli/blob/master/README.md

So once you have these you should be able to change to the directory that contains your app to deploy and use this command line dotnet eb deploy-environment and follow the command line prompts. You may see an error message about S3 bucket not existing, which is easy enough to fix, just look at what bucket it was trying to create and go create it, it should be a Non-Public / Private bucket

Checking the deployment

So as we saw above a deployment to Elastic Beanstalk did quite a few things, so we should be able to check out the following

That the binaries are in S3 bucket

When we deploy an app to Elastic Beanstalk the binaries are placed into a S3 bucket, so we should see that has been created for us

image

Where we can drill into the bucket and see these files

image

The Elastic Beanstalk Env

We can use the Elastic Beankstalk console https://console.aws.amazon.com/elasticbeanstalk to see if our environment is deployed ok. We should see something like this

image

One thing that is interesting is if we go into the environment (green box)

image

Then use the Actions button at the top right, we can save this environment, so we can use this as a blueprint for a new environment.

Other cool stuff is that we can look at the Configuration, request logs, look at monitoring, setup alarms etc etc

Checking the deployed application works

Using the URL for our app, we should be able to test out our standard WebApi project. Lets give it a go

image

All looking good. So there you go. That’s all pretty cool I think

See ya later, not goodbye

Ok that’s it for now until the next post

AWS

AWS : Kinesis

What are we talking about this time?

This time we are going to talk about AWS Kinesis

 

Initial setup

If you did not read the very first part of this series of posts, I urge you to go and read that one now as it shows you how to get started with AWS, and create an IAM user : https://sachabarbs.wordpress.com/2018/08/30/aws-initial-setup/

 

Where is the code

The code for this post can be found here in GitHub : https://github.com/sachabarber/AWS/tree/master/Messaging/Kinesis

 

What is AWS Kinesis?

So last time we talked about SQS, and we saw that if you had multiple consumers, it was whoever managed to read the message from the queue first would be the one and only possible consumer of a message. Kinesis is very different to this.

 

Lets consider this text from the official AWS docs

Amazon Kinesis is a fully managed, cloud-based service for real-time data processing over large, distributed data streams. With Amazon Kinesis, data can be collected from many sources such as website clickstreams, IT logs, social media feeds, billing related transactions, and sensor readings from IoT devices.

 

After data has been stored in Amazon Kinesis, you can consume and process data with the KCL for data analysis, archival, real-time dashboards, and much more. While you can use Amazon Kinesis API functions to process stream data directly, the KCL takes care of many complex tasks associated with distributed processing and allows you to focus on the record processing logic. For example, the KCL can automatically load balance record processing across many instances, allow the user to checkpoint records that are already processed, and handle instance failures.

 

C# support for the KCL is implemented using the MultiLangDaemon.  The core of the KCL logic (communicating with Amazon Kinesis, load balancing, handling instance failure, etc.) resides in Java, and Amazon KCL for C# uses the multi-language daemon protocol to communicate with the Java daemon.

 

It can be seen above that there are 2 possible way of reading Kinesis data using .NET

  • Kinesis APIs
  • KCL

We will be looking at both of these when we get to looking at consumers

Kinesis is more comparable with something like Kafka (except Kafka is miles better, in so many ways, but lets not go there). This diagram should help to show you where Kinesis fits into the big picture

image

 

From this diagram we can see that a stream has multiple shards, and there may be multiple consumers of the stream data. This is different from SQS, where once a message was read by one consumer, it was just gone. We can also see that KCL consumers are supposed to run on EC2 instances, where the KCL can use load balancing, allow consumer checkpointing etc etc

This is all stuff you would expect to find in a decent stream based API.  Like I say if you want a top of the line streaming service, Kafka is your guy. But we are talking about AWS Kinesis so lets carry on the post for now.

 

IAM user privileges needed for Kinesis

For the demo code I added the following permissions to my IAM user to allow them to use AWS Kinesis

  • AmazonDynamoDBFullAccess
  • AmazonKinesisFullAccess
  • AmazonDynamoDBFullAccesswithDataPipeline
  • AmazonKinesisAnalyticsFullAccess

 

This is the official recommendation:

image

 

image

 

As this is just demo code for me, I was ok with using full access, in practice it may be good to have a dedicated producer IAM user/consumer IAM user.

 

Producer

Ok so as I stated above there are 2 consumer libraries that can be used with Kinesis, however the producer code is the same for both. So lets see an example of what a typical Producer looks like.

 

using Amazon.Kinesis.Model;
using System;
using System.Collections.Generic;
using System.IO;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Amazon.Kinesis.DataStreamproducer
{
    /// <summary>
    /// A sample producer of Kinesis records.
    /// </summary>
    class ProducerApp
    {


        private static readonly AmazonKinesisClient kinesisClient = 
            new AmazonKinesisClient(RegionEndpoint.EUWest2);
        const string myStreamName = "myTestStream";

        public static void Main(string[] args)
        {
            new ProducerApp().WriteToStream().GetAwaiter().GetResult();
        }

        private async Task WriteToStream()
        {

            const string myStreamName = "myTestStream";
            const int myStreamSize = 1;



            try
            {
                var createStreamRequest = new CreateStreamRequest();
                createStreamRequest.StreamName = myStreamName;
                createStreamRequest.ShardCount = myStreamSize;
                var createStreamReq = createStreamRequest;

                var existingStreams = await kinesisClient.ListStreamsAsync();

                if (!existingStreams.StreamNames.Contains(myStreamName))
                {

                    var CreateStreamResponse = await kinesisClient.CreateStreamAsync(createStreamReq);
                    Console.WriteLine("Created Stream : " + myStreamName);
                }
            }
            catch (ResourceInUseException)
            {
                Console.Error.WriteLine("Producer is quitting without creating stream " + myStreamName +
                    " to put records into as a stream of the same name already exists.");
                Environment.Exit(1);
            }

            await WaitForStreamToBecomeAvailableAsync(myStreamName);

            Console.Error.WriteLine("Putting records in stream : " + myStreamName);
            // Write 10 UTF-8 encoded records to the stream.
            for (int j = 0; j < 10; ++j)
            {
                byte[] dataAsBytes = Encoding.UTF8.GetBytes("testdata-" + j);
                using (MemoryStream memoryStream = new MemoryStream(dataAsBytes))
                {
                    try
                    {
                        PutRecordRequest requestRecord = new PutRecordRequest();
                        requestRecord.StreamName = myStreamName;
                        requestRecord.PartitionKey = "url-response-times";
                        requestRecord.Data = memoryStream;

                        PutRecordResponse responseRecord = 
                            await kinesisClient.PutRecordAsync(requestRecord);
                        Console.WriteLine("Successfully sent record to Kinesis. Sequence number: {0}", 
                            responseRecord.SequenceNumber);
                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine("Failed to send record to Kinesis. Exception: {0}", ex.Message);
                    }
                }
            }

            Console.ReadLine();

        }

        /// <summary>
        /// This method waits a maximum of 10 minutes for the specified stream to become active.
        /// <param name="myStreamName">Name of the stream whose active status is waited upon.</param>
        /// </summary>
        private static async Task WaitForStreamToBecomeAvailableAsync(string myStreamName)
        {
            var deadline = DateTime.UtcNow + TimeSpan.FromMinutes(10);
            while (DateTime.UtcNow < deadline)
            {
                DescribeStreamRequest describeStreamReq = new DescribeStreamRequest();
                describeStreamReq.StreamName = myStreamName;
                var describeResult = await kinesisClient.DescribeStreamAsync(describeStreamReq);
                string streamStatus = describeResult.StreamDescription.StreamStatus;
                Console.Error.WriteLine("  - current state: " + streamStatus);
                if (streamStatus == StreamStatus.ACTIVE)
                {
                    return;
                }
                Thread.Sleep(TimeSpan.FromSeconds(20));
            }

            throw new Exception("Stream " + myStreamName + " never went active.");
        }


    }
}

 

The main points above are that we use use a stream, which has a fixed number of partitions, and we are able to put records to it using the PutRecordAsync. There really isn’t too much more to talk about in the producer. The consumer however are fun

Consumer Data Streams API

So there is a low level (easier to use) API which you can use simply by adding a Nuget reference to AWSSDK.Kinesis, and then you can write code to iterate the shards and read all the messages from the shards contained with the Kinesis stream.  The code below is a complete Consumer using the Kinesis APIs

using Amazon.Kinesis.Model;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;

namespace Amazon.Kinesis.DataStreamConsumer
{
    /// <summary>
    /// A sample producer of Kinesis records.
    /// </summary>
    class ConsumerApp
    {
        private static readonly AmazonKinesisClient kinesisClient = 
            new AmazonKinesisClient(RegionEndpoint.EUWest2);
        const string myStreamName = "myTestStream";

        public static void Main(string[] args)
        {
            new ConsumerApp().ReadFromStream().GetAwaiter().GetResult();
        }

        private async Task ReadFromStream()
        {
            DescribeStreamRequest describeRequest = new DescribeStreamRequest();
            describeRequest.StreamName = myStreamName;

            DescribeStreamResponse describeResponse = 
                await kinesisClient.DescribeStreamAsync(describeRequest);
            List<Shard> shards = describeResponse.StreamDescription.Shards;

            foreach (Shard shard in shards)
            {
                GetShardIteratorRequest iteratorRequest = new GetShardIteratorRequest();
                iteratorRequest.StreamName = myStreamName;
                iteratorRequest.ShardId = shard.ShardId;
                iteratorRequest.ShardIteratorType = ShardIteratorType.TRIM_HORIZON;

                GetShardIteratorResponse iteratorResponse = await kinesisClient.GetShardIteratorAsync(iteratorRequest);
                string iteratorId = iteratorResponse.ShardIterator;

                while (!string.IsNullOrEmpty(iteratorId))
                {
                    GetRecordsRequest getRequest = new GetRecordsRequest();
                    getRequest.Limit = 1000;
                    getRequest.ShardIterator = iteratorId;

                    GetRecordsResponse getResponse = await kinesisClient.GetRecordsAsync(getRequest);
                    string nextIterator = getResponse.NextShardIterator;
                    List<Record> records = getResponse.Records;

                    if (records.Count > 0)
                    {
                        Console.WriteLine("Received {0} records. ", records.Count);
                        foreach (Record record in records)
                        {
                            string theMessage = Encoding.UTF8.GetString(record.Data.ToArray());
                            Console.WriteLine("message string: " + theMessage);
                        }
                    }
                    iteratorId = nextIterator;
                }
            }
        }

    }
}

If we ran the DataStreamProducer and DataStreamConsumer several times we would see output something like this, which shows the messages are persisted. There is a limit to this, which you must setup when you are creating your initial stream (again I prefer Kafka, but that’s just me)

 

image

Consumer KCL

This is a lot harder to get started with than the datastreams API (Nuget AWSSDK.Kinesis). So much so, that Amazon have pushed a starting kit for this, which you can find here: https://github.com/awslabs/amazon-kinesis-client-net/tree/bbe8abf6abb811f8b4c7469df7c1dd281dbb9caa

A typical c# record processor using KCL will look like this

using System;
using System.Collections.Generic;
using Amazon.Kinesis.ClientLibrary;

namespace Sample
{
    class SampleRecordProcessor : IRecordProcessor
    {
        public void Initialize(InitializationInput input)
        {
            // initialize
        }

        public void ProcessRecords(ProcessRecordsInput input)
        {
            // process batch of records (input.Records) and
            // checkpoint (using input.Checkpointer)
        }

        public void Shutdown(ShutdownInput input)
        {
            // cleanup
        }
    }

    class MainClass
    {
        public static void Main(string[] args)
        {
            KCLProcess.Create(new SampleRecordProcessor()).Run();
        }
    }
}

Which seems easy enough. However its not that easy, the bulk of the KCL is written in Java, and the .NET KCL communicates with the Java Daemon over StdIn/StdOut/StdError. As such there are quite a few steps you must follow to get a working C# KCL app up and running.

So what we really end up with is a C# consumer more like this:

/*
 * Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Amazon Software License (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 *  http://aws.amazon.com/asl/
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 */

using System;
using System.Collections.Generic;
using System.Threading;
using Amazon.Kinesis.ClientLibrary;

namespace Amazon.Kinesis.ClientLibrary.SampleConsumer
{
    /// <summary>
    /// A sample processor of Kinesis records.
    /// </summary>
    class SampleRecordProcessor : IRecordProcessor {

        /// <value>The time to wait before this record processor
        /// reattempts either a checkpoint, or the processing of a record.</value>
        private static readonly TimeSpan Backoff = TimeSpan.FromSeconds(3);

        /// <value>The interval this record processor waits between
        /// doing two successive checkpoints.</value>
        private static readonly TimeSpan CheckpointInterval = TimeSpan.FromMinutes(1);

        /// <value>The maximum number of times this record processor retries either
        /// a failed checkpoint, or the processing of a record that previously failed.</value>
        private static readonly int NumRetries = 10;

        /// <value>The shard ID on which this record processor is working.</value>
        private string _kinesisShardId;

        /// <value>The next checkpoint time expressed in milliseconds.</value>
        private DateTime _nextCheckpointTime = DateTime.UtcNow;

        /// <summary>
        /// This method is invoked by the Amazon Kinesis Client Library before records from the specified shard
        /// are delivered to this SampleRecordProcessor.
        /// </summary>
        /// <param name="input">
        /// InitializationInput containing information such as the name of the shard whose records this
        /// SampleRecordProcessor will process.
        /// </param>
        public void Initialize(InitializationInput input)
        {
            Console.Error.WriteLine("Initializing record processor for shard: " + input.ShardId);
            this._kinesisShardId = input.ShardId;
        }

        /// <summary>
        /// This method processes the given records and checkpoints using the given checkpointer.
        /// </summary>
        /// <param name="input">
        /// ProcessRecordsInput that contains records, a Checkpointer and contextual information.
        /// </param>
        public void ProcessRecords(ProcessRecordsInput input)
        {
            Console.Error.WriteLine("Processing " + input.Records.Count + " records from " + _kinesisShardId);

            // Process records and perform all exception handling.
            ProcessRecordsWithRetries(input.Records);

            // Checkpoint once every checkpoint interval.
            if (DateTime.UtcNow >= _nextCheckpointTime)
            {
                Checkpoint(input.Checkpointer);
                _nextCheckpointTime = DateTime.UtcNow + CheckpointInterval;
            }
        }

        /// <summary>
        /// This shuts down the record processor and checkpoints the specified checkpointer.
        /// </summary>
        /// <param name="input">
        /// ShutdownContext containing information such as the reason for shutting down the record processor,
        /// as well as a Checkpointer.
        /// </param>
        public void Shutdown(ShutdownInput input)
        {
            Console.Error.WriteLine("Shutting down record processor for shard: " + _kinesisShardId);
            // Checkpoint after reaching end of shard, so we can start processing data from child shards.
            if (input.Reason == ShutdownReason.TERMINATE)
            {
                Checkpoint(input.Checkpointer);
            }
        }

        /// <summary>
        /// This method processes records, performing retries as needed.
        /// </summary>
        /// <param name="records">The records to be processed.</param>
        private void ProcessRecordsWithRetries(List<Record> records)
        {
            foreach (Record rec in records)
            {
                bool processedSuccessfully = false;
                string data = null;
                for (int i = 0; i < NumRetries; ++i) {
                    try {
                        // As per the accompanying AmazonKinesisSampleProducer.cs, the payload
                        // is interpreted as UTF-8 characters.
                        data = System.Text.Encoding.UTF8.GetString(rec.Data);

                        // Uncomment the following if you wish to see the retrieved record data.
                        //Console.WriteLine(
                        //    String.Format("=========== > Retrieved record:\n\tpartition key = {0},\n\tsequence number = {1},\n\tdata = {2}",
                        //    rec.PartitionKey, rec.SequenceNumber, data));

                        // Your own logic to process a record goes here.
                        Console.WriteLine("===================================================");
                        Console.WriteLine($"===========> Saw message : {data}");
                        Console.WriteLine("===================================================");

                        processedSuccessfully = true;
                        break;
                    } catch (Exception e) {
                        Console.Error.WriteLine("Exception processing record data: " + data, e);
                    }

                    //Back off before retrying upon an exception.
                    Thread.Sleep(Backoff);
                }

                if (!processedSuccessfully)
                {
                    Console.Error.WriteLine("Couldn't process record " + rec + ". Skipping the record.");
                }
            }
        }

        /// <summary>
        /// This checkpoints the specified checkpointer with retries.
        /// </summary>
        /// <param name="checkpointer">The checkpointer used to do checkpoints.</param>
        private void Checkpoint(Checkpointer checkpointer)
        {
            Console.Error.WriteLine("Checkpointing shard " + _kinesisShardId);

            // You can optionally provide an error handling delegate to be invoked when checkpointing fails.
            // The library comes with a default implementation that retries for a number of times with a fixed
            // delay between each attempt. If you do not provide an error handler, the checkpointing operation
            // will not be retried, but processing will continue.
            checkpointer.Checkpoint(RetryingCheckpointErrorHandler.Create(NumRetries, Backoff));
        }
    }

    class MainClass
    {
        /// <summary>
        /// This method creates a KclProcess and starts running an SampleRecordProcessor instance.
        /// </summary>
        public static void Main(string[] args)
        {
            try
            {
                KclProcess.Create(new SampleRecordProcessor()).Run();
            }
            catch (Exception e) {
                Console.Error.WriteLine("ERROR: " + e);
            }
        }
    }
}

 

To actually run a C# KCL example you should do this

1. Download the sample from here : https://github.com/awslabs/amazon-kinesis-client-net

2. Follow the getting started guide here : https://github.com/awslabs/amazon-kinesis-client-net/tree/bbe8abf6abb811f8b4c7469df7c1dd281dbb9caa#getting-started

3. Follow this checklist

  • Run the boostrapper project to obtain the relevant KCL MultiLanuage JAR files
  • Make sure you have environment variables for AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY for the bootstrap app
  • Make sure you have Java installed (and its in your path)
  • Ensured the bootstrap has these command line args to include “–properties kcl.properties –execute”
  • Make sure the kcl.properties file has the correct streamName/executableName/regionName (which should be changed to match your region for producer/AWS account, so eu-west-2 was mine)
  • Make sure the executableName includes the FULL path to the consumer library, mine looks like this in kcl.properties : executableName = dotnet C:\\Users\\sacha\\Desktop\\AWS-Playground\\AWS\\Messaging\\Kinesis\\KCL\\SampleConsumer\\bin\\Debug\\netcoreapp2.0\\SampleConsumer.dll

Once and only once you have done that should you be able to run the example SampleProducer and Bootstrap projects together, and you should see output like this

 

image

 

While you can use Amazon Kinesis API functions to process stream data directly, the KCL takes care of many complex tasks associated with distributed processing and allows you to focus on the record processing logic. For example, the KCL can automatically load balance record processing across many instances, allow the user to checkpoint records that are already processed, and handle instance failures.

When you start a KCL application, it calls the KCL to instantiate a worker. This call provides the KCL with configuration information for the application, such as the stream name and AWS credentials.

The KCL performs the following tasks:

  • Connects to the stream
  • Enumerates the shards
  • Coordinates shard associations with other workers (if any)
  • Instantiates a record processor for every shard it manages
  • Pulls data records from the stream
  • Pushes the records to the corresponding record processor
  • Checkpoints processed records
  • Balances shard-worker associations when the worker instance count changes
  • Balances shard-worker associations when shards are split or merged

Behind the scenes the KCL library uses the Stateless .NET state machine library to ensure the correctness of the implementation is met

How do we deploy a KCL .NET Core App to AWS?

The recommended advise seems to be to use AWS Elastic Beanstalk, and although there are not really any specific AWS KCL C# guides, these should help you out, though I have to say if you want to run something like a .NET Core Console app in Elastic Beanstalk (rather than what everyone shows the classic ASP .NET Core example) you may have to dig quite deep. The advise seems to be to run that on a dedicated EC2 instance, rather than use Elastic Beanstalk

 

dotnet eb deploy-environment --publish-options "--runtime win-x64"

 

Here is an example of a Elastic Beanstalk environment that I created to host .NET apps:

 

image

 

From here you can use the “Upload and Deploy” button

 

See ya later, not goodbye

Ok that’s it for now until the next post