Creating a Reusable 'ObservableSource' for the Reactive Extensions (Part 1)
Note reloaded into my rebuilt blog: I know the test code should be a unit test now! This was written over 13 years ago!
Introduction
As seen in the requirements for an implementation
of IObservable<T>
there is a lot of ground to cover. So the idea is
to start with a massively simplified subset of the requirements to better understand what
is will take to implement the full requirements. In other words, a series of partially
functional prototypes.
As a first step, just implement:
Subscribe
, and maintain a list of subscribed observers.-
A disposable helper type to act as a return from
Subscribe
to unsubscribe. Next
to call theOnNext
of each subscriber
This implementation does not:
- Handle concurrency.
- Support re-entrancy. Any call by a subscriber back into the Observable is, at best, going to fail.
-
Make use of the rest of the
IObserver<T>
interface. No errors and no end to the events. -
Calling the subscribers asynchronously, in a synchronisation context of their (or the
Reactive Framework''s choice). (The
System.Linq.Observable.ObserveOn<T>(IObservable<T> source, SynchronizationContext sync)
method of RX might be able to be used to work around this.)
Implementation
Testing
Obviously I have some test code, but it all gets somewhat tedious very quickly as it covers all sorts of edge cases (e.g. check that subscribing and then unsubscribing before any events works—it does). So these are a couple of indicative samples.
First what must be just about the simplest possible case, subscribe, one event and unsubscribe. Check that subscriber was called once:
And then this to show that arbitrary interleaving of subscribe and unsubscribe operations will also work:
In this second approach, I ended up adding the comments to the Next()
calls to
help me work out the counts. Then running and checking the console ensured all the events
went to at least one subscriber. Clearly any real (production) code like this should be
condemned with even the most limited of code reviews, but tests need to cover the bases.
Both tests also show just how useful lambdas are when working with Rx, the ability to create test subscribers that are both inline with the test, and close over locals makes state handling much easier.
How this first IObservable<T>
Is Inadequate
Quick answer: in almost every way. Longer answer: while events are forwarded, and subscribers
are managed the integrity of the system is only preserved by the subscribers playing by a very
limited execution model. In particular no subscriber can unsubscribe in direct response to an
event, because the iteration over the set of subscribers will fail if the Subscribe
or Unsubscribe
are called while in that iteration.
Next Steps
Deal with the lack of re-entrancy, so a subscriber can be added or removed while events are being sent to the subscribers. Also use a test framework to make testing a little more integrated.