AWS

AWS : Lambda

What are we talking about this time?

This time we are going to talk about AWS Lambda, believe it or not this is a fairly big subject for such a simple topic.This will probably require a few posts, so this is what I would like to cover I think

  • SQS source Lambda writing to S3 bucket
  • Using Serverless framework
  • Kinesis Firehose Lambda writing to S3 bucket
  • Step functions using Lambda functions

Initial setup

If you did not read the very first part of this series of posts, I urge you to go and read that one now as it shows you how to get started with AWS, and create an IAM user : https://sachabarbs.wordpress.com/2018/08/30/aws-initial-setup/

Where is the code

The code for this post can be found here in GitHub : https://github.com/sachabarber/AWS/tree/master/Compute/Lambda.SQS.DemoApp

What is AWS Lambda?

AWS Lambda is an event-driven, serverless computing platform provided by Amazon as a part of the Amazon Web Services. It is a computing service that runs code in response to events and automatically manages the computing resources required by that code. There are of course servers behind the scenes, but these are provisioned when needed, and you are only paying for what time you use of their compute resource.

AWS Lambda Limits

There are some limitations when using Lambda that one should be aware of before getting started. These are shown below

image

image

AWS Lambda will dynamically scale capacity in response to increased traffic, subject to the concurrent executions limit noted previously. To handle any burst in traffic, AWS Lambda will immediately increase your concurrently executing functions by a predetermined amount, dependent on which region it’s executed, as noted below:

image

Getting started with AWS Lambda

In this post we are going to build this pipeline in AWS

image

It is worth noting that there are MANY sources for Lambda, you can find out more here : https://github.com/aws/aws-lambda-dotnet, but for those of you that just want to know now, here is the current list

image

Don’t try and click this it’s a screenshot

AWS Toolkit

So the easiest way to get started with your own Lambda function is using the AWS Toolkit, which if you followed part1 you would have installed. So lets use the wizard to create a SQS triggered Lambda.

image

Once you have run through this wizard you will be left with the shell of a Lambda project that is triggered via a SQS event, and has unit tests for that. I however wanted my Lambda to write to S3, and I also wanted a SQS publisher that I could use to push messages to my SQS queue to test my lambda for real later on.

So the final solution for me looks like this

image

SQS Publisher

Lets start with the simple bit, the SQS Publisher, this is as follows

using System;
using Amazon.SQS;
using Amazon.SQS.Model;
using Nito.AsyncEx;

namespace SQSSPublisher
{
    class Program
    {
        private static bool _receieverShouldDeleteMessage = false;
        private static AmazonSQSClient _sqs = new AmazonSQSClient();
        private static string _myQueueUrl;
        private static string _queueName = "lamda-sqs-demo-app";

        static void Main(string[] args)
        {
            AsyncContext.Run(() => MainAsync(args));
        }

        static async void MainAsync(string[] args)
        {
            try
            {

                var listQueuesRequest = new ListQueuesRequest();
                var listQueuesResponse = await _sqs.ListQueuesAsync(listQueuesRequest);

                try
                {
                    Console.WriteLine($"Checking for a queue called {_queueName}.\n");
                    var resp = await _sqs.GetQueueUrlAsync(_queueName);
                    _myQueueUrl = resp.QueueUrl;

                }
                catch(QueueDoesNotExistException quex)
                {
                    //Creating a queue
                    Console.WriteLine($"Create a queue called {_queueName}.\n");
                    var sqsRequest = new CreateQueueRequest { QueueName = _queueName };
                    var createQueueResponse = await _sqs.CreateQueueAsync(sqsRequest);
                    _myQueueUrl = createQueueResponse.QueueUrl;
                }

                //Sending a message
                for (int i = 0; i < 2; i++)
                {
                    var message = $"This is my message text-Id-{Guid.NewGuid().ToString("N")}";
                    //var message = $"This is my message text";
                    Console.WriteLine($"Sending a message to MyQueue : {message}");
                    var sendMessageRequest = new SendMessageRequest
                    {
                        QueueUrl = _myQueueUrl, //URL from initial queue creation
                        MessageBody = message
                    };
                    await _sqs.SendMessageAsync(sendMessageRequest);
                }
            }
            catch (AmazonSQSException ex)
            {
                Console.WriteLine("Caught Exception: " + ex.Message);
                Console.WriteLine("Response Status Code: " + ex.StatusCode);
                Console.WriteLine("Error Code: " + ex.ErrorCode);
                Console.WriteLine("Error Type: " + ex.ErrorType);
                Console.WriteLine("Request ID: " + ex.RequestId);
            }

            Console.WriteLine("Press Enter to continue...");
            Console.Read();
        }


        
    }
}

Lambda

Now lets see the Lambda itself. Remember it will get triggered to run when a SQS event arrives in the queue its listening to, and it will write to s3 bucket.

using System;
using System.Linq;
using System.Threading.Tasks;
using Amazon.Lambda.Core;
using Amazon.Lambda.SQSEvents;
using Amazon.S3;
using Amazon.S3.Model;


// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.Json.JsonSerializer))]

namespace Lambda.SQS.DemoApp
{
    public class Function
    {
        static IAmazonS3 client;
        private static string bucketName = "lamda-sqs-demo-app-out-bucket";

        /// 
<summary>
        /// Default constructor. This constructor is used by Lambda to construct the instance. When invoked in a Lambda environment
        /// the AWS credentials will come from the IAM role associated with the function and the AWS region will be set to the
        /// region the Lambda function is executed in.
        /// </summary>

        public Function()
        {

        }


        /// 
<summary>
        /// This method is called for every Lambda invocation. This method takes in an SQS event object and can be used 
        /// to respond to SQS messages.
        /// </summary>

        /// <param name="evnt"></param>
        /// <param name="context"></param>
        /// <returns></returns>
        public async Task FunctionHandler(SQSEvent evnt, ILambdaContext context)
        {
            foreach(var message in evnt.Records)
            {
                await ProcessMessageAsync(message, context);
            }
        }

        private async Task ProcessMessageAsync(SQSEvent.SQSMessage message, ILambdaContext context)
        {
            context.Logger.LogLine($"Processed message {message.Body}");

            using (client = new AmazonS3Client(Amazon.RegionEndpoint.EUWest2))
            {
                Console.WriteLine("Creating a bucket");
                await CreateABucketAsync(bucketName, false);
                Console.WriteLine("Writing message from SQS to bucket");
                await WritingAnObjectAsync(message.Body.ToUpper(), Guid.NewGuid().ToString("N").ToLower());
            }


            // TODO: Do interesting work based on the new message
            await Task.CompletedTask;
        }


        async Task WritingAnObjectAsync(string messageBody, string keyName)
        {
            await CarryOutAWSTask<Unit>(async () =>
            {
                // simple object put
                PutObjectRequest request = new PutObjectRequest()
                {
                    ContentBody = messageBody,
                    BucketName = bucketName,
                    Key = keyName
                };

                PutObjectResponse response = await client.PutObjectAsync(request);
                return Unit.Empty;
            }, "Writing object");
        }


        async Task CreateABucketAsync(string bucketToCreate, bool isPublic = true)
        {
            await CarryOutAWSTask<Unit>(async () =>
            {
                if(await BucketExists(bucketToCreate))
                {
                    Console.WriteLine($"{bucketToCreate} already exists, skipping this step");
                }

                PutBucketRequest putBucketRequest = new PutBucketRequest()
                {
                    BucketName = bucketToCreate,
                    BucketRegion = S3Region.EUW2,
                    CannedACL = isPublic ? S3CannedACL.PublicRead : S3CannedACL.Private
                };
                var response = await client.PutBucketAsync(putBucketRequest);
                return Unit.Empty;
            }, "Create a bucket");
        }


        async Task<bool> BucketExists(string bucketName)
        {
            return await CarryOutAWSTask<bool>(async () =>
            {
                ListBucketsResponse response = await client.ListBucketsAsync();
                return  response.Buckets.Select(x => x.BucketName).Contains(bucketName);
            }, "Listing buckets");
        }

        async Task<T> CarryOutAWSTask<T>(Func<Task<T>> taskToPerform, string op)
        {
            try
            {
                return await taskToPerform();
            }
            catch (AmazonS3Exception amazonS3Exception)
            {
                if (amazonS3Exception.ErrorCode != null &&
                    (amazonS3Exception.ErrorCode.Equals("InvalidAccessKeyId") ||
                    amazonS3Exception.ErrorCode.Equals("InvalidSecurity")))
                {
                    Console.WriteLine("Please check the provided AWS Credentials.");
                    Console.WriteLine("If you haven't signed up for Amazon S3, please visit http://aws.amazon.com/s3");
                }
                else
                {
                    Console.WriteLine($"An Error, number '{amazonS3Exception.ErrorCode}', " +
                                      $"occurred when '{op}' with the message '{amazonS3Exception.Message}'");
                }

                return default(T);
            }
        }


    }



    public class Unit
    {
        public static Unit Empty => new Unit();
    }
}

Tests

And this is what the tests look like, where we test the lamba input, and check the output is stored in s3

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

using Xunit;
using Amazon.Lambda.TestUtilities;
using Amazon.Lambda.SQSEvents;

using Lambda.SQS.DemoApp;
using Amazon.S3;
using Amazon.S3.Model;

namespace Lambda.SQS.DemoApp.Tests
{
    public class FunctionTest
    {
        private static string bucketName = "lamda-sqs-demo-app-out-bucket";


        [Fact]
        public async Task TestSQSEventLambdaFunction()
        {
            var sqsEvent = new SQSEvent
            {
                Records = new List<SQSEvent.SQSMessage>
                {
                    new SQSEvent.SQSMessage
                    {
                        Body = "foobar"
                    }
                }
            };

            var logger = new TestLambdaLogger();
            var context = new TestLambdaContext
            {
                Logger = logger
            };

            var countBefore = await CountOfItemsInBucketAsync(bucketName);

            var function = new Function();
            await function.FunctionHandler(sqsEvent, context);

            var countAfter = await CountOfItemsInBucketAsync(bucketName);

            Assert.Contains("Processed message foobar", logger.Buffer.ToString());

            Assert.Equal(1, countAfter - countBefore);
        }


        private async Task<int> CountOfItemsInBucketAsync(string bucketName)
        {
            using (var client = new AmazonS3Client(Amazon.RegionEndpoint.EUWest2))
            {
                ListObjectsRequest request = new ListObjectsRequest();
                request.BucketName = bucketName;
                ListObjectsResponse response = await client.ListObjectsAsync(request);
                return response.S3Objects.Count;
            }
        }
    }
}

Deploying the Lambda to AWS

We have a few options available to us here, Dotnet command line / VS2017

VS2017

So for now lets just right click the lambda project, and “Publish to AWS Lambda”. Follow this wizard will show something like this

image

image

There are several inbuilt roles to choose from to use to run your Lambda. I started with the AWSLambdaFullAccess. However I still need to add SQS and S3 to that. We will see how to do that below

Dot net tool

Run dotnet tool install -g Amazon.Lambda.Tools (you need .NET Core 2.1.3 or above) o grab the dot net core command line CLI. Once you have that installed you can create/deploy lambdas straight from the command line

Adjusting the deployed Lambda for extra policy requirements

So we started out using this AWSLambdaFullAccess role for our lambda, but now we need to create the execution policy. This is described here : https://docs.aws.amazon.com/lambda/latest/dg/with-sqs-create-execution-role.html

But is easier to locate the role for the lambda you just created and give it the extra permissions using the IAM console, where you grab the ARNs for the SQS queue and S3 buckets etc etc

image

Testing it out

In AWS Console using test events

We can use the AWS console and use test events (not SQS events), and we can see that it all looks good, we have a nice green panel for the test execution

image

And we can check out the s3 bucket to ensure we got the expected output.

image

Cool, this looks good.

Use the real SQS source

So how about the real input from SQS?Well we can go back the AWS lamdba console, and configure the SQS trigger

image

Where we need to fill in the SQS event configuration for the lambda, so it knows what queue to use

image

