Creating a Reusable “ObservableSource” for the Reactive Extensions (part 6.1)

Preparing For Implementing Concurrency

Introduction

The next step in this series on creating my own IObservable<T> implementation is concurrency. (See below for links to previous parts, Part 5 has a full code listing.)

This is the hard part. Too easy to think it is OK when it isn’t, and too hard to validate that is really thread safe.

The only option is careful analysis, and then double (triple, …) check everything.

After all even the best set of unit tests will fail to find problems if it takes a particular combination of OS version, hardware and other processes running to show the problem. With race conditions, deadlocks and all the other concurrency fun such is not just a possibility but likely.

But first, the new 1.0.2350.0 (2010-03-15) build of Rx makes no change to my code, all tests pass without any changes. And this is now running on my main system (Rx has go-live licence, so why not) with its multiple codes (previously was working on a VM with a single virtual CPU).

What Needs Protection?

At the heart of my implementation there is some shared data. This needs protecting where multiple threads could operate concurrently. E.g. while events are pushed to subscribers using a copy of the list of subscribers (this allows re-entrant subscription and un-subscription) taking a copy of the values of a Dictionary<K,V> certainly is not safe.

Some operations are inherently safe (defined by the platforms object model, see Joe Duffy’s blog for more details). In this case reading or writing an aligned, volatile int or reference is safe (such reads and writes are inherently atomic). And only a single field needs to be checked, written to, or both compared & exchanged (via Interlocked.CompareExchange):

  • Checking if the endOfEvents field (reference to a Notification<T>) is null.

  • The boolean field pushingEvents used only in PushEvents (the method that loops through a snapshot of the current subscribers) to avoid re-entrant or multi-threaded pushing of events.

    When it is read, and true in a simple check there is no problem (just return from PushEvents. But if it is false it then needs to be set. So two operations are needed: a read and a write. Interlocked.CompareExchange will take care of this without a lock. (Before returning from pushing events it is set back to false, but a single write is also OK.)

    As there is no overload of Interlocked.CompareExchange for bool the field will need to be an int with values 0 and 1.

  • To avoid races in Next, Completed and Error between checking for endOfEvents and setting it one can use Interlocked.CompareExchange.

There are also two pieces of functionality that require multiple operations on fields (or objects that are not thread safe). In each case a lock is needed.

  • The collection of subscribers and subscriber id counter. With operations to add or remove a subscriber, and to snapshot a list of all subscribers. This will require a lock, there is no way (I can see) around this.

  • To operate on the queue of pending events can be handled by changing to an instance of ConcurrentQueue<T> with modifications in PushEvents to use TryDequeue rather than a combination of checking the count and then Dequeue.

What Doesn’t Need Protection

This implementation is not about being fair. So if two threads both try and use an instance together, with one calling Next and the other Completed only the Windows thread scheduler gets any say in which wins (if Completed is called first, no subscriber will see the value passed to Next).

Ready to Implement?

Even as I wrote the analysis above (which was already the second pass through what would be needed) I noted that I could avoid a lock for Next, Completed, and Error and use CompareExchange. So I’ve probably still missed something: time for another pass over the code.

And another question is how to test…? Throwing a load of cocurrent tasks from the Parallel Extensions (see here) will be a start, but cannot really cover mixing in Completed and Error because I cannot determine which thread will win and which will not, and thus no way to verify the “right one” won in a unit test assert.

More to work on.

Previously

  • Part 1: The basic implementation of Subscribe, unsubscribe and Next.

  • Part 2: Making Subscribe and unsubscribe re-entrant.

  • Part 3: Why re-entrancy is important.

  • Part 4: Making Next re-entrant.

  • Part 5: Implementing OnCompleted and OnError to complete support for full IObserver<T> interface.

  • Part 5a: Updates for build 1.0.2317.0 of the Reactive Extensions (2010-03-05).

This entry was posted Thursday, March 18th, 2010 17:09. In .NET Futures, Rx.
You can follow any responses to this entry through the RSS 2.0 feed.

Comments are closed.