Kafka

Kafka Streams Introduction

So this post will be an introductory one on Kafka Streams. It is not intended to be one on Apache Kafka itself. For that there are many interesting books/posts/documents available which cover this topic.

 

That said there are still a few key concepts that we will need to introduce before we can start talking about Kafka Streams, such as

 

  • Kafka overall architecture
  • Topics
  • Commit Log
  • Consumer group

 

After we have talked about these (and I will only be skimming the surface of these, you really would be better off reading about Kafka if this is brand new territory for you), I will then move on to talk about some of the basic building blocks of Kafka Streams

 

Kafka overall architecture

Image result for kafka partitions

 

As shown in the diagram above, Kafka is a broker and is intended to run in a cluster (which might be a single broker, but that is obviously not what’s recommended). It also makes use of Zookeeper for various state storage/metadata.

 

Topics

Kafka uses topics as a way of organizing what data is produced/read. Producers push data to topics, whilst consumers read data from topics.

 

Topics are divided into a number of partitions (there is some default partitioning strategy at play, but you can write you own). Partitions allow you to parallelize the data by potentially storing it across multiple machines. It may also be read in parallel

 

Another key part of a topic is the offset, which is maintained by the consumer, and it’s a indicator at to where the consumer should start reading data from within the topic. As one could imagine for this to be possible the messages must be ordered, this is something that Kafka guarantees for you.

 

Image result for kafka topic

Commit Log

Kafka maintains message for a configurable amount of time. Meaning that a consumer may go down, and restart again, and providing its within the period of log retention, it will just start reading messages from where it left of.

 

Consumer Groups

Kafka has Consumers, which read from a single partition. However Consumers can also be organized into Consumer groups for a Topic. Each Consumer will read from a partition, and the Consumer group as a whole will read the entire Topic. If you organize your consumers in such as way that you have more consumers than partitions, some will go idle. However if you have more partitions than consumers, some consumers will get messages from multiple partitions

 

Image result for kafka consumer group

 

Further Reading

 

As I say I did not want to get too bogged down with the Kafka fundamentals, as this series is really about Kafka Streams, but the few points above as well as the “further reading” section should get you to a pretty decent position to understand the rest of this post.

 

Kafka Streams Introduction

Kafka Streams is a client library for processing and analyzing data stored in Kafka. It builds upon important stream processing concepts such as properly distinguishing between event time and processing time, windowing support, and simple yet efficient management and real-time querying of application state.

Kafka Streams has a low barrier to entry: You can quickly write and run a small-scale proof-of-concept on a single machine; and you only need to run additional instances of your application on multiple machines to scale up to high-volume production workloads. Kafka Streams transparently handles the load balancing of multiple instances of the same application by leveraging Kafka’s parallelism model.

 

Stream Processing Topology

  • A stream is the most important abstraction provided by Kafka Streams: it represents an unbounded, continuously updating data set. A stream is an ordered, replayable, and fault-tolerant sequence of immutable data records, where a data record is defined as a key-value pair.
  • A stream processing application is any program that makes use of the Kafka Streams library. It defines its computational logic through one or more processor topologies, where a processor topology is a graph of stream processors (nodes) that are connected by streams (edges).
  • A stream processor is a node in the processor topology; it represents a processing step to transform data in streams by receiving one input record at a time from its upstream processors in the topology, applying its operation to it, and may subsequently produce one or more output records to its downstream processors.
    There are two special processors in the topology:
    Source Processor: A source processor is a special type of stream processor that does not have any upstream processors. It produces an input stream to its topology from one or multiple Kafka topics by consuming records from these topics and forwarding them to its down-stream processors.
    Sink Processor: A sink processor is a special type of stream processor that does not have down-stream processors. It sends any received records from its up-stream processors to a specified Kafka topic.
    Note that in normal processor nodes other remote systems can also be accessed while processing the current record. Therefore the processed results can either be streamed back into Kafka or written to an external system.

Although this post will not show any code at all, I want to introduce a couple of things that will be coming up in the next posts, namely

  • KStream
  • KTable
  • Global KTable
  • State Stores

 

KStream

A KStream is an abstraction of a record stream, where each data record represents a self-contained datum in the unbounded data set. Using the table analogy, data records in a record stream are always interpreted as an “INSERT” — think: adding more entries to an append-only ledger — because no record replaces an existing row with the same key. Examples are a credit card transaction, a page view event, or a server log entry.

