Creating a Reusable 'ObservableSource' for the Reactive Extensions (part 5)
Adding Completed
and Error
Methods
Introduction
So far in this series (links below) the focus has been on implementing
Next
and Subscribe
in a way that is re-entrant.
But that leaves two methods uncovered. Both Completed
and
Error
end the sequence, but in different ways.
Error
passes an exception indicating the source has failed,
whereas Completed
just indicates it has ended.
Based on Subject<T>
when a sequence of events is ended:
-
Existing subscribers received a call to their
OnError
orOnCompleted
(as applicable). -
Further calls to any of
Subject<T>
'sIObserver<T>
methods is a no-op. -
Any new subscribers immediately get a call to their
OnError
orOnCompleted
(as applicable) and nothing more.
Previously
-
Part 1: The basic implementation of
Subscribe
,Unsubscribe
, andNext
. -
Part 2: Making
Subscribe
andUnsubscribe
re-entrant. -
Part 3: Why re-entrancy is important.
-
Part 4: Making
Next
re-entrant.
Notification<T>
As A Way to Store An Observable Sequence
The reactive Framework (Rx) distributable comes with three assemblies:
System.CoreEx
System.Reactive
System.Interactive
System.CoreEx contains types that would have been in System or System.Core
had .NET been designed with Rx in it from the start. System.Interactive
extends LINQ to Objects by back-porting many of IObservable<T>
's
extension methods (or ’operators’) to IEnumerable<T>
.
System.Reactive implements LINQ like operators for IObservable<T>
.
My observable implementation almost doesn’t need any of these
three assemblies (IObservable<T>
and IObserver<T>
being defined in the mscorlib assembly).
However one rather useful group of types defined in the System.CoreEx
assembly in namespace System.Collections.Generic are Notification<T>
and its subtypes. These types allow the “value” of an observable event
to be stored. Using one of the three subtypes (one each for OnNext
,
OnCompleted
and OnError
) defined as members of
Notification<T>
(hence created as, for example,
new Notification<int>.OnNext(1)
). Therefore a collection
of events covering all three IObserver<T>
methods can be
easily created without some custom helper types.
These types are provided for the Materialize
and
Dematerialise
enumerable and observable operators (the former being
one the back-ported operators in System.Interactive).
Overview of The Implementation of Completed
and OnError
Both follow the same pattern as the existing Next
implementation:
If ended, return
Add the new event to a queue of pending events.
If not already pushing an event, process the queue.
The first of these steps is new, and handles the requirement that either
Completed
or OnError
ends the sequence.
To make handling new subscriptions after the end of the sequence
the endOfEvents
field is a
Notification<T>
reference. If null the sequence has not
ended, otherwise this is the event to immediately push. Also a helper
from System.Reactive is used as the return value:
System.Disposables.Disposable.Empty
which is an instance of
a type which implements IDisposable
as a no-op and these is
no need to track this after the end subscriber (it will never be called
again).
The New Implementation
The single biggest change to support storing notifications (rather than
values of type T
) is in PushEvents
where
different kinds of notifications need to call different
IObserver<T>
methods, so a switch is needed. This is
in its own method (SendNotification
) because it can then
be reused to send the ending event to post-end subscribers.
The implementations of Next
, Completed
and
Error
end up being very similar and simple.
Overall, compared to the code from
part 4, the
implementation is a somewhat longer, but certainly better factored with
each method doing one thing. The most complex (PushEvents
)
does have to loop over both pending events and subscribers, but that
is all it does.
The Code
Further Steps
The toughest part to get right is still left: concurrency. This will be
difficult as much to test as to implement, tests using workers are never
simple and just being confident that they are doing things concurrently
is hard (after all a simple Interlocked.Increment
might
be slow enough to effectively eliminate any concurrency due to forcing
CPU cache synchronisation even with multiple idle cores).