What are we talking about this time?
This time we are going to talk about AWS Lambda, believe it or not this is a fairly big subject for such a simple topic.This will probably require a few posts, so this is what I would like to cover I think
- SQS source Lambda writing to S3 bucket
- Using Serverless framework
- Kinesis Firehose Lambda writing to S3 bucket
- Step functions using Lambda functions
Initial setup
If you did not read the very first part of this series of posts, I urge you to go and read that one now as it shows you how to get started with AWS, and create an IAM user : https://sachabarbs.wordpress.com/2018/08/30/aws-initial-setup/
Where is the code
The code for this post can be found here in GitHub : https://github.com/sachabarber/AWS/tree/master/Compute/Lambda.SQS.DemoApp
What is AWS Lambda?
AWS Lambda is an event-driven, serverless computing platform provided by Amazon as a part of the Amazon Web Services. It is a computing service that runs code in response to events and automatically manages the computing resources required by that code. There are of course servers behind the scenes, but these are provisioned when needed, and you are only paying for what time you use of their compute resource.
AWS Lambda Limits
There are some limitations when using Lambda that one should be aware of before getting started. These are shown below
AWS Lambda will dynamically scale capacity in response to increased traffic, subject to the concurrent executions limit noted previously. To handle any burst in traffic, AWS Lambda will immediately increase your concurrently executing functions by a predetermined amount, dependent on which region it’s executed, as noted below:
Getting started with AWS Lambda
In this post we are going to build this pipeline in AWS
It is worth noting that there are MANY sources for Lambda, you can find out more here : https://github.com/aws/aws-lambda-dotnet, but for those of you that just want to know now, here is the current list
Don’t try and click this it’s a screenshot
AWS Toolkit
So the easiest way to get started with your own Lambda function is using the AWS Toolkit, which if you followed part1 you would have installed. So lets use the wizard to create a SQS triggered Lambda.
Once you have run through this wizard you will be left with the shell of a Lambda project that is triggered via a SQS event, and has unit tests for that. I however wanted my Lambda to write to S3, and I also wanted a SQS publisher that I could use to push messages to my SQS queue to test my lambda for real later on.
So the final solution for me looks like this
SQS Publisher
Lets start with the simple bit, the SQS Publisher, this is as follows
using System; using Amazon.SQS; using Amazon.SQS.Model; using Nito.AsyncEx; namespace SQSSPublisher { class Program { private static bool _receieverShouldDeleteMessage = false; private static AmazonSQSClient _sqs = new AmazonSQSClient(); private static string _myQueueUrl; private static string _queueName = "lamda-sqs-demo-app"; static void Main(string[] args) { AsyncContext.Run(() => MainAsync(args)); } static async void MainAsync(string[] args) { try { var listQueuesRequest = new ListQueuesRequest(); var listQueuesResponse = await _sqs.ListQueuesAsync(listQueuesRequest); try { Console.WriteLine($"Checking for a queue called {_queueName}.\n"); var resp = await _sqs.GetQueueUrlAsync(_queueName); _myQueueUrl = resp.QueueUrl; } catch(QueueDoesNotExistException quex) { //Creating a queue Console.WriteLine($"Create a queue called {_queueName}.\n"); var sqsRequest = new CreateQueueRequest { QueueName = _queueName }; var createQueueResponse = await _sqs.CreateQueueAsync(sqsRequest); _myQueueUrl = createQueueResponse.QueueUrl; } //Sending a message for (int i = 0; i < 2; i++) { var message = $"This is my message text-Id-{Guid.NewGuid().ToString("N")}"; //var message = $"This is my message text"; Console.WriteLine($"Sending a message to MyQueue : {message}"); var sendMessageRequest = new SendMessageRequest { QueueUrl = _myQueueUrl, //URL from initial queue creation MessageBody = message }; await _sqs.SendMessageAsync(sendMessageRequest); } } catch (AmazonSQSException ex) { Console.WriteLine("Caught Exception: " + ex.Message); Console.WriteLine("Response Status Code: " + ex.StatusCode); Console.WriteLine("Error Code: " + ex.ErrorCode); Console.WriteLine("Error Type: " + ex.ErrorType); Console.WriteLine("Request ID: " + ex.RequestId); } Console.WriteLine("Press Enter to continue..."); Console.Read(); } } }
Lambda
Now lets see the Lambda itself. Remember it will get triggered to run when a SQS event arrives in the queue its listening to, and it will write to s3 bucket.
using System; using System.Linq; using System.Threading.Tasks; using Amazon.Lambda.Core; using Amazon.Lambda.SQSEvents; using Amazon.S3; using Amazon.S3.Model; // Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.Json.JsonSerializer))] namespace Lambda.SQS.DemoApp { public class Function { static IAmazonS3 client; private static string bucketName = "lamda-sqs-demo-app-out-bucket"; /// <summary> /// Default constructor. This constructor is used by Lambda to construct the instance. When invoked in a Lambda environment /// the AWS credentials will come from the IAM role associated with the function and the AWS region will be set to the /// region the Lambda function is executed in. /// </summary> public Function() { } /// <summary> /// This method is called for every Lambda invocation. This method takes in an SQS event object and can be used /// to respond to SQS messages. /// </summary> /// <param name="evnt"></param> /// <param name="context"></param> /// <returns></returns> public async Task FunctionHandler(SQSEvent evnt, ILambdaContext context) { foreach(var message in evnt.Records) { await ProcessMessageAsync(message, context); } } private async Task ProcessMessageAsync(SQSEvent.SQSMessage message, ILambdaContext context) { context.Logger.LogLine($"Processed message {message.Body}"); using (client = new AmazonS3Client(Amazon.RegionEndpoint.EUWest2)) { Console.WriteLine("Creating a bucket"); await CreateABucketAsync(bucketName, false); Console.WriteLine("Writing message from SQS to bucket"); await WritingAnObjectAsync(message.Body.ToUpper(), Guid.NewGuid().ToString("N").ToLower()); } // TODO: Do interesting work based on the new message await Task.CompletedTask; } async Task WritingAnObjectAsync(string messageBody, string keyName) { await CarryOutAWSTask<Unit>(async () => { // simple object put PutObjectRequest request = new PutObjectRequest() { ContentBody = messageBody, BucketName = bucketName, Key = keyName }; PutObjectResponse response = await client.PutObjectAsync(request); return Unit.Empty; }, "Writing object"); } async Task CreateABucketAsync(string bucketToCreate, bool isPublic = true) { await CarryOutAWSTask<Unit>(async () => { if(await BucketExists(bucketToCreate)) { Console.WriteLine($"{bucketToCreate} already exists, skipping this step"); } PutBucketRequest putBucketRequest = new PutBucketRequest() { BucketName = bucketToCreate, BucketRegion = S3Region.EUW2, CannedACL = isPublic ? S3CannedACL.PublicRead : S3CannedACL.Private }; var response = await client.PutBucketAsync(putBucketRequest); return Unit.Empty; }, "Create a bucket"); } async Task<bool> BucketExists(string bucketName) { return await CarryOutAWSTask<bool>(async () => { ListBucketsResponse response = await client.ListBucketsAsync(); return response.Buckets.Select(x => x.BucketName).Contains(bucketName); }, "Listing buckets"); } async Task<T> CarryOutAWSTask<T>(Func<Task<T>> taskToPerform, string op) { try { return await taskToPerform(); } catch (AmazonS3Exception amazonS3Exception) { if (amazonS3Exception.ErrorCode != null && (amazonS3Exception.ErrorCode.Equals("InvalidAccessKeyId") || amazonS3Exception.ErrorCode.Equals("InvalidSecurity"))) { Console.WriteLine("Please check the provided AWS Credentials."); Console.WriteLine("If you haven't signed up for Amazon S3, please visit http://aws.amazon.com/s3"); } else { Console.WriteLine($"An Error, number '{amazonS3Exception.ErrorCode}', " + $"occurred when '{op}' with the message '{amazonS3Exception.Message}'"); } return default(T); } } } public class Unit { public static Unit Empty => new Unit(); } }
Tests
And this is what the tests look like, where we test the lamba input, and check the output is stored in s3
using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using Xunit; using Amazon.Lambda.TestUtilities; using Amazon.Lambda.SQSEvents; using Lambda.SQS.DemoApp; using Amazon.S3; using Amazon.S3.Model; namespace Lambda.SQS.DemoApp.Tests { public class FunctionTest { private static string bucketName = "lamda-sqs-demo-app-out-bucket"; [Fact] public async Task TestSQSEventLambdaFunction() { var sqsEvent = new SQSEvent { Records = new List<SQSEvent.SQSMessage> { new SQSEvent.SQSMessage { Body = "foobar" } } }; var logger = new TestLambdaLogger(); var context = new TestLambdaContext { Logger = logger }; var countBefore = await CountOfItemsInBucketAsync(bucketName); var function = new Function(); await function.FunctionHandler(sqsEvent, context); var countAfter = await CountOfItemsInBucketAsync(bucketName); Assert.Contains("Processed message foobar", logger.Buffer.ToString()); Assert.Equal(1, countAfter - countBefore); } private async Task<int> CountOfItemsInBucketAsync(string bucketName) { using (var client = new AmazonS3Client(Amazon.RegionEndpoint.EUWest2)) { ListObjectsRequest request = new ListObjectsRequest(); request.BucketName = bucketName; ListObjectsResponse response = await client.ListObjectsAsync(request); return response.S3Objects.Count; } } } }
Deploying the Lambda to AWS
We have a few options available to us here, Dotnet command line / VS2017
VS2017
So for now lets just right click the lambda project, and “Publish to AWS Lambda”. Follow this wizard will show something like this
There are several inbuilt roles to choose from to use to run your Lambda. I started with the AWSLambdaFullAccess. However I still need to add SQS and S3 to that. We will see how to do that below
Dot net tool
Run dotnet tool install -g Amazon.Lambda.Tools (you need .NET Core 2.1.3 or above) o grab the dot net core command line CLI. Once you have that installed you can create/deploy lambdas straight from the command line
Adjusting the deployed Lambda for extra policy requirements
So we started out using this AWSLambdaFullAccess role for our lambda, but now we need to create the execution policy. This is described here : https://docs.aws.amazon.com/lambda/latest/dg/with-sqs-create-execution-role.html
But is easier to locate the role for the lambda you just created and give it the extra permissions using the IAM console, where you grab the ARNs for the SQS queue and S3 buckets etc etc
Testing it out
In AWS Console using test events
We can use the AWS console and use test events (not SQS events), and we can see that it all looks good, we have a nice green panel for the test execution
And we can check out the s3 bucket to ensure we got the expected output.
Cool, this looks good.
Use the real SQS source
So how about the real input from SQS?Well we can go back the AWS lamdba console, and configure the SQS trigger
Where we need to fill in the SQS event configuration for the lambda, so it knows what queue to use
Once we have done that, and we have made sure the role our Lambda is using is ok, the AWS lambda console should show something like this where we have a nicely configured SQS event trigger
Ok now we just run the SQSPublisher in the solution, check the s3 bucket, and wham we see 2 new messages. hooray
This corresponds nicely to the code in the SQSPublisher……….Its only working, YAY
See ya later, not goodbye
Ok that’s it for now until the next post