To illustrate, let’s imagine the following two data records are being sent to the stream:

(“alice”, 1) –> (“alice”, 3)

If your stream processing application were to sum the values per user, it would return 4 for alice. Why? Because the second data record would not be considered an update of the previous record.

https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#streams_concepts_kstream

 

KTable

A KTable is an abstraction of a changelog stream, where each data record represents an update. More precisely, the value in a data record is interpreted as an “UPDATE” of the last value for the same record key, if any (if a corresponding key doesn’t exist yet, the update will be considered an INSERT). Using the table analogy, a data record in a changelog stream is interpreted as an UPSERT aka INSERT/UPDATE because any existing row with the same key is overwritten. Also, null values are interpreted in a special way: a record with a null value represents a “DELETE” or tombstone for the record’s key.

To illustrate, let’s imagine the following two data records are being sent to the stream:

(“alice”, 1) –> (“alice”, 3)

If your stream processing application were to sum the values per user, it would return 3 for alice. Why? Because the second data record would be considered an update of the previous record.

https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#streams_concepts_ktable

 

GlobalKTable

Like a KTable, a GlobalKTable is an abstraction of a changelog stream, where each data record represents an update.

A GlobalKTable differs from a KTable in the data that they are being populated with, i.e. which data from the underlying Kafka topic is being read into the respective table. Slightly simplified, imagine you have an input topic with 5 partitions. In your application, you want to read this topic into a table. Also, you want to run your application across 5 application instances for maximum parallelism.

  • If you read the input topic into a KTable, then the “local” KTable instance of each application instance will be populated with data from only 1 partition of the topic’s 5 partitions.
  • If you read the input topic into a GlobalKTable, then the local GlobalKTable instance of each application instance will be populated with data from all partitions of the topic.

GlobalKTable provides the ability to look up current values of data records by keys.

https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#streams_concepts_globalktable

 

State Stores

Kafka Streams provides so-called state stores, which can be used by stream processing applications to store and query data. This is an important capability when implementing stateful operations. Every task in Kafka Streams embeds one or more state stores that can be accessed via APIs to store and query data required for processing. These state stores can either be a persistent key-value store, an in-memory hashmap, or another convenient data structure. Kafka Streams offers fault-tolerance and automatic recovery for local state stores.

Kafka Streams allows direct read-only queries of the state stores by methods, threads, processes or applications external to the stream processing application that created the state stores. This is provided through a feature called Interactive Queries. All stores are named and Interactive Queries exposes only the read operations of the underlying implementation.

https://kafka.apache.org/21/documentation/streams/core-concepts#streams_state

 

That’s It

So that’s it for now. I have borrowed a lot of material from the official docs for this post, but as we move through the series we will start to form our own code/topologies, and as such our own tests/descriptions. So please forgive me this one.

AWS

AWS : ElastiCache

What are we talking about this time?

Last time we talked about AWS DynamoDB, this time we will talk about AWS ElasticCache

Initial setup

If you did not read the very first part of this series of posts, I urge you to go and read that one now as it shows you how to get started with AWS, and create an IAM user : https://sachabarbs.wordpress.com/2018/08/30/aws-initial-setup/

Where is the code

The code for this post can be found here in GitHub :

What are we talking about this time?

This time we will be talking about ElasticCache.

So what exactly is ElasticCache?

ElastiCache is AWSs cache cluster. This comes in 2 flavours

  • Redis
  • MemCached

Where you may use the console to pick your cluster size/VMs

image

I have chosen to use MemCached for this example. So once you have setup a cluster, you will see the nodes created for you in the AWS console. If you go and examine the nodes you can click on one of them, and you should see a dialog similar to this. It shows the nodes configuration end point, and also the actual nodes in the cluster. But it also shows you how to download a client library for use with .NET.

 

image

 

For .NET AWS used the following .NET Memcached client : https://github.com/enyim/EnyimMemcached. So if you did download using the link shown in the above dialog, you should see some DLLs like this

image

So once you have them its really just a job of referencing these 2 Dlls

  • Amazon.ElastiCacheCluster.dll
  • Enyim.Caching.dll

 

The  you should be able to add a App.Config like this (make sure you use the address with the “cfg” in it)

