Azure Databricks platform exploration

Now for those that know me will know I like lots of different technology, but as soon as I saw Apache Spark I fell in love with the fact that you could take a DataSet parallize it and send it out to a cluster of worker (think linq running across a cluster).

The only thing that was not great about Apache Spark was that you kind of wanted to treat it as an Analalytics engine to do one of jobs that you could get results from after it had finished.

This was possibly with Apache Spark but it was a bit clunky.

Luckily now we have a cloud hosted platform for working with Apache Spark, which is called DataBricks.

DataBricks exposes a bunch of REST APIs which really make working with Apache Spark shine even more.

I just had to find some time to have an explore, to this end I have written this article up on it which I hope some of you may find of interest : https://www.codeproject.com/Articles/1244429/Azure-Examining-Databricks-Apache-Spark-platform

Advertisements

Azure WebApp : Checking the deployed WebApp file system

 

So at work we are using Azure a lot, one of things we use a heck of a lot are web apps. We have a lot of these, some full blown web sites, some simple Service Stack REST APIs. We orchestrate the deployment of these Azure WebApps via the use of standard VSTS build/release steps, and the use of some custom ARM templates.

 

Just for a quick diversion this is what ARM templates are if you have not heard of them

 

What is Azure Resource Manager

Azure Resource Manager (ARM) allows you to provision your applications using a declarative template. In a single template, you can deploy multiple services along with their dependencies. You use the same template to repeatedly deploy your application during every stage of the application life cycle.

 

This post is NOT about ARM templates, but I just thought it worth calling out what they were, incase you have not heard of them before.

 

So what is this post about?

Well as I say we have a bunch of WebApps that we deploy to Azure, which most of the time is just fine, and we rarely need to check up on this automated deployment mechanism, it just works. However as most engineers will attest to, the shit fairy does occasionally come to town, and when she does she is sure to stop by and sprinkle a little chaos on your lovely deployment setup.

 

Now I don’t really believe in fairies, but I have certainly witnessed first hand that things have ended up deployed all wrong, and I have found myself in a situation where I needed to check the following things to make sure what I think I have configured is what is actually happening when I deploy

 

  1. VSTS steps are correct
  2. VSTS variables are correct
  3. Web.Config/AppSettings.json have the correct values in them when deployed

 

Items 1 AND 2 from that list are easy as we can check that inside VSTS, that is fairly ok. However item 3 requires us to get onto the actual deployment file system of the VM running the Azure WebApp that we tried to deploy. This is certainly not possible (to my knowledge) from VSTS.

 

So how can we see what was deployed for our Azure WebApp?

So it would seem we need to get access to the filesystem of the VM running the Azure WebApp. Now if you know anything about scale sets, and how Azure deals with WebApps you will know that you can’t really trust that what is there right now in terms of VMs is guaranteed to be the exact same VMs tomorrow. Azure just doesn’t work that way. If a VM is deemed unhealthy, it can and will be taken away, and a new on will be provisioned under the covers.

 

You generally don’t have to care about this, Azure just does its magic and we are blissfully unaware. Happy days.

 

However if we do need to do something like check a deployment, how do we do that? What VM should I try and gain access too? Will that VM be the same one tomorrow? Maybe/Maybe not. So we cant really write any funky scripts with set VM host names in them, as we may not be getting the same VM to host our WebApp from one day to the next. So how do we deal with this exactly?

 

Luckily there is a magic button in the Azure Portal that allows us to do just what we want.

 

Say we have a WebApp that we have created via some automated VSTS deployment setup

 

image

 

We can open the web app, and drill into its blade and look for the Advanced Tools

 

image

 

Then select the “Go” option from the panel that is displayed in the portal. Once you have done that a new tab will open in your browser that shows something like this

 

image

 

It can be seen that opens up a Kudu portal for the selected Azure WebApp.

 

But just what is Kudu?

Kudu is the engine behind git/hg deployments, WebJobs, and various other features in Azure Web Sites. It can also run outside of Azure.

 

Anyway once we have this Kudu portal open for our selected WebApp we can do various things, the one that we are interested in for this post is the Debug Console –> PowerShell

 

image

 

So lets launch that, we then get something like this

 

image

 

