Archive for January, 2010

Creating a Reusable "ObservableSource" for the Reactive Extensions (part 2)

Posted in .NET Futures, Rx on January 24th, 2010 by Richard – Comments Off on Creating a Reusable "ObservableSource" for the Reactive Extensions (part 2)

Introduction

Continuing on from Observable1<T> implemented in Part 1 with Observable2<T> here.

The aim in this part is to allow re-entrant subscribe and unsubscribe operations to not fail. Using Observable1<T> a subscriber which tries to unsubscribe itself in its OnNext implementation will lead to an InvalidOperationException being thrown from Observable1<T>.OnNext. This is because Subscribe and Unsubscribe both modify the collection of subscribers in Observable1<T> and which OnNext is iterating over. The .NET Framework collections do not support this.

Supporting Re-entrant Operations

To allow Subscribe and Unsubscribe to work is really very simple: just take a copy of the set of subscribers before iterating over it. Replacing the implementation of Next from Observable1<T>:

public void Next(T value) {
    foreach (var sub in subscribers.Values) {
        sub.OnNext(value);
    }
}

with the following in Observable2<T> (full implementation is given below):

public void Next(T value) {
    var subs = subscribers.Values.ToArray();
    foreach (var sub in subs) {
        sub.OnNext(value);
    }
}

And now code like the following doesn’t fail:

bool hasUnsubscribed = false;
IDisposable unsub = null;
var sub = Observer.Create<int>(i => {
    Console.WriteLine("This is the subscriber, arg: {0}", i);
    if (unsub != null) {
        unsub.Dispose();
        unsub = null;
        hasUnsubscribed = true;
    }
});

var source = new Observable2<int>();
unsub = source.Subscribe(sub);
source.Next(1);
Assert.IsTrue(hasUnsubscribed);
Assert.IsNull(unsub);

Note this test code wouldn’t pass with an arbitrary observable, where the requirement to allow asynchronous subscribe could lead to unsub not yet being set by the time the subscriber code is called (the null check will at least avoid the crash).

While this seems an odd thing to do, consider a subscriber which just wants to react to one event (or the implementation of the RX operator First).

Progress Against the Overall Requirements

See the full list of requirements.

  • #1: Manage subscribers: done. Can subscribe zero or more observers, and freely add or remove them.

  • #2: OnNext, OnComplete and OnError: OnNext is done (will be called zero or more times). But neither completion or errors are supported, this also means that the source cannot indicate the end of events, every even sequence is currently infinite.

  • #3 Exceptions from subscribers will fall back to the event producer: this will happen, but will also mean no other subscribers will see the event.

    Based on this forum thread, about Subject<T>, the right approach here in Rx is not completely agreed.

  • #4: Done: can freely add and remove subscribers.

  • #5: Re-entrancy: this, part 2, makes a start. But a subscriber calling Next is likely to lead to some unexpected sequencing of calls.

The Observable2<T> Implementation

using System;
using System.Collections.Generic;
using System.Linq;

public class Observable2<T> : IObservable<T> {
    private Dictionary<int, IObserver<T>> subscribers = new Dictionary<int, IObserver<T>>();
    private int nextSubscriber = 0;

    public IDisposable Subscribe(IObserver<T> observer) {
        subscribers.Add(nextSubscriber, observer);
        return new Observable1Disposer(this, nextSubscriber++);
    }

    private void Unsubscribe(int index) {
        subscribers.Remove(index);
    }

    public void Next(T value) {
        // To allow subscribers to unsubscribe while calling them, don't
        // directly iterate over the collection, but use a copy.
        var subs = subscribers.Values.ToArray();
        foreach (var sub in subs) {
            sub.OnNext(value);
        }
    }


    private class Observable1Disposer : IDisposable {
        private Observable2<T> target;
        private int index;
        internal Observable1Disposer(Observable2<T> target, int index) {
            this.target = target;
            this.index = index;
        }

        public void Dispose() {
            target.Unsubscribe(index);
        }
    }
}

Creating a Reusable "ObservableSource" for the Reactive Extensions (Part 1)

Posted in .NET Futures, Rx on January 22nd, 2010 by Richard – Comments Off on Creating a Reusable "ObservableSource" for the Reactive Extensions (Part 1)

Introduction

As seen in the requirements for an implementation of IObservable<T> there is a lot of ground to cover. So the idea is to start with a massively simplified subset of the requirements to better understand what is will take to implement the full requirements. In other words, a series of partially functional prototypes.

As a first step, just implement:

  • Subscribe, and maintain a list of subscribed observers.
  • A disposable helper type to act as a return from Subscribe to unsubscribe.
  • Next to call the OnNext of each subscriber