<?xml version="1.0" encoding="utf-8"?>
<configuration>
    <configSections>
        <section 
            name="clusterclient" 
            type="Amazon.ElastiCacheCluster.ClusterConfigSettings, Amazon.ElastiCacheCluster" />
    </configSections>

    <clusterclient>
        <!-- the hostname and port values are from step 1 above -->
        <endpoint hostname="sachaXXXXXX.cfg.XXXXXXXXX.cache.amazonaws.com" port="11211" />
    </clusterclient>
</configuration>

And use some simple C# code to write/read from the cache like this

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Amazon.ElastiCacheCluster;
using Enyim.Caching;
using Enyim.Caching.Configuration;
using Enyim.Caching.Memcached;

namespace ElastiCache.Demo
{
    class Program
    {
        static void Main(string[] args)
        {
            // instantiate a new client.
            ElastiCacheClusterConfig config = new ElastiCacheClusterConfig();
            MemcachedClient memClient = new MemcachedClient(config);

            // Store the data for 3600 seconds (1hour) in the cluster. 
            // The client will decide which cache host will store this item.
            memClient.Store(StoreMode.Set,"mykey","This is the data value.", TimeSpan.FromMinutes(10));

            var mykey = memClient.Get<string>("mykey");




            Console.ReadLine();
        }
    }
}

And to be honest that is all there really is to it.

 

I guess the final point is, if you do not like this MemCached .NET client, AWS Elasticache is “memcached compatible” so that really means you can use any memcached library of your choosing.

 

Anyway another nice short one, I am saving the AWS Batch one for a rainy day, that post will be quite indepth believe me

AWS

AWS : DynamoDB

What are we talking about this time?

Last time we talked about AWS SWF, this time we will talk about DynamoDB

Initial setup

If you did not read the very first part of this series of posts, I urge you to go and read that one now as it shows you how to get started with AWS, and create an IAM user : https://sachabarbs.wordpress.com/2018/08/30/aws-initial-setup/

Where is the code

The code for this post can be found here in GitHub : https://github.com/sachabarber/AWS/tree/master/Databases/DynamoDB

What are we talking about this time?

This time we will be talking about DynamoDB.

So what exactly is DynamoDB?

Here is what Amazon have to say

Amazon DynamoDB is a key-value and document database that delivers single-digit millisecond performance at any scale. It’s a fully managed, multiregion, multimaster database with built-in security, backup and restore, and in-memory caching for internet-scale applications. DynamoDB can handle more than 10 trillion requests per day and support peaks of more than 20 million requests per second.

PERFORMANCE AT SCALE
DynamoDB supports some of the world’s largest scale applications by providing consistent, single-digit millisecond response times at any scale. You can build applications with virtually unlimited throughput and storage. DynamoDB global tables replicate your data across multiple AWS Regions to give you fast, local access to data for your globally distributed applications. For use cases that require even faster access with microsecond latency, DynamoDB Accelerator (DAX) provides a fully managed in-memory cache.

SERVERLESS
With DynamoDB, there are no servers to provision, patch, or manage and no software to install, maintain, or operate. DynamoDB automatically scales tables up and down to adjust for capacity and maintain performance. Availability and fault tolerance are built in, eliminating the need to architect your applications for these capabilities. DynamoDB provides both on-demand and provisioned capacity modes so that you can optimize costs by specifying capacity per workload, or paying for only the resources you consume.

ENTERPRISE READY
DynamoDB supports ACID transactions to enable you to build business-critical applications at scale. DynamoDB encrypts all data by default and provides fine-grained identity and access control on all your tables. You can create full backups of hundreds of terabytes of data instantly with no performance impact to your tables, and recover to any point in time in the preceding 35 days with no downtime. DynamoDB is also backed by a service level agreement for guaranteed availability.

 

So that  is what it is, but how do we use it? Well as stated above it is both a key-value store and a document database. We will look at the document side of things as well as using the .NET SDK to examine object persistence using DynamoDB.

 

DocumentDB Document API CRUD

The following snippet illustrates how you can perform some basic CRUD operations using the document approach. You can read more here : https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ItemCRUDDotNetDocumentAPI.html

 

using System;
using System.Collections.Generic;
using System.Linq;
using Amazon.DynamoDBv2;
using Amazon.DynamoDBv2.DocumentModel;
using Amazon.Runtime;

