Introduction

The recently released preview of the Reactive Extensions (Rx) from Microsoft has had a lot of coverage from Channel 9. It includes support for compositional handling of observable event streams from:

  • WinForms events,
  • WPF events,
  • the asynchronous pattern (BeginX and EndX method pairs), and
  • an IEnumerable<T>.

But it does not include anything reusable from which to build your own observable object.

This is unfortunate because it turns out creating an observable (a type implementing IObservable<T> is harder than it might be expected to be. It is certainly much more involved than implementing IEnumerable<T>/​IEnumerator<T> without using yield (as one needed to do to create a fully custom collection in .NET 1.0 and 1.1).

To start with one needs the requirements. There does not appear to be any explicit listing (yet), but from one source or another there is the information one needs. I've collected a few below.

Requirements for an IObservable<T> Implementation

In writing an implementation of IObservable<T> one should follow the documented semantics (the syntax is, of course, enforced by the compiler and runtime). And therefore be consistent with existing implementation, especially with the implementations contained within the Reactive Framework.

The documentation for IObservable<T> is (currently, in preview release state) quite brief, but the sources listed below, and a little intuition can get a long way.

  1. There will be zero or more subscribers.

    This does mean handling zero subscribers (all operations are no-ops) as well as handling many subscribers. This drives much of the implementation:

    • Need a collection of subscribers.
    • Need a mechanism for the object returned by the Subscribe(observer) to the subscriber to identify which subscriber is unsubscribing when Dispose() is called on that retuned object. This need to link back to a specific subscription means this disposable object cannot be trivial.
    • Every event from the observable will require iterating through the list of subscribers. This needs to be both thread safe and re-entrant. In practice this means the observable needs to take a snapshot of the subscribers and iterate through that for each event (this makes use of the assumed asynchronous unsubscribe noted below).
  2. A subscriber, which remains subscribed, will see zero or more calls to it's OnNext method maybe followed by OnError or OnComplete.

    In pseudo-regular-expression this could be: OnNext*(OnError|OnComplete)?.

    All the following patterns are acceptable

    • nothing, no events at all,
    • OnNext one or more times (no OnComplete or OnError),
    • Just OnComplete.
  3. Exceptions thrown by subscribers caused non-specified effects. It could do anything from nothing (i.e. observable absorbs them with a no-op catch block) to killing the whole process.

    This is more significant for subscribers: they should avoid throwing.

  4. Subscribers can be added or removed (via the IDisposable implementing object returned from the observable's Subscribe method).

  5. There is no restriction on the subscriber being notified, or any other piece of code, from calling into the observable. A subscriber (or other object called from the subscriber) or an entirely separate type on a different thread, can (directly or indirectly) call observable methods.

    For example it is quite acceptable for a subscriber to unsubscribe itself while handling a notification. For example:

    var input = new[] { 1, 2, 3, 4, 5 };
    IDisposable d = null;
    var sub = Observer.Create&lt;int&gt;(x =&gt; {
        Console.WriteLine("Received {0}", x);
        d.Dispose();
    });
    d = input.Subscribe(sub);

    In this code, one line is printed to the console.

    Strictly speaking the code above is incorrect. An observable could start sending events to the subscriber before Subscribe returns, thus d may not yet have a value when the OnNext is called. In this case that will lead to a NullReferenceException. This can be fixed by unsubscribing on the first event in which d has been set.

There are a couple of things the observable is not responsible for:

  • The observable does not need to ensure Subscribe returns before events are seen (as noted below the simple one-shot subscriber above).

  • The observable does not need to complete an unsubscribe immediately. Rather the subscriber must handle receiving more events even after unsubscribing (this is true of .NET events as will always be an inherent race-condition when removing a listener).

Sources of Requirements

  1. "Reactive Extensions API in depth: Contract" is directly from the team.

    It includes the following non-obvious points:

    • Subscribe(subscriber) is asynchronous: the subscriber could start receiving events before the IDisposable with which to unsubscribe is returned.
    • Dispose() of the object returned by Subscribe(subscriber) is also asynchronous: you may receive events from the observable after you have disposed the object.
    • IObservable<T>.Subscribe(subscriber) must not throw. (I think this should be should not throw—after all out of memory and similar cannot be effectively be prevented in all cases.
    • Exceptions from subscribers will not, in general, be caught (and fall through to the context of the observable).
  2. IObservable<T> documentation on MSDN

  3. IObserver<T> documentation on MSDN

  4. Blog posting An RSS Dashboard in F#, part two (IObservables) by Brian McNamara (2009-12-20).

    In F#, but the requirements for re-entrancy is clear.

  5. This question on Stack Overflow also has some useful information.

And next: on to an actual implementation.