C#

System.Threading.Channels

So I had a little discussion with a colleague today about the use of Rx (which I love), and he mentioned that they had used is but had moved to a simpler model by using System.Threading.Channels.

 

Now I must admit I had not used this, so I decided to give it a try tonight.

 

Here is the blurb from the readme at GitHub

 

The System.Threading.Tasks.Channels library provides a set of synchronization data structures for passing data between producers and consumers. Whereas the existing System.Threading.Tasks.Dataflow library is focused on pipelining and connecting together dataflow “blocks” which encapsulate both storage and processing, System.Threading.Tasks.Channels is focused purely on the storage aspect, with data structures used to provide the hand-offs between participants explicitly coded to use the storage. The library is designed to be used with async/await in C#.

 

https://github.com/stephentoub/corefxlab/blob/master/src/System.Threading.Tasks.Channels/README.md

 

I work in finance and in particular within trading of crypto-currencies, so producer/consumer problems usually get my ears pricked up.

 

The channels API is based around a channel object, which is made up of 2 halfs a ReadableChannel and WriteableChannel. There are factory methods on channel to give you a channel either bounded (with maxCapacity or unbounded (no limits)).

 

From a producer point of view you may use the TryWrite/WriteAsync/WaitToWriteAsync/Complete methods

From a consumer point of view you may use the TryRead/ReadAsync/WaitToReadAsync/Completion methods

 

Which are described from the official readme as follows:

 

  • TryRead/TryWrite: Attempt to read or write an item synchronously, returning whether the read or write was successful.
  • ReadAsync/WriteAsync: Read or write an item asynchronously. These will complete synchronously if data/space is already available.
  • TryComplete/Completion: Channels may be completed, such that no additional items may be written; such channels will “complete” when marked as completed and all existing data in the channel has been consumed. Channels may also be marked as faulted by passing an optional Exception to Complete; this exception will emerge when awaiting on the Completion Task, as well as when trying to ReadAsync from an empty completed collection.
  • WaitToReadAsync/WaitToWriteAsync: Return a Task<bool> that will complete when reading or writing can be attempted. If the task completes with a true result, at that moment the channel was available for reading or writing, though because these channels may be used concurrently, it’s possible the status changed the moment after the operation completed. If the task completes with a false result, the channel has been completed and will not be able to satisfy a read or write.

 

This may all sound a bit complicated but here is a fully functional snippet with adjustable input params that you can play with

 

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Channels;
using System.Threading.Tasks;

namespace ChannelsExample
{
    class Program
    {
        static void Main(string[] args)
        {
            Task.Run(async () =>
            {
                await ChannelRun(2000, 1, 100, 5);
            });
            Console.WriteLine("running");
            Console.ReadLine();
        }


        public static async Task ChannelRun(int delayMs, int numberOfReaders, 
            int howManyMessages = 100, int maxCapacity=10)
        {
            var finalDelayMs = 25;
            var finalNumberOfReaders = 1;

            if (delayMs >= 25)
                finalDelayMs = delayMs;

            if (numberOfReaders >= 1)
                finalNumberOfReaders = numberOfReaders;


            //use a bounded channel is useful if you have a slow consumer
            //unbounded may lead to OutOfMemoryException
            var channel = Channel.CreateBounded<string>(maxCapacity);

            var reader = channel.Reader;
            var writer = channel.Writer;

            async Task Read(ChannelReader<string> theReader, int readerNumber)
            {
                //while when channel is not complete 
                while (await theReader.WaitToReadAsync())
                {
                    while (theReader.TryRead(out var theMessage))
                    {
                        Console.WriteLine($"Reader {readerNumber} read '{theMessage}' at {DateTime.Now.ToLongTimeString()}");
                        //simulate some work
                        await Task.Delay(delayMs);
                    }
                }
            }

            var tasks = new List<Task>();
            for (int i = 0; i < finalNumberOfReaders; i++)
            {
                tasks.Add(Task.Run(() => Read(reader, i+1)));
                await Task.Delay(10);
            }

            //Write message to the channel, but since Read has Delay
            //we will get back pressure applied to the writer, which causes it to block
            //when writing. Unbounded channels do not block ever
            for (int i = 0; i < howManyMessages; i++) {
                Console.WriteLine($"Writing at {DateTime.Now.ToLongTimeString()}");
                await writer.WriteAsync($"SomeText message '{i}");
            }

            //Tell readers we are complete with writing, to stop them awaiting 
            //WaitToReadAsync() forever
            writer.Complete();



            await reader.Completion;
            await Task.WhenAll(tasks);

        }
    }
}

 

Here is a run using 1 reader which introduces a fake 2 second delay for the writer which causes the writer to be back pressured and produce less values.

 

running
Writing at 21:58:09
Writing at 21:58:09
Writing at 21:58:09
Writing at 21:58:09
Writing at 21:58:09
Writing at 21:58:09
Writing at 21:58:09
Reader 1 read ‘SomeText message ‘0’ at 21:58:09
Reader 1 read ‘SomeText message ‘1’ at 21:58:11
Writing at 21:58:11
Reader 1 read ‘SomeText message ‘2’ at 21:58:13
Writing at 21:58:13
Reader 1 read ‘SomeText message ‘3’ at 21:58:15
Writing at 21:58:15

Here is another run using less delay for readers, and more readers

 

running
Writing at 22:00:26
Writing at 22:00:26
Writing at 22:00:26
Writing at 22:00:26
Writing at 22:00:26
Writing at 22:00:26
Reader 2 read ‘SomeText message ‘0’ at 22:00:26
Reader 4 read ‘SomeText message ‘3’ at 22:00:26
Reader 1 read ‘SomeText message ‘1’ at 22:00:26
Reader 3 read ‘SomeText message ‘2’ at 22:00:26
Writing at 22:00:26
Writing at 22:00:26
Writing at 22:00:26
Writing at 22:00:26
Reader 3 read ‘SomeText message ‘5’ at 22:00:26
Reader 2 read ‘SomeText message ‘4’ at 22:00:26
Reader 4 read ‘SomeText message ‘7’ at 22:00:26
Reader 1 read ‘SomeText message ‘6’ at 22:00:26
Writing at 22:00:26
Writing at 22:00:26
Writing at 22:00:26
Writing at 22:00:26
Reader 4 read ‘SomeText message ‘9’ at 22:00:26
Reader 2 read ‘SomeText message ‘8’ at 22:00:26
Reader 1 read ‘SomeText message ’10’ at 22:00:26
Reader 3 read ‘SomeText message ’11’ at 22:00:26

 

It’s a nice little API that I think will be very useful when working with publishers/subscribers that operate at different rates, without resorting to RX, which quite frankly does take a lot more getting used to

3 thoughts on “System.Threading.Channels

Leave a comment