namespace com.amazonaws.codesamples
{
    class MidlevelItemCRUD
    {
        private static AmazonDynamoDBClient client = new AmazonDynamoDBClient();
        private static string tableName = "ProductCatalog";
        // The sample uses the following id PK value to add book item.
        private static int sampleBookId = 555;

        static void Main(string[] args)
        {
            try
            {
                Table productCatalog = Table.LoadTable(client, tableName);
                CreateBookItem(productCatalog);
                RetrieveBook(productCatalog);
                // Couple of sample updates.
                UpdateMultipleAttributes(productCatalog);
                UpdateBookPriceConditionally(productCatalog);

                // Delete.
                DeleteBook(productCatalog);
                Console.WriteLine("To continue, press Enter");
                Console.ReadLine();
            }
            catch (AmazonDynamoDBException e) { Console.WriteLine(e.Message); }
            catch (AmazonServiceException e) { Console.WriteLine(e.Message); }
            catch (Exception e) { Console.WriteLine(e.Message); }
        }

        // Creates a sample book item.
        private static void CreateBookItem(Table productCatalog)
        {
            Console.WriteLine("\n*** Executing CreateBookItem() ***");
            var book = new Document();
            book["Id"] = sampleBookId;
            book["Title"] = "Book " + sampleBookId;
            book["Price"] = 19.99;
            book["ISBN"] = "111-1111111111";
            book["Authors"] = new List<string> { "Author 1", "Author 2", "Author 3" };
            book["PageCount"] = 500;
            book["Dimensions"] = "8.5x11x.5";
            book["InPublication"] = new DynamoDBBool(true);
            book["InStock"] = new DynamoDBBool(false);
            book["QuantityOnHand"] = 0;

            productCatalog.PutItem(book);
        }

        private static void RetrieveBook(Table productCatalog)
        {
            Console.WriteLine("\n*** Executing RetrieveBook() ***");
            // Optional configuration.
            GetItemOperationConfig config = new GetItemOperationConfig
            {
                AttributesToGet = new List<string> { "Id", "ISBN", "Title", "Authors", "Price" },
                ConsistentRead = true
            };
            Document document = productCatalog.GetItem(sampleBookId, config);
            Console.WriteLine("RetrieveBook: Printing book retrieved...");
            PrintDocument(document);
        }

        private static void UpdateMultipleAttributes(Table productCatalog)
        {
            Console.WriteLine("\n*** Executing UpdateMultipleAttributes() ***");
            Console.WriteLine("\nUpdating multiple attributes....");
            int partitionKey = sampleBookId;

            var book = new Document();
            book["Id"] = partitionKey;
            // List of attribute updates.
            // The following replaces the existing authors list.
            book["Authors"] = new List<string> { "Author x", "Author y" };
            book["newAttribute"] = "New Value";
            book["ISBN"] = null; // Remove it.

            // Optional parameters.
            UpdateItemOperationConfig config = new UpdateItemOperationConfig
            {
                // Get updated item in response.
                ReturnValues = ReturnValues.AllNewAttributes
            };
            Document updatedBook = productCatalog.UpdateItem(book, config);
            Console.WriteLine("UpdateMultipleAttributes: Printing item after updates ...");
            PrintDocument(updatedBook);
        }

        private static void UpdateBookPriceConditionally(Table productCatalog)
        {
            Console.WriteLine("\n*** Executing UpdateBookPriceConditionally() ***");

            int partitionKey = sampleBookId;

            var book = new Document();
            book["Id"] = partitionKey;
            book["Price"] = 29.99;

            // For conditional price update, creating a condition expression.
            Expression expr = new Expression();
            expr.ExpressionStatement = "Price = :val";
            expr.ExpressionAttributeValues[":val"] = 19.00;

            // Optional parameters.
            UpdateItemOperationConfig config = new UpdateItemOperationConfig
            {
                ConditionalExpression = expr,
                ReturnValues = ReturnValues.AllNewAttributes
            };
            Document updatedBook = productCatalog.UpdateItem(book, config);
            Console.WriteLine("UpdateBookPriceConditionally: Printing item whose price was conditionally updated");
            PrintDocument(updatedBook);
        }

