Ractive Framework’s Observer.Create Doesn’t Create a Pure Wrapper

What Happens When One’s Test Doesn’t Fail When It Should, or, Why Observer.Create<T> Isn’t Good For Testing Observables


In building my own implementation of IObservable<T> across a series of posts (Part 1, Part 2, Part 3 and Part 4) I’ve been making heavy use of Observer.Create<T> to create helper observer instances to ensure that the right methods of the subscribed observers are called, the right number of times (and even—although somewhat harder to track in all but the simplest cases—the right order).

The Discovery

In working towards Part 5 which will cover implementing the two, so far missing, methods of IObserver<T>: OnCompleted and OnError I was rather surprise when this test passed:

public void Observable4_CallingCompletedAgainIsNoOp() {
    bool hasCompleted = false;
    var sub = Obs.Create<int>(
        i => { Assert.Fail("OnNext should not have been called"); },
        exn => { Assert.Fail("OnError should not have been called"); },
        () => {
            Assert.IsFalse(hasCompleted, "OnCompleted for single subscriber called multiple times");
            hasCompleted = true;

    var source = new Observable4<int>();
    using (var unsub = source.Subscribe(sub)) {

When I knew that Observable4<int>.Completed had no logic to prevent IObserver<T>.OnComplete being called multiple times. Under the debugger it was quite clear that the third lambda was not being called (briefly I considered a debugger bug, but decided to check more fully first).

I looked at the implementation of Observer.Create<T> in Reflector. That factory method creates an instance of an internal type, which derives from another internal type AbstractObserver<T>. So far largely as expected.

But in looking at the implementation of the IObserver<T> methods I saw:

public void OnCompleted() {
    if (!this.IsStopped) {
        this.IsStopped = true;

I.e. it keeps track of a call to OnCompleted and blocks further events. All three IObserver<T> methods check for IsStopped, and it is also set in OnError.

The Implication

If I want an observer that I can use to test an observable for compliance to the IObserver<T> semantic contract, I cannot use Observer.Create<T>. It will hide breaking the “OnCompleted or OnError end the observable’s event sequence”.

The Solution

In the end the fix is rather easy: make my own Observer.Create<T> which doesn’t contain any such logic. This is rather simple:

public static class Obs {
    public static AnonObserver<T> Create<T>(Action<T> onNext) {
        return new AnonObserver<T>(onNext);
    public static AnonObserver<T> Create<T>(Action<T> onNext, Action<Exception> onError, Action onComplete) {
        return new AnonObserver<T>(onNext, onError, onComplete);

public class AnonObserver<T> : IObserver<T> {
    private Action<T> onNext;
    private Action onComplete;
    private Action<Exception> onError;

    public AnonObserver(Action<T> onNext) {
        if (onNext == null) { throw new ArgumentNullException("onNext"); }
        this.onNext = onNext;
    public AnonObserver(Action<T> onNext, Action<Exception> onError, Action onComplete) {
        if (onNext == null) { throw new ArgumentNullException("onNext"); }
        if (onError == null) { throw new ArgumentNullException("onError"); }
        if (onComplete == null) { throw new ArgumentNullException("onComplete"); }
        this.onNext = onNext;
        this.onError = onError;
        this.onComplete = onComplete;

    public void OnCompleted() {
        if (onComplete != null) {

    public void OnError(Exception error) {
        if (onError != null) {

    public void OnNext(T value) {

I now need to just replace all the use of Observer.Create<T> with Obs.Create<T> (and try and think of a better name).

This entry was posted Monday, February 22nd, 2010 11:49. In .NET Futures, Rx.
You can follow any responses to this entry through the RSS 2.0 feed.

Comments are closed.