.NET Futures

Creating a Reusable “ObservableSource” for the Reactive Extensions (part 6.1)

Posted in .NET Futures, Rx on March 18th, 2010 by Richard – Comments Off on Creating a Reusable “ObservableSource” for the Reactive Extensions (part 6.1)

Preparing For Implementing Concurrency

Introduction

The next step in this series on creating my own IObservable<T> implementation is concurrency. (See below for links to previous parts, Part 5 has a full code listing.)

This is the hard part. Too easy to think it is OK when it isn’t, and too hard to validate that is really thread safe.

The only option is careful analysis, and then double (triple, …) check everything.

After all even the best set of unit tests will fail to find problems if it takes a particular combination of OS version, hardware and other processes running to show the problem. With race conditions, deadlocks and all the other concurrency fun such is not just a possibility but likely.

But first, the new 1.0.2350.0 (2010-03-15) build of Rx makes no change to my code, all tests pass without any changes. And this is now running on my main system (Rx has go-live licence, so why not) with its multiple codes (previously was working on a VM with a single virtual CPU).

What Needs Protection?

At the heart of my implementation there is some shared data. This needs protecting where multiple threads could operate concurrently. E.g. while events are pushed to subscribers using a copy of the list of subscribers (this allows re-entrant subscription and un-subscription) taking a copy of the values of a Dictionary<K,V> certainly is not safe.

Some operations are inherently safe (defined by the platforms object model, see Joe Duffy’s blog for more details). In this case reading or writing an aligned, volatile int or reference is safe (such reads and writes are inherently atomic). And only a single field needs to be checked, written to, or both compared & exchanged (via Interlocked.CompareExchange):

  • Checking if the endOfEvents field (reference to a Notification<T>) is null.

  • The boolean field pushingEvents used only in PushEvents (the method that loops through a snapshot of the current subscribers) to avoid re-entrant or multi-threaded pushing of events.

    When it is read, and true in a simple check there is no problem (just return from PushEvents. But if it is false it then needs to be set. So two operations are needed: a read and a write. Interlocked.CompareExchange will take care of this without a lock. (Before returning from pushing events it is set back to false, but a single write is also OK.)

    As there is no overload of Interlocked.CompareExchange for bool the field will need to be an int with values 0 and 1.

  • To avoid races in Next, Completed and Error between checking for endOfEvents and setting it one can use Interlocked.CompareExchange.

There are also two pieces of functionality that require multiple operations on fields (or objects that are not thread safe). In each case a lock is needed.

  • The collection of subscribers and subscriber id counter. With operations to add or remove a subscriber, and to snapshot a list of all subscribers. This will require a lock, there is no way (I can see) around this.

  • To operate on the queue of pending events can be handled by changing to an instance of ConcurrentQueue<T> with modifications in PushEvents to use TryDequeue rather than a combination of checking the count and then Dequeue.

What Doesn’t Need Protection

This implementation is not about being fair. So if two threads both try and use an instance together, with one calling Next and the other Completed only the Windows thread scheduler gets any say in which wins (if Completed is called first, no subscriber will see the value passed to Next).

Ready to Implement?

Even as I wrote the analysis above (which was already the second pass through what would be needed) I noted that I could avoid a lock for Next, Completed, and Error and use CompareExchange. So I’ve probably still missed something: time for another pass over the code.

And another question is how to test…? Throwing a load of cocurrent tasks from the Parallel Extensions (see here) will be a start, but cannot really cover mixing in Completed and Error because I cannot determine which thread will win and which will not, and thus no way to verify the “right one” won in a unit test assert.

More to work on.

Previously

  • Part 1: The basic implementation of Subscribe, unsubscribe and Next.

  • Part 2: Making Subscribe and unsubscribe re-entrant.

  • Part 3: Why re-entrancy is important.

  • Part 4: Making Next re-entrant.

  • Part 5: Implementing OnCompleted and OnError to complete support for full IObserver<T> interface.

  • Part 5a: Updates for build 1.0.2317.0 of the Reactive Extensions (2010-03-05).

Creating a Reusable “ObservableSource” for the Reactive Extensions (part 5a)

Posted in .NET Futures, Rx on March 9th, 2010 by Richard – Comments Off on Creating a Reusable “ObservableSource” for the Reactive Extensions (part 5a)

Updates for Next Reactive Extensions (Rx) Code Drop

The new code drop came through, 2½ months after the last drop, on 5th March. The release notes are suitably long. With a fair amount of change, much of it under the covers to edge cases.

But a fair number of things have moved assemblies and changed name.

Impact on My Implementation

Given all the change, there was relatively little impact. The only thing that broke was my use of Notification<T>: the Current property is now called Value. And it now has an Exception property, which saves a cast when handling an OnError notification.

My (unpublished) test harness used GroupDisposable to clean up subscriptions, this was renamed CompositeDisposable and moved to the CoreEx assembly.

Both of these renames give slightly better and more consistent names, and the addition of the Exception property is definitely better. So overall the new drop is, for me, an improvement so far: once the code compiled all tests just passed.

The only function that changed was SendNotification so following DRY helped. The new implementation:

private static void SendNotification(IObserver<T> obs, Notification<T> n) {
    switch (n.Kind) {
    case NotificationKind.OnNext:
        Debug.Assert(n.HasValue);
        obs.OnNext(n.Value);
        break;
    case NotificationKind.OnCompleted:
        obs.OnCompleted();
        break;
    case NotificationKind.OnError:
        obs.OnError(n.Exception);
        break;
    }
}

So Where’s The Concurrency Support…?

Coming! :-)
I've been rather head down in learning WPF for something else, and trying to avoid too many distractions while getting my head around the basics of WPF. Concurrency support is perhaps the biggest challenge here and I therefore want to be able to focus on it and get it right. This will mean taking a block of focused time.