        private static void DeleteBook(Table productCatalog)
        {
            Console.WriteLine("\n*** Executing DeleteBook() ***");
            // Optional configuration.
            DeleteItemOperationConfig config = new DeleteItemOperationConfig
            {
                // Return the deleted item.
                ReturnValues = ReturnValues.AllOldAttributes
            };
            Document document = productCatalog.DeleteItem(sampleBookId, config);
            Console.WriteLine("DeleteBook: Printing deleted just deleted...");
            PrintDocument(document);
        }

        private static void PrintDocument(Document updatedDocument)
        {
            foreach (var attribute in updatedDocument.GetAttributeNames())
            {
                string stringValue = null;
                var value = updatedDocument[attribute];
                if (value is Primitive)
                    stringValue = value.AsPrimitive().Value.ToString();
                else if (value is PrimitiveList)
                    stringValue = string.Join(",", (from primitive
                                    in value.AsPrimitiveList().Entries
                                                    select primitive.Value).ToArray());
                Console.WriteLine("{0} - {1}", attribute, stringValue);
            }
        }
    }
}

 

The main points from the code above are that we use the following

  • AmazonDynamoDBClient : to access the AWS DynamoDB
  • Table : allows us to interact with the table object, where we can get/put/update a document
  • Document : which is a dictionary of key/value pairs making up the Document

In order to create the table in the above code, we might have something like this for the ProductCatalog table creation

client.CreateTable(new CreateTableRequest
{
    TableName = "ProductCatalog",
    ProvisionedThroughput = new ProvisionedThroughput { ReadCapacityUnits = 3, WriteCapacityUnits = 1 },
    KeySchema = new List<KeySchemaElement>
    {
	    new KeySchemaElement
		{
			AttributeName = "Title",
			KeyType = KeyType.HASH
		},
        new KeySchemaElement
        {
            AttributeName = "Id",
            KeyType = KeyType.RANGE
        }
    },
    AttributeDefinitions = new List<AttributeDefinition>
    {
        new AttributeDefinition { AttributeName = "Id", AttributeType = ScalarAttributeType.N }
    }
});

So what about queries? How do we do those, well we have 2 choices, we can use

  • Table.Query(..)
  • Table.Scan(..)

 

Table.Query

The Query method enables you to query your tables. You can only query the tables that have a composite primary key (partition key and sort key). If your table’s primary key is made of only a partition key, then the Query operation is not supported. Here is an example of how you might do this

QueryFilter filter = new QueryFilter();
filter.AddCondition("Title", QueryOperator.Equal, "XXXX");
filter.AddCondition("Id", QueryOperator.Between, 0, 15);
QueryOperationConfig queryConfig = new QueryOperationConfig
{
    Filter = filter,
    Limit = 1
};

Search query = table.Query(queryConfig);
while (!query.IsDone)
{
    Console.WriteLine("Retrieving next set (page) of items");
    List<Document> querySet = query.GetNextSet();
   
    foreach (Document doc in querySet)
    {
        Console.WriteLine("Retrieving individual properties");
        Primitive title = doc["Title"].AsPrimitive();
        Primitive id = doc["Id"].AsPrimitive();
    }
}

It can be seen that you can use the Table.Query to give you a Search object, which you can page through using the GetNextSet()  method

Table.Scan

The Scan method performs a full table scan.

ScanFilter scanFilter = new ScanFilter();
scanFilter.AddCondition("Id", ScanOperator.LessThan, 15);
Search scan = table.Scan(scanFilter);
List<Document> scanItems = scan.GetRemaining();

So that is a whirlwind tour of the Document style API, next we can look at the object persistent model, which to my mind is what people would probably use more often, as its more inline with proper .NET objects that can be saved to the DynamoDB

 

Object Persistent Model

The best place to start to explore this is actually via the official AWS .NET SDK example., which creates and manipulates 2 sample tables

 

  • Movies
  • Actors

 

This is how these sample tables are created using the AmazonDynamoDBClient