Once we have done that, and we have made sure the role our Lambda is using is ok, the AWS lambda console should show something like this where we have a nicely configured SQS event trigger

image

Ok now we just run the SQSPublisher in the solution, check the s3 bucket, and wham we see 2 new messages. hooray

image

 

image

This corresponds nicely to the code in the SQSPublisher……….Its only working, YAY

image

See ya later, not goodbye

Ok that’s it for now until the next post

JS

ParcelJs vs Webpack

Now those of you that follow my blog will know I normally don’t blog about JavaScript. That is not to say I don’t try and keep an eye on it.

Not being big headed here, I am in fact fairly ok with talking Webpack/React/Redux/Babel with seasoned WebDevs, and have done a few projects using the following bits and bobs

  • Babel
  • SCSS (compass CSS)
  • Typescript
  • React
  • Bootstrap
  • ES6 modules

My defacto setup has always to use these tools:

  • NPM for module installation
  • Webpack
    • Modules
    • Source maps
    • Minification
    • Hashing of file names
    • Running one transpiler through another (Typescript –> Babel –> JS)
  • Babel for polyfilling features

I have been fairly happy with this in the past. Ok from time to time you have to figure out why LibraryX doesn’t play nice with LibraryY, which is a right PITA, but eventually you get there (if you ignore major updates to Webpack which seemed to break a lot of stuff)

Typical Webpack.config

This has been a pretty typical webpack.config file (for DEV I have a different one for PROD, where I minify/have no source map etc etc)

let path = require('path');

let ExtractTextPlugin = require('extract-text-webpack-plugin');

let babelOptions = {
    "presets": ["es2015", "react"]
};

let entries = {
    index: './src/index.tsx'
};


module.exports = {
    entry: entries,
    context: __dirname,
    resolve: {
        extensions: [".tsx", ".ts", ".js", ".jsx"],
        modules: [path.resolve(__dirname, "src"), "node_modules"]
    },
    mode: 'development',
    devtool: "eval-source-map",
    output: {
        filename: 'bundle.js',
        path: path.resolve(__dirname, 'dist')
    },
    plugins: [

        //scss/sass files extracted to common css bundle
        new ExtractTextPlugin({
            filename: '[name].bundle.[hash].css',
            allChunks: true,
            disable: true
        }),
    ],
    module: {
        rules: [
            // All files with a '.ts' or '.tsx' extension will be handled by 'awesome-typescript-loader' 1st 
            // then 'babel-loader'
            // NOTE : loaders run right to left (think of them as a cmd line pipe)
            {
                test: /\.ts(x?)$/,
                exclude: /node_modules/,
                use: [
                    {
                        loader: 'babel-loader',
                        options: babelOptions
                    },
                    {
                        loader: 'awesome-typescript-loader'
                    }
                ]
            },


            // All files with a .scss|.sass extenson will be handled by 'sass-loader'
            {
                test: /\.(sass|scss)$/,
                exclude: /node_modules/,
                //loader: ExtractTextPlugin.extract(['css-loader', 'sass-loader'])
                loader: ['style-loader','css-loader', 'sass-loader']
            },

            // All files with a '.js' extension will be handled by 'babel-loader'.
            {
                test: /\.js$/,
                exclude: /node_modules/,
                use: [
                    {
                        loader: 'babel-loader',
                        options: babelOptions
                    }
                ]
            },

            {
                test: /\.png$/,
                include: /node_modules/,
                loader: "url-loader?limit=100000"
            },

            {
                test: /\.jpg$/,
                include: /node_modules/,
                loader: "file-loader"
            },

            {
                test: /\.woff(\?.*)?$/,
                include: /node_modules/,
                loader: 'url-loader?prefix=fonts/&name=fonts/[name].[ext]&limit=10000&mimetype=application/font-woff'
            },

            {
                test: /\.woff2(\?.*)?$/,
                include: /node_modules/,
                loader: 'url-loader?prefix=fonts/&name=fonts/[name].[ext]&limit=10000&mimetype=application/font-woff2'
            },

            {
                test: /\.ttf(\?.*)?$/,
                include: /node_modules/,
                loader: 'url-loader?prefix=fonts/&name=fonts/[name].[ext]&limit=10000&mimetype=application/octet-stream'
            },

            {
                test: /\.eot(\?.*)?$/, loader: 'file-loader?prefix=fonts/&name=fonts/[name].[ext]'
            },

            {
                test: /\.svg(\?.*)?$/,
                include: /node_modules/,
                loader: 'url-loader?prefix=fonts/&name=fonts/[name].[ext]&limit=10000&mimetype=image/svg+xml'
            },

            // All output '.js' files will have any sourcemaps re-processed by 'source-map-loader'.
            {
                enforce: "pre",
                test: /\.js$/,
                loader: "source-map-loader"
            }
        ]
    }
};

 

So I was working with our in-house web expert, who said “hey you should have a look at ParcelJS, it’s much easier than WebPack”.

ParcelJS

My initial reaction was ok, I’ll take a look……This is the first thing you see when you go to its website

image

ZERO? Did I just read that right? ZERO it said, yep ZERO. Ok lets put that to the test

I have this small github repo which I used to test it : https://github.com/sachabarber/ParcelJS-demo

image

This repo contains the following

  • Simple HTML page
  • NPM packages.json
    • React
    • Bootstrap
    • ….
  • Some custom React TSX (Typescript react files)
  • Some custom SCSS files

Lets look at some of these, in reverse chronological order

Custom SCSS files

Here is one of my SCSS files see how it uses ES6 imports

@import './variables.scss';

body {
    background-color: $body-background;
    font-size:12px;
}

 

And here is an example Typescript React file, again note the imports, and the fact it is actually TypeScript here, and that this file also imports a node_modules BootStrap.css file



import * as React from "react";
import * as ReactDOM from "react-dom";
import "/node_modules/bootstrap/dist/css/bootstrap.min.css";


import {
    Nav,
    Navbar,
    NavItem,
    NavDropdown,
    MenuItem,
    Button
} from "react-bootstrap";
import { Router, Route, hashHistory } from 'react-router';
import { Details } from "./Details";
import { Orders } from "./Orders";
import "../scss/index.scss";


class MainNav extends React.Component<undefined, undefined> {

    constructor(props: any) {
        super(props);
    }

    render() {
        return (
			<Navbar collapseOnSelect>
				<Navbar.Header>
					<Navbar.Brand>
						<span>Simple React-Redux example</span>
					</Navbar.Brand>
					<Navbar.Toggle />
				</Navbar.Header>
				<Navbar.Collapse>
					
<Nav pullRight>
                        <NavItem eventKey={2} href='#/detailsPage'>Details</NavItem>
                        <NavItem eventKey={2} href='#/ordersPage'>Orders</NavItem>
					</Nav>

				</Navbar.Collapse>
			</Navbar> 
        )
    }
}

class App extends React.Component<undefined, undefined> {
    render() {
        return (
            
<div>
                
<div>
                    <MainNav/>
                    {this.props.children}
                </div>

            </div>

        )
    }
}


ReactDOM.render((
    <Router history={hashHistory}>
        <Route component={App}>
            <Route path="/" component={Details} />
            <Route path="/detailsPage" component={Details} />
            <Route path="/ordersPage" component={Orders} />
        </Route>
    </Router>
), document.getElementById('root'));
</pre>

HTML file

This is the HTML file I use for the app. Notice how there is NO CSS / JS in the head section at all. All there is, is a single reference to the main entry point JSX file Index.tsx. That is it.

 

<html>

<head>
	
</head>
<body>
  <div id="root"></div>
  <script src="./src/index.tsx"></script>
</body>
</html>

 

Running it all

So that’s it, you will not find ANY configuration what so ever. ParcelJS just deals with it. In order to test this out lets do the following

Ensure you have done the following

  • npm install
  • npm install -g parcel-bundler
  • open directory to wwwroot folder in nodeJS command prompt and issue this command parcel index.html
  • open browser and you should see the following

alt text

WOW that’s pretty nuts. What does this prove? Well quite a lot actually:

  • ES6 modules are working with ZERO config
  • Our entry point Js file is working with ZERO config
  • Typescripts –> Babel –> standard Js transpiling is working with ZERO config
  • SCSS transpiling is working with ZERO config

 

That’s not all, See how source maps are on my default

 

alt text

What Does It Actually Produce With This Magic?

There are some sensible defaults at play with Parcel, we just saw one of them auto Source-Maps unless stated otherwise. Where stuff gets generated is another sensible default of “dist

Lets have a look at the default output see what we get:

image

  • Boostrap glyphs
  • Some hash named CSS/JS files (good as we will see cache misses when these change names)
  • Source map
  • And we get this HTML file created all without the use of the HtmlWebpackPlugin

 

<html>

<head>

<link rel="stylesheet" href="/src.aeb21ac6.css"></head>
<body>
<div id="root"></div>
<script src="/src.aeb21ac6.js"></script>
</body>
</html>

 

Lets just take a breather there. I did not write one single line of config (go check out the Webpack.config file again), I seem to have the same functionality as that, without writing one line of stuff. That’s pretty A-MAZING if you ask me.

PROD Release

For prod you would likely want to use the following command parcel index.html –no-source-maps. You can read more on the ParcelJs CLI section : https://parceljs.org/cli.html

Conclusion

I have to say it does what it says on the tin. I guess if you want really fine grained control over EVERYTHING, WebPack will do you proud, but if you are happy with some  BLOODY SENSIBLE DEFAULTS being applied I have to say I was up and running with Parcel in minutes.

Its awesome

AWS

AWS Deploying ASP .NET Core app to Elastic Beanstalk

What are we talking about this time?

This time we are going to talk about AWS Elastic Beanstalk and how we can use it to deploy a scalable load balanced web site

Initial setup

If you did not read the very first part of this series of posts, I urge you to go and read that one now as it shows you how to get started with AWS, and create an IAM user : https://sachabarbs.wordpress.com/2018/08/30/aws-initial-setup/

Where is the code

The code for this post can be found here in GitHub : https://github.com/sachabarber/AWS/tree/master/Compute/ElasticBeanstalk/WebApiForElasticBeanstalk

What is Elastic Beanstalk?

Elastic Beanstalk is one of the AWS compute services. It comes with several platform languages supported. In other languages such as Java there is a web kind of role and a worker kind of role. However for .NET there is ONLY a IIS web kind of role. However don’t let that put you off, The whole planet like web sites of late it would seem, and it just so happens Elastic Beanstalk is a perfect fit for these type of apps.

Elastic Beanstalk has the following architecture

Image result for elastic beanstalk

It can be seen that EC2 instance are part of an Auto Scale Group (scalability) and we also get a load balancer out of the box with a single URI endpoint for our Elastic Beanstalk app, which will load balance amongst the running web apps hosted on the EC2 instances which are all running the web app. This is quite a lot of stuff that is good to have for free. This sort of stuff is quite hard to configure by yourself, so this is quite cool.

Deploying from Visual Studio

So when you create a new .NET Core Web App (The demo uses a standard .NET Core WebApi project) you can publish it straight to Elastic Beanstalk straight from Visual Studio.

image

This is obviously thanks to the AWS Toolkit (which I have installed and talk about in the 1st article in this series). This will launch a wizard, which looks like this

image

You can choose to create a new application environment or use one that you previously created

image

We can then pick a name for the application and its URI that will be publicly available

image

You then pick your hardware requirements (EC2 instance types)

image

You then pick your application permissions / Service permissions

image

We then pick our application options

When you Finish the wizard you should see something like this screen

image

The entire deployment takes about 10 minutes to do, so be patient. One thing to note is that the Visual Studio deployment also creates the aws-beanstalk-tools-defaults.json file to aid in the final application deployment to AWS. This is its contents for this demo app