Creating a Reusable “ObservableSource” for the Reactive Extensions (part 5)

Posted in .NET Futures, Rx on February 24th, 2010 by Richard – Comments Off on Creating a Reusable “ObservableSource” for the Reactive Extensions (part 5)

Adding Completed and Error Methods

Introduction

So far in this series (links below) the focus has been on implementing Next and Subscribe in a way that is re-entrant. But that leaves two methods uncovered. Both Completed and Error end the sequence, but in different ways. Error passes an exception indicating the source has failed, whereas Completed just indicates it has ended.

Based on Subject<T> when a sequence of events is ended:

  • Existing subscribers received a call to their OnError or OnCompleted (as applicable).

  • Further calls to any of Subject<T>'s IObserver<T> methods is a no-op.

  • Any new subscribers immediately get a call to their OnError or OnCompleted (as applicable) and nothing more.

Previously

  • Part 1: The basic implementation of Subscribe, unsubscribe and Next.

  • Part 2: Making Subscribe and unsubscribe re-entrant.

  • Part 3: Why re-entrancy is important.

  • Part 4: Making Next re-entrant.

Notification<T> As A Way to Store An Observable Sequence

The reactive Framework (Rx) distributable comes with three assemblies:

  • System.CoreEx

  • System.Reactive

  • System.Interactive

System.CoreEx contains types that would have been in System or System.Core had .NET been designed with Rx in it from the start. System.Interactive extends LINQ to Objects by back-porting many of IObservable<T>'s extension methods (or ’operators”) to IEnumerable<T>. System.Reactive implements LINQ like operators for IObservable<T>.

My observable implementation almost doesn’t need any of these three assemblies (IObservable<T> and IObserver<T> being defined in the mscorlib assembly).

However one rather useful group of types defined in the System.CoreEx assembly in namespace System.Collections.Generic are Notification<T> and its subtypes. These types allow the “value” of an observable event to be stored. Using one of the three subtypes (one each for OnNext, OnCompleted and OnError) defined as members of Notification<T> (hence created as, for example, new Notification<int>.OnNext(1)). Therefore a collection of events covering all three IObserver<T> methods can be easily created without some custom helper types.

These types are provided for the Materialize and Dematerialise enumerable and observable operators (the former being one the back-ported operators in System.Interactive).

Overview of The Implementation of Completed and OnError

Both follow the same pattern as the existing Next implementation:

  1. If ended, return

  2. Add the new event to a queue of pending events.

  3. If not already pushing an event, process the queue.

The first of these steps is new, and handles the requirement that either Completed or OnError ends the sequence.

To make handling new subscriptions after the end of the sequence the endOfSequnec field is a Notification<T> reference. If null the sequence has not ended, otherwise this is the event to immediately push. Also a helper from System.Reactive is used as the return value: System.Disposables.Disposable.Empty which is an instance of a type which implements IDisposable as a no-op and these is no need to track this after the end subscriber (it will never be called again).

The New Implementation

The single biggest change to support storing notifications (rather than values of type T>) is in PushEvents where different kinds of notifications need to call different IObserver<T> methods, so a switch is needed. This is in its own method (SendNotification) because it can then be reused to send the ending event to post-end subscribers.

The implementations of Next, Completed and Error end up being very similar and simple.

