Preparing For Implementing Concurrency
This is the hard part. Too easy to think it is OK when it isn’t, and too hard to validate that is really thread safe.
The only option is careful analysis, and then double (triple, …) check everything.
After all even the best set of unit tests will fail to find problems if it takes a particular combination of OS version, hardware and other processes running to show the problem. With race conditions, deadlocks and all the other concurrency fun such is not just a possibility but likely.
But first, the new 1.0.2350.0 (2010-03-15) build of Rx makes no change to my code, all tests pass without any changes. And this is now running on my main system (Rx has go-live licence, so why not) with its multiple codes (previously was working on a VM with a single virtual CPU).
What Needs Protection?
At the heart of my implementation there is some shared data. This needs
protecting where multiple threads could operate concurrently. E.g. while
events are pushed to subscribers using a copy of the list of subscribers
(this allows re-entrant subscription and un-subscription) taking a copy of
the values of a
Dictionary<K,V> certainly is not
Some operations are inherently safe (defined by the platforms object model,
see Joe Duffy’s blog
for more details). In this case reading or writing an aligned, volatile
int or reference is safe (such reads and writes are inherently
atomic). And only a single field needs to be checked, written to, or both
compared & exchanged (via
Checking if the
endOfEventsfield (reference to a
The boolean field
pushingEventsused only in
PushEvents(the method that loops through a snapshot of the current subscribers) to avoid re-entrant or multi-threaded pushing of events.
When it is read, and true in a simple check there is no problem (just return from
PushEvents. But if it is false it then needs to be set. So two operations are needed: a read and a write.
Interlocked.CompareExchangewill take care of this without a lock. (Before returning from pushing events it is set back to false, but a single write is also OK.)
As there is no overload of
boolthe field will need to be an
intwith values 0 and 1.
To avoid races in
Errorbetween checking for
endOfEventsand setting it one can use
There are also two pieces of functionality that require multiple operations on fields (or objects that are not thread safe). In each case a lock is needed.
The collection of subscribers and subscriber id counter. With operations to add or remove a subscriber, and to snapshot a list of all subscribers. This will require a lock, there is no way (I can see) around this.
To operate on the queue of pending events can be handled by changing to an instance of
ConcurrentQueue<T>with modifications in
TryDequeuerather than a combination of checking the count and then
What Doesn’t Need Protection
This implementation is not about being fair. So if two threads
both try and use an instance together, with one calling
Next and the other
the Windows thread scheduler gets any say in which wins (if
Completed is called first, no subscriber will see
the value passed to
Ready to Implement?
Even as I wrote the analysis above (which was already the second pass through
what would be needed) I noted that I could avoid a lock for
Error and use
So I’ve probably still missed something: time for another pass over the code.
And another question is how to test…? Throwing a load of cocurrent tasks
from the Parallel Extensions (see here)
will be a start, but cannot really cover mixing in
Error because I cannot determine which thread will win and which
will not, and thus no way to verify the “right one” won in a unit
More to work on.
Part 1: The basic implementation of Subscribe, unsubscribe and Next.
Part 2: Making Subscribe and unsubscribe re-entrant.
Part 3: Why re-entrancy is important.
Part 4: Making
Part 5: Implementing
OnErrorto complete support for full
Part 5a: Updates for build 1.0.2317.0 of the Reactive Extensions (2010-03-05).