static readonly string[] SAMPLE_TABLE_NAMES = { "Actors", "Movies" };
/// <summary>
/// Creates all samples defined in SampleTables map
/// </summary>
/// <param name="client"></param>
public static void CreateSampleTables(AmazonDynamoDBClient client)
{
    Console.WriteLine("Getting list of tables");
    List<string> currentTables = client.ListTables().TableNames;
    Console.WriteLine("Number of tables: " + currentTables.Count);

    bool tablesAdded = false;
    if (!currentTables.Contains("Actors"))
    {
        Console.WriteLine("Table Actors does not exist, creating");
        client.CreateTable(new CreateTableRequest
        {
            TableName = "Actors",
            ProvisionedThroughput = new ProvisionedThroughput { ReadCapacityUnits = 3, WriteCapacityUnits = 1 },
            KeySchema = new List<KeySchemaElement>
            {
                new KeySchemaElement
                {
                    AttributeName = "Name",
                    KeyType = KeyType.HASH
                }
            },
            AttributeDefinitions = new List<AttributeDefinition>
            {
                new AttributeDefinition { AttributeName = "Name", AttributeType = ScalarAttributeType.S }
            }
        });
        tablesAdded = true;
    }

    if (!currentTables.Contains("Movies"))
    {
        Console.WriteLine("Table Movies does not exist, creating");
        client.CreateTable(new CreateTableRequest
        {
            TableName = "Movies",
            ProvisionedThroughput = new ProvisionedThroughput { ReadCapacityUnits = 3, WriteCapacityUnits = 1 },
            KeySchema = new List<KeySchemaElement>
            {
                new KeySchemaElement
                {
                    AttributeName = "Title",
                    KeyType = KeyType.HASH
                },
                new KeySchemaElement
                {
                    AttributeName = "Released",
                    KeyType = KeyType.RANGE
                }
            },
            AttributeDefinitions = new List<AttributeDefinition>
            {
                new AttributeDefinition { AttributeName = "Title", AttributeType = ScalarAttributeType.S },
                new AttributeDefinition { AttributeName = "Released", AttributeType = ScalarAttributeType.S }
            }
        });
        tablesAdded = true;
    }

    if (tablesAdded)
    {
        bool allActive;
        do
        {
            allActive = true;
            Console.WriteLine("While tables are still being created, sleeping for 5 seconds...");
            Thread.Sleep(TimeSpan.FromSeconds(5));

            foreach (var tableName in SAMPLE_TABLE_NAMES)
            {
                TableStatus tableStatus = GetTableStatus(client, tableName);
                if (!object.Equals(tableStatus, TableStatus.ACTIVE))
                    allActive = false;
            }
        } while (!allActive);
    }

    Console.WriteLine("All sample tables created");
}

/// <summary>
/// Retrieves a table status. Returns empty string if table does not exist.
/// </summary>
/// <param name="client"></param>
/// <param name="tableName"></param>
/// <returns></returns>
private static TableStatus GetTableStatus(AmazonDynamoDBClient client, string tableName)
{
    try
    {
        var table = client.DescribeTable(new DescribeTableRequest { TableName = tableName }).Table;
        return (table == null) ? null : table.TableStatus;
    }
    catch (AmazonDynamoDBException db)
    {
        if (db.ErrorCode == "ResourceNotFoundException")
            return string.Empty;
        throw;
    }
}

 

Probably the most important concept to grasp is the part above that deals with KeySchema, which if you recall from earlier will assist with Query/Scan operations.

 

  • Movie has 2 keys Title (Hash), Released (Range), this allow this collection to use the Table.Query method which requires a composite pair to query
  • Actor has 1 key Name (Hash), so this will only support the Table.Scan method

 

So now that we have a table defined you can see how the classes for Movie/Actor map to these tables.

 

Movie

[DynamoDBTable("Movies")]
public class Movie
{
    [DynamoDBHashKey]
    public string Title { get; set; }
    [DynamoDBRangeKey(AttributeName = "Released")]
    public DateTime ReleaseDate { get; set; }

    public List<string> Genres { get; set; }
    [DynamoDBProperty("Actors")]
    public List<string> ActorNames { get; set; }

    public override string ToString()
    {
        return string.Format(@"{0} - {1} Actors: {2}", 
            Title, ReleaseDate, string.Join(", ", ActorNames.ToArray()));
    }
}

Actor

[DynamoDBTable("Actors")]
public class Actor
{
    [DynamoDBHashKey]
    public string Name { get; set; }

    public string Bio { get; set; }
    public DateTime BirthDate { get; set; }

    [DynamoDBProperty(AttributeName = "Height")]
    public float HeightInMeters { get; set; }

    [DynamoDBProperty(Converter = typeof(AddressConverter))]
    public Address Address { get; set; }