This implementation does not:

  • Handle concurrency.
  • Support re-entrancy. Any call by a subscriber back into the Observable is, at best, going to fail.
  • Make use of the rest of the IObserver<T> interface. No errors and no end to the events.
  • Calling the subscribers asynchronously, in a synchronisation context of their (or the Reactive Framework's choice). (The System.Linq.Observable.ObserveOn<T>(IObservable<T> source, SynchronizationContext sync) method of RX might be able to be used to work around this.)

Implementation

using System;
using System.Collections.Generic;

public class Observable1<T> : IObservable<T> {
    private Dictionary<int, IObserver<T>> subscribers = new Dictionary<int, IObserver<T>>();
    private int nextSubscriber = 0;
    
    public IDisposable Subscribe(IObserver<T> observer) {
        subscribers.Add(nextSubscriber, observer);
        return new Observable1Disposer(this, nextSubscriber++);
    }

    private void Unsubscribe(int index) {
        subscribers.Remove(index);
    }

    public void Next(T value) {
        foreach (var sub in subscribers.Values) {
            sub.OnNext(value);
        }
    }


    private class Observable1Disposer : IDisposable {
        private Observable1<T> target;
        private int index;
        internal Observable1Disposer(Observable1<T> target, int index) {
            this.target = target;
            this.index = index;
        }

        public void Dispose() {
            target.Unsubscribe(index);
        }
    }
}

Testing

Obviously I have some test code, but it all gets somewhat tedious very quickly as it covers all sorts of edge cases (e.g. check that subscribing and then unsubscribing before any events works—it does). So these are a couple of indicative samples.

First what must be just about the simplest possible case, subscribe, one event and unsubscribe. Check that subscriber was called once:

var calls = 0;
var sub = Observer.Create<int>(i => {
    Console.WriteLine("This is the subscriber, arg: {0}", i);
    ++calls;
});

var source = new Observable1<int>();
using (var disp = source.Subscribe(sub)) {
    source.Next(1);
}

if (calls != 1) {
    Console.WriteLine("Test FAILED");
}

And then this to show that arbitrary interleaving of subscribe and unsubscribe operations will also work:

var calls1 = 0;
var sub1 = Observer.Create<int>(i => {
    Console.WriteLine("This is subscriber #1, arg: {0}", i);
    ++calls1;
});
var calls2 = 0;
var sub2 = Observer.Create<int>(i => {
    Console.WriteLine("This is subscriber #2, arg: {0}", i);
    ++calls2;
});

var source = new Observable1<int>();
IDisposable disp1 = null, disp2 = null;
try {
    disp1 = source.Subscribe(sub1);
    disp2 = source.Subscribe(sub2);
    disp2.Dispose();
    source.Next(1); // sub1 gets this
    disp2 = source.Subscribe(sub2);
    source.Next(2); // sub1 and sub2 get this
    disp1.Dispose();
    source.Next(3); // sub2 gets this
    disp1 = source.Subscribe(sub1);
    source.Next(4); // Both get this
} finally {
    disp1.Dispose();
    disp2.Dispose();
}

if (calls1 != 3 || calls2 != 3) {
    Console.WriteLine("Test FAILED");
}

In this second approach, I ended up adding the comments to the Next() calls to help me work out the counts. Then running and checking the console ensured all the events went to at least one subscriber. Clearly any real (production) code like this should be condemned with even the most limited of code reviews, but tests need to cover the bases.

Both tests also show just how useful lambdas are when working with Rx, the ability to create test subscribers that are both inline with the test, and close over locals makes state handling much easier.

How this first IObservable<T> Is Inadequate

Quick answer: in almost every way. Longer answer: while events are forwarded, and subscribers are managed the integrity of the system is only preserved by the subscribers playing by a very limited execution model. In particular no subscriber can unsubscribe in direct response to an event, because the iteration over the set of subscribers will fail if the Subscribe or Unsubscribe are called while in that iteration.

Next Steps

Deal with the lack of re-entrancy, so a subscriber can be added or removed while events are being sent to the subscribers. Also use a test framework to make testing a little more integrated.

Requirements for IObservable<T>

Posted in .NET Futures, Rx on January 10th, 2010 by Richard – 2 Comments

Introduction

The recently released preview of the Reactive Extensions (Rx) from Microsoft has had a lot of coverage from Channel 9. It includes support for compositional handling of observable event streams from:

  • WinForms events,
  • WPF events,
  • the asynchronous pattern (BeginX and EndX method pairs), and
  • an IEnumerable<T>.

But it does not include anything reusable from which to build your own observable object.

This is unfortunate because it turns out creating an observable (a type implementing IObservable<T> is harder than it might be expected to be. It is certainly much more involved than implementing IEnumerable<T>/​IEnumerator<T> without using yield (as one needed to do to create a fully custom collection in .NET 1.0 and 1.1).

To start with one needs the requirements. There does not appear to be any explicit listing (yet), but from one source or another there is the information one needs. I've collected a few below.

Requirements for an IObservable<T> Implementation

In writing an implementation of IObservable<T> one should follow the documented semantics (the syntax is, of course, enforced by the compiler and runtime). And therefore be consistent with existing implementation, especially with the implementations contained within the Reactive Framework.

The documentation for IObservable<T> is (currently, in preview release state) quite brief, but the sources listed below, and a little intuition can get a long way.

  1. There will be zero or more subscribers.

    This does mean handling zero subscribers (all operations are no-ops) as well as handling many subscribers. This drives much of the implementation:

    • Need a collection of subscribers.
    • Need a mechanism for the object returned by the Subscribe(observer) to the subscriber to identify which subscriber is unsubscribing when Dispose() is called on that retuned object. This need to link back to a specific subscription means this disposable object cannot be trivial.
    • Every event from the observable will require iterating through the list of subscribers. This needs to be both thread safe and re-entrant. In practice this means the observable needs to take a snapshot of the subscribers and iterate through that for each event (this makes use of the assumed asynchronous unsubscribe noted below).
  2. A subscriber, which remains subscribed, will see zero or more calls to it's OnNext method maybe followed by OnError or OnComplete.

    In pseudo-regular-expression this could be: OnNext*(OnError|OnComplete)?.

    All the following patterns are acceptable

    • nothing, no events at all,
    • OnNext one or more times (no OnComplete or OnError),
    • Just OnComplete.
  3. Exceptions thrown by subscribers caused non-specified effects. It could do anything from nothing (i.e. observable absorbs them with a no-op catch block) to killing the whole process.

    This is more significant for subscribers: they should avoid throwing.

  4. Subscribers can be added or removed (via the IDisposable implementing object returned from the observable's Subscribe method).

  5. There is no restriction on the subscriber being notified, or any other piece of code, from calling into the observable. A subscriber (or other object called from the subscriber) or an entirely separate type on a different thread, can (directly or indirectly) call observable methods.

    For example it is quite acceptable for a subscriber to unsubscribe itself while handling a notification. For example:

          var input = new[] { 1, 2, 3, 4, 5 };
          IDisposable d = null;
          var sub = Observer.Create(x => {
              Console.WriteLine("Received {0}", x);
              d.Dispose();
          });
          d = input.Subscribe(sub);

    In this code, one line is printed to the console.

    Strictly speaking the code above is incorrect. An observable could start sending events to the subscriber before Subscribe returns, thus d may not yet have a value when the OnNext is called. In this case that will lead to a NullReferenceException. This can be fixed by unsubscribing on the first event in which d has been set.

There are a couple of things the observable is not responsible for:

  • The observable does not need to ensure Subscribe returns before events are seen (as noted below the simple one-shot subscriber above).

  • The observable does not need to complete an unsubscribe immediately. Rather the subscriber must handle receiving more events even after unsubscribing (this is true of .NET events as will always be an inherent race-condition when removing a listener).

Sources of Requirements

  1. "Reactive Extensions API in depth: Contract" is directly from the team.

    It includes the following non-obvious points:

    • Subscribe(subscriber) is asynchronous: the subscriber could start receiving events before the IDisposable with which to unsubscribe is returned.
    • Dispose() of the object returned by Subscribe(subscriber) is also asynchronous: you may receive events from the observable after you have disposed the object.
    • IObservable<T>.Subscribe(subscriber) must not throw. (I think this should be should not throw—after all out of memory and similar cannot be effectively be prevented in all cases.
    • Exceptions from subscribers will not, in general, be caught (and fall through to the context of the observable).
  2. IObservable<T> documentation on MSDN

  3. IObserver<T> documentation on MSDN

  4. Blog posting An RSS Dashboard in F#, part two (IObservables) by Brian McNamara (2009-12-20).

    In F#, but the requirements for re-entrancy is clear.

  5. This question on Stack Overflow also has some useful information.

And next: on to an actual implementation.

Winter Photographs

Posted in Photos on January 9th, 2010 by Richard – Comments Off on Winter Photographs

Uploaded a few photos of the recent “once in a lifetime” snow here.

(And I do know that for many places this would be just a thin covering that is barely noticed. I live in the south of England, a place where 2cm of snow is above average; this was close to 20cm.)