Aha a file system for the WebApp. Cool. So now all we need to do is explore this say by changing directories to site\wwwroot. Then from there we could say have a look at the Web.Config (this is a standard .NET web site so no AppSettings.json for this one)

 

We could examine the Web.Config content like this say where we use the Get-Content PowerShell commandlet

 

image

 

 

 

Conclusion

So that’s it. This was a small post, but I hope it showed you that even though the VMs you may be running on from one day to the next may change, you still have the tools you need to get in there and have a look around. Until next time then….

Small Azure EventGrid + Azure Functions Demo

I am a big fan of reactive programming, and love things like RX/Akka, and service buses and things like that, and I have been meaning to try the new (preview) Azure EventGrid service out for a while.

 

To this end I have given it a little go where I hooked it up to a Azure Function and written a small article about it here : https://www.codeproject.com/Articles/1220389/Azure-EventGrid-Azure-Function-demo

Azure Service Fabric Demo App

At work we have made use of the Azure Service Fabric, and I thought it might be nice to write up some of the fun we had with that. To this end I have written an article on it at codeproject.com which you can read here : https://www.codeproject.com/Articles/1217885/Azure-Service-Fabric-demo

The article covers :

  • Service Fabric basics
  • IOC
  • Logging (Serilog/Seq)
  • Encryption of connection strings

Anyway hope you like it

I’m going to write up this big Scala thing I have been doing, then I may post some more Azure bits and bobs, adios until then

 

 

Azure : Upload and stream video content to WPF from blob storage

A while back when Azure first came out I toyed with the idea of uploading video content to Azure Blob Storage, and having it play back in my WPF app. At the time (can’t recall exactly when that was, but quite a while ago) I had some major headaches doing this. The problem stemmed from the fact that the WPF MediaElement and the Azure Blob Storage did not play nicely together.

You just could not seek (that is when you go to an unbuffered / not downloaded) to a segment of the video and try and play. It just did not work, you would have to wait for the video to download ALL the content up the point you requested.

 

There is a very good post that discusses this old problem right here : http://programmerpayback.com/2013/01/30/hosting-progressive-download-videos-on-azure-blobs/

 

Previously you had to set the Blob storage API version. Starting from the 2011-08-18 version, you can do partial and pause/resume downloads on blob objects. The nice thing is that your client code doesn’t have to change to achieve this. 

 

Luckily this is no longer a problem, so now days it is as simple as following these steps:

 

  1. Upload a video (say MP4) to Azure Blob Storage
  2. Grab the Uri of the uploaded video
  3. Use that Uri for a WPF MediaElement

 

I have created a small demo app here for you, here is what it looks like after I have uploaded a video and pressed the play button

 

image

 

The code is dead simple, here is the XAML (its a WPF app)

 

<Window x:Class="WpfMediaPlayerFromBlobstorage.MainWindow"
        xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
        xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
        Title="MainWindow" Height="350" Width="525" WindowState="Maximized">
    <Grid>
        <DockPanel LastChildFill="True">
           
            <StackPanel Orientation="Horizontal" DockPanel.Dock="Top">
                <Button x:Name="btnUpload" 
                        Click="BtnUpload_OnClick" 
                        Content="Pick MP4 file to upload" 
                        Width="Auto" 
                        Margin="5"
                        Height="23"/>
                <StackPanel Orientation="Horizontal" Margin="50,5,5,5">
                    <StackPanel x:Name="controls" 
                                HorizontalAlignment="Center" 
                                Orientation="Horizontal">

                        <Button x:Name="btnPlay" 
                                Height="23" 
                                Content="Play" 
                                VerticalAlignment="Center"
                                Margin="5"
                                Click="BtnPlay_OnClick" />
                        <Button x:Name="btnPause" 
                                Height="23" 
                                Content="Pause" 
                                VerticalAlignment="Center"
                                Margin="5"
                                Click="BtnPause_OnClick" />
                        <Button x:Name="btnStop" 
                                Height="23" 
                                Content="Stop" 
                                VerticalAlignment="Center"
                                Click="BtnStop_OnClick"
                                Margin="5" />

                        <TextBlock VerticalAlignment="Center" 
                                   Text="Seek To"
                                   Margin="5" />
                        <Slider Name="timelineSlider" 
                                Margin="5" 
                                Height="23"
                                VerticalAlignment="Center"
                                Width="70"
                                ValueChanged="SeekToMediaPosition" />

                    </StackPanel>
                </StackPanel>
            </StackPanel>
            <MediaElement x:Name="player" 
                          Volume="1"
                          LoadedBehavior="Manual"
                          UnloadedBehavior="Manual"
                          HorizontalAlignment="Stretch" 
                          VerticalAlignment="Stretch"
                          Margin="10"
                          MediaOpened="Element_MediaOpened" 
                          MediaEnded="Element_MediaEnded"/>
        </DockPanel>
    </Grid>