{
    "comment" : "This file is used to help set default values when using the dotnet CLI extension Amazon.ElasticBeanstalk.Tools. For more information run \"dotnet eb --help\" from the project root.",
    "profile" : "default",
    "region"  : "eu-west-2",
    "application" : "WebApiForElasticBeanstalk",
    "environment" : "WebApiForElasticBeanstalk-dev",
    "cname"       : "webapiforelasticbeanstalk-dev",
    "solution-stack" : "64bit Windows Server 2016 v1.2.0 running IIS 10.0",
    "environment-type" : "SingleInstance",
    "instance-profile" : "aws-elasticbeanstalk-ec2-role",
    "service-role"     : "aws-elasticbeanstalk-service-role",
    "health-check-url" : "/",
    "instance-type"    : "t2.micro",
    "key-pair"         : "",
    "iis-website"      : "Default Web Site",
    "app-path"         : "/",
    "enable-xray"      : false
}

This is what is happening behind the scenes

Launching an environment creates the following resources:

  • EC2 instance – An Amazon Elastic Compute Cloud (Amazon EC2) virtual machine configured to run web apps on the platform that you choose.

    Each platform runs a specific set of software, configuration files, and scripts to support a specific language version, framework, web container, or combination thereof. Most platforms use either Apache or nginx as a reverse proxy that sits in front of your web app, forwards requests to it, serves static assets, and generates access and error logs.

  • Instance security group – An Amazon EC2 security group configured to allow ingress on port 80. This resource lets HTTP traffic from the load balancer reach the EC2 instance running your web app. By default, traffic isn’t allowed on other ports.
  • Load balancer – An Elastic Load Balancing load balancer configured to distribute requests to the instances running your application. A load balancer also eliminates the need to expose your instances directly to the internet.
  • Load balancer security group – An Amazon EC2 security group configured to allow ingress on port 80. This resource lets HTTP traffic from the internet reach the load balancer. By default, traffic isn’t allowed on other ports.
  • Auto Scaling group – An Auto Scaling group configured to replace an instance if it is terminated or becomes unavailable.
  • Amazon S3 bucket – A storage location for your source code, logs, and other artifacts that are created when you use Elastic Beanstalk.
  • Amazon CloudWatch alarms – Two CloudWatch alarms that monitor the load on the instances in your environment and are triggered if the load is too high or too low. When an alarm is triggered, your Auto Scaling group scales up or down in response.
  • AWS CloudFormation stack – Elastic Beanstalk uses AWS CloudFormation to launch the resources in your environment and propagate configuration changes. The resources are defined in a template that you can view in the AWS CloudFormation console.
  • Domain name – A domain name that routes to your web app in the formsubdomain.region.elasticbeanstalk.com.

All of these resources are managed by Elastic Beanstalk. When you terminate your environment, Elastic Beanstalk terminates all the resources that it contains.

 

Taken from https://docs.aws.amazon.com/elasticbeanstalk/latest/dg/dotnet-core-tutorial.html#dotnet-core-tutorial-deploy up on date 19/09/18

Deploying from the command line

Ok so we can deploy from Visual Studio, this is cool, but it certainly won’t cut the mustard for CI purposes. Luckily AWS exposes some new .NET Core commands that we can use to deploy to Elastic Beanstalk. We need to grab the CLI tools to do this

Get the AWS Dot Net Core CLI tools

You will need .NET Core SDk 2.1.300 and later (dotnet tool is only available in 2.1.3 or later), once you have that installed you should be able to run this command dotnet tool install -g Amazon.ElasticBeanstalk.Tools to install the AWS Elastic Beanstalk dotnet commands. You can read more about this here : https://github.com/aws/aws-extensions-for-dotnet-cli/blob/master/README.md

So once you have these you should be able to change to the directory that contains your app to deploy and use this command line dotnet eb deploy-environment and follow the command line prompts. You may see an error message about S3 bucket not existing, which is easy enough to fix, just look at what bucket it was trying to create and go create it, it should be a Non-Public / Private bucket

Checking the deployment

So as we saw above a deployment to Elastic Beanstalk did quite a few things, so we should be able to check out the following

That the binaries are in S3 bucket

When we deploy an app to Elastic Beanstalk the binaries are placed into a S3 bucket, so we should see that has been created for us

image

Where we can drill into the bucket and see these files

image

The Elastic Beanstalk Env

We can use the Elastic Beankstalk console https://console.aws.amazon.com/elasticbeanstalk to see if our environment is deployed ok. We should see something like this

image

One thing that is interesting is if we go into the environment (green box)

image

Then use the Actions button at the top right, we can save this environment, so we can use this as a blueprint for a new environment.

Other cool stuff is that we can look at the Configuration, request logs, look at monitoring, setup alarms etc etc

Checking the deployed application works

Using the URL for our app, we should be able to test out our standard WebApi project. Lets give it a go

image

All looking good. So there you go. That’s all pretty cool I think

See ya later, not goodbye

Ok that’s it for now until the next post

AWS

AWS : Kinesis

What are we talking about this time?

This time we are going to talk about AWS Kinesis

 

Initial setup

If you did not read the very first part of this series of posts, I urge you to go and read that one now as it shows you how to get started with AWS, and create an IAM user : https://sachabarbs.wordpress.com/2018/08/30/aws-initial-setup/

 

Where is the code

The code for this post can be found here in GitHub : https://github.com/sachabarber/AWS/tree/master/Messaging/Kinesis

 

What is AWS Kinesis?

So last time we talked about SQS, and we saw that if you had multiple consumers, it was whoever managed to read the message from the queue first would be the one and only possible consumer of a message. Kinesis is very different to this.

 

Lets consider this text from the official AWS docs

Amazon Kinesis is a fully managed, cloud-based service for real-time data processing over large, distributed data streams. With Amazon Kinesis, data can be collected from many sources such as website clickstreams, IT logs, social media feeds, billing related transactions, and sensor readings from IoT devices.

 

After data has been stored in Amazon Kinesis, you can consume and process data with the KCL for data analysis, archival, real-time dashboards, and much more. While you can use Amazon Kinesis API functions to process stream data directly, the KCL takes care of many complex tasks associated with distributed processing and allows you to focus on the record processing logic. For example, the KCL can automatically load balance record processing across many instances, allow the user to checkpoint records that are already processed, and handle instance failures.

 

C# support for the KCL is implemented using the MultiLangDaemon.  The core of the KCL logic (communicating with Amazon Kinesis, load balancing, handling instance failure, etc.) resides in Java, and Amazon KCL for C# uses the multi-language daemon protocol to communicate with the Java daemon.

 

It can be seen above that there are 2 possible way of reading Kinesis data using .NET

  • Kinesis APIs
  • KCL

We will be looking at both of these when we get to looking at consumers

Kinesis is more comparable with something like Kafka (except Kafka is miles better, in so many ways, but lets not go there). This diagram should help to show you where Kinesis fits into the big picture

image

 

From this diagram we can see that a stream has multiple shards, and there may be multiple consumers of the stream data. This is different from SQS, where once a message was read by one consumer, it was just gone. We can also see that KCL consumers are supposed to run on EC2 instances, where the KCL can use load balancing, allow consumer checkpointing etc etc

This is all stuff you would expect to find in a decent stream based API.  Like I say if you want a top of the line streaming service, Kafka is your guy. But we are talking about AWS Kinesis so lets carry on the post for now.

 

IAM user privileges needed for Kinesis

For the demo code I added the following permissions to my IAM user to allow them to use AWS Kinesis

  • AmazonDynamoDBFullAccess
  • AmazonKinesisFullAccess
  • AmazonDynamoDBFullAccesswithDataPipeline
  • AmazonKinesisAnalyticsFullAccess

 

This is the official recommendation:

image

 

image

 

As this is just demo code for me, I was ok with using full access, in practice it may be good to have a dedicated producer IAM user/consumer IAM user.

 

Producer

Ok so as I stated above there are 2 consumer libraries that can be used with Kinesis, however the producer code is the same for both. So lets see an example of what a typical Producer looks like.

 

using Amazon.Kinesis.Model;
using System;
using System.Collections.Generic;
using System.IO;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Amazon.Kinesis.DataStreamproducer
{
    /// <summary>
    /// A sample producer of Kinesis records.
    /// </summary>
    class ProducerApp
    {


        private static readonly AmazonKinesisClient kinesisClient = 
            new AmazonKinesisClient(RegionEndpoint.EUWest2);
        const string myStreamName = "myTestStream";

        public static void Main(string[] args)
        {
            new ProducerApp().WriteToStream().GetAwaiter().GetResult();
        }

        private async Task WriteToStream()
        {

            const string myStreamName = "myTestStream";
            const int myStreamSize = 1;



            try
            {
                var createStreamRequest = new CreateStreamRequest();
                createStreamRequest.StreamName = myStreamName;
                createStreamRequest.ShardCount = myStreamSize;
                var createStreamReq = createStreamRequest;

                var existingStreams = await kinesisClient.ListStreamsAsync();

                if (!existingStreams.StreamNames.Contains(myStreamName))
                {

                    var CreateStreamResponse = await kinesisClient.CreateStreamAsync(createStreamReq);
                    Console.WriteLine("Created Stream : " + myStreamName);
                }
            }
            catch (ResourceInUseException)
            {
                Console.Error.WriteLine("Producer is quitting without creating stream " + myStreamName +
                    " to put records into as a stream of the same name already exists.");
                Environment.Exit(1);
            }

            await WaitForStreamToBecomeAvailableAsync(myStreamName);

            Console.Error.WriteLine("Putting records in stream : " + myStreamName);
            // Write 10 UTF-8 encoded records to the stream.
            for (int j = 0; j < 10; ++j)
            {
                byte[] dataAsBytes = Encoding.UTF8.GetBytes("testdata-" + j);
                using (MemoryStream memoryStream = new MemoryStream(dataAsBytes))
                {
                    try
                    {
                        PutRecordRequest requestRecord = new PutRecordRequest();
                        requestRecord.StreamName = myStreamName;
                        requestRecord.PartitionKey = "url-response-times";
                        requestRecord.Data = memoryStream;

                        PutRecordResponse responseRecord = 
                            await kinesisClient.PutRecordAsync(requestRecord);
                        Console.WriteLine("Successfully sent record to Kinesis. Sequence number: {0}", 
                            responseRecord.SequenceNumber);
                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine("Failed to send record to Kinesis. Exception: {0}", ex.Message);
                    }
                }
            }

            Console.ReadLine();

        }

        /// <summary>
        /// This method waits a maximum of 10 minutes for the specified stream to become active.
        /// <param name="myStreamName">Name of the stream whose active status is waited upon.</param>
        /// </summary>
        private static async Task WaitForStreamToBecomeAvailableAsync(string myStreamName)
        {
            var deadline = DateTime.UtcNow + TimeSpan.FromMinutes(10);
            while (DateTime.UtcNow < deadline)
            {
                DescribeStreamRequest describeStreamReq = new DescribeStreamRequest();
                describeStreamReq.StreamName = myStreamName;
                var describeResult = await kinesisClient.DescribeStreamAsync(describeStreamReq);
                string streamStatus = describeResult.StreamDescription.StreamStatus;
                Console.Error.WriteLine("  - current state: " + streamStatus);
                if (streamStatus == StreamStatus.ACTIVE)
                {
                    return;
                }
                Thread.Sleep(TimeSpan.FromSeconds(20));
            }

            throw new Exception("Stream " + myStreamName + " never went active.");
        }


    }
}

 

The main points above are that we use use a stream, which has a fixed number of partitions, and we are able to put records to it using the PutRecordAsync. There really isn’t too much more to talk about in the producer. The consumer however are fun

Consumer Data Streams API

So there is a low level (easier to use) API which you can use simply by adding a Nuget reference to AWSSDK.Kinesis, and then you can write code to iterate the shards and read all the messages from the shards contained with the Kinesis stream.  The code below is a complete Consumer using the Kinesis APIs

using Amazon.Kinesis.Model;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;

namespace Amazon.Kinesis.DataStreamConsumer
{
    /// <summary>
    /// A sample producer of Kinesis records.
    /// </summary>
    class ConsumerApp
    {
        private static readonly AmazonKinesisClient kinesisClient = 
            new AmazonKinesisClient(RegionEndpoint.EUWest2);
        const string myStreamName = "myTestStream";

        public static void Main(string[] args)
        {
            new ConsumerApp().ReadFromStream().GetAwaiter().GetResult();
        }