Overall, compared to the code from part 4, the implementation is a somewhat longer, but certainly better factored with each method doing one thing. The most complex (PushEvents) does have to loop over both pending events and subscribers, but that is all it does.

The Code

using System;
using System.Collections.Generic;
using System.Linq;

public class Observable4<T> : IObservable<T> {
    private Dictionary<int, IObserver<T>> subscribers = new Dictionary<int, IObserver<T>>();
    private int nextSubscriber = 0;
    private Queue<Notification<T>> pendingEvents = new Queue<Notification<T>>();
    private bool pushingEvent = false;  // If true, only queue the event.
    private Notification<T> endOfEvents;   // Not null => have had ended, and this is the Completed or Error that ended it.

    public IDisposable Subscribe(IObserver<T> observer) {
        if (endOfEvents != null) {
            // Already have end of sequence, so just tell this new subscriber
            // and return no-op.
            SendNotification(observer, endOfEvents);
            return System.Disposables.Disposable.Empty;
        } else {
            subscribers.Add(nextSubscriber, observer);
            return new Observable4Disposer(this, nextSubscriber++);
        }
    }

    private void Unsubscribe(int index) {
        subscribers.Remove(index);
    }

    public void Next(T value) {
        if (endOfEvents != null) { return; }
        pendingEvents.Enqueue(new Notification<T>.OnNext(value));
        PushEvents();
    }

    public void Completed() {
        if (endOfEvents != null) { return; }
        endOfEvents = new Notification<T>.OnCompleted();
        pendingEvents.Enqueue(endOfEvents);
        PushEvents();
    }

    public void Error(Exception exn) {
        if (endOfEvents != null) { return; }
        endOfEvents = new Notification<T>.OnError(exn);
        pendingEvents.Enqueue(endOfEvents);
        PushEvents();
    }

    private void PushEvents() {
        if (!pushingEvent) {
            try {
                pushingEvent = true;
                while (pendingEvents.Count > 0) {
                    // To allow subscribers to unsubscribe while calling them, don't
                    // directly iterate over the collection, but use a copy.
                    // Take this each time, in case some event triggers a change
                    var subs = subscribers.Values.ToArray();
                    var n = pendingEvents.Dequeue();
                    foreach (var s in subs) {
                        SendNotification(s, n);
                    }
                }
            } finally {
                pushingEvent = false;
            }
        }
    }

    private static void SendNotification(IObserver<T> obs, Notification<T> n) {
        switch (n.Kind) {
        case NotificationKind.OnNext:
            obs.OnNext(n.Current);
            break;
        case NotificationKind.OnCompleted:
            obs.OnCompleted();
            break;
        case NotificationKind.OnError:
            obs.OnError(((Notification<T>.OnError)(n)).Exception);
            break;
        }
    }


    private class Observable4Disposer : IDisposable {
        private Observable4<T> target;
        private int index;
        internal Observable4Disposer(Observable4<T> target, int index) {
            this.target = target;
            this.index = index;
        }

        public void Dispose() {
            target.Unsubscribe(index);
        }
    }
}

Further Steps

The toughest part to get right is still left: concurrency. This will be difficult as much to test as to implement, tests using workers are never simple and just being confident that they are doing things concurrently is hard (after all a simple Interlocked.Increment might be slow enough to effectively eliminate any concurrency due to forcing CPU cache synchronisation even with multiple idle cores).

Ractive Framework’s Observer.Create Doesn’t Create a Pure Wrapper

Posted in .NET Futures, Rx on February 22nd, 2010 by Richard – Comments Off on Ractive Framework’s Observer.Create Doesn’t Create a Pure Wrapper

What Happens When One’s Test Doesn’t Fail When It Should, or, Why Observer.Create<T> Isn’t Good For Testing Observables

Introduction

In building my own implementation of IObservable<T> across a series of posts (Part 1, Part 2, Part 3 and Part 4) I’ve been making heavy use of Observer.Create<T> to create helper observer instances to ensure that the right methods of the subscribed observers are called, the right number of times (and even—although somewhat harder to track in all but the simplest cases—the right order).

The Discovery

In working towards Part 5 which will cover implementing the two, so far missing, methods of IObserver<T>: OnCompleted and OnError I was rather surprise when this test passed:

[TestMethod]
public void Observable4_CallingCompletedAgainIsNoOp() {
    bool hasCompleted = false;
    var sub = Obs.Create<int>(
        i => { Assert.Fail("OnNext should not have been called"); },
        exn => { Assert.Fail("OnError should not have been called"); },
        () => {
            Assert.IsFalse(hasCompleted, "OnCompleted for single subscriber called multiple times");
            hasCompleted = true;
        });

    var source = new Observable4<int>();
    using (var unsub = source.Subscribe(sub)) {
        source.Completed();
        source.Completed();
    }
    Assert.IsTrue(hasCompleted);
}

