Creating a Reusable "ObservableSource" for the Reactive Extensions (Part 1)

Introduction

As seen in the requirements for an implementation of IObservable<T> there is a lot of ground to cover. So the idea is to start with a massively simplified subset of the requirements to better understand what is will take to implement the full requirements. In other words, a series of partially functional prototypes.

As a first step, just implement:

  • Subscribe, and maintain a list of subscribed observers.
  • A disposable helper type to act as a return from Subscribe to unsubscribe.
  • Next to call the OnNext of each subscriber

This implementation does not:

  • Handle concurrency.
  • Support re-entrancy. Any call by a subscriber back into the Observable is, at best, going to fail.
  • Make use of the rest of the IObserver<T> interface. No errors and no end to the events.
  • Calling the subscribers asynchronously, in a synchronisation context of their (or the Reactive Framework's choice). (The System.Linq.Observable.ObserveOn<T>(IObservable<T> source, SynchronizationContext sync) method of RX might be able to be used to work around this.)

Implementation

using System;
using System.Collections.Generic;

public class Observable1<T> : IObservable<T> {
    private Dictionary<int, IObserver<T>> subscribers = new Dictionary<int, IObserver<T>>();
    private int nextSubscriber = 0;
    
    public IDisposable Subscribe(IObserver<T> observer) {
        subscribers.Add(nextSubscriber, observer);
        return new Observable1Disposer(this, nextSubscriber++);
    }

    private void Unsubscribe(int index) {
        subscribers.Remove(index);
    }

    public void Next(T value) {
        foreach (var sub in subscribers.Values) {
            sub.OnNext(value);
        }
    }


    private class Observable1Disposer : IDisposable {
        private Observable1<T> target;
        private int index;
        internal Observable1Disposer(Observable1<T> target, int index) {
            this.target = target;
            this.index = index;
        }

        public void Dispose() {
            target.Unsubscribe(index);
        }
    }
}

Testing

Obviously I have some test code, but it all gets somewhat tedious very quickly as it covers all sorts of edge cases (e.g. check that subscribing and then unsubscribing before any events works—it does). So these are a couple of indicative samples.

First what must be just about the simplest possible case, subscribe, one event and unsubscribe. Check that subscriber was called once:

var calls = 0;
var sub = Observer.Create<int>(i => {
    Console.WriteLine("This is the subscriber, arg: {0}", i);
    ++calls;
});

var source = new Observable1<int>();
using (var disp = source.Subscribe(sub)) {
    source.Next(1);
}

if (calls != 1) {
    Console.WriteLine("Test FAILED");
}

And then this to show that arbitrary interleaving of subscribe and unsubscribe operations will also work:

var calls1 = 0;
var sub1 = Observer.Create<int>(i => {
    Console.WriteLine("This is subscriber #1, arg: {0}", i);
    ++calls1;
});
var calls2 = 0;
var sub2 = Observer.Create<int>(i => {
    Console.WriteLine("This is subscriber #2, arg: {0}", i);
    ++calls2;
});

var source = new Observable1<int>();
IDisposable disp1 = null, disp2 = null;
try {
    disp1 = source.Subscribe(sub1);
    disp2 = source.Subscribe(sub2);
    disp2.Dispose();
    source.Next(1); // sub1 gets this
    disp2 = source.Subscribe(sub2);
    source.Next(2); // sub1 and sub2 get this
    disp1.Dispose();
    source.Next(3); // sub2 gets this
    disp1 = source.Subscribe(sub1);
    source.Next(4); // Both get this
} finally {
    disp1.Dispose();
    disp2.Dispose();
}

if (calls1 != 3 || calls2 != 3) {
    Console.WriteLine("Test FAILED");
}

In this second approach, I ended up adding the comments to the Next() calls to help me work out the counts. Then running and checking the console ensured all the events went to at least one subscriber. Clearly any real (production) code like this should be condemned with even the most limited of code reviews, but tests need to cover the bases.

Both tests also show just how useful lambdas are when working with Rx, the ability to create test subscribers that are both inline with the test, and close over locals makes state handling much easier.

How this first IObservable<T> Is Inadequate

Quick answer: in almost every way. Longer answer: while events are forwarded, and subscribers are managed the integrity of the system is only preserved by the subscribers playing by a very limited execution model. In particular no subscriber can unsubscribe in direct response to an event, because the iteration over the set of subscribers will fail if the Subscribe or Unsubscribe are called while in that iteration.

Next Steps

Deal with the lack of re-entrancy, so a subscriber can be added or removed while events are being sent to the subscribers. Also use a test framework to make testing a little more integrated.

This entry was posted Friday, January 22nd, 2010 15:51. In .NET Futures, Rx.
You can follow any responses to this entry through the RSS 2.0 feed.

Comments are closed.