Creating a Reusable “ObservableSource” for the Reactive Extensions (part 3)

Supporting Re-entrant Event Publication: Why It is Significant

Introduction

Continuing from Part 1 and Part 2 where the first steps to a reusable IObservable<T> were taken.

But this is something of an aside. Because in starting to look at the reasons to explicitly support this, I found that the RX type Subject<T> (in assembly "System.Reactive", namespace System.Collections.Generic) has exactly the same behaviour as Observable2<T> from Part 2.

Personally I feel that this behaviour will lead to inconsistent and unexpected ordering of events as seen by subscribers, depending on the order in which the observers subscribed and the details of the internals of the observable (what kind of collection is used to store the list of subscribers). This opens up the potential of races to subscribe with different timing leading to seeing out of order events.

One final note here, where I started drafting my implementation of IObservable<T> I was not aware of Subject<T>. When I became aware of this type, I almost decided to give up on my own implementation. With this discovery I am not rather glad I didn't.

The Test Code

var obs = new Subject<int>();
var subCalls = 0;
var sub1Calls = 0;

var sub0 = Observer.Create<int>(i => {
    ++subCalls;
    Console.WriteLine("Sub 0 ({2}): sub1 called {0} times (total subscriber calls {1})", sub1Calls, subCalls, i);
});
var sub1 = Observer.Create<int>(i => {
    ++subCalls;
    ++sub1Calls;
    Console.WriteLine("Sub 1 ({2}): sub1 called {0} times (total subscriber calls: {1})", sub1Calls, subCalls, i);
    if (i > 0 && i < 3) {
	obs.OnNext(-1 * i);
    }
});
var sub2 = Observer.Create<int>(i => {
    ++subCalls;
    Console.WriteLine("Sub 2 ({2}): sub1 called {0} times (total subscriber calls: {1})", sub1Calls, subCalls, i);
});

using (var d0 = obs.Subscribe(sub0))
using (var d1 = obs.Subscribe(sub1))
using (var d2 = obs.Subscribe(sub2)) {
    obs.OnNext(1);
}

Console.WriteLine("END: sub1 called {0} times, all subscribers {1} times", sub1Calls, subCalls);

In this the three subscribers (imaginatively named sub0, sub1 and sub2) all increment a count of subscriber calls, and all print the same details to the console: which subscriber, the event value, the total number of subscriber calls and the number of calls to the second subscriber (sub1).

Additionally the second subscriber will raise an event itself, recursively publishing an event if the current value is 1≤i≤2 with value -1×i. Thus there is no more than a single level of recursion.

The Results

Sub 0 (1): sub1 called 0 times (total subscriber calls 1)
Sub 1 (1): sub1 called 1 times (total subscriber calls: 2)
Sub 0 (-1): sub1 called 1 times (total subscriber calls 3)
Sub 1 (-1): sub1 called 2 times (total subscriber calls: 4)
Sub 2 (-1): sub1 called 2 times (total subscriber calls: 5)
Sub 2 (1): sub1 called 2 times (total subscriber calls: 6)
END: sub1 called 2 times, all subscribers 6 times

As expected there are six calls to subscribers, with each subscriber seeing two events. But there is an oddity in the order in which the subscribers see events.

Not all subscribers receive the first event raised first, sub2 receives event -1 before event 1: in reverse order. But sub0 sees them in order. The only difference between the two subscribers is the order in which they were subscribed relative to sub1.

Hence the comment above about a race-condition, and dependence on the implementation of Subject<T>. If a different container were used the order in which subscribers are called could be changed without any client code changing.

Clearly the ordering between subscribers is subject to the scheduler in use, but a single subscriber seeing events in a different order certainly could be a problem. This is exacerbated by the dependence on when the subscription is performed. A concurrent system could have the relative order of subscription subject to external factors (e.g. how many cores, how slow an assembly is to load, …).

Next

This post is now long enough, so my solution to this will wait for part 4.

This entry was posted Thursday, February 4th, 2010 17:59. In .NET Futures, Rx.
You can follow any responses to this entry through the RSS 2.0 feed.

Comments are closed.