AWS

AWS : Simple Queue Service (SQS)

 

What are we talking about this time?

This time we are going to talk about AWS SQS (simple queue system)

 

Initial setup

If you did not read the very first part of this series of posts, I urge you to go and read that one now as it shows you how to get started with AWS, and create an IAM user : https://sachabarbs.wordpress.com/2018/08/30/aws-initial-setup/

 

Where is the code?

The code for this post can be found here in GitHub : https://github.com/sachabarber/AWS/tree/master/Messaging/SQSSinglePublisherManyReceivers

 

Ok so how does SQS File System work? What is it?

So SQS is in essence a simple cloud based queue. That is once a message has been read from the queue, it is gone. AWS also offers other messaging solutions, which compete with some of the big boys like Kafka/EventHub for your IOT type applications. SQS however is not that. It is, as I say a simple queue based system in the cloud, if you have used Azure its more akin to Azure storage queues

 

IAM user privileges needed for S3

You will need to add these permissions to your IAM user to allow them to use SQS

 

  • AmazonSQSFullAccess

 

Obviously if you are working in a team you will not want to give out full access, but for this series of posts this is fine.

 

 

So what will we try and cover in this post?

This post will cover the basics of using SQS. As I say it is the basics of using SQS, in some future posts we will look at using S3 as a trigger for SQS, and also maybe look at SQS triggering Lambda functions. But the bulk of this post is about plain SQS functionality

 

Creating a SQS queue from the AWS console

You can create a new queue directly from the AWS console (as well as programmatically, which we will see later). Shown below are a number of screen shots that show you how to create a new SQS queue in the AWS console

 

 

 

image

Initial queue creation

 

image

SQS also supports encryption. This can be turned on when you first create the queue

 

image

 

 

image

Once the queue is created you will see a screen like that above

 

How about programmatically?

We are devs though right? So how do we do this stuff programmatically?

 

Create queues/send to queue

Lets see. Here is how we do that using the AmazonSQSClient

 

using System;
using System.Linq;
using System.Threading.Tasks;
using Amazon.SQS;
using Amazon.SQS.Model;
using Nito.AsyncEx;

namespace SQSSinglePublisherManyReceivers
{
    class Program
    {
        private static bool _receieverShouldDeleteMessage = false;
        private static AmazonSQSClient _sqs = new AmazonSQSClient();
        private static string _myQueueUrl;


        static void Main(string[] args)
        {
            AsyncContext.Run(() => MainAsync(args));
        }

        static async void MainAsync(string[] args)
        {
            try
            {
                Console.WriteLine("===========================================");
                Console.WriteLine("Getting Started with Amazon SQS");
                Console.WriteLine("===========================================\n");

                //Creating a queue
                Console.WriteLine("Create a queue called MyQueue.\n");
                var sqsRequest = new CreateQueueRequest { QueueName = "MyQueue11" };
                var createQueueResponse = await _sqs.CreateQueueAsync(sqsRequest);
                _myQueueUrl = createQueueResponse.QueueUrl;

                //Confirming the queue exists
                var listQueuesRequest = new ListQueuesRequest();
                var listQueuesResponse = await _sqs.ListQueuesAsync(listQueuesRequest);

                Console.WriteLine("Printing list of Amazon SQS queues.\n");
                if (listQueuesResponse.QueueUrls != null)
                {
                    foreach (String queueUrl in listQueuesResponse.QueueUrls)
                    {
                        Console.WriteLine("  QueueUrl: {0}", queueUrl);
                    }
                }
                Console.WriteLine();

                //Sending a message
                for (int i = 0; i < 10; i++)
                {
                    var message = $"This is my message text-Id-{Guid.NewGuid().ToString("N")}";
                    //var message = $"This is my message text";
                    Console.WriteLine($"Sending a message to MyQueue : {message}");
                    var sendMessageRequest = new SendMessageRequest
                    {
                        QueueUrl = _myQueueUrl, //URL from initial queue creation
                        MessageBody = message
                    };
                    await _sqs.SendMessageAsync(sendMessageRequest);
                }

               .....

            }
            catch (AmazonSQSException ex)
            {
                Console.WriteLine("Caught Exception: " + ex.Message);
                Console.WriteLine("Response Status Code: " + ex.StatusCode);
                Console.WriteLine("Error Code: " + ex.ErrorCode);
                Console.WriteLine("Error Type: " + ex.ErrorType);
                Console.WriteLine("Request ID: " + ex.RequestId);
            }

            Console.WriteLine("Press Enter to continue...");
            Console.Read();
        }


        
        }
    }
}

 

Receiving

And to keep things interesting lets create a number of receivers (I published 10 messages, so lets create 5 receivers)

 

using System;
using System.Linq;
using System.Threading.Tasks;
using Amazon.SQS;
using Amazon.SQS.Model;
using Nito.AsyncEx;

namespace SQSSinglePublisherManyReceivers
{
    class Program
    {
        private static bool _receieverShouldDeleteMessage = false;
        private static AmazonSQSClient _sqs = new AmazonSQSClient();
        private static string _myQueueUrl;


        static void Main(string[] args)
        {
            AsyncContext.Run(() => MainAsync(args));
        }