        private async Task ReadFromStream()
        {
            DescribeStreamRequest describeRequest = new DescribeStreamRequest();
            describeRequest.StreamName = myStreamName;

            DescribeStreamResponse describeResponse = 
                await kinesisClient.DescribeStreamAsync(describeRequest);
            List<Shard> shards = describeResponse.StreamDescription.Shards;

            foreach (Shard shard in shards)
            {
                GetShardIteratorRequest iteratorRequest = new GetShardIteratorRequest();
                iteratorRequest.StreamName = myStreamName;
                iteratorRequest.ShardId = shard.ShardId;
                iteratorRequest.ShardIteratorType = ShardIteratorType.TRIM_HORIZON;

                GetShardIteratorResponse iteratorResponse = await kinesisClient.GetShardIteratorAsync(iteratorRequest);
                string iteratorId = iteratorResponse.ShardIterator;

                while (!string.IsNullOrEmpty(iteratorId))
                {
                    GetRecordsRequest getRequest = new GetRecordsRequest();
                    getRequest.Limit = 1000;
                    getRequest.ShardIterator = iteratorId;

                    GetRecordsResponse getResponse = await kinesisClient.GetRecordsAsync(getRequest);
                    string nextIterator = getResponse.NextShardIterator;
                    List<Record> records = getResponse.Records;

                    if (records.Count > 0)
                    {
                        Console.WriteLine("Received {0} records. ", records.Count);
                        foreach (Record record in records)
                        {
                            string theMessage = Encoding.UTF8.GetString(record.Data.ToArray());
                            Console.WriteLine("message string: " + theMessage);
                        }
                    }
                    iteratorId = nextIterator;
                }
            }
        }

    }
}

If we ran the DataStreamProducer and DataStreamConsumer several times we would see output something like this, which shows the messages are persisted. There is a limit to this, which you must setup when you are creating your initial stream (again I prefer Kafka, but that’s just me)

 

image

Consumer KCL

This is a lot harder to get started with than the datastreams API (Nuget AWSSDK.Kinesis). So much so, that Amazon have pushed a starting kit for this, which you can find here: https://github.com/awslabs/amazon-kinesis-client-net/tree/bbe8abf6abb811f8b4c7469df7c1dd281dbb9caa

A typical c# record processor using KCL will look like this

using System;
using System.Collections.Generic;
using Amazon.Kinesis.ClientLibrary;

namespace Sample
{
    class SampleRecordProcessor : IRecordProcessor
    {
        public void Initialize(InitializationInput input)
        {
            // initialize
        }

        public void ProcessRecords(ProcessRecordsInput input)
        {
            // process batch of records (input.Records) and
            // checkpoint (using input.Checkpointer)
        }

        public void Shutdown(ShutdownInput input)
        {
            // cleanup
        }
    }

    class MainClass
    {
        public static void Main(string[] args)
        {
            KCLProcess.Create(new SampleRecordProcessor()).Run();
        }
    }
}

Which seems easy enough. However its not that easy, the bulk of the KCL is written in Java, and the .NET KCL communicates with the Java Daemon over StdIn/StdOut/StdError. As such there are quite a few steps you must follow to get a working C# KCL app up and running.

So what we really end up with is a C# consumer more like this:

/*
 * Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Amazon Software License (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 *  http://aws.amazon.com/asl/
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 */

using System;
using System.Collections.Generic;
using System.Threading;
using Amazon.Kinesis.ClientLibrary;

namespace Amazon.Kinesis.ClientLibrary.SampleConsumer
{
    /// <summary>
    /// A sample processor of Kinesis records.
    /// </summary>
    class SampleRecordProcessor : IRecordProcessor {

        /// <value>The time to wait before this record processor
        /// reattempts either a checkpoint, or the processing of a record.</value>
        private static readonly TimeSpan Backoff = TimeSpan.FromSeconds(3);

        /// <value>The interval this record processor waits between
        /// doing two successive checkpoints.</value>
        private static readonly TimeSpan CheckpointInterval = TimeSpan.FromMinutes(1);

        /// <value>The maximum number of times this record processor retries either
        /// a failed checkpoint, or the processing of a record that previously failed.</value>
        private static readonly int NumRetries = 10;

        /// <value>The shard ID on which this record processor is working.</value>
        private string _kinesisShardId;

        /// <value>The next checkpoint time expressed in milliseconds.</value>
        private DateTime _nextCheckpointTime = DateTime.UtcNow;

        /// <summary>
        /// This method is invoked by the Amazon Kinesis Client Library before records from the specified shard
        /// are delivered to this SampleRecordProcessor.
        /// </summary>
        /// <param name="input">
        /// InitializationInput containing information such as the name of the shard whose records this
        /// SampleRecordProcessor will process.
        /// </param>
        public void Initialize(InitializationInput input)
        {
            Console.Error.WriteLine("Initializing record processor for shard: " + input.ShardId);
            this._kinesisShardId = input.ShardId;
        }

        /// <summary>
        /// This method processes the given records and checkpoints using the given checkpointer.
        /// </summary>
        /// <param name="input">
        /// ProcessRecordsInput that contains records, a Checkpointer and contextual information.
        /// </param>
        public void ProcessRecords(ProcessRecordsInput input)
        {
            Console.Error.WriteLine("Processing " + input.Records.Count + " records from " + _kinesisShardId);

            // Process records and perform all exception handling.
            ProcessRecordsWithRetries(input.Records);

            // Checkpoint once every checkpoint interval.
            if (DateTime.UtcNow >= _nextCheckpointTime)
            {
                Checkpoint(input.Checkpointer);
                _nextCheckpointTime = DateTime.UtcNow + CheckpointInterval;
            }
        }

        /// <summary>
        /// This shuts down the record processor and checkpoints the specified checkpointer.
        /// </summary>
        /// <param name="input">
        /// ShutdownContext containing information such as the reason for shutting down the record processor,
        /// as well as a Checkpointer.
        /// </param>
        public void Shutdown(ShutdownInput input)
        {
            Console.Error.WriteLine("Shutting down record processor for shard: " + _kinesisShardId);
            // Checkpoint after reaching end of shard, so we can start processing data from child shards.
            if (input.Reason == ShutdownReason.TERMINATE)
            {
                Checkpoint(input.Checkpointer);
            }
        }

        /// <summary>
        /// This method processes records, performing retries as needed.
        /// </summary>
        /// <param name="records">The records to be processed.</param>
        private void ProcessRecordsWithRetries(List<Record> records)
        {
            foreach (Record rec in records)
            {
                bool processedSuccessfully = false;
                string data = null;
                for (int i = 0; i < NumRetries; ++i) {
                    try {
                        // As per the accompanying AmazonKinesisSampleProducer.cs, the payload
                        // is interpreted as UTF-8 characters.
                        data = System.Text.Encoding.UTF8.GetString(rec.Data);

                        // Uncomment the following if you wish to see the retrieved record data.
                        //Console.WriteLine(
                        //    String.Format("=========== > Retrieved record:\n\tpartition key = {0},\n\tsequence number = {1},\n\tdata = {2}",
                        //    rec.PartitionKey, rec.SequenceNumber, data));

                        // Your own logic to process a record goes here.
                        Console.WriteLine("===================================================");
                        Console.WriteLine($"===========> Saw message : {data}");
                        Console.WriteLine("===================================================");

                        processedSuccessfully = true;
                        break;
                    } catch (Exception e) {
                        Console.Error.WriteLine("Exception processing record data: " + data, e);
                    }

                    //Back off before retrying upon an exception.
                    Thread.Sleep(Backoff);
                }

                if (!processedSuccessfully)
                {
                    Console.Error.WriteLine("Couldn't process record " + rec + ". Skipping the record.");
                }
            }
        }

        /// <summary>
        /// This checkpoints the specified checkpointer with retries.
        /// </summary>
        /// <param name="checkpointer">The checkpointer used to do checkpoints.</param>
        private void Checkpoint(Checkpointer checkpointer)
        {
            Console.Error.WriteLine("Checkpointing shard " + _kinesisShardId);

            // You can optionally provide an error handling delegate to be invoked when checkpointing fails.
            // The library comes with a default implementation that retries for a number of times with a fixed
            // delay between each attempt. If you do not provide an error handler, the checkpointing operation
            // will not be retried, but processing will continue.
            checkpointer.Checkpoint(RetryingCheckpointErrorHandler.Create(NumRetries, Backoff));
        }
    }

    class MainClass
    {
        /// <summary>
        /// This method creates a KclProcess and starts running an SampleRecordProcessor instance.
        /// </summary>
        public static void Main(string[] args)
        {
            try
            {
                KclProcess.Create(new SampleRecordProcessor()).Run();
            }
            catch (Exception e) {
                Console.Error.WriteLine("ERROR: " + e);
            }
        }
    }
}

 

To actually run a C# KCL example you should do this

1. Download the sample from here : https://github.com/awslabs/amazon-kinesis-client-net

2. Follow the getting started guide here : https://github.com/awslabs/amazon-kinesis-client-net/tree/bbe8abf6abb811f8b4c7469df7c1dd281dbb9caa#getting-started

3. Follow this checklist

  • Run the boostrapper project to obtain the relevant KCL MultiLanuage JAR files
  • Make sure you have environment variables for AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY for the bootstrap app
  • Make sure you have Java installed (and its in your path)
  • Ensured the bootstrap has these command line args to include “–properties kcl.properties –execute”
  • Make sure the kcl.properties file has the correct streamName/executableName/regionName (which should be changed to match your region for producer/AWS account, so eu-west-2 was mine)
  • Make sure the executableName includes the FULL path to the consumer library, mine looks like this in kcl.properties : executableName = dotnet C:\\Users\\sacha\\Desktop\\AWS-Playground\\AWS\\Messaging\\Kinesis\\KCL\\SampleConsumer\\bin\\Debug\\netcoreapp2.0\\SampleConsumer.dll

Once and only once you have done that should you be able to run the example SampleProducer and Bootstrap projects together, and you should see output like this

 

image

 

While you can use Amazon Kinesis API functions to process stream data directly, the KCL takes care of many complex tasks associated with distributed processing and allows you to focus on the record processing logic. For example, the KCL can automatically load balance record processing across many instances, allow the user to checkpoint records that are already processed, and handle instance failures.

When you start a KCL application, it calls the KCL to instantiate a worker. This call provides the KCL with configuration information for the application, such as the stream name and AWS credentials.

The KCL performs the following tasks:

  • Connects to the stream
  • Enumerates the shards
  • Coordinates shard associations with other workers (if any)
  • Instantiates a record processor for every shard it manages
  • Pulls data records from the stream
  • Pushes the records to the corresponding record processor
  • Checkpoints processed records
  • Balances shard-worker associations when the worker instance count changes
  • Balances shard-worker associations when shards are split or merged

Behind the scenes the KCL library uses the Stateless .NET state machine library to ensure the correctness of the implementation is met

How do we deploy a KCL .NET Core App to AWS?

The recommended advise seems to be to use AWS Elastic Beanstalk, and although there are not really any specific AWS KCL C# guides, these should help you out, though I have to say if you want to run something like a .NET Core Console app in Elastic Beanstalk (rather than what everyone shows the classic ASP .NET Core example) you may have to dig quite deep. The advise seems to be to run that on a dedicated EC2 instance, rather than use Elastic Beanstalk

 

dotnet eb deploy-environment --publish-options "--runtime win-x64"

 

Here is an example of a Elastic Beanstalk environment that I created to host .NET apps:

 

image

 

From here you can use the “Upload and Deploy” button

 

See ya later, not goodbye

Ok that’s it for now until the next post

AWS

AWS : Simple Queue Service (SQS)

 

What are we talking about this time?

This time we are going to talk about AWS SQS (simple queue system)

 

Initial setup

If you did not read the very first part of this series of posts, I urge you to go and read that one now as it shows you how to get started with AWS, and create an IAM user : https://sachabarbs.wordpress.com/2018/08/30/aws-initial-setup/

 

Where is the code?

The code for this post can be found here in GitHub : https://github.com/sachabarber/AWS/tree/master/Messaging/SQSSinglePublisherManyReceivers

 

