So far in this series (links below) the focus has been on implementing
Subscribe in a way that is re-entrant.
But that leaves two methods uncovered. Both
Error end the sequence, but in different ways.
Error passes an exception indicating the source has failed,
Completed just indicates it has ended.
Subject<T> when a sequence of events is ended:
Existing subscribers received a call to their
Further calls to any of
IObserver<T>methods is a no-op.
Any new subscribers immediately get a call to their
OnCompleted(as applicable) and nothing more.
Part 1: The basic implementation of
Part 2: Making
Part 3: Why re-entrancy is important.
Part 4: Making
Notification<T> As A Way to Store An Observable Sequence
The reactive Framework (Rx) distributable comes with three assemblies:
System.CoreEx contains types that would have been in System or System.Core
had .NET been designed with Rx in it from the start. System.Interactive
extends LINQ to Objects by back-porting many of
extension methods (or ’operators’) to
System.Reactive implements LINQ like operators for
My observable implementation almost doesn’t need any of these
three assemblies (
being defined in the mscorlib assembly).
However one rather useful group of types defined in the System.CoreEx
assembly in namespace System.Collections.Generic are
and its subtypes. These types allow the “value” of an observable event
to be stored. Using one of the three subtypes (one each for
OnError) defined as members of
Notification<T> (hence created as, for example,
new Notification<int>.OnNext(1)). Therefore a collection
of events covering all three
IObserver<T> methods can be
easily created without some custom helper types.
These types are provided for the
Dematerialise enumerable and observable operators (the former being
one the back-ported operators in System.Interactive).
Overview of The Implementation of
Both follow the same pattern as the existing
If ended, return
Add the new event to a queue of pending events.
If not already pushing an event, process the queue.
The first of these steps is new, and handles the requirement that either
OnError ends the sequence.
To make handling new subscriptions after the end of the sequence
endOfEvents field is a
Notification<T> reference. If null the sequence has not
ended, otherwise this is the event to immediately push. Also a helper
from System.Reactive is used as the return value:
System.Disposables.Disposable.Empty which is an instance of
a type which implements
IDisposable as a no-op and these is
no need to track this after the end subscriber (it will never be called
The New Implementation
The single biggest change to support storing notifications (rather than
values of type
T) is in
different kinds of notifications need to call different
IObserver<T> methods, so a switch is needed. This is
in its own method (
SendNotification) because it can then
be reused to send the ending event to post-end subscribers.
The implementations of
Error end up being very similar and simple.
Overall, compared to the code from
part 4, the
implementation is a somewhat longer, but certainly better factored with
each method doing one thing. The most complex (
does have to loop over both pending events and subscribers, but that
is all it does.
The toughest part to get right is still left: concurrency. This will be
difficult as much to test as to implement, tests using workers are never
simple and just being confident that they are doing things concurrently
is hard (after all a simple
be slow enough to effectively eliminate any concurrency due to forcing
CPU cache synchronisation even with multiple idle cores).