    [DynamoDBIgnore]
    public string Comment { get; set; }

    public TimeSpan Age
    {
        get
        {
            return DateTime.UtcNow - BirthDate.ToUniversalTime();
        }
    }

    public override string ToString()
    {
        return string.Format("{0} - {1}", Name, BirthDate);
    }
}

public class Address
{
    public string Street { get; set; }
    public string City { get; set; }
    public string Country { get; set; }
}

public class AddressConverter : IPropertyConverter
{
    private XmlSerializer _serializer = new XmlSerializer(typeof(Address));

    #region IPropertyConverter Members

    public object FromEntry(DynamoDBEntry entry)
    {
        Primitive primitive = entry as Primitive;
        if (primitive == null) return null;

        if (primitive.Type != DynamoDBEntryType.String) throw new InvalidCastException();
        string xml = primitive.AsString();
        using (StringReader reader = new StringReader(xml))
        {
            return _serializer.Deserialize(reader);
        }
    }

    public DynamoDBEntry ToEntry(object value)
    {
        Address address = value as Address;
        if (address == null) return null;

        string xml;
        using (StringWriter stringWriter = new StringWriter())
        {
            _serializer.Serialize(stringWriter, address);
            xml = stringWriter.ToString();
        }
        return new Primitive(xml);
    }

    #endregion
}

 

So there are a couple of attributes we should discuss here, namely:

 

  • DynamoDBTable which allows us to specify the DynamoDB table for this object type
  • DynamoDBHashKey specifies the adorned property as a HashKey
  • DynamoDBRangeKey specified the adorned property as a RangeKey
  • DynamoDBProperty allows a straight DynamoDB mapping for the property
  • [DynamoDBProperty(Converter = typeof(AddressConverter))] allows us to use a converter. In this case we use the custom XML AddressConverter to save/hydrate XML
  • DynamoDBIgnore simply ignores this property

 

Ok so on to some CRUD, Here is how we can save a new Movie

AmazonDynamoDBClient client = new AmazonDynamoDBClient();
DynamoDBContext context = new DynamoDBContext(client);

Movie darkKnight = new Movie
{
    Title = "The Dark Knight",
    ReleaseDate = new DateTime(2008, 7, 18),
    Genres = new List<string> { "Action", "Crime", "Drama" },
    ActorNames = new List<string>
    {
        christianBale.Name,
        michaelCaine.Name
    }
};

context.Save<Movie>(darkKnight);

And here is how we might loasd an existing Movie

AmazonDynamoDBClient client = new AmazonDynamoDBClient();
DynamoDBContext context = new DynamoDBContext(client);

Movie existingMovie = context.Load<Movie>("The Dark Knight", new DateTime(2008, 7, 18));

Or using the Table.Query API (which Movie affords since it has a composite key (Hash for Title, and Ranged for Released (DateTime property))

AmazonDynamoDBClient client = new AmazonDynamoDBClient();
DynamoDBContext context = new DynamoDBContext(client);

IEnumerable<Movie> movieQueryResults = context.Query<Movie>("The Dark Knight", QueryOperator.GreaterThan, new DateTime(1995, 1, 1));

And this is how we might query for an Actor (remember this one doesn’t have a composite key, only hash Hash for Name property), so we can only use Table.Scan to do the searching

AmazonDynamoDBClient client = new AmazonDynamoDBClient();
DynamoDBContext context = new DynamoDBContext(client);

IEnumerable<Actor> actorScanResults = context.Scan<Actor>(
	new ScanCondition("HeightInMeters", ScanOperator.LessThan, 1.85f));

And this is how we might update an existing Movie

AmazonDynamoDBClient client = new AmazonDynamoDBClient();
DynamoDBContext context = new DynamoDBContext(client);

Movie existingMovie = context.Load<Movie>("The Dark Knight", new DateTime(2008, 7, 18));	
existingMovie.ActorNames.Add(maggieGyllenhaal.Name);
existingMovie.Genres.Add("Thriller");
context.Save<Movie>(existingMovie);

 

Conclusion

So that is all I wanted to say this time, this time we leant pretty heavily on the standard .NET SDK sample for DynamoDB, but that is ok, it still demonstrates quite nicely how we could go about designing our own objects, and tables and make use of the AmazonDynamoDBClient/DynamoDBContext to carry out any bespoke work we may want for our own databases.