Archive for March, 2010

Creating a Reusable “ObservableSource” for the Reactive Extensions (part 6.1)

Posted in .NET Futures, Rx on March 18th, 2010 by Richard – Comments Off on Creating a Reusable “ObservableSource” for the Reactive Extensions (part 6.1)

Preparing For Implementing Concurrency


The next step in this series on creating my own IObservable<T> implementation is concurrency. (See below for links to previous parts, Part 5 has a full code listing.)

This is the hard part. Too easy to think it is OK when it isn’t, and too hard to validate that is really thread safe.

The only option is careful analysis, and then double (triple, …) check everything.

After all even the best set of unit tests will fail to find problems if it takes a particular combination of OS version, hardware and other processes running to show the problem. With race conditions, deadlocks and all the other concurrency fun such is not just a possibility but likely.

But first, the new 1.0.2350.0 (2010-03-15) build of Rx makes no change to my code, all tests pass without any changes. And this is now running on my main system (Rx has go-live licence, so why not) with its multiple codes (previously was working on a VM with a single virtual CPU).

What Needs Protection?

At the heart of my implementation there is some shared data. This needs protecting where multiple threads could operate concurrently. E.g. while events are pushed to subscribers using a copy of the list of subscribers (this allows re-entrant subscription and un-subscription) taking a copy of the values of a Dictionary<K,V> certainly is not safe.

Some operations are inherently safe (defined by the platforms object model, see Joe Duffy’s blog for more details). In this case reading or writing an aligned, volatile int or reference is safe (such reads and writes are inherently atomic). And only a single field needs to be checked, written to, or both compared & exchanged (via Interlocked.CompareExchange):

  • Checking if the endOfEvents field (reference to a Notification<T>) is null.

  • The boolean field pushingEvents used only in PushEvents (the method that loops through a snapshot of the current subscribers) to avoid re-entrant or multi-threaded pushing of events.

    When it is read, and true in a simple check there is no problem (just return from PushEvents. But if it is false it then needs to be set. So two operations are needed: a read and a write. Interlocked.CompareExchange will take care of this without a lock. (Before returning from pushing events it is set back to false, but a single write is also OK.)

    As there is no overload of Interlocked.CompareExchange for bool the field will need to be an int with values 0 and 1.

  • To avoid races in Next, Completed and Error between checking for endOfEvents and setting it one can use Interlocked.CompareExchange.

There are also two pieces of functionality that require multiple operations on fields (or objects that are not thread safe). In each case a lock is needed.

  • The collection of subscribers and subscriber id counter. With operations to add or remove a subscriber, and to snapshot a list of all subscribers. This will require a lock, there is no way (I can see) around this.

  • To operate on the queue of pending events can be handled by changing to an instance of ConcurrentQueue<T> with modifications in PushEvents to use TryDequeue rather than a combination of checking the count and then Dequeue.

What Doesn’t Need Protection

This implementation is not about being fair. So if two threads both try and use an instance together, with one calling Next and the other Completed only the Windows thread scheduler gets any say in which wins (if Completed is called first, no subscriber will see the value passed to Next).

Ready to Implement?

Even as I wrote the analysis above (which was already the second pass through what would be needed) I noted that I could avoid a lock for Next, Completed, and Error and use CompareExchange. So I’ve probably still missed something: time for another pass over the code.

And another question is how to test…? Throwing a load of cocurrent tasks from the Parallel Extensions (see here) will be a start, but cannot really cover mixing in Completed and Error because I cannot determine which thread will win and which will not, and thus no way to verify the “right one” won in a unit test assert.

More to work on.


  • Part 1: The basic implementation of Subscribe, unsubscribe and Next.

  • Part 2: Making Subscribe and unsubscribe re-entrant.

  • Part 3: Why re-entrancy is important.

  • Part 4: Making Next re-entrant.

  • Part 5: Implementing OnCompleted and OnError to complete support for full IObserver<T> interface.

  • Part 5a: Updates for build 1.0.2317.0 of the Reactive Extensions (2010-03-05).

Creating a Reusable “ObservableSource” for the Reactive Extensions (part 5a)

Posted in .NET Futures, Rx on March 9th, 2010 by Richard – Comments Off on Creating a Reusable “ObservableSource” for the Reactive Extensions (part 5a)

Updates for Next Reactive Extensions (Rx) Code Drop

The new code drop came through, 2½ months after the last drop, on 5th March. The release notes are suitably long. With a fair amount of change, much of it under the covers to edge cases.

But a fair number of things have moved assemblies and changed name.

Impact on My Implementation

Given all the change, there was relatively little impact. The only thing that broke was my use of Notification<T>: the Current property is now called Value. And it now has an Exception property, which saves a cast when handling an OnError notification.

My (unpublished) test harness used GroupDisposable to clean up subscriptions, this was renamed CompositeDisposable and moved to the CoreEx assembly.

Both of these renames give slightly better and more consistent names, and the addition of the Exception property is definitely better. So overall the new drop is, for me, an improvement so far: once the code compiled all tests just passed.

The only function that changed was SendNotification so following DRY helped. The new implementation:

private static void SendNotification(IObserver<T> obs, Notification<T> n) {
    switch (n.Kind) {
    case NotificationKind.OnNext:
    case NotificationKind.OnCompleted:
    case NotificationKind.OnError:

So Where’s The Concurrency Support…?

Coming! :-)
I've been rather head down in learning WPF for something else, and trying to avoid too many distractions while getting my head around the basics of WPF. Concurrency support is perhaps the biggest challenge here and I therefore want to be able to focus on it and get it right. This will mean taking a block of focused time.