</Window>

And here is the code behind (for simplicity I did not use MVVM for this demo)

 

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Windows;
using System.Windows.Controls;
using System.Windows.Data;
using System.Windows.Documents;
using System.Windows.Input;
using System.Windows.Media;
using System.Windows.Media.Imaging;
using System.Windows.Navigation;
using System.Windows.Shapes;

using Microsoft.Win32;
using Microsoft.WindowsAzure;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Auth;
using Microsoft.WindowsAzure.Storage.Blob;
using Microsoft.WindowsAzure.Storage.Shared.Protocol;

namespace WpfMediaPlayerFromBlobstorage
{
    /// <summary>
    /// Interaction logic for MainWindow.xaml
    /// </summary>
    public partial class MainWindow : Window
    {
        private static string blobStorageConnectionString =
            "DefaultEndpointsProtocol=http;AccountName=YOUR_ACCOUNT_HERE;AccountKey=YOUR_KEY_HERE";
        private Uri uploadedBlobUri=null;


        public MainWindow()
        {
            InitializeComponent();
            this.controls.IsEnabled = false;
        }

        private async void BtnUpload_OnClick(object sender, RoutedEventArgs e)
        {
            this.controls.IsEnabled = false;
            OpenFileDialog fd = new OpenFileDialog();
            fd.InitialDirectory=@"c:\";
            var result = fd.ShowDialog();
            if (result.HasValue && result.Value)
            {
                try
                {
                    var storageAccount = CloudStorageAccount.Parse(blobStorageConnectionString);
                    CloudBlobClient blobClient = storageAccount.CreateCloudBlobClient();
                    CloudBlobContainer container = blobClient.GetContainerReference("mycontainer");
                    container.CreateIfNotExists();
                    CloudBlockBlob blockBlob = container.GetBlockBlobReference("myblob");
                    container.SetPermissions(
                        new BlobContainerPermissions
                        {
                            PublicAccess =
                                BlobContainerPublicAccessType.Blob
                        }
                    );

                    using (var fileStream = File.OpenRead(fd.FileName))
                    {
                        await blockBlob.UploadFromStreamAsync(fileStream);
                        uploadedBlobUri = blockBlob.Uri;
                        this.controls.IsEnabled = true;
                        MessageBox.Show("File uploaded ok");
                    }
                }
                catch (Exception exception)
                {
                    MessageBox.Show("Ooops : " + exception.Message);
                }
            }


           
        }

        private void BtnPlay_OnClick(object sender, RoutedEventArgs e)
        {
            player.Source = uploadedBlobUri;
            player.Play();
            timelineSlider.Value = 0;
        }

        private void BtnPause_OnClick(object sender, RoutedEventArgs e)
        {
            player.Pause();
        }

        private void BtnStop_OnClick(object sender, RoutedEventArgs e)
        {
            player.Stop();
            timelineSlider.Value = 0;
        }

        private void Element_MediaOpened(object sender, EventArgs e)
        {
            timelineSlider.Maximum = player.NaturalDuration.TimeSpan.TotalMilliseconds;
        }

        private void Element_MediaEnded(object sender, EventArgs e)
        {
            player.Stop();
            timelineSlider.Value = 0;
        }


        private void SeekToMediaPosition(object sender, 
		RoutedPropertyChangedEventArgs<double> args)
        {
            int sliderValue = (int)timelineSlider.Value;
            TimeSpan ts = new TimeSpan(0, 0, 0, 0, sliderValue);
            player.Position = ts;
        }
    }
}

And there you have it, a very simple media player that allows play/pause/stop and seek from a Azure Blob Storage uploaded video.