Ok so how does SQS File System work? What is it?

So SQS is in essence a simple cloud based queue. That is once a message has been read from the queue, it is gone. AWS also offers other messaging solutions, which compete with some of the big boys like Kafka/EventHub for your IOT type applications. SQS however is not that. It is, as I say a simple queue based system in the cloud, if you have used Azure its more akin to Azure storage queues

 

IAM user privileges needed for S3

You will need to add these permissions to your IAM user to allow them to use SQS

 

  • AmazonSQSFullAccess

 

Obviously if you are working in a team you will not want to give out full access, but for this series of posts this is fine.

 

 

So what will we try and cover in this post?

This post will cover the basics of using SQS. As I say it is the basics of using SQS, in some future posts we will look at using S3 as a trigger for SQS, and also maybe look at SQS triggering Lambda functions. But the bulk of this post is about plain SQS functionality

 

Creating a SQS queue from the AWS console

You can create a new queue directly from the AWS console (as well as programmatically, which we will see later). Shown below are a number of screen shots that show you how to create a new SQS queue in the AWS console

 

 

 

image

Initial queue creation

 

image

SQS also supports encryption. This can be turned on when you first create the queue

 

image

 

 

image

Once the queue is created you will see a screen like that above

 

How about programmatically?

We are devs though right? So how do we do this stuff programmatically?

 

Create queues/send to queue

Lets see. Here is how we do that using the AmazonSQSClient

 

using System;
using System.Linq;
using System.Threading.Tasks;
using Amazon.SQS;
using Amazon.SQS.Model;
using Nito.AsyncEx;

namespace SQSSinglePublisherManyReceivers
{
    class Program
    {
        private static bool _receieverShouldDeleteMessage = false;
        private static AmazonSQSClient _sqs = new AmazonSQSClient();
        private static string _myQueueUrl;


        static void Main(string[] args)
        {
            AsyncContext.Run(() => MainAsync(args));
        }

        static async void MainAsync(string[] args)
        {
            try
            {
                Console.WriteLine("===========================================");
                Console.WriteLine("Getting Started with Amazon SQS");
                Console.WriteLine("===========================================\n");

                //Creating a queue
                Console.WriteLine("Create a queue called MyQueue.\n");
                var sqsRequest = new CreateQueueRequest { QueueName = "MyQueue11" };
                var createQueueResponse = await _sqs.CreateQueueAsync(sqsRequest);
                _myQueueUrl = createQueueResponse.QueueUrl;

                //Confirming the queue exists
                var listQueuesRequest = new ListQueuesRequest();
                var listQueuesResponse = await _sqs.ListQueuesAsync(listQueuesRequest);

                Console.WriteLine("Printing list of Amazon SQS queues.\n");
                if (listQueuesResponse.QueueUrls != null)
                {
                    foreach (String queueUrl in listQueuesResponse.QueueUrls)
                    {
                        Console.WriteLine("  QueueUrl: {0}", queueUrl);
                    }
                }
                Console.WriteLine();

                //Sending a message
                for (int i = 0; i < 10; i++)
                {
                    var message = $"This is my message text-Id-{Guid.NewGuid().ToString("N")}";
                    //var message = $"This is my message text";
                    Console.WriteLine($"Sending a message to MyQueue : {message}");
                    var sendMessageRequest = new SendMessageRequest
                    {
                        QueueUrl = _myQueueUrl, //URL from initial queue creation
                        MessageBody = message
                    };
                    await _sqs.SendMessageAsync(sendMessageRequest);
                }

               .....

            }
            catch (AmazonSQSException ex)
            {
                Console.WriteLine("Caught Exception: " + ex.Message);
                Console.WriteLine("Response Status Code: " + ex.StatusCode);
                Console.WriteLine("Error Code: " + ex.ErrorCode);
                Console.WriteLine("Error Type: " + ex.ErrorType);
                Console.WriteLine("Request ID: " + ex.RequestId);
            }

            Console.WriteLine("Press Enter to continue...");
            Console.Read();
        }


        
        }
    }
}

 

Receiving

And to keep things interesting lets create a number of receivers (I published 10 messages, so lets create 5 receivers)

 

using System;
using System.Linq;
using System.Threading.Tasks;
using Amazon.SQS;
using Amazon.SQS.Model;
using Nito.AsyncEx;

namespace SQSSinglePublisherManyReceivers
{
    class Program
    {
        private static bool _receieverShouldDeleteMessage = false;
        private static AmazonSQSClient _sqs = new AmazonSQSClient();
        private static string _myQueueUrl;


        static void Main(string[] args)
        {
            AsyncContext.Run(() => MainAsync(args));
        }

        static async void MainAsync(string[] args)
        {
            try
            {
				//publish code
				//publish code
				//publish code
				//publish code
				......
				
                //start of 5 receiver tasks
                var tasks = Enumerable.Range(0, 5).Select(number =>
                    Task.Run(async () =>
                        await ReceiveMessage(number)
                    )).ToList();

                await Task.WhenAll(tasks);

            }
            catch (AmazonSQSException ex)
            {
                Console.WriteLine("Caught Exception: " + ex.Message);
                Console.WriteLine("Response Status Code: " + ex.StatusCode);
                Console.WriteLine("Error Code: " + ex.ErrorCode);
                Console.WriteLine("Error Type: " + ex.ErrorType);
                Console.WriteLine("Request ID: " + ex.RequestId);
            }

            Console.WriteLine("Press Enter to continue...");
            Console.Read();
        }


        private static async Task ReceiveMessage(int state)
        {
            //Receiving a message
            var receiveMessageRequest = new ReceiveMessageRequest { QueueUrl = _myQueueUrl };
            var receiveMessageResponse = await _sqs.ReceiveMessageAsync(receiveMessageRequest);
            if (receiveMessageResponse.Messages != null)
            {
                Console.WriteLine($"Receiever {state} Printing received message.\n");
                foreach (var message in receiveMessageResponse.Messages)
                {
                    Console.WriteLine($"Receiever {state}   Message");
                    if (!string.IsNullOrEmpty(message.MessageId))
                    {
                        Console.WriteLine($"Receiever {state}     MessageId: {message.MessageId}");
                    }
                    if (!string.IsNullOrEmpty(message.ReceiptHandle))
                    {
                        Console.WriteLine($"Receiever {state}     ReceiptHandle: {message.ReceiptHandle}");
                    }
                    if (!string.IsNullOrEmpty(message.MD5OfBody))
                    {
                        Console.WriteLine($"Receiever {state}     MD5OfBody: {message.MD5OfBody}");
                    }
                    if (!string.IsNullOrEmpty(message.Body))
                    {
                        Console.WriteLine($"Receiever {state}     Body: {message.Body}");
                    }

                    foreach (string attributeKey in message.Attributes.Keys)
                    {
                        Console.WriteLine("  Attribute");
                        Console.WriteLine("    Name: {0}", attributeKey);
                        var value = message.Attributes[attributeKey];
                        Console.WriteLine("    Value: {0}", string.IsNullOrEmpty(value) ? "(no value)" : value);
                    }
                }

                var messageRecieptHandle = receiveMessageResponse.Messages[0].ReceiptHandle;

                if (_receieverShouldDeleteMessage)
                {
                    //Deleting a message
                    Console.WriteLine("Deleting the message.\n");
                    var deleteRequest = new DeleteMessageRequest { QueueUrl = _myQueueUrl, ReceiptHandle = messageRecieptHandle };
                    _sqs.DeleteMessage(deleteRequest);
                }
            }
        }
    }
}

 

 

The demos full output should be something like this

image

 

 

Sending output

This is the output (see how I sent 10 messages)

 

Sending a message to MyQueue : This is my message text-Id-ec6bfabda87d4347ac717680b46665dc
Sending a message to MyQueue : This is my message text-Id-13221ee1edd4451bb69171f9f3903f72
Sending a message to MyQueue : This is my message text-Id-2b3bcdcf9f6149cbbccaf992a8ce770d
Sending a message to MyQueue : This is my message text-Id-c0f29bc20c904b66a0b09931bff3d689
Sending a message to MyQueue : This is my message text-Id-57c3681bbb214b4db21598e42522635a
Sending a message to MyQueue : This is my message text-Id-0466901408014641b4cc33dd5c5a0034
Sending a message to MyQueue : This is my message text-Id-297aa21d624645fbbd10429a53361011
Sending a message to MyQueue : This is my message text-Id-a8c7430e40914e7d936116a596abf222
Sending a message to MyQueue : This is my message text-Id-481f1e058d75417a992eda1d7be35e0d
Sending a message to MyQueue : This is my message text-Id-6aa6423c182643d183c72c0f9f2ba256

 

And if I copy the output from the console and filter it in Notepad++, I will get this

 

Receiving output

See how I get one message being picked up by each of the 5 threads I created. This also leaves the remaining 5 in the SQS queue

Receiever 4     Body: This is my message text-Id-ec6bfabda87d4347ac717680b46665dc
Receiever 0     Body: This is my message text-Id-2b3bcdcf9f6149cbbccaf992a8ce770d
Receiever 1     Body: This is my message text-Id-a8c7430e40914e7d936116a596abf222
Receiever 3     Body: This is my message text-Id-13221ee1edd4451bb69171f9f3903f72
Receiever 2     Body: This is my message text-Id-57c3681bbb214b4db21598e42522635a

 

What this shows us is that once a message is marked as received, the message is gone, it is not available to another thread or process. AWS also supports another type of messaging service called Kinesis, which works a bit more like Apache Kafka and Azure EventHub, where each consumer/receiver has to mark their own offset into the message log.

 

This is quite useful for things like Event Sourcing, but it is a little out of scope for this post. I will be talking about Kinesis in the next post, so lets leave this discussion until then.

 

 

See ya later, not goodbye

Ok that’s it for now until the next post

AWS

AWS : TransferUtility

What are we talking about this time?

This time we are going to talk about AWS S3 TransferUtility. This is a fairly simple utility class, and as such this is not going to be a long post.

 

Initial setup

If you did not read the very first part of this series of posts, I urge you to go and read that one now as it shows you how to get started with AWS, and create an IAM user : https://sachabarbs.wordpress.com/2018/08/30/aws-initial-setup/

 

Where is the code?

The code for this post can be found here in GitHub : https://github.com/sachabarber/AWS/tree/master/Storage/S3TrasferUtility

 

So what is the TransferUtility

Provides a high level utility for managing transfers to and from Amazon S3.

 

TransferUtility provides a simple API for uploading content to and downloading content from Amazon S3. It makes extensive use of Amazon S3 multipart uploads to achieve enhanced throughput, performance, and reliability.

 

When uploading large files by specifying file paths instead of a stream, TransferUtility uses multiple threads to upload multiple parts of a single upload at once. When dealing with large content sizes and high bandwidth, this can increase throughput significantly.

 

 

 

IAM user privileges needed for S3

You will need to add these permissions to your IAM user to allow them to use S3

 

  • AmazonS3FullAccess

 

Obviously if you are working in a team you will not want to give out full access, but for this series of posts this is fine.

 

 

So what will we try and cover in this post?

So to my mind what I am trying to cover is how to use the TransferUtility class to do the following things

 

  • Write a Stream to S3 bucket from local content
  • Read a Stream from s3 bucket
  • Download an object from s3 as a Stream to local file

 

Install the nugets

So lets start. The first thing we need to do is install the Nuget packages, which for this demo are

  • AWSSDK.S3

Ok now that we have that in place and we know (thanks to the 1st post about how to use the default Profile which is linked to the IAM user for this demo series), we can just go through the items on the list above one by one

 

Write a Stream to S3 bucket from local content

When you have large files, you really want to send them as Streams to S3. Even better still would be would be to create different chunks and have different worker upload their own chunk of the overall large file. This is totally possible to do yourself, but luckily for us, this is what the TransferUtility  does for us all for free. So all we have to do to upload a potentially large file as a Stream is this sort of thing

 

