AWS

AWS : Lambda

What are we talking about this time?

This time we are going to talk about AWS Lambda, believe it or not this is a fairly big subject for such a simple topic.This will probably require a few posts, so this is what I would like to cover I think

  • SQS source Lambda writing to S3 bucket
  • Using Serverless framework
  • Kinesis Firehose Lambda writing to S3 bucket
  • Step functions using Lambda functions

Initial setup

If you did not read the very first part of this series of posts, I urge you to go and read that one now as it shows you how to get started with AWS, and create an IAM user : https://sachabarbs.wordpress.com/2018/08/30/aws-initial-setup/

Where is the code

The code for this post can be found here in GitHub : https://github.com/sachabarber/AWS/tree/master/Compute/Lambda.SQS.DemoApp

What is AWS Lambda?

AWS Lambda is an event-driven, serverless computing platform provided by Amazon as a part of the Amazon Web Services. It is a computing service that runs code in response to events and automatically manages the computing resources required by that code. There are of course servers behind the scenes, but these are provisioned when needed, and you are only paying for what time you use of their compute resource.

AWS Lambda Limits

There are some limitations when using Lambda that one should be aware of before getting started. These are shown below

image

image

AWS Lambda will dynamically scale capacity in response to increased traffic, subject to the concurrent executions limit noted previously. To handle any burst in traffic, AWS Lambda will immediately increase your concurrently executing functions by a predetermined amount, dependent on which region it’s executed, as noted below:

image

Getting started with AWS Lambda

In this post we are going to build this pipeline in AWS

image

It is worth noting that there are MANY sources for Lambda, you can find out more here : https://github.com/aws/aws-lambda-dotnet, but for those of you that just want to know now, here is the current list

image

Don’t try and click this it’s a screenshot

AWS Toolkit

So the easiest way to get started with your own Lambda function is using the AWS Toolkit, which if you followed part1 you would have installed. So lets use the wizard to create a SQS triggered Lambda.

image

Once you have run through this wizard you will be left with the shell of a Lambda project that is triggered via a SQS event, and has unit tests for that. I however wanted my Lambda to write to S3, and I also wanted a SQS publisher that I could use to push messages to my SQS queue to test my lambda for real later on.

So the final solution for me looks like this

image

SQS Publisher

Lets start with the simple bit, the SQS Publisher, this is as follows

using System;
using Amazon.SQS;
using Amazon.SQS.Model;
using Nito.AsyncEx;

namespace SQSSPublisher
{
    class Program
    {
        private static bool _receieverShouldDeleteMessage = false;
        private static AmazonSQSClient _sqs = new AmazonSQSClient();
        private static string _myQueueUrl;
        private static string _queueName = "lamda-sqs-demo-app";

        static void Main(string[] args)
        {
            AsyncContext.Run(() => MainAsync(args));
        }

        static async void MainAsync(string[] args)
        {
            try
            {

                var listQueuesRequest = new ListQueuesRequest();
                var listQueuesResponse = await _sqs.ListQueuesAsync(listQueuesRequest);

                try
                {
                    Console.WriteLine($"Checking for a queue called {_queueName}.\n");
                    var resp = await _sqs.GetQueueUrlAsync(_queueName);
                    _myQueueUrl = resp.QueueUrl;

                }
                catch(QueueDoesNotExistException quex)
                {
                    //Creating a queue
                    Console.WriteLine($"Create a queue called {_queueName}.\n");
                    var sqsRequest = new CreateQueueRequest { QueueName = _queueName };
                    var createQueueResponse = await _sqs.CreateQueueAsync(sqsRequest);
                    _myQueueUrl = createQueueResponse.QueueUrl;
                }

                //Sending a message
                for (int i = 0; i < 2; i++)
                {
                    var message = $"This is my message text-Id-{Guid.NewGuid().ToString("N")}";
                    //var message = $"This is my message text";
                    Console.WriteLine($"Sending a message to MyQueue : {message}");
                    var sendMessageRequest = new SendMessageRequest
                    {
                        QueueUrl = _myQueueUrl, //URL from initial queue creation
                        MessageBody = message
                    };
                    await _sqs.SendMessageAsync(sendMessageRequest);
                }
            }
            catch (AmazonSQSException ex)
            {
                Console.WriteLine("Caught Exception: " + ex.Message);
                Console.WriteLine("Response Status Code: " + ex.StatusCode);
                Console.WriteLine("Error Code: " + ex.ErrorCode);
                Console.WriteLine("Error Type: " + ex.ErrorType);
                Console.WriteLine("Request ID: " + ex.RequestId);
            }

            Console.WriteLine("Press Enter to continue...");
            Console.Read();
        }


        
    }
}