When I knew that Observable4<int>.Completed had no logic to prevent IObserver<T>.OnComplete being called multiple times. Under the debugger it was quite clear that the third lambda was not being called (briefly I considered a debugger bug, but decided to check more fully first).

I looked at the implementation of Observer.Create<T> in Reflector. That factory method creates an instance of an internal type, which derives from another internal type AbstractObserver<T>. So far largely as expected.

But in looking at the implementation of the IObserver<T> methods I saw:

public void OnCompleted() {
    if (!this.IsStopped) {
        this.IsStopped = true;
        this.Completed();
    }
}

I.e. it keeps track of a call to OnCompleted and blocks further events. All three IObserver<T> methods check for IsStopped, and it is also set in OnError.

The Implication

If I want an observer that I can use to test an observable for compliance to the IObserver<T> semantic contract, I cannot use Observer.Create<T>. It will hide breaking the “OnCompleted or OnError end the observable’s event sequence”.

The Solution

In the end the fix is rather easy: make my own Observer.Create<T> which doesn’t contain any such logic. This is rather simple:

public static class Obs {
    public static AnonObserver<T> Create<T>(Action<T> onNext) {
        return new AnonObserver<T>(onNext);
    }
    public static AnonObserver<T> Create<T>(Action<T> onNext, Action<Exception> onError, Action onComplete) {
        return new AnonObserver<T>(onNext, onError, onComplete);
    }
}

public class AnonObserver<T> : IObserver<T> {
    private Action<T> onNext;
    private Action onComplete;
    private Action<Exception> onError;

    public AnonObserver(Action<T> onNext) {
        if (onNext == null) { throw new ArgumentNullException("onNext"); }
        this.onNext = onNext;
    }
    public AnonObserver(Action<T> onNext, Action<Exception> onError, Action onComplete) {
        if (onNext == null) { throw new ArgumentNullException("onNext"); }
        if (onError == null) { throw new ArgumentNullException("onError"); }
        if (onComplete == null) { throw new ArgumentNullException("onComplete"); }
        this.onNext = onNext;
        this.onError = onError;
        this.onComplete = onComplete;
    }

    public void OnCompleted() {
        if (onComplete != null) {
            onComplete();
        }
    }

    public void OnError(Exception error) {
        if (onError != null) {
            onError(error);
        }
    }

    public void OnNext(T value) {
        onNext(value);
    }
}

I now need to just replace all the use of Observer.Create<T> with Obs.Create<T> (and try and think of a better name).

Creating a Reusable “ObservableSource” for the Reactive Extensions (part 4)

Posted in .NET Futures, Rx on February 17th, 2010 by Richard – Comments Off on Creating a Reusable “ObservableSource” for the Reactive Extensions (part 4)

Supporting Re-entrant Event Publication: Implementation

Introduction

In Part 3 I looked at the effect of calling Subject<T>.OnNext from within a subscriber to that Subject<T>. I really do not think that the out of order events that creates are going to create anything other than confusion.

In my implementation, continuing on from Part 1 and Part 2, I want to avoid the implicit re-ordering that happens when Next(value) is called from within a subscriber.

The Re-Entrant Implementation

In the basic implementation a snapshot of the subscribers is taken (to avoid problems with the collection being modified by re-entrant subscribe and unsubscribe operations), then iterated over:

var subs = subscribers.Values.ToArray();
foreach (var sub in subs) {
   sub.OnNext(value);
}

In the case of a re-entrant Next call this loop will simply exist on the stack twice:

Observable2.Next()
ATestObserver.OnNext()
Observable2.Next()

With the second call iterating through the list of subscribers before the first moves on to the next.

The solution here is to recognise that an iteration is taking place already when Next is called and, rather than just looping to call all subscribers, buffer up the new value. The original call keeps looping until all buffered values are sent out. (Once, later in this series, concurrency is introduced locking is going to be needed here.)

First two new fields are needed. A Queue to act as buffer, and a flag to indicate events are currently being pushed to the subscribers:

private Queue<T> pendingEvents = new Queue<T>();
private bool pushingEvent = false;

Then reworking the Next implementation:

pendingEvents.Enqueue(value);
if (!pushingEvent) {
    try {
        pushingEvent = true;
        while (pendingEvents.Count > 0) {
            var subs = subscribers.Values.ToArray();
            var v = pendingEvents.Dequeue();
            foreach (var s in subs) {
                s.OnNext(v);
            }
        }
    } finally {
        pushingEvent = false;
    }
}