async Task WritingAStreamPublicAsync()
{
    await CarryOutAWSTask(async () =>
    {
        //sly inner function
        async Task<Stream> GenerateStreamFromStringAsync(string s)
        {
            var stream = new MemoryStream();
            var writer = new StreamWriter(stream);
            await writer.WriteAsync(s);
            await writer.FlushAsync();
            stream.Position = 0;
            return stream;
        }

        await CreateABucketAsync(bucketName);

        var fileTransferUtility = new TransferUtility(client);
        var contentsToUpload = "some random string contents";
        Console.WriteLine("Uploading the following contents using TransferUtility");
        Console.WriteLine(contentsToUpload);
        using (var streamToUpload = await GenerateStreamFromStringAsync(contentsToUpload))
        {
            var uploadRequest = new TransferUtilityUploadRequest()
            {
                InputStream = streamToUpload,
                Key = fileName,
                BucketName = bucketName,
                CannedACL = S3CannedACL.PublicRead
            };

            //If you are uploading large files, TransferUtility 
            //will use multipart upload to fulfill the request
            await fileTransferUtility.UploadAsync(uploadRequest);
        }
        Console.WriteLine($"Upload using stream to file '{fileName}' completed");


    }, "Writing using a Stream to public file");
}

 

 

Read a Stream from s3 bucket

So now that we have uploaded something, it would be reasonable to be able to get a StreamReader to read from the S3 object as Stream, where we do not take the hit of reading the entire object into memory in one go. Obviously as this is just a demo I know what I uploaded to be VERY small, so we just read the entire Stream into memory.

async Task ReadingAnObjectFromS3AsAStream()
{
    await CarryOutAWSTask(async () =>
    {

        var fileTransferUtility = new TransferUtility(client);
        using (var fs = await fileTransferUtility.OpenStreamAsync(bucketName, fileName, CancellationToken.None))
        {
            using (var reader = new StreamReader(fs))
            {
                var contents = await reader.ReadToEndAsync();
                Console.WriteLine($"Content of file {fileName} is");
                Console.WriteLine(contents);
            }
        }
    }, "Reading an Object from S3 as a Stream");
}

 

 

Download an object from s3 as a Stream to local file

To my mind the last thing you may want to do is to actually download the data as a Stream to a local file. This time we use the TransferUtility.DownloadAsync to download to a file of our choosing. In this contrived example we then read out the saved file contents just to print it out. As before as its my own code, I know the content is tiny, so we can read entire Stream into memory.

async Task DownloadingAnObjectFromS3AsAStream()
{
    await CarryOutAWSTask(async () =>
    {
        var fileTransferUtility = new TransferUtility(client);
        string theTempFile = Path.Combine(Path.GetTempPath(), "SavedS3TextFile.txt");
        try
        {
            await fileTransferUtility.DownloadAsync(theTempFile, bucketName, fileName);
            using (var fs = new FileStream(theTempFile, FileMode.Open))
            {
                using (var reader = new StreamReader(fs))
                {
                    var contents = await reader.ReadToEndAsync();
                    Console.WriteLine($"Content of saved file {theTempFile} is");
                    Console.WriteLine(contents);
                }
            }
        }
        finally
        {
            File.Delete(theTempFile);
        }

    }, "Downloading an Object from S3 as a Stream");
}

 

 

TransferUtility Request Events

If you want to track the progress of uploads/downloads using the TransferUtility you should use the method overloads that accept the required request. The requests themselves typically expose an event that you can hook into to track progress. Here is one such example

 

// Summary:
//     Contains all the parameters that can be set when making a this request with the
//     TransferUtility method.
public class TransferUtilityDownloadRequest : BaseDownloadRequest
{
	public TransferUtilityDownloadRequest();

	//
	// Summary:
	//     Get or sets the file path location of where the downloaded Amazon S3 object will
	//     be written to.
	public string FilePath { get; set; }

	//
	// Summary:
	//     The event for WriteObjectProgressEvent notifications. All subscribers will be
	//     notified when a new progress event is raised.
	//     The WriteObjectProgressEvent is fired as data is downloaded from S3. The delegates
	//     attached to the event will be passed information detailing how much data has
	//     been downloaded as well as how much will be downloaded.
	//
	// Remarks:
	//     Subscribe to this event if you want to receive WriteObjectProgressEvent notifications.
	//     Here is how: 1. Define a method with a signature similar to this one: private
	//     void displayProgress(object sender, WriteObjectProgressArgs args) { Console.WriteLine(args);
	//     } 2. Add this method to the WriteObjectProgressEvent delegate's invocation list
	//     TransferUtilityDownloadRequest request = new TransferUtilityDownloadRequest();
	//     request.WriteObjectProgressEvent += displayProgress;
	public event EventHandler<WriteObjectProgressArgs> WriteObjectProgressEvent;
}

 

See ya later, not goodbye

Ok that’s it for now until the next post

AWS

AWS : S3 File System

What are we talking about this time?

This time we are going to talk about AWS S3 fie system. This builds upon what we have covered in the last couple of posts, and will show you how you can leverage your own filesystem and AWS S3 filesystem.

 

Initial setup

If you did not read the very first part of this series of posts, I urge you to go and read that one now as it shows you how to get started with AWS, and create an IAM user : https://sachabarbs.wordpress.com/2018/08/30/aws-initial-setup/

 

Where is the code?

The code for this post can be found here in GitHub : https://github.com/sachabarber/AWS/tree/master/Storage/S3FileSystem

 

Ok so how does S3 File System work?

As we have already seen in the previous post S3 has a concept of buckets, and files. Thing is we can also use S3 to create file system like hierarchies, and place new directories/subdirectories and files in them. There is also a decent set of APIs to make it feel much like working with a regular file system in .NET. Since this is just a bit more about AWS S3 in essence, I will skip what I have already said about buckets etc etc.

 

IAM user privileges needed for S3

You will need to add these permissions to your IAM user to allow them to use S3

 

  • AmazonS3FullAccess

 

Obviously if you are working in a team you will not want to give out full access, but for this series of posts this is fine.

 

 

So what will we try and cover in this post?

So to my mind what I am trying to cover is how to use the main classes in the S3 file system namespace, namely S3DirectoryInfo and S3FileInfo where we will see examples of how to do the following things

 

  • Create a directory
  • Create a file
  • Create a sub directory
  • Enumerate a directory structure
  • Read file contents

 

But before we get onto the rest of the post lets see some of the methods available on these 2 classes

 

S3DirectoryInfo

Here is a breakdown of the methods/properties you can play with on this one

 

Bucket   
The S3DirectoryInfo for the root of the S3 bucket.

CopyFromLocal(String)   
Copies files from the local file system to S3 in this directory. Sub directories are copied as well.

CopyFromLocal(String, DateTime)   
Copies files from the local file system to S3 in this directory. Sub directories are copied as well. Only files that have been modified since the changesSince property will be copied.

CopyTo(String, String)   
Copies the files from this directory to the target directory specified by the bucket and object key.

CopyTo(String, String, DateTime)   
Copies the files from this directory to the target directory specified by the bucket and object key. Only files that have changed since the changeSince date will be copied.

CopyTo(S3DirectoryInfo)   
Copies the files from this directory to the target directory.

CopyTo(S3DirectoryInfo, DateTime)   
Copies the files from this directory to the target directory. Only files that have changed since the changeSince date will be copied.

CopyToLocal(String)   
Copies the files from the S3 directory to the local file system in the location indicated by the path parameter.

CopyToLocal(String, DateTime)   
Copies the files from the S3 directory to the local file system in the location indicated by the path parameter. Only files that have been modified since the changesSince property will be copied.

Create()   
Creates the directory in S3. If no object key was specified when creating the S3DirectoryInfo then the bucket will be created.

CreateSubdirectory(String)   
Creates a sub directory inside the instance of S3DirectoryInfo.

Delete()   
Deletes all the files in this directory as well as this directory.

Delete(Boolean)   
Deletes all the files in this directory as well as this directory. If recursive is set to true then all sub directories will be deleted as well.

EnumerateDirectories()   
Enumerate the sub directories of this directory.

EnumerateDirectories(String)   
Enumerate the sub directories of this directory.

EnumerateDirectories(String, SearchOption)   
Enumerate the sub directories of this directory.

EnumerateFiles()   
Enumerate the files of this directory.

EnumerateFiles(String)   
Enumerate the sub directories of this directory.

EnumerateFiles(String, SearchOption)   
Enumerate the files of this directory.

EnumerateFileSystemInfos()   
Enumerate the files of this directory.

EnumerateFileSystemInfos(String)   
Enumerate the files of this directory.

EnumerateFileSystemInfos(String, SearchOption)   
Enumerate the files of this directory.

Exists   
Checks with S3 to see if the directory exists and if so returns true. Due to Amazon S3’s eventual consistency model this property can return false for newly created buckets.

FullName   
The full path of the directory including bucket name.

GetDirectories()   
Returns an array of S3DirectoryInfos for the directories in this directory.

GetDirectories(String)   
Returns an array of S3DirectoryInfos for the directories in this directory.

GetDirectories(String, SearchOption)   
Returns an array of S3DirectoryInfos for the directories in this directory.

GetDirectory(String)   
Returns the S3DirectoryInfo for the specified sub directory.

GetFile(String)   
Returns the S3FileInfo for the specified file.

GetFiles()   
Returns an array of S3FileInfos for the files in this directory.

GetFiles(String)   
Returns an array of S3FileInfos for the files in this directory.

GetFiles(String, SearchOption)   
Returns an array of S3FileInfos for the files in this directory.

GetFileSystemInfos()   
Returns an array of IS3FileSystemInfos for the files in this directory.

GetFileSystemInfos(String)   
Returns an array of IS3FileSystemInfos for the files in this directory.

GetFileSystemInfos(String, SearchOption)   
Returns an array of IS3FileSystemInfos for the files in this directory.

LastWriteTime   
Returns the last write time of the the latest file written to the directory.

LastWriteTimeUtc   
UTC converted version of LastWriteTime.

MoveFromLocal(String)   
Moves files from the local file system to S3 in this directory. Sub directories are moved as well.

MoveTo(String, String)   
Moves the files from this directory to the target directory specified by the bucket and object key.

MoveTo(S3DirectoryInfo)   
Moves the files from this directory to the target S3 directory.

MoveToLocal(String)   
Moves the files from the S3 directory to the local file system in the location indicated by the path parameter.

Name   
Returns the name of the folder.

Parent   
Return the S3DirectoryInfo of the parent directory.

Root   
Returns the S3DirectroyInfo for the S3 account.

 

S3FileInfo

Here is a breakdown of the methods/properties you can play with on this one

 

CopyFromLocal(String)   
Copies the file from the local file system to S3. If the file already exists in S3 than an ArgumentException is thrown.

CopyFromLocal(String, Boolean)   
Copies the file from the local file system to S3. If the file already exists in S3 and overwrite is set to false than an ArgumentException is thrown.

CopyTo(String, String)   
Copies this file’s content to the file indicated by the S3 bucket and object key. If the file already exists in S3 than an ArgumentException is thrown.

CopyTo(String, String, Boolean)   
Copies this file’s content to the file indicated by the S3 bucket and object key. If the file already exists in S3 and overwrite is set to false than an ArgumentException is thrown.

CopyTo(S3DirectoryInfo)   
Copies this file to the target directory. If the file already exists in S3 than an ArgumentException is thrown.

CopyTo(S3DirectoryInfo, Boolean)   
Copies this file to the target directory. If the file already exists in S3 and overwrite is set to false than an ArgumentException is thrown.

CopyTo(S3FileInfo)   
Copies this file to the location indicated by the passed in S3FileInfo. If the file already exists in S3 than an ArgumentException is thrown.

CopyTo(S3FileInfo, Boolean)   
Copies this file to the location indicated by the passed in S3FileInfo. If the file already exists in S3 and overwrite is set to false than an ArgumentException is thrown.

CopyToLocal(String)   
Copies from S3 to the local file system. If the file already exists on the local file system than an ArgumentException is thrown.

CopyToLocal(String, Boolean)   
Copies from S3 to the local file system. If the file already exists on the local file system and overwrite is set to false than an ArgumentException is thrown.