You can grab this project (you will need to fill in the Azure Blob Storage connection string details with your own account settings) from my github account here : https://github.com/sachabarber/WpfMediaPlayerFromBlobstorage

 

NOTE : If you want more control over encoding/streaming etc etc you should check out Azure Media Services

Azure : Event Hub A First Look

Over the next few weeks I am going to be looking at a couple of things I have had on my back log for a while (I need to get these things done, so I can make my pushy work colleague happy by learning Erlang). One of the things that I have on my back log is having a look at Azure Event Hubs.

 

Event Hubs come under the Azure Service Bus umbrella, but are quite different. They are a high throughput pub/sub at a massive scale, with low latency and high reliability. To be honest this post will not add much more than you could find on MSDN, in fact even the demo associated with this post is one directly from MSDN, however in the next series of post(s) I will be showing you some more novel uses of working with Event Hub(s), which will be my own material

 

I guess if you have not heard of Azure Event Hubs there will still be some goodness in here, even if I have poached a lot of the content for this post (please forgive me) from MSDN.

 

Event Hubs provides a message stream handling capability and though an Event Hub is an entity similar to queues and topics, it has very different characteristics than traditional enterprise messaging. Enterprise messaging scenarios commonly require a number of sophisticated capabilities such as sequencing, dead-lettering, transaction support, and strong delivery assurances, while the dominant concern for event ingestion is high throughput and processing flexibility for event streams. Therefore, the Azure Event Hubs capability differs from Service Bus topics in that it is strongly biased towards high throughput and event processing scenarios. As such, Event Hubs does not implement some of the messaging capabilities that are available for topics. If you need those capabilities, topics remain the optimal choice.

An Event Hub is created at the namespace level in Service Bus, similar to queues and topics. Event Hubs uses AMQP and HTTP as its primary API interfaces.

 

https://msdn.microsoft.com/library/azure/dn836025.aspx

 

Partitions

In order to create such a high throughput ingestor (Event Hub) Microsoft used the idea of partitions. I like to use these set of images to help me understand what partitions bring to the table.

 

Regular messaging may be something like this                   

image

 

Whilst an Event Hub may be more like this (many lanes)

 

 

What I am trying to show there is that by only have one lane, less traffic may travel, but by having more lanes more traffic will flow.

Event Hubs get their through put by holding n-many partitions. Using the Azure portal the maximum number of partitions you may allocate is 16, this may be extended if you contact the Microsoft Azure Service Bus team. Each partition can be thought of as a queue (FIFO) of messages. Messages are held for a configurable amount of time. This setting is global across the entire Event Hub, and as such will effect messages held across ALL partitions

In order to use partitions from your code you should assign a partition key, which would ensure that the correct partition gets used. If your publishing code does not supply a partition key, a round robin assignment will be used. Ensuring that each partition is fairly balanced in terms of through put.

Stream Offsets

Within each partition an offset is held within the partition, this offset can be thought of as a client side cursor, giving the position in the message stream that has been dealt with. This offset should be maintained by the event consumer, and may be used to indicate the position in the stream to start processing from should communications to the Event Hub be lost.

Checkpoints

Checkpoints are the responsibility of the consumer, and mark or commit their position within a partition event stream. The consumer can inform the Event Hub when it considers an event stream complete. If a consumer disconnects from a partition, when connection is re-established it begins reading at the checkpoint that was previously submitted. Due to the fact that event data is held for a specified period, it is possible to return older data by specifying a lower offset from this checkpointing process. Through this mechanism, checkpointing enables both failover resiliency and controlled event stream replay.

So How About A Demo

I simply followed the getting started example, which you can find here : https://azure.microsoft.com/en-gb/documentation/articles/service-bus-event-hubs-csharp-ephcs-getstarted/

The Publisher

Here is the entire code for a FULLY working Event Hub publisher

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;


using System.Threading;
using Microsoft.ServiceBus.Messaging;

namespace Sender
{
    class Program
    {

        static string eventHubName = "{Your hub name}";
        static string connectionString = "{Your hub connection string}";
   

        static void Main(string[] args)
        {
            Console.WriteLine("Press Ctrl-C to stop the sender process");
            Console.WriteLine("Press Enter to start now");
            Console.ReadLine();
            SendingRandomMessages();
        }



