Creating a Reusable "ObservableSource" for the Reactive Extensions (part 2)

Introduction

Continuing on from Observable1<T> implemented in Part 1 with Observable2<T> here.

The aim in this part is to allow re-entrant subscribe and unsubscribe operations to not fail. Using Observable1<T> a subscriber which tries to unsubscribe itself in its OnNext implementation will lead to an InvalidOperationException being thrown from Observable1<T>.OnNext. This is because Subscribe and Unsubscribe both modify the collection of subscribers in Observable1<T> and which OnNext is iterating over. The .NET Framework collections do not support this.

Supporting Re-entrant Operations

To allow Subscribe and Unsubscribe to work is really very simple: just take a copy of the set of subscribers before iterating over it. Replacing the implementation of Next from Observable1<T>:

public void Next(T value) {
    foreach (var sub in subscribers.Values) {
        sub.OnNext(value);
    }
}

with the following in Observable2<T> (full implementation is given below):

public void Next(T value) {
    var subs = subscribers.Values.ToArray();
    foreach (var sub in subs) {
        sub.OnNext(value);
    }
}

And now code like the following doesn’t fail:

bool hasUnsubscribed = false;
IDisposable unsub = null;
var sub = Observer.Create<int>(i => {
    Console.WriteLine("This is the subscriber, arg: {0}", i);
    if (unsub != null) {
        unsub.Dispose();
        unsub = null;
        hasUnsubscribed = true;
    }
});

var source = new Observable2<int>();
unsub = source.Subscribe(sub);
source.Next(1);
Assert.IsTrue(hasUnsubscribed);
Assert.IsNull(unsub);

Note this test code wouldn’t pass with an arbitrary observable, where the requirement to allow asynchronous subscribe could lead to unsub not yet being set by the time the subscriber code is called (the null check will at least avoid the crash).

While this seems an odd thing to do, consider a subscriber which just wants to react to one event (or the implementation of the RX operator First).

Progress Against the Overall Requirements

See the full list of requirements.

  • #1: Manage subscribers: done. Can subscribe zero or more observers, and freely add or remove them.

  • #2: OnNext, OnComplete and OnError: OnNext is done (will be called zero or more times). But neither completion or errors are supported, this also means that the source cannot indicate the end of events, every even sequence is currently infinite.

  • #3 Exceptions from subscribers will fall back to the event producer: this will happen, but will also mean no other subscribers will see the event.

    Based on this forum thread, about Subject<T>, the right approach here in Rx is not completely agreed.

  • #4: Done: can freely add and remove subscribers.

  • #5: Re-entrancy: this, part 2, makes a start. But a subscriber calling Next is likely to lead to some unexpected sequencing of calls.

The Observable2<T> Implementation

using System;
using System.Collections.Generic;
using System.Linq;

public class Observable2<T> : IObservable<T> {
    private Dictionary<int, IObserver<T>> subscribers = new Dictionary<int, IObserver<T>>();
    private int nextSubscriber = 0;

    public IDisposable Subscribe(IObserver<T> observer) {
        subscribers.Add(nextSubscriber, observer);
        return new Observable1Disposer(this, nextSubscriber++);
    }

    private void Unsubscribe(int index) {
        subscribers.Remove(index);
    }

    public void Next(T value) {
        // To allow subscribers to unsubscribe while calling them, don't
        // directly iterate over the collection, but use a copy.
        var subs = subscribers.Values.ToArray();
        foreach (var sub in subs) {
            sub.OnNext(value);
        }
    }


    private class Observable1Disposer : IDisposable {
        private Observable2<T> target;
        private int index;
        internal Observable1Disposer(Observable2<T> target, int index) {
            this.target = target;
            this.index = index;
        }

        public void Dispose() {
            target.Unsubscribe(index);
        }
    }
}
This entry was posted Sunday, January 24th, 2010 17:57. In .NET Futures, Rx.
You can follow any responses to this entry through the RSS 2.0 feed.

Comments are closed.