Create()   
Returns a Stream that can be used to write data to S3. The content is persisted to S3 once the Stream is closed.

CreateText()   
Returns a StreamWriter that can be used to write data to S3. The content is persisted to S3 once the StreamWriter is closed.

Delete()   
Deletes the from S3.

Directory   
Returns the parent S3DirectoryInfo.

DirectoryName   
The full name of parent directory.

Exists   
Checks S3 if the file exists in S3 and return true if it does.

Extension   
Gets the file’s extension.

FullName   
Returns the full path including the bucket.

LastWriteTime   
Returns the last time the file was modified.

LastWriteTimeUtc   
Returns the last time the fule was modified in UTC.

Length   
Returns the content length of the file.

MoveFromLocal(String)   
Moves the file from the local file system to S3 in this directory. If the file already exists in S3 than an ArgumentException is thrown.

MoveFromLocal(String, Boolean)   
Moves the file from the local file system to S3 in this directory. If the file already exists in S3 and overwrite is set to false than an ArgumentException is thrown.

MoveTo(String, String)   
Moves the file to a a new location in S3.

MoveTo(S3DirectoryInfo)   
Moves the file to a a new location in S3.

MoveTo(S3FileInfo)   
Moves the file to a a new location in S3.

MoveToLocal(String)   
Moves the file from S3 to the local file system in the location indicated by the path parameter.

Name   
Returns the file name without its directory name.

OpenRead()   
Returns a Stream for reading the contents of the file.

OpenText()   
Returns a StreamReader for reading the contents of the file.

OpenWrite()   
Returns a Stream for writing to S3. If the file already exists it will be overwritten.

Replace(String, String, String, String)   
Replaces the destination file with the content of this file and then deletes the orignial file. If a backup location is specifed then the content of destination file is backup to it.

Replace(S3DirectoryInfo, S3DirectoryInfo)   
Replaces the destination file with the content of this file and then deletes the orignial file. If a backupDir is specifed then the content of destination file is backup to it.

Replace(S3FileInfo, S3FileInfo)   
Replaces the destination file with the content of this file and then deletes the orignial file. If a backupFile is specifed then the content of destination file is backup to it.

ReplaceLocal(String, String)   
Replaces the content of the destination file on the local file system with the content from this file from S3. If a backup file is specified then the content of the destination file is backup to it.

 

Beware

The reason I went to great pains to list the methods above, was to make you realize NOT ONE of them is Async/Await. So just watch that

 

 

Install the nugets

So lets start. The first thing we need to do is install the Nuget packages, which for this demo are

 

  • AWSSDK.S3

 

Ok now that we have that in place and we know (thanks to the 1st post about how to use the default Profile which is linked to the IAM user for this demo series), we can just go through the items on the list above one by one

 

Create a directory

This shows how to create top level directory (bucket really)

S3DirectoryInfo rootDirectory = new S3DirectoryInfo(client, bucketName);
rootDirectory.Create();

 

Create a file

So now we can create a file like this

S3FileInfo readme = rootDirectory.GetFile("README.txt");
using (StreamWriter writer = new StreamWriter(readme.OpenWrite()))
    writer.WriteLine("This is my readme file.");

 

Create a sub directory

Creating a subdirectory is done like this

S3DirectoryInfo codeDir = rootDirectory.CreateSubdirectory("wiki");

 

Enumerate a directory structure

Should we wish to enumerate a directory structure we can do something like this

WriteDirectoryStructure(rootDirectory, 0);
.....
.....
.....
.....
static void WriteDirectoryStructure(S3DirectoryInfo directory, int level)
{
    StringBuilder indentation = new StringBuilder();
    for (int i = 0; i < level; i++)
        indentation.Append("\t");

    Console.WriteLine("{0}{1}", indentation, directory.Name);
    foreach (var file in directory.GetFiles())
        Console.WriteLine("\t{0}{1}", indentation, file.Name);

    foreach (var subDirectory in directory.GetDirectories())
    {
        WriteDirectoryStructure(subDirectory, level + 1);
    }
}

 

 

Read file contents

We can also read file contents (as a Stream too YAY)