        static void SendingRandomMessages()
        {
            var eventHubClient = 
                EventHubClient.CreateFromConnectionString(connectionString, eventHubName);
            while (true)
            {
                try
                {
                    var message = Guid.NewGuid().ToString();
                    Console.WriteLine("{0} > Sending message: {1}", 
                        DateTime.Now, message);

                    EventData eventData = new EventData(
                        Encoding.UTF8.GetBytes(message));

                    //This is how you can include metadata
                    //eventData.Properties["someProp"] = "MyEvent"

                    //this is how you would set the partition key
                    //eventData.PartitionKey = 1.ToString();
                    eventHubClient.Send(eventData);
                }
                catch (Exception exception)
                {
                    Console.ForegroundColor = ConsoleColor.Red;
                    Console.WriteLine("{0} > Exception: {1}", 
                        DateTime.Now, exception.Message);
                    Console.ResetColor();
                }

                Thread.Sleep(5000);
            }
        }
    }
}

 

It can be seen above that there is a EventHubClient class that you may use to send events. The code above also shows how you create a new event using the EventData class. Although I have not used these features the code above also shows how to associate metadata with the event, and also set a partition key for the message.

The Consumer

The consumer is a little trickier but not too much, there are only 2 classes of interest in the demo app. The main entry point contains an EventProcessorHost, which used this code

In an effort to alleviate this overhead the Service Bus team has created EventProcessorHost an intelligent agent for .NET consumers that manages partition access and per partition offset for consumers.

To use this class you first must implement the IEventProcessor interface which has three methods: OpenAsync, CloseAsync, and ProcessEventsAsnyc. A simple implementation is shown below.

 

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

using Microsoft.ServiceBus.Messaging;
using Microsoft.Threading;
using System.Threading.Tasks;

using Microsoft.Threading;

namespace Receiver
{
    class Program
    {
        static void Main(string[] args)
        {
            AsyncPump.Run(MainAsync);
        }


        static async Task MainAsync()
        {
            string eventHubConnectionString = "{Your hub connection string}";
            string eventHubName = "{Your hub name}";
            string storageAccountName = "{Your storage account name}";
            string storageAccountKey = "{Your storage account key}";
            string storageConnectionString = 
                string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}",
                storageAccountName, storageAccountKey);

            string eventProcessorHostName = Guid.NewGuid().ToString();
            EventProcessorHost eventProcessorHost = 
                new EventProcessorHost(
                    eventProcessorHostName, 
                    eventHubName, 
                    EventHubConsumerGroup.DefaultGroupName, 
                    eventHubConnectionString, storageConnectionString);
            var epo = new EventProcessorOptions()
            {
                MaxBatchSize = 100,
                PrefetchCount = 1,
                ReceiveTimeOut = TimeSpan.FromSeconds(20)
            };
            await eventProcessorHost.RegisterEventProcessorAsync<SimpleEventProcessor>(epo);


            Console.WriteLine("Receiving. Press enter key to stop worker.");
            Console.ReadLine();
        }
    }
}

 

To use this class you first must implement the IEventProcessor interface which has three methods: OpenAsync, After implementing this class instantiate EventProcessorHost providing the necessary parameters to the constructor.

  • Hostname – be sure not to hard code this, each instance of EventProcessorHost must have a unique value for this within a consumer group.Eve
  • EventHubPath – this is an easy one.
  • ConsumerGroupName – also an easy one, “$Default” is the name of the default consumer group, but it generally is a good idea to create a consumer group for your specific aspect of
  • processing.EventHubConnectionString – this is the connection string to the particular event hub, which can be retrieved from the Azure portal.  This connection string should have Listen permissions on the Event Hub.
  • StorageConnectionString – this is the storage account that will be used for partition distribution and leases.  When Checkpointing the lastest offset values will also be stored here.
     

Finally call RegisterEventProcessorAsync on the EventProcessorHost and register your implementation of IEventProcessor.  At this point the agent will begin obtaining leases for partitions and creating receivers to read from them.  For each partition that a lease is acquired for an instance of your IEventProcessor class will be created and then used for processing events from that specific partition.

 

http://blogs.msdn.com/b/servicebus/archive/2015/01/16/event-processor-host-best-practices-part-1.aspx 

Lease management