A few points worth noting:

  • By always putting the new event value in the queue there is no special cases needed for the first event

  • The try…finally ensures the pushingEvent gets reset, but this could leave un-pushed events. This is a consequence of the RX exception model (propogate back to the called), but with the added twist that a re-entrant call to this will not get the exception, the original caller of Next will.

  • I have chosen to re-snapshot the subscribers for each event. This will avoid a subscriber that unsubscribes in response to one event getting more, even if already queued up.

Next Steps

This is still missing Exception and Complete methods (to call IObserver<T>.Error() and .OnCompleted.

…and concurrency support of course.

Creating a Reusable “ObservableSource” for the Reactive Extensions (part 3)

Posted in .NET Futures, Rx on February 4th, 2010 by Richard – Comments Off on Creating a Reusable “ObservableSource” for the Reactive Extensions (part 3)

Supporting Re-entrant Event Publication: Why It is Significant

Introduction

Continuing from Part 1 and Part 2 where the first steps to a reusable IObservable<T> were taken.

But this is something of an aside. Because in starting to look at the reasons to explicitly support this, I found that the RX type Subject<T> (in assembly "System.Reactive", namespace System.Collections.Generic) has exactly the same behaviour as Observable2<T> from Part 2.

Personally I feel that this behaviour will lead to inconsistent and unexpected ordering of events as seen by subscribers, depending on the order in which the observers subscribed and the details of the internals of the observable (what kind of collection is used to store the list of subscribers). This opens up the potential of races to subscribe with different timing leading to seeing out of order events.

One final note here, where I started drafting my implementation of IObservable<T> I was not aware of Subject<T>. When I became aware of this type, I almost decided to give up on my own implementation. With this discovery I am not rather glad I didn't.

The Test Code

var obs = new Subject<int>();
var subCalls = 0;
var sub1Calls = 0;

var sub0 = Observer.Create<int>(i => {
    ++subCalls;
    Console.WriteLine("Sub 0 ({2}): sub1 called {0} times (total subscriber calls {1})", sub1Calls, subCalls, i);
});
var sub1 = Observer.Create<int>(i => {
    ++subCalls;
    ++sub1Calls;
    Console.WriteLine("Sub 1 ({2}): sub1 called {0} times (total subscriber calls: {1})", sub1Calls, subCalls, i);
    if (i > 0 && i < 3) {
	obs.OnNext(-1 * i);
    }
});
var sub2 = Observer.Create<int>(i => {
    ++subCalls;
    Console.WriteLine("Sub 2 ({2}): sub1 called {0} times (total subscriber calls: {1})", sub1Calls, subCalls, i);
});

using (var d0 = obs.Subscribe(sub0))
using (var d1 = obs.Subscribe(sub1))
using (var d2 = obs.Subscribe(sub2)) {
    obs.OnNext(1);
}

Console.WriteLine("END: sub1 called {0} times, all subscribers {1} times", sub1Calls, subCalls);

In this the three subscribers (imaginatively named sub0, sub1 and sub2) all increment a count of subscriber calls, and all print the same details to the console: which subscriber, the event value, the total number of subscriber calls and the number of calls to the second subscriber (sub1).

Additionally the second subscriber will raise an event itself, recursively publishing an event if the current value is 1≤i≤2 with value -1×i. Thus there is no more than a single level of recursion.

The Results

Sub 0 (1): sub1 called 0 times (total subscriber calls 1)
Sub 1 (1): sub1 called 1 times (total subscriber calls: 2)
Sub 0 (-1): sub1 called 1 times (total subscriber calls 3)
Sub 1 (-1): sub1 called 2 times (total subscriber calls: 4)
Sub 2 (-1): sub1 called 2 times (total subscriber calls: 5)
Sub 2 (1): sub1 called 2 times (total subscriber calls: 6)
END: sub1 called 2 times, all subscribers 6 times

As expected there are six calls to subscribers, with each subscriber seeing two events. But there is an oddity in the order in which the subscribers see events.

Not all subscribers receive the first event raised first, sub2 receives event -1 before event 1: in reverse order. But sub0 sees them in order. The only difference between the two subscribers is the order in which they were subscribed relative to sub1.

Hence the comment above about a race-condition, and dependence on the implementation of Subject<T>. If a different container were used the order in which subscribers are called could be changed without any client code changing.

Clearly the ordering between subscribers is subject to the scheduler in use, but a single subscriber seeing events in a different order certainly could be a problem. This is exacerbated by the dependence on when the subscription is performed. A concurrent system could have the relative order of subscription subject to external factors (e.g. how many cores, how slow an assembly is to load, …).

Next

This post is now long enough, so my solution to this will wait for part 4.

Creating a Reusable "ObservableSource" for the Reactive Extensions (part 2)

Posted in .NET Futures, Rx on January 24th, 2010 by Richard – Comments Off on Creating a Reusable "ObservableSource" for the Reactive Extensions (part 2)

Introduction

Continuing on from Observable1<T> implemented in Part 1 with Observable2<T> here.

The aim in this part is to allow re-entrant subscribe and unsubscribe operations to not fail. Using Observable1<T> a subscriber which tries to unsubscribe itself in its OnNext implementation will lead to an InvalidOperationException being thrown from Observable1<T>.OnNext. This is because Subscribe and Unsubscribe both modify the collection of subscribers in Observable1<T> and which OnNext is iterating over. The .NET Framework collections do not support this.

Supporting Re-entrant Operations

To allow Subscribe and Unsubscribe to work is really very simple: just take a copy of the set of subscribers before iterating over it. Replacing the implementation of Next from Observable1<T>:

public void Next(T value) {
    foreach (var sub in subscribers.Values) {
        sub.OnNext(value);
    }
}

with the following in Observable2<T> (full implementation is given below):

public void Next(T value) {
    var subs = subscribers.Values.ToArray();
    foreach (var sub in subs) {
        sub.OnNext(value);
    }
}

And now code like the following doesn’t fail:

bool hasUnsubscribed = false;
IDisposable unsub = null;
var sub = Observer.Create<int>(i => {
    Console.WriteLine("This is the subscriber, arg: {0}", i);
    if (unsub != null) {
        unsub.Dispose();
        unsub = null;
        hasUnsubscribed = true;
    }
});

var source = new Observable2<int>();
unsub = source.Subscribe(sub);
source.Next(1);
Assert.IsTrue(hasUnsubscribed);
Assert.IsNull(unsub);

Note this test code wouldn’t pass with an arbitrary observable, where the requirement to allow asynchronous subscribe could lead to unsub not yet being set by the time the subscriber code is called (the null check will at least avoid the crash).

While this seems an odd thing to do, consider a subscriber which just wants to react to one event (or the implementation of the RX operator First).

Progress Against the Overall Requirements

See the full list of requirements.

  • #1: Manage subscribers: done. Can subscribe zero or more observers, and freely add or remove them.

  • #2: OnNext, OnComplete and OnError: OnNext is done (will be called zero or more times). But neither completion or errors are supported, this also means that the source cannot indicate the end of events, every even sequence is currently infinite.

  • #3 Exceptions from subscribers will fall back to the event producer: this will happen, but will also mean no other subscribers will see the event.

    Based on this forum thread, about Subject<T>, the right approach here in Rx is not completely agreed.

  • #4: Done: can freely add and remove subscribers.

  • #5: Re-entrancy: this, part 2, makes a start. But a subscriber calling Next is likely to lead to some unexpected sequencing of calls.

The Observable2<T> Implementation

using System;
using System.Collections.Generic;
using System.Linq;

public class Observable2<T> : IObservable<T> {
    private Dictionary<int, IObserver<T>> subscribers = new Dictionary<int, IObserver<T>>();
    private int nextSubscriber = 0;

    public IDisposable Subscribe(IObserver<T> observer) {
        subscribers.Add(nextSubscriber, observer);
        return new Observable1Disposer(this, nextSubscriber++);
    }

    private void Unsubscribe(int index) {
        subscribers.Remove(index);
    }

    public void Next(T value) {
        // To allow subscribers to unsubscribe while calling them, don't
        // directly iterate over the collection, but use a copy.
        var subs = subscribers.Values.ToArray();
        foreach (var sub in subs) {
            sub.OnNext(value);
        }
    }


    private class Observable1Disposer : IDisposable {
        private Observable2<T> target;
        private int index;
        internal Observable1Disposer(Observable2<T> target, int index) {
            this.target = target;
            this.index = index;
        }

        public void Dispose() {
            target.Unsubscribe(index);
        }
    }
}

Creating a Reusable "ObservableSource" for the Reactive Extensions (Part 1)

Posted in .NET Futures, Rx on January 22nd, 2010 by Richard – Comments Off on Creating a Reusable "ObservableSource" for the Reactive Extensions (Part 1)

Introduction

As seen in the requirements for an implementation of IObservable<T> there is a lot of ground to cover. So the idea is to start with a massively simplified subset of the requirements to better understand what is will take to implement the full requirements. In other words, a series of partially functional prototypes.

As a first step, just implement:

  • Subscribe, and maintain a list of subscribed observers.
  • A disposable helper type to act as a return from Subscribe to unsubscribe.
  • Next to call the OnNext of each subscriber

This implementation does not:

  • Handle concurrency.
  • Support re-entrancy. Any call by a subscriber back into the Observable is, at best, going to fail.
  • Make use of the rest of the IObserver<T> interface. No errors and no end to the events.
  • Calling the subscribers asynchronously, in a synchronisation context of their (or the Reactive Framework's choice). (The System.Linq.Observable.ObserveOn<T>(IObservable<T> source, SynchronizationContext sync) method of RX might be able to be used to work around this.)

Implementation

using System;
using System.Collections.Generic;

public class Observable1<T> : IObservable<T> {
    private Dictionary<int, IObserver<T>> subscribers = new Dictionary<int, IObserver<T>>();
    private int nextSubscriber = 0;
    
    public IDisposable Subscribe(IObserver<T> observer) {
        subscribers.Add(nextSubscriber, observer);
        return new Observable1Disposer(this, nextSubscriber++);
    }

    private void Unsubscribe(int index) {
        subscribers.Remove(index);
    }

    public void Next(T value) {
        foreach (var sub in subscribers.Values) {
            sub.OnNext(value);
        }
    }


    private class Observable1Disposer : IDisposable {
        private Observable1<T> target;
        private int index;
        internal Observable1Disposer(Observable1<T> target, int index) {
            this.target = target;
            this.index = index;
        }

        public void Dispose() {
            target.Unsubscribe(index);
        }
    }
}

Testing

Obviously I have some test code, but it all gets somewhat tedious very quickly as it covers all sorts of edge cases (e.g. check that subscribing and then unsubscribing before any events works—it does). So these are a couple of indicative samples.

First what must be just about the simplest possible case, subscribe, one event and unsubscribe. Check that subscriber was called once:

var calls = 0;
var sub = Observer.Create<int>(i => {
    Console.WriteLine("This is the subscriber, arg: {0}", i);
    ++calls;
});

var source = new Observable1<int>();
using (var disp = source.Subscribe(sub)) {
    source.Next(1);
}

if (calls != 1) {
    Console.WriteLine("Test FAILED");
}

And then this to show that arbitrary interleaving of subscribe and unsubscribe operations will also work:

var calls1 = 0;
var sub1 = Observer.Create<int>(i => {
    Console.WriteLine("This is subscriber #1, arg: {0}", i);
    ++calls1;
});
var calls2 = 0;
var sub2 = Observer.Create<int>(i => {
    Console.WriteLine("This is subscriber #2, arg: {0}", i);
    ++calls2;
});

var source = new Observable1<int>();
IDisposable disp1 = null, disp2 = null;
try {
    disp1 = source.Subscribe(sub1);
    disp2 = source.Subscribe(sub2);
    disp2.Dispose();
    source.Next(1); // sub1 gets this
    disp2 = source.Subscribe(sub2);
    source.Next(2); // sub1 and sub2 get this
    disp1.Dispose();
    source.Next(3); // sub2 gets this
    disp1 = source.Subscribe(sub1);
    source.Next(4); // Both get this
} finally {
    disp1.Dispose();
    disp2.Dispose();
}

if (calls1 != 3 || calls2 != 3) {
    Console.WriteLine("Test FAILED");
}

In this second approach, I ended up adding the comments to the Next() calls to help me work out the counts. Then running and checking the console ensured all the events went to at least one subscriber. Clearly any real (production) code like this should be condemned with even the most limited of code reviews, but tests need to cover the bases.

Both tests also show just how useful lambdas are when working with Rx, the ability to create test subscribers that are both inline with the test, and close over locals makes state handling much easier.

How this first IObservable<T> Is Inadequate

Quick answer: in almost every way. Longer answer: while events are forwarded, and subscribers are managed the integrity of the system is only preserved by the subscribers playing by a very limited execution model. In particular no subscriber can unsubscribe in direct response to an event, because the iteration over the set of subscribers will fail if the Subscribe or Unsubscribe are called while in that iteration.

Next Steps

Deal with the lack of re-entrancy, so a subscriber can be added or removed while events are being sent to the subscribers. Also use a test framework to make testing a little more integrated.

Requirements for IObservable<T>

Posted in .NET Futures, Rx on January 10th, 2010 by Richard – 2 Comments

Introduction

The recently released preview of the Reactive Extensions (Rx) from Microsoft has had a lot of coverage from Channel 9. It includes support for compositional handling of observable event streams from:

  • WinForms events,
  • WPF events,
  • the asynchronous pattern (BeginX and EndX method pairs), and
  • an IEnumerable<T>.

But it does not include anything reusable from which to build your own observable object.

This is unfortunate because it turns out creating an observable (a type implementing IObservable<T> is harder than it might be expected to be. It is certainly much more involved than implementing IEnumerable<T>/​IEnumerator<T> without using yield (as one needed to do to create a fully custom collection in .NET 1.0 and 1.1).

To start with one needs the requirements. There does not appear to be any explicit listing (yet), but from one source or another there is the information one needs. I've collected a few below.

Requirements for an IObservable<T> Implementation

In writing an implementation of IObservable<T> one should follow the documented semantics (the syntax is, of course, enforced by the compiler and runtime). And therefore be consistent with existing implementation, especially with the implementations contained within the Reactive Framework.

The documentation for IObservable<T> is (currently, in preview release state) quite brief, but the sources listed below, and a little intuition can get a long way.

  1. There will be zero or more subscribers.

    This does mean handling zero subscribers (all operations are no-ops) as well as handling many subscribers. This drives much of the implementation:

    • Need a collection of subscribers.
    • Need a mechanism for the object returned by the Subscribe(observer) to the subscriber to identify which subscriber is unsubscribing when Dispose() is called on that retuned object. This need to link back to a specific subscription means this disposable object cannot be trivial.
    • Every event from the observable will require iterating through the list of subscribers. This needs to be both thread safe and re-entrant. In practice this means the observable needs to take a snapshot of the subscribers and iterate through that for each event (this makes use of the assumed asynchronous unsubscribe noted below).
  2. A subscriber, which remains subscribed, will see zero or more calls to it's OnNext method maybe followed by OnError or OnComplete.

    In pseudo-regular-expression this could be: OnNext*(OnError|OnComplete)?.

    All the following patterns are acceptable

    • nothing, no events at all,
    • OnNext one or more times (no OnComplete or OnError),
    • Just OnComplete.
  3. Exceptions thrown by subscribers caused non-specified effects. It could do anything from nothing (i.e. observable absorbs them with a no-op catch block) to killing the whole process.

    This is more significant for subscribers: they should avoid throwing.

  4. Subscribers can be added or removed (via the IDisposable implementing object returned from the observable's Subscribe method).

  5. There is no restriction on the subscriber being notified, or any other piece of code, from calling into the observable. A subscriber (or other object called from the subscriber) or an entirely separate type on a different thread, can (directly or indirectly) call observable methods.

    For example it is quite acceptable for a subscriber to unsubscribe itself while handling a notification. For example:

          var input = new[] { 1, 2, 3, 4, 5 };
          IDisposable d = null;
          var sub = Observer.Create(x => {
              Console.WriteLine("Received {0}", x);
              d.Dispose();
          });
          d = input.Subscribe(sub);

    In this code, one line is printed to the console.

    Strictly speaking the code above is incorrect. An observable could start sending events to the subscriber before Subscribe returns, thus d may not yet have a value when the OnNext is called. In this case that will lead to a NullReferenceException. This can be fixed by unsubscribing on the first event in which d has been set.

There are a couple of things the observable is not responsible for:

  • The observable does not need to ensure Subscribe returns before events are seen (as noted below the simple one-shot subscriber above).

  • The observable does not need to complete an unsubscribe immediately. Rather the subscriber must handle receiving more events even after unsubscribing (this is true of .NET events as will always be an inherent race-condition when removing a listener).

Sources of Requirements

  1. "Reactive Extensions API in depth: Contract" is directly from the team.

    It includes the following non-obvious points:

    • Subscribe(subscriber) is asynchronous: the subscriber could start receiving events before the IDisposable with which to unsubscribe is returned.
    • Dispose() of the object returned by Subscribe(subscriber) is also asynchronous: you may receive events from the observable after you have disposed the object.
    • IObservable<T>.Subscribe(subscriber) must not throw. (I think this should be should not throw—after all out of memory and similar cannot be effectively be prevented in all cases.
    • Exceptions from subscribers will not, in general, be caught (and fall through to the context of the observable).
  2. IObservable<T> documentation on MSDN

  3. IObserver<T> documentation on MSDN

  4. Blog posting An RSS Dashboard in F#, part two (IObservables) by Brian McNamara (2009-12-20).

    In F#, but the requirements for re-entrancy is clear.

  5. This question on Stack Overflow also has some useful information.

And next: on to an actual implementation.