foreach (var file in codeDir.GetFiles())
{
    Console.WriteLine("Content of {0}", file.Name);
    Console.WriteLine("------------------------------------");
    using (StreamReader reader = file.OpenText())
    {
        Console.WriteLine(reader.ReadToEnd());
    }
}

     

     

    After running the demo code associated with this article we should see something like this (providing you have set the deleteAtEnd variable to false in the code)

     

    image

     

     

    See ya later, not goodbye

    Ok that’s it for now until the next post

    AWS

    AWS : Glacier Storage

    What are we talking about this time?

    This time we will be talking about glacier storage, which is AWS answer to slow moving/archival data. If you have ever seen a cop movie where there are some dicks (detectives) working on a case and they have a slim file on their desks (this would be S3) but they have to go to some grungy old basement where the rest of the files are kept (that would be Glacier Storage).

     

    Glacier storage is slower, but costs very little. However don’t let that fool you it is still quite a useful service, with tools like

    • AWS Athena
    • Glacier Select

    Both of which allow you to query the data in Glacier Storage using SQL syntax (obviously to do that the data files stored in Glacier will need to be readable as related data, so some sort of CSV/JSON format)

     

     

    Initial setup

    If you did not read the very first part of this series of posts, I urge you to go and read that one now as it shows you how to get started with AWS, and create an IAM user : https://sachabarbs.wordpress.com/2018/08/30/aws-initial-setup/

     

    Where is the code?

    The code for this post can be found here in GitHub : https://github.com/sachabarber/AWS/tree/master/Storage/GlacierStorage

     

    Ok so how does Glacier Storage work?

    There are 2 main concepts you need to understand:

     

    • Vault : This is a top level object that will store Archives
    • Archive : This represents the file you uploaded as an archive (it must be a file when you upload it)

     

     

    IAM user privileges needed for S3

    You will need to add these permissions to your IAM user to allow them to use S3

     

    • AmazonGlacierlFullAccess

     

    Obviously if you are working in a team you will not want to give out full access, but for this series of posts this is fine.

     

     

    So what will we try and cover in this post?

    We will try and cover

    • Creating a vault, and uploading an archive to it
    • Downloading an archive from a vault

     

     

    Install the nugets

    So lets start. The first thing we need to do is install the Nuget packages, which for this demo are

     

    • AWSSDK.Glacier
    • Nito.AsyncEx (nice async await extensions, like awaitable Console apps)

     

    Ok now that we have that in place and we know (thanks to the 1st post about how to use the default Profile which is linked to the IAM user for this demo series), we can just go through the items on the list above one by one

     

    Creating a vault and uploading an archive to it

    The C# API starts with this class AmazonGlacierClient. An instance of this then gets fed into a ArchiveTransferManager which you will then use to manage the Vaults/Archives.

     

    This is how you would create a new vault and upload an archive to it

    using System;
    using Amazon.Glacier;
    using Amazon.Glacier.Transfer;
    using System.Collections.Specialized;
    using System.Configuration;
    using Amazon.Runtime;
     
    namespace GlacierStorage
    {
        class Program
        {
            // Change the AWSProfileName to the profile you want to use in the App.config file.
            // See http://aws.amazon.com/credentials  for more details.
            // You must also sign up for an Amazon Glacier account for this to work
            // See http://aws.amazon.com/glacier/ for details on creating an Amazon Glacier account
            // Change the vaultName and fileName fields to values that match your vaultName and fileName
     
            static int currentPercentage = -1;
     
            static ArchiveTransferManager manager;
            static string archiveId;
     
            //Set the vault name you want to use here.
            static string vaultName = null;
     
            // Set the file path for the file you want to upload here.
            static string filePath = null;
     
            // Set the file path for the archive to be saved after download.
            static string downloadFilePath = null;
     
     
            static void Main(string[] args)
            {
    			AsyncContext.Run(() => MainAsync(args));
            }
     
     
            static async void MainAsync(string[] args)
            {
                if (CheckRequiredFields())
                {
                    var glacierClient = new AmazonGlacierClient();
                    using (manager = new ArchiveTransferManager(glacierClient))
                    {
                        try
                        {
                            // Creates a new Vault
                            Console.WriteLine("Create Vault");
                            await manager.CreateVaultAsync(vaultName);
     
                            // Uploads the specified file to Glacier.
                            Console.WriteLine("Upload a Archive");
                            var uploadResult = await manager.UploadAsync(vaultName, "Archive Description", filePath);
                            archiveId = uploadResult.ArchiveId;
                            Console.WriteLine("Upload successful. Archive Id : {0}  Checksum : {1}",
                                uploadResult.ArchiveId, uploadResult.Checksum);
     
                            .......
                        }
                        catch (AmazonGlacierException e)
                        {
                            Console.WriteLine(e.Message);
                        }
                        catch (AmazonServiceException e)
                        {
                            Console.WriteLine(e.Message);
                        }
                    }
                }
            }
     
        }
    }
    

    Download an archive

    This code shows how to download an archive from Glacier. As this is a long running operation involving creating a SNS topic/and SQS queue and polling there is also an event based mechanism by which progress is reported. This is shown in the example below

     

    using System;
    using Amazon.Glacier;
    using Amazon.Glacier.Transfer;
    using System.Collections.Specialized;
    using System.Configuration;
    using Amazon.Runtime;
     
    namespace GlacierStorage
    {
        class Program
        {
            // Change the AWSProfileName to the profile you want to use in the App.config file.
            // See http://aws.amazon.com/credentials  for more details.
            // You must also sign up for an Amazon Glacier account for this to work
            // See http://aws.amazon.com/glacier/ for details on creating an Amazon Glacier account
            // Change the vaultName and fileName fields to values that match your vaultName and fileName
     
            static int currentPercentage = -1;
     
            static ArchiveTransferManager manager;
            static string archiveId;
     
            //Set the vault name you want to use here.
            static string vaultName = null;
     
            // Set the file path for the file you want to upload here.
            static string filePath = null;
     
            // Set the file path for the archive to be saved after download.
            static string downloadFilePath = null;
     
     
            static void Main(string[] args)
            {
    			AsyncContext.Run(() => MainAsync(args));
            }
     
     
            static async void MainAsync(string[] args)
            {
                if (CheckRequiredFields())
                {
                    var glacierClient = new AmazonGlacierClient();
                    using (manager = new ArchiveTransferManager(glacierClient))
                    {
                        try
                        {
                            ......
     
                            // Downloads the file from Glacier 
                            // This operation can take a long time to complete. 
                            // The ArchiveTransferManager.Download() method creates an Amazon SNS topic, 
                            // and an Amazon SQS queue that is subscribed to that topic. 
                            // It then initiates the archive retrieval job and polls the queue for the 
                            // archive to be available. This polling takes about 4 hours.
                            // Once the archive is available, download will begin.
                            Console.WriteLine("Download the Archive");
                            var options = new DownloadOptions();
                            options.StreamTransferProgress += OnProgress;
     
                            //This will take a while
                            await manager.DownloadAsync(vaultName, archiveId, downloadFilePath, options);
     
                            Console.WriteLine("Delete the Archive");
                            await manager.DeleteArchiveAsync(vaultName, archiveId);
     
                        }
                        catch (AmazonGlacierException e)
                        {
                            Console.WriteLine(e.Message);
                        }
                        catch (AmazonServiceException e)
                        {
                            Console.WriteLine(e.Message);
                        }
                    }
                }
            }
     
     
            static void OnProgress(object sender, StreamTransferProgressArgs args)
            {
                if (args.PercentDone != currentPercentage)
                {
                    currentPercentage = args.PercentDone;
                    Console.WriteLine("Downloaded {0}%", args.PercentDone);
                }
     
                if (args.TotalBytes == args.TransferredBytes) {
                    ((DownloadOptions)sender).StreamTransferProgress -= OnProgress;
                }
            }
        }
    }
    

     

    Glacier Select

    As I stated at the top of this post, you can actually use SQL to query the data in your s3 buckets, and glacier archive. This is known as “S3 select” and “Glacier Select” this is a relatively new service, so there is not  .NET API as yet. However you can still use the following

    • Java
    • Python
    • AWS CLI

     

     

    Here is an example of a rest call that would conduct a S3 Select (read more here https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectSELECTContent.html)

     

    POST /ObjectName?select&select-type=2 HTTP/1.1
    Host: BucketName.s3.amazonaws.com
    Date: date
    Authorization: authorization string 
    Request body goes here
    

     

    The following XML shows the request body for an object in CSV format with results in CSV format:

    <?xml version="1.0" encoding="UTF-8"?>
    <SelectRequest>
        <Expression>Select * from S3Object</Expression>
        <ExpressionType>SQL</ExpressionType>
        <InputSerialization>
            <CompressionType>GZIP</CompressionType>
            <CSV>
                <FileHeaderInfo>IGNORE</FileHeaderInfo>
                <RecordDelimiter>\n</RecordDelimiter>
                <FieldDelimiter>,</FieldDelimiter>
                <QuoteCharacter>"</QuoteCharacter>
                <QuoteEscapeCharacter>"</QuoteEscapeCharacter>
                <Comments>#</Comments>
                <AllowQuotedRecordDelimiter>FALSE</AllowQuotedRecordDelimiter>
            </CSV>
        </InputSerialization>
        <OutputSerialization>
            <CSV>
                <QuoteFields>ASNEEDED</QuoteFields>
                <RecordDelimiter>\n</RecordDelimiter>
                <FieldDelimiter>,</FieldDelimiter>
                <QuoteCharacter>"</QuoteCharacter>
                <QuoteEscapeCharacter>"</QuoteEscapeCharacter>
            </CSV>
        </OutputSerialization>
        <RequestProgress>
            <Enabled>FALSE</Enabled>
        </RequestProgress>
    </SelectRequest> 
    

     

    The following XML shows the request body for an object in JSON format with results in JSON format:

    <?xml version="1.0" encoding="UTF-8"?>
    <SelectRequest>
        <Expression>Select * from S3Object</Expression>
        <ExpressionType>SQL</ExpressionType>
        <InputSerialization>
            <CompressionType>GZIP</CompressionType>
            <JSON>
                <Type>DOCUMENT</Type>
            </JSON>
        </InputSerialization>
        <OutputSerialization>
            <JSON>
                <RecordDelimiter>\n</RecordDelimiter>
            </JSON>                                  
        </OutputSerialization>
        <RequestProgress>
            <Enabled>FALSE</Enabled>
        </RequestProgress>                                  
    </SelectRequest> 
    

     

    SELECT

    Amazon S3 Select and Amazon Glacier Select support only the SELECT SQL command. The following ANSI standard clauses are supported for SELECT:

    • SELECT list
    • FROM clause
    • WHERE clause
    • LIMIT clause (Amazon S3 Select only)

     

    Amazon S3 Select and Amazon Glacier Select queries currently do not support subqueries or joins.

     

    AWS ATHENA

    An alternative (which I think will get rolled into S3/Glacier Select) is AWS Athena, which allows you to create EXTERNAL tables using S3/Glacier buckets/archives. There is a relatively new (at time of writing) Nuget for this here : https://www.nuget.org/packages/AWSSDK.Athena/

     

    Though this is largely undocumented right now (04/09/18) this is the main class you would interact with AmazonAthenaClient where you would likely use these methods/events

     

     

    If you really want to get jiggy with Athena/C# you will have to dig in there and have a play (which I may yet do in a subsequent article)

     

     

    See ya later, not goodbye

    Ok that’s it for now until the next post

    AWS

    AWS : S3 Storage

    What are we talking about this time?

    This time we are going to talk about AWS S3 storage. I have chosen to start with S3, as storage lies at the heart of a great many cloud services, both AWS and Azure. For example

     

    • Use S3 blobs to create external SQL tables (AWS Athena)
    • Use S3 storage with Kafka
    • Use S3 with data warehouses such as AWS Redshift
    • Use S3 with Apache Spark
    • Use S3 with AWS Lambda
    • Receive events when a new S3 operation occurs

     

    These are just some of the things you can do using S3 storage. So it’s a great starting point. There are other storage options in AWS such as

    • Glacier (archive / slow moving data storage)
    • EFS (file system)
    • Storage Gateway

    I will probably cover some of these in future posts too, but for now lets stick to what this post will cover which is standard S3.

     

    Initial setup

    If you did not read the very first part of this series of posts, I urge you to go and read that one now as it shows you how to get started with AWS, and create an IAM user : https://sachabarbs.wordpress.com/2018/08/30/aws-initial-setup/

     

    Where is the code?

    The code for this post can be found here in GitHub : https://github.com/sachabarber/AWS/tree/master/Storage/S3BucketsAndKeys

     

    Ok so how does S3 work?

    S3 has a concept of a bucket, which is a top level entity. You may have multiple buckets, each which may have metadata, public/private permissions, auto encryptions etc etc enabled on it. Each bucket contains your files that you have uploaded. Conceptually there is not a lot more to it.

     

    IAM user privileges needed for S3

    You will need to add these permissions to your IAM user to allow them to use S3

     

    • AmazonS3FullAccess

     

    Obviously if you are working in a team you will not want to give out full access, but for this series of posts this is fine.

     

     

    So what will we try and cover in this post?

    So to my mind what I am trying to cover is all/most of the stuff I would like to do had I have been using Azure Blob Storage. To this end this is the list of things I will cover in this post.

     

    • List all buckets
    • Create a bucket
    • Write an object to a bucket
    • Write an object to a bucket with a pre-check to see if it exists
    • Write a Stream to a bucket
    • Read an object from a bucket
    • Delete an object from a bucket
    • List all objects in a bucket

     

    Install the nugets

    So lets start. The first thing we need to do is install the Nuget packages, which for this demo are

     

    • AWSSDK.S3
    • Nito.AsyncEx (nice async await extensions, like awaitable Console apps)

     

    Ok now that we have that in place and we know (thanks to the 1st post about how to use the default Profile which is linked to the IAM user for this demo series), we can just go through the items on the list above one by one

     

    List all buckets

    var client = new AmazonS3Client();
    ListBucketsResponse response = await client.ListBucketsAsync();
    foreach (S3Bucket bucket in response.Buckets)
    {
        Console.WriteLine("You own Bucket with name: {0}", bucket.BucketName);
    }
    

     

    Create a bucket

    This example shows you how to make the bucket public/private

    if (client.DoesS3BucketExist(bucketToCreate))
    {
        Console.WriteLine($"{bucketToCreate} already exists, skipping this step");
    }
    
    PutBucketRequest putBucketRequest = new PutBucketRequest()
    {
        BucketName = bucketToCreate,
        BucketRegion = S3Region.EUW2,
        CannedACL = isPublic ? S3CannedACL.PublicRead : S3CannedACL.Private
    };
    var response = await client.PutBucketAsync(putBucketRequest);
    

     

    Write an object to a bucket

    This example also shows you how to attach metadata to an object in a bucket

    var client = new AmazonS3Client();
    
    // simple object put
    PutObjectRequest request = new PutObjectRequest()
    {
        ContentBody = "this is a test",
        BucketName = bucketName,
        Key = keyName
    };
    
    PutObjectResponse response = await client.PutObjectAsync(request);
    
    // put a more complex object with some metadata and http headers.
    PutObjectRequest titledRequest = new PutObjectRequest()
    {
        BucketName = bucketName,
        Key = keyName
    };
    titledRequest.Metadata.Add("title", "the title");
    
    await client.PutObjectAsync(titledRequest);
    

     

     

    Write an object to a bucket with a pre-check to see if it exists

    var client = new AmazonS3Client();
    
    if (!S3FileExists(bucketName, uniqueKeyName))
    {
    
        // simple object put
        Console.WriteLine($"Adding file {uniqueKeyName}");
        PutObjectRequest request = new PutObjectRequest()
        {
            ContentBody = "this is a test",
            BucketName = bucketName,
            Key = uniqueKeyName
        };
    
        PutObjectResponse response = await client.PutObjectAsync(request);
    }
    else
    {
        Console.WriteLine($"File {uniqueKeyName} existed");
    }
    
    ....
    ....
    ....
    
    
    
    bool S3FileExists(string bucketName, string keyName)
    {
        var s3FileInfo = new Amazon.S3.IO.S3FileInfo(client, bucketName, keyName);
        return s3FileInfo.Exists;
    }
    

     

    Write a Stream to a bucket

    There are plenty of times when you want to upload/download data as Streams, as you don’t want to take the memory hit of loading it all into memory in one go. For this reason it is a good idea to work with Stream objects. Here is an upload example using the TransferUtility which is something we might look at in another dedicated post

    var client = new AmazonS3Client();
    
    //sly inner function
    async Task<Stream> GenerateStreamFromStringAsync(string s)
    {
        var stream = new MemoryStream();
        var writer = new StreamWriter(stream);
        await writer.WriteAsync(s);
        await writer.FlushAsync();
        stream.Position = 0;
        return stream;
    }
    
    var bucketToCreate = $"public-{bucketName}";
    await CreateABucketAsync(bucketToCreate);
    
    var fileTransferUtility = new TransferUtility(client);
    var fileName = Guid.NewGuid().ToString("N");
    using (var streamToUpload = await GenerateStreamFromStringAsync("some random string contents"))
    {
        var uploadRequest = new TransferUtilityUploadRequest()
        {
            InputStream = streamToUpload,
            Key = $"{fileName}.txt",
            BucketName = bucketToCreate,
            CannedACL = S3CannedACL.PublicRead
        };
    
        await fileTransferUtility.UploadAsync(uploadRequest);
    }
    Console.WriteLine($"Upload using stream to file '{fileName}' completed");
    

     

     

    Read an object from a bucket

    var client = new AmazonS3Client();
    
    GetObjectRequest request = new GetObjectRequest()
    {
        BucketName = bucketName,
        Key = keyName
    };
    
    using (GetObjectResponse response = await client.GetObjectAsync(request))
    {
        string title = response.Metadata["x-amz-meta-title"];
        Console.WriteLine("The object's title is {0}", title);
        string dest = Path.Combine(Environment.GetFolderPath(Environment.SpecialFolder.Desktop), keyName);
        if (!File.Exists(dest))
        {
            await response.WriteResponseStreamToFileAsync(dest, true, CancellationToken.None);
        }
    }
    

     

     

    Delete an object from a bucket

    var client = new AmazonS3Client();
    
    DeleteObjectRequest request = new DeleteObjectRequest()
    {
        BucketName = bucketName,
        Key = keyName
    };
    
    await client.DeleteObjectAsync(request);
    

     

    List all objects in a bucket

    var client = new AmazonS3Client();
    
    ListObjectsRequest request = new ListObjectsRequest();
    request.BucketName = bucketName;
    ListObjectsResponse response = await client.ListObjectsAsync(request);
    foreach (S3Object entry in response.S3Objects)
    {
        Console.WriteLine("key = {0} size = {1}", entry.Key, entry.Size);
    }
    
    // list only things starting with "foo"
    request.Prefix = "foo";
    response = await client.ListObjectsAsync(request);
    foreach (S3Object entry in response.S3Objects)
    {
        Console.WriteLine("key = {0} size = {1}", entry.Key, entry.Size);
    }
    
    // list only things that come after "bar" alphabetically
    request.Prefix = null;
    request.Marker = "bar";
    response = await client.ListObjectsAsync(request);
    foreach (S3Object entry in response.S3Objects)
    {
        Console.WriteLine("key = {0} size = {1}", entry.Key, entry.Size);
    }
    
    // only list 3 things
    request.Prefix = null;
    request.Marker = null;
    request.MaxKeys = 3;
    response = await client.ListObjectsAsync(request);
    foreach (S3Object entry in response.S3Objects)
    {
        Console.WriteLine("key = {0} size = {1}", entry.Key, entry.Size);
    }
    

     

     

    After running the demo code associated with this article we should see something like this

     

    image

     

     

    Conclusion

    I think I have shown the most common things you may want to do with AWS s3. We will 100% be looking at some of the integration points with other services in the future. I hope I have also shown this stuff is not that hard, and the APIs are quite intuitive