Checkpointing is not the only use of the storage connection string performed by EventProcessorHost.  Partition ownership (that is reader ownership) is also performed for you.  This way only a single reader can read from any given partition at a time within a consumer group.  This is accomplished using Azure Storage Blob Leases and implemented using Epoch.  This greatly simplifies the auto-scale nature of EventProcessorHost.  As an instance of EventProcessorHost starts it will acquire as many leases as possible and begin reading events. As the leases draw near expiration EventProcessorHost will attempt to renew them by placing a reservation. If the lease is available for renewal the processor continues reading, but if it is not the reader is closed and CloseAsync is called – this is a good time to perform any final cleanup for that partition.

EventProcessorHost has a member PartitionManagerOptions. This member allows for control over lease management. Set these options before registering your IEventProcessor implementation.

 

Controlling the runtime

Additionally the call to RegisterEventProcessorAsync allows for a parameter EventProcessorOptions. This is where you can control the behavior of the EventProcessorHost itself. There are four properties and one event that you should be aware of.

 

  • MaxBatchSize – this is the maximum size of the collection the user wants to receive in an invocation of ProcessEventsAsync. Note that this is not the minimum, only the maximum. If there are not this many messages to be received the ProcessEventsAsync will execute with as many as were available.
  • PrefetchCount – this is a value used by the underlying AMQP channel to determine the upper limit of how many messages the client should receive. This value should be greater than or equal to MaxBatchSize.
  • InvokeProcessorAfterReceiveTimeout – setting this parameter to true will result in ProcessEventsAsync being called when the underlying call the receive events on a partition times out. This is useful for taking time based actions during periods of inactivity on the partition.
  • InitialOffsetProvider – this allows a function pointer or lambda expression to be set that will be called to provide the initial offset when a reader begins reading a partition. Without setting this the reader will start at the oldest event unless a JSON file with an offset has already been saved in the storage account supplied to the EventProcessorHost constructor. This is useful when you want to change the behavior of reader start up. When this method is invoked the object parameter will contain the partition id that the reader is being started for.
  • ExceptionReceived  – this event allows you to receive notification of any underlying exceptions that occur in the EventProcessorHost. If things aren’t working as you expect, this is a great place to start looking.

 

 

Here is the demo codes implementation

 

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

using Microsoft.ServiceBus.Messaging;
using System.Diagnostics;
using System.Threading.Tasks;

namespace Receiver
{
    class SimpleEventProcessor : IEventProcessor
    {
        Stopwatch checkpointStopWatch;

        async Task IEventProcessor.CloseAsync(PartitionContext context, CloseReason reason)
        {
            Console.WriteLine("Processor Shutting Down. Partition '{0}', Reason: '{1}'.", 
                context.Lease.PartitionId, reason);
            if (reason == CloseReason.Shutdown)
            {
                await context.CheckpointAsync();
            }
        }

        Task IEventProcessor.OpenAsync(PartitionContext context)
        {
            Console.WriteLine("SimpleEventProcessor initialized.  Partition: '{0}', Offset: '{1}'", 
                context.Lease.PartitionId, context.Lease.Offset);
            this.checkpointStopWatch = new Stopwatch();
            this.checkpointStopWatch.Start();
            return Task.FromResult<object>(null);
        }

        async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, 
            
            IEnumerable<EventData> messages)
        {
            foreach (EventData eventData in messages)
            {
                string data = Encoding.UTF8.GetString(eventData.GetBytes());

                Console.WriteLine(string.Format("Message received.  Partition: '{0}', Data: '{1}'",
                    context.Lease.PartitionId, data));
            }

            //Call checkpoint every 5 minutes, so that worker can resume processing 
            //from the 5 minutes back if it restarts.
            if (this.checkpointStopWatch.Elapsed > TimeSpan.FromMinutes(5))
            {
                await context.CheckpointAsync();
                this.checkpointStopWatch.Restart();
            }
        }
    }
}

 

This code probably needs a little explanation, and one of the best explanations you are likely to find is over on the Service Bus teams we site, which again I will blatantly steal here:

Thread safety & processor instances
It’s important to know that by default EventProcessorHost is thread safe and will behave in a synchronous manner as far as your instance of IEventProcessor is concerned. When events arrive for a particular partition ProcessEventsAsync will be called on the IEventProcessor instance for that partition and will block further calls to ProcessEventsAsync for the particular partition.  Subsequent messages and calls to ProcessEventsAsync will queue up behind the scenes as the message pump continues to run in the background on other threads.  This thread safety removes the need for thread safe collections and dramatically increases performance.
 
Receiving Messages
Each call to ProcessEventsAsync will deliver a collection of events.  It is your responsibility to do whatever it is you intend to do with these events.  Keep in mind you want to keep whatever it is you’re doing relatively fast – i.e. don’t try to do many processes from here – that’s what consumer groups are for.  If you need to write to storage and do some routing it is generally better to use two consumer groups and have two IEventProcessor implementations that run separately.
 
At some point during your processing you’re going to want to keep track of what you have read and completed.  This will be critical if you have to restart reading – so you don’t start back at the beginning of the stream.  EventProcessorHost greatly simplifies this with the concept of Checkpoints.  A Checkpoint is a location, or offset, for a given partition, within a given consumer group, where you are satisfied that you have processed the messages up to that point. It is where you are currently “done”. Marking a checkpoint in EventProcessorHost is accomplished by calling the CheckpointAsync method on the PartitionContext object.  This is generally done within the ProcessEventsAsync method but can be done in CloseAsync as well.
 
CheckpointAsync has two overloads: the first, with no parameters, checkpoints to the highest event offset within the collection returned by ProcessEventsAsync.  This is a “high water mark” in that it is optimistically assuming you have processed all recent events when you call it.  If you use this method in this way be aware that you are expected to perform this after your other event processing code has returned.  The second overload allows you to specify an EventData instance to checkpoint to.  This allows you to use a different type of watermark to checkpoint to.  With this you could implement a “low water mark” – the lowest sequenced event you are certain has been processed. This overload is provided to enable flexibility in offset management.

 
When the checkpoint is performed a JSON file with partition specific information, the offset in particular, is written to the storage account supplied in the constructor to EventProcessorHost.  This file will be continually updated.  It is critical to consider checkpointing in context – it would be unwise to checkpoint every message.  The storage account used for checkpointing probably wouldn’t handle this load, but more importantly checkpointing every single event is indicative of a queued messaging pattern for which a Service Bus Queue may be a better option than an Event Hub.  The idea behind Event Hubs is that you will get at least once delivery at great scale.  By making your downstream systems idempotent it is easy to recover from failures or restarts that result in the same events being received multiple times.
 
Shutting down gracefully
Finally EventProcessorHost.UnregisterEventProcessorAsync allows for the clean shut down of all partition readers and should always be called when shutting down an instance of EventProcessorHost. Failure to do this can cause delays when starting other instances of EventProcessorHost due to lease expiration and Epoch conflicts.

 

http://blogs.msdn.com/b/servicebus/archive/2015/01/16/event-processor-host-best-practices-part-1.aspx

 

 

When you run this demo code you will see that 16 partitions are initialized and then messages are dispatches to the partitions.

 

You can grab a starter for this demo from here : https://github.com/sachabarber/EventHubDemo though you WILL need to create an Event Hub in Azure as well as a Storage account. Like I say full instructions are available on MSDN for this one, I simply followed the getting started example, which you can find here : https://azure.microsoft.com/en-gb/documentation/articles/service-bus-event-hubs-csharp-ephcs-getstarted/

 

 

image

 

 

This posts adds absolutely ZERO to the example shown in the link above, and I have borrowed A LOT of material from MSDN, that said if you have not heard of the Azure Event Hub you may have learnt something here. In my next post however (which may become an article, where I like to show original work), I will be looking to use an Azure Event Hub along with the Azure Stream Analytics service, which I think should be quite cool, and original. I am however sorry this post is so borrowed……case of could not have said it better myself.

Azure Cloud Service : Inter role communications

I have been doing a bit more with Azure of late, and one of the things I wanted to try out was inter role communications between cloud service roles. You can obviously use the Azure ServiceBus where topic based subscriptions may make some sense. I felt there may be a lighter approach, and set out to explore this.

I was not alone in this thinking, and found some good stuff out there, which I have reworked into the article below:

 

http://www.codeproject.com/Articles/888469/Azure-Cloud-Service-Inter-role-communications

 

I hope some of you find it useful