- WinForms events,
- WPF events,
- the asynchronous pattern (
EndXmethod pairs), and
But it does not include anything reusable from which to build your own observable object.
This is unfortunate because it turns out creating an observable (a type implementing
IObservable<T> is harder than it might be expected to be. It is certainly
much more involved than implementing
yield (as one needed to do to create a fully custom collection in .NET 1.0 and
To start with one needs the requirements. There does not appear to be any explicit listing (yet), but from one source or another there is the information one needs. I've collected a few below.
Requirements for an
In writing an implementation of IObservable<T> one should follow the documented semantics (the syntax is, of course, enforced by the compiler and runtime). And therefore be consistent with existing implementation, especially with the implementations contained within the Reactive Framework.
The documentation for IObservable<T> is (currently, in preview release state) quite brief, but the sources listed below, and a little intuition can get a long way.
There will be zero or more subscribers.
This does mean handling zero subscribers (all operations are no-ops) as well as handling many subscribers. This drives much of the implementation:
- Need a collection of subscribers.
Need a mechanism for the object returned by the
Subscribe(observer)to the subscriber to identify which subscriber is unsubscribing when
Dispose()is called on that retuned object. This need to link back to a specific subscription means this disposable object cannot be trivial.
- Every event from the observable will require iterating through the list of subscribers. This needs to be both thread safe and re-entrant. In practice this means the observable needs to take a snapshot of the subscribers and iterate through that for each event (this makes use of the assumed asynchronous unsubscribe noted below).
A subscriber, which remains subscribed, will see zero or more calls to it's
OnNextmethod maybe followed by
In pseudo-regular-expression this could be:
All the following patterns are acceptable
- nothing, no events at all,
- OnNext one or more times (no OnComplete or OnError),
- Just OnComplete.
Exceptions thrown by subscribers caused non-specified effects. It could do anything from nothing (i.e. observable absorbs them with a no-op
catchblock) to killing the whole process.
This is more significant for subscribers: they should avoid throwing.
Subscribers can be added or removed (via the
IDisposableimplementing object returned from the observable's
There is no restriction on the subscriber being notified, or any other piece of code, from calling into the observable. A subscriber (or other object called from the subscriber) or an entirely separate type on a different thread, can (directly or indirectly) call observable methods.
For example it is quite acceptable for a subscriber to unsubscribe itself while handling a notification. For example:
In this code, one line is printed to the console.
Strictly speaking the code above is incorrect. An observable could start sending events to the subscriber before
dmay not yet have a value when the
OnNextis called. In this case that will lead to a
NullReferenceException. This can be fixed by unsubscribing on the first event in which
dhas been set.
There are a couple of things the observable is not responsible for:
The observable does not need to ensure
Subscribereturns before events are seen (as noted below the simple one-shot subscriber above).
The observable does not need to complete an unsubscribe immediately. Rather the subscriber must handle receiving more events even after unsubscribing (this is true of .NET events as will always be an inherent race-condition when removing a listener).
Sources of Requirements
"Reactive Extensions API in depth: Contract" is directly from the team.
It includes the following non-obvious points:
Subscribe(subscriber)is asynchronous: the subscriber could start receiving events before the
IDisposablewith which to unsubscribe is returned.
Dispose()of the object returned by
Subscribe(subscriber)is also asynchronous: you may receive events from the observable after you have disposed the object.
IObservable<T>.Subscribe(subscriber)must not throw. (I think this should be should not throw—after all out of memory and similar cannot be effectively be prevented in all cases.
- Exceptions from subscribers will not, in general, be caught (and fall through to the context of the observable).
Blog posting An RSS Dashboard in F#, part two (IObservables) by Brian McNamara (2009-12-20).
In F#, but the requirements for re-entrancy is clear.
And next: on to an actual implementation.