        static async void MainAsync(string[] args)
        {
            try
            {
				//publish code
				//publish code
				//publish code
				//publish code
				......
				
                //start of 5 receiver tasks
                var tasks = Enumerable.Range(0, 5).Select(number =>
                    Task.Run(async () =>
                        await ReceiveMessage(number)
                    )).ToList();

                await Task.WhenAll(tasks);

            }
            catch (AmazonSQSException ex)
            {
                Console.WriteLine("Caught Exception: " + ex.Message);
                Console.WriteLine("Response Status Code: " + ex.StatusCode);
                Console.WriteLine("Error Code: " + ex.ErrorCode);
                Console.WriteLine("Error Type: " + ex.ErrorType);
                Console.WriteLine("Request ID: " + ex.RequestId);
            }

            Console.WriteLine("Press Enter to continue...");
            Console.Read();
        }


        private static async Task ReceiveMessage(int state)
        {
            //Receiving a message
            var receiveMessageRequest = new ReceiveMessageRequest { QueueUrl = _myQueueUrl };
            var receiveMessageResponse = await _sqs.ReceiveMessageAsync(receiveMessageRequest);
            if (receiveMessageResponse.Messages != null)
            {
                Console.WriteLine($"Receiever {state} Printing received message.\n");
                foreach (var message in receiveMessageResponse.Messages)
                {
                    Console.WriteLine($"Receiever {state}   Message");
                    if (!string.IsNullOrEmpty(message.MessageId))
                    {
                        Console.WriteLine($"Receiever {state}     MessageId: {message.MessageId}");
                    }
                    if (!string.IsNullOrEmpty(message.ReceiptHandle))
                    {
                        Console.WriteLine($"Receiever {state}     ReceiptHandle: {message.ReceiptHandle}");
                    }
                    if (!string.IsNullOrEmpty(message.MD5OfBody))
                    {
                        Console.WriteLine($"Receiever {state}     MD5OfBody: {message.MD5OfBody}");
                    }
                    if (!string.IsNullOrEmpty(message.Body))
                    {
                        Console.WriteLine($"Receiever {state}     Body: {message.Body}");
                    }

                    foreach (string attributeKey in message.Attributes.Keys)
                    {
                        Console.WriteLine("  Attribute");
                        Console.WriteLine("    Name: {0}", attributeKey);
                        var value = message.Attributes[attributeKey];
                        Console.WriteLine("    Value: {0}", string.IsNullOrEmpty(value) ? "(no value)" : value);
                    }
                }

                var messageRecieptHandle = receiveMessageResponse.Messages[0].ReceiptHandle;

                if (_receieverShouldDeleteMessage)
                {
                    //Deleting a message
                    Console.WriteLine("Deleting the message.\n");
                    var deleteRequest = new DeleteMessageRequest { QueueUrl = _myQueueUrl, ReceiptHandle = messageRecieptHandle };
                    _sqs.DeleteMessage(deleteRequest);
                }
            }
        }
    }
}

 

 

The demos full output should be something like this

image

 

 

Sending output

This is the output (see how I sent 10 messages)

 

Sending a message to MyQueue : This is my message text-Id-ec6bfabda87d4347ac717680b46665dc
Sending a message to MyQueue : This is my message text-Id-13221ee1edd4451bb69171f9f3903f72
Sending a message to MyQueue : This is my message text-Id-2b3bcdcf9f6149cbbccaf992a8ce770d
Sending a message to MyQueue : This is my message text-Id-c0f29bc20c904b66a0b09931bff3d689
Sending a message to MyQueue : This is my message text-Id-57c3681bbb214b4db21598e42522635a
Sending a message to MyQueue : This is my message text-Id-0466901408014641b4cc33dd5c5a0034
Sending a message to MyQueue : This is my message text-Id-297aa21d624645fbbd10429a53361011
Sending a message to MyQueue : This is my message text-Id-a8c7430e40914e7d936116a596abf222
Sending a message to MyQueue : This is my message text-Id-481f1e058d75417a992eda1d7be35e0d
Sending a message to MyQueue : This is my message text-Id-6aa6423c182643d183c72c0f9f2ba256

 

And if I copy the output from the console and filter it in Notepad++, I will get this

 

Receiving output

See how I get one message being picked up by each of the 5 threads I created. This also leaves the remaining 5 in the SQS queue

Receiever 4     Body: This is my message text-Id-ec6bfabda87d4347ac717680b46665dc
Receiever 0     Body: This is my message text-Id-2b3bcdcf9f6149cbbccaf992a8ce770d
Receiever 1     Body: This is my message text-Id-a8c7430e40914e7d936116a596abf222
Receiever 3     Body: This is my message text-Id-13221ee1edd4451bb69171f9f3903f72
Receiever 2     Body: This is my message text-Id-57c3681bbb214b4db21598e42522635a

 

What this shows us is that once a message is marked as received, the message is gone, it is not available to another thread or process. AWS also supports another type of messaging service called Kinesis, which works a bit more like Apache Kafka and Azure EventHub, where each consumer/receiver has to mark their own offset into the message log.

 

This is quite useful for things like Event Sourcing, but it is a little out of scope for this post. I will be talking about Kinesis in the next post, so lets leave this discussion until then.

 

 

See ya later, not goodbye

Ok that’s it for now until the next post

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s