Creating a Reusable 'ObservableSource' for the Reactive Extensions (part 6.1)
Note reloaded into my rebuilt blog: some linked fixed up.
Preparing For Implementing Concurrency
Introduction
The next step in this series on creating my own
IObservable<T>
implementation is concurrency.
(See below for links to previous parts,
Part 5
has a full code listing.)
This is the hard part. Too easy to think it is OK when it isn’t, and too hard to validate that is really thread safe.
The only option is careful analysis, and then double (triple, …) check everything.
After all even the best set of unit tests will fail to find problems if it takes a particular combination of OS version, hardware and other processes running to show the problem. With race conditions, deadlocks and all the other concurrency fun such is not just a possibility but likely.
But first, the new 1.0.2350.0 (2010-03-15) build of Rx makes no change to my code, all tests pass without any changes. And this is now running on my main system (Rx has go-live licence, so why not) with its multiple codes (previously was working on a VM with a single virtual CPU).
What Needs Protection?
At the heart of my implementation there is some shared data. This needs
protecting where multiple threads could operate concurrently. E.g. while
events are pushed to subscribers using a copy of the list of subscribers
(this allows re-entrant subscription and un-subscription) taking a copy of
the values of a Dictionary<K,V>
certainly is not
safe.
Some operations are inherently safe (defined by the platforms object model,
see <a href=''http://www.bluebytesoftware.com/blog/''>Joe Duffy’s blog</a>
for more details). In this case reading or writing an aligned, volatile
int
or reference is safe (such reads and writes are inherently
atomic). And only a single field needs to be checked, written to, or both
compared & exchanged (via Interlocked.CompareExchange
):
-
Checking if the
endOfEvents
field (reference to aNotification<T>
) isnull
. -
The boolean field
pushingEvents
used only inPushEvents
(the method that loops through a snapshot of the current subscribers) to avoid re-entrant or multi-threaded pushing of events.When it is read, and true in a simple check there is no problem (just return from
PushEvents
. But if it is false it then needs to be set. So two operations are needed: a read and a write.Interlocked.CompareExchange
will take care of this without a lock. (Before returning from pushing events it is set back to false, but a single write is also OK.)As there is no overload of
Interlocked.CompareExchange
forbool
the field will need to be anint
with values 0 and 1. -
To avoid races in
Next
,Completed
andError
between checking forendOfEvents
and setting it one can useInterlocked.CompareExchange
.
There are also two pieces of functionality that require multiple operations on fields (or objects that are not thread safe). In each case a lock is needed.
-
The collection of subscribers and subscriber id counter. With operations to add or remove a subscriber, and to snapshot a list of all subscribers. This will require a lock, there is no way (I can see) around this.
-
To operate on the queue of pending events can be handled by changing to an instance of
ConcurrentQueue<T>
with modifications inPushEvents
to useTryDequeue
rather than a combination of checking the count and thenDequeue
.
What Doesn’t Need Protection
This implementation is not about being fair. So if two threads
both try and use an instance together, with one calling
Next
and the other Completed
only
the Windows thread scheduler gets any say in which wins (if
Completed
is called first, no subscriber will see
the value passed to Next
).
Ready to Implement?
Even as I wrote the analysis above (which was already the second pass through
what would be needed) I noted that I could avoid a lock for Next
,
Completed
, and Error
and use CompareExchange
.
So I’ve probably still missed something: time for another pass over the code.
And another question is how to test…? Throwing a load of cocurrent tasks
from the Parallel Extensions (see here)
will be a start, but cannot really cover mixing in Completed
and
Error
because I cannot determine which thread will win and which
will not, and thus no way to verify the “right one” won in a unit
test assert.
More to work on.
Previously
-
Part 1: The basic implementation of
Subscribe
, unsubscribe andNext
. -
Part 2: Making
Subscribe
and unsubscribe re-entrant. -
Part 3: Why re-entrancy is important.
-
Part 4: Making
Next
re-entrant. -
Part 5: Implementing
OnCompleted
andOnError
to complete support for fullIObserver<T>
interface. -
Part 5a: Updates for build 1.0.2317.0 of the Reactive Extensions (2010-03-05).