Lambda

Now lets see the Lambda itself. Remember it will get triggered to run when a SQS event arrives in the queue its listening to, and it will write to s3 bucket.

using System;
using System.Linq;
using System.Threading.Tasks;
using Amazon.Lambda.Core;
using Amazon.Lambda.SQSEvents;
using Amazon.S3;
using Amazon.S3.Model;


// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.Json.JsonSerializer))]

namespace Lambda.SQS.DemoApp
{
    public class Function
    {
        static IAmazonS3 client;
        private static string bucketName = "lamda-sqs-demo-app-out-bucket";

        /// 
<summary>
        /// Default constructor. This constructor is used by Lambda to construct the instance. When invoked in a Lambda environment
        /// the AWS credentials will come from the IAM role associated with the function and the AWS region will be set to the
        /// region the Lambda function is executed in.
        /// </summary>

        public Function()
        {

        }


        /// 
<summary>
        /// This method is called for every Lambda invocation. This method takes in an SQS event object and can be used 
        /// to respond to SQS messages.
        /// </summary>

        /// <param name="evnt"></param>
        /// <param name="context"></param>
        /// <returns></returns>
        public async Task FunctionHandler(SQSEvent evnt, ILambdaContext context)
        {
            foreach(var message in evnt.Records)
            {
                await ProcessMessageAsync(message, context);
            }
        }

        private async Task ProcessMessageAsync(SQSEvent.SQSMessage message, ILambdaContext context)
        {
            context.Logger.LogLine($"Processed message {message.Body}");

            using (client = new AmazonS3Client(Amazon.RegionEndpoint.EUWest2))
            {
                Console.WriteLine("Creating a bucket");
                await CreateABucketAsync(bucketName, false);
                Console.WriteLine("Writing message from SQS to bucket");
                await WritingAnObjectAsync(message.Body.ToUpper(), Guid.NewGuid().ToString("N").ToLower());
            }


            // TODO: Do interesting work based on the new message
            await Task.CompletedTask;
        }


        async Task WritingAnObjectAsync(string messageBody, string keyName)
        {
            await CarryOutAWSTask<Unit>(async () =>
            {
                // simple object put
                PutObjectRequest request = new PutObjectRequest()
                {
                    ContentBody = messageBody,
                    BucketName = bucketName,
                    Key = keyName
                };

                PutObjectResponse response = await client.PutObjectAsync(request);
                return Unit.Empty;
            }, "Writing object");
        }


        async Task CreateABucketAsync(string bucketToCreate, bool isPublic = true)
        {
            await CarryOutAWSTask<Unit>(async () =>
            {
                if(await BucketExists(bucketToCreate))
                {
                    Console.WriteLine($"{bucketToCreate} already exists, skipping this step");
                }

                PutBucketRequest putBucketRequest = new PutBucketRequest()
                {
                    BucketName = bucketToCreate,
                    BucketRegion = S3Region.EUW2,
                    CannedACL = isPublic ? S3CannedACL.PublicRead : S3CannedACL.Private
                };
                var response = await client.PutBucketAsync(putBucketRequest);
                return Unit.Empty;
            }, "Create a bucket");
        }


        async Task<bool> BucketExists(string bucketName)
        {
            return await CarryOutAWSTask<bool>(async () =>
            {
                ListBucketsResponse response = await client.ListBucketsAsync();
                return  response.Buckets.Select(x => x.BucketName).Contains(bucketName);
            }, "Listing buckets");
        }

        async Task<T> CarryOutAWSTask<T>(Func<Task<T>> taskToPerform, string op)
        {
            try
            {
                return await taskToPerform();
            }
            catch (AmazonS3Exception amazonS3Exception)
            {
                if (amazonS3Exception.ErrorCode != null &&
                    (amazonS3Exception.ErrorCode.Equals("InvalidAccessKeyId") ||
                    amazonS3Exception.ErrorCode.Equals("InvalidSecurity")))
                {
                    Console.WriteLine("Please check the provided AWS Credentials.");
                    Console.WriteLine("If you haven't signed up for Amazon S3, please visit http://aws.amazon.com/s3");
                }
                else
                {
                    Console.WriteLine($"An Error, number '{amazonS3Exception.ErrorCode}', " +
                                      $"occurred when '{op}' with the message '{amazonS3Exception.Message}'");
                }

                return default(T);
            }
        }


    }



    public class Unit
    {
        public static Unit Empty => new Unit();
    }
}

Tests

And this is what the tests look like, where we test the lamba input, and check the output is stored in s3

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

using Xunit;
using Amazon.Lambda.TestUtilities;
using Amazon.Lambda.SQSEvents;

using Lambda.SQS.DemoApp;
using Amazon.S3;
using Amazon.S3.Model;

namespace Lambda.SQS.DemoApp.Tests
{
    public class FunctionTest
    {
        private static string bucketName = "lamda-sqs-demo-app-out-bucket";


        [Fact]
        public async Task TestSQSEventLambdaFunction()
        {
            var sqsEvent = new SQSEvent
            {
                Records = new List<SQSEvent.SQSMessage>
                {
                    new SQSEvent.SQSMessage
                    {
                        Body = "foobar"
                    }
                }
            };

            var logger = new TestLambdaLogger();
            var context = new TestLambdaContext
            {
                Logger = logger
            };

            var countBefore = await CountOfItemsInBucketAsync(bucketName);

            var function = new Function();
            await function.FunctionHandler(sqsEvent, context);

            var countAfter = await CountOfItemsInBucketAsync(bucketName);

            Assert.Contains("Processed message foobar", logger.Buffer.ToString());

            Assert.Equal(1, countAfter - countBefore);
        }


        private async Task<int> CountOfItemsInBucketAsync(string bucketName)
        {
            using (var client = new AmazonS3Client(Amazon.RegionEndpoint.EUWest2))
            {
                ListObjectsRequest request = new ListObjectsRequest();
                request.BucketName = bucketName;
                ListObjectsResponse response = await client.ListObjectsAsync(request);
                return response.S3Objects.Count;
            }
        }
    }
}

Deploying the Lambda to AWS

We have a few options available to us here, Dotnet command line / VS2017

VS2017

So for now lets just right click the lambda project, and “Publish to AWS Lambda”. Follow this wizard will show something like this

image

image

There are several inbuilt roles to choose from to use to run your Lambda. I started with the AWSLambdaFullAccess. However I still need to add SQS and S3 to that. We will see how to do that below

Dot net tool

Run dotnet tool install -g Amazon.Lambda.Tools (you need .NET Core 2.1.3 or above) o grab the dot net core command line CLI. Once you have that installed you can create/deploy lambdas straight from the command line

Adjusting the deployed Lambda for extra policy requirements

So we started out using this AWSLambdaFullAccess role for our lambda, but now we need to create the execution policy. This is described here : https://docs.aws.amazon.com/lambda/latest/dg/with-sqs-create-execution-role.html

But is easier to locate the role for the lambda you just created and give it the extra permissions using the IAM console, where you grab the ARNs for the SQS queue and S3 buckets etc etc

image

Testing it out

In AWS Console using test events

We can use the AWS console and use test events (not SQS events), and we can see that it all looks good, we have a nice green panel for the test execution

image

And we can check out the s3 bucket to ensure we got the expected output.

image

Cool, this looks good.

Use the real SQS source

So how about the real input from SQS?Well we can go back the AWS lamdba console, and configure the SQS trigger

image

Where we need to fill in the SQS event configuration for the lambda, so it knows what queue to use

image

Once we have done that, and we have made sure the role our Lambda is using is ok, the AWS lambda console should show something like this where we have a nicely configured SQS event trigger

image

Ok now we just run the SQSPublisher in the solution, check the s3 bucket, and wham we see 2 new messages. hooray

image

 

image

This corresponds nicely to the code in the SQSPublisher……….Its only working, YAY

image

See ya later, not goodbye

Ok that’s it for now until the next post

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s