Archive for February, 2010

Creating a Reusable “ObservableSource” for the Reactive Extensions (part 5)

Posted in .NET Futures, Rx on February 24th, 2010 by Richard – Comments Off on Creating a Reusable “ObservableSource” for the Reactive Extensions (part 5)

Adding Completed and Error Methods

Introduction

So far in this series (links below) the focus has been on implementing Next and Subscribe in a way that is re-entrant. But that leaves two methods uncovered. Both Completed and Error end the sequence, but in different ways. Error passes an exception indicating the source has failed, whereas Completed just indicates it has ended.

Based on Subject<T> when a sequence of events is ended:

  • Existing subscribers received a call to their OnError or OnCompleted (as applicable).

  • Further calls to any of Subject<T>'s IObserver<T> methods is a no-op.

  • Any new subscribers immediately get a call to their OnError or OnCompleted (as applicable) and nothing more.

Previously

  • Part 1: The basic implementation of Subscribe, unsubscribe and Next.

  • Part 2: Making Subscribe and unsubscribe re-entrant.

  • Part 3: Why re-entrancy is important.

  • Part 4: Making Next re-entrant.

Notification<T> As A Way to Store An Observable Sequence

The reactive Framework (Rx) distributable comes with three assemblies:

  • System.CoreEx

  • System.Reactive

  • System.Interactive

System.CoreEx contains types that would have been in System or System.Core had .NET been designed with Rx in it from the start. System.Interactive extends LINQ to Objects by back-porting many of IObservable<T>'s extension methods (or ’operators”) to IEnumerable<T>. System.Reactive implements LINQ like operators for IObservable<T>.

My observable implementation almost doesn’t need any of these three assemblies (IObservable<T> and IObserver<T> being defined in the mscorlib assembly).

However one rather useful group of types defined in the System.CoreEx assembly in namespace System.Collections.Generic are Notification<T> and its subtypes. These types allow the “value” of an observable event to be stored. Using one of the three subtypes (one each for OnNext, OnCompleted and OnError) defined as members of Notification<T> (hence created as, for example, new Notification<int>.OnNext(1)). Therefore a collection of events covering all three IObserver<T> methods can be easily created without some custom helper types.

These types are provided for the Materialize and Dematerialise enumerable and observable operators (the former being one the back-ported operators in System.Interactive).

Overview of The Implementation of Completed and OnError

Both follow the same pattern as the existing Next implementation:

  1. If ended, return

  2. Add the new event to a queue of pending events.

  3. If not already pushing an event, process the queue.

The first of these steps is new, and handles the requirement that either Completed or OnError ends the sequence.

To make handling new subscriptions after the end of the sequence the endOfSequnec field is a Notification<T> reference. If null the sequence has not ended, otherwise this is the event to immediately push. Also a helper from System.Reactive is used as the return value: System.Disposables.Disposable.Empty which is an instance of a type which implements IDisposable as a no-op and these is no need to track this after the end subscriber (it will never be called again).

The New Implementation

The single biggest change to support storing notifications (rather than values of type T>) is in PushEvents where different kinds of notifications need to call different IObserver<T> methods, so a switch is needed. This is in its own method (SendNotification) because it can then be reused to send the ending event to post-end subscribers.

The implementations of Next, Completed and Error end up being very similar and simple.

Overall, compared to the code from part 4, the implementation is a somewhat longer, but certainly better factored with each method doing one thing. The most complex (PushEvents) does have to loop over both pending events and subscribers, but that is all it does.

The Code

using System;
using System.Collections.Generic;
using System.Linq;

public class Observable4<T> : IObservable<T> {
    private Dictionary<int, IObserver<T>> subscribers = new Dictionary<int, IObserver<T>>();
    private int nextSubscriber = 0;
    private Queue<Notification<T>> pendingEvents = new Queue<Notification<T>>();
    private bool pushingEvent = false;  // If true, only queue the event.
    private Notification<T> endOfEvents;   // Not null => have had ended, and this is the Completed or Error that ended it.

    public IDisposable Subscribe(IObserver<T> observer) {
        if (endOfEvents != null) {
            // Already have end of sequence, so just tell this new subscriber
            // and return no-op.
            SendNotification(observer, endOfEvents);
            return System.Disposables.Disposable.Empty;
        } else {
            subscribers.Add(nextSubscriber, observer);
            return new Observable4Disposer(this, nextSubscriber++);
        }
    }

    private void Unsubscribe(int index) {
        subscribers.Remove(index);
    }

    public void Next(T value) {
        if (endOfEvents != null) { return; }
        pendingEvents.Enqueue(new Notification<T>.OnNext(value));
        PushEvents();
    }

    public void Completed() {
        if (endOfEvents != null) { return; }
        endOfEvents = new Notification<T>.OnCompleted();
        pendingEvents.Enqueue(endOfEvents);
        PushEvents();
    }

    public void Error(Exception exn) {
        if (endOfEvents != null) { return; }
        endOfEvents = new Notification<T>.OnError(exn);
        pendingEvents.Enqueue(endOfEvents);
        PushEvents();
    }

    private void PushEvents() {
        if (!pushingEvent) {
            try {
                pushingEvent = true;
                while (pendingEvents.Count > 0) {
                    // To allow subscribers to unsubscribe while calling them, don't
                    // directly iterate over the collection, but use a copy.
                    // Take this each time, in case some event triggers a change
                    var subs = subscribers.Values.ToArray();
                    var n = pendingEvents.Dequeue();
                    foreach (var s in subs) {
                        SendNotification(s, n);
                    }
                }
            } finally {
                pushingEvent = false;
            }
        }
    }

    private static void SendNotification(IObserver<T> obs, Notification<T> n) {
        switch (n.Kind) {
        case NotificationKind.OnNext:
            obs.OnNext(n.Current);
            break;
        case NotificationKind.OnCompleted:
            obs.OnCompleted();
            break;
        case NotificationKind.OnError:
            obs.OnError(((Notification<T>.OnError)(n)).Exception);
            break;
        }
    }


    private class Observable4Disposer : IDisposable {
        private Observable4<T> target;
        private int index;
        internal Observable4Disposer(Observable4<T> target, int index) {
            this.target = target;
            this.index = index;
        }

        public void Dispose() {
            target.Unsubscribe(index);
        }
    }
}

Further Steps

The toughest part to get right is still left: concurrency. This will be difficult as much to test as to implement, tests using workers are never simple and just being confident that they are doing things concurrently is hard (after all a simple Interlocked.Increment might be slow enough to effectively eliminate any concurrency due to forcing CPU cache synchronisation even with multiple idle cores).

Ractive Framework’s Observer.Create Doesn’t Create a Pure Wrapper

Posted in .NET Futures, Rx on February 22nd, 2010 by Richard – Comments Off on Ractive Framework’s Observer.Create Doesn’t Create a Pure Wrapper

What Happens When One’s Test Doesn’t Fail When It Should, or, Why Observer.Create<T> Isn’t Good For Testing Observables

Introduction

In building my own implementation of IObservable<T> across a series of posts (Part 1, Part 2, Part 3 and Part 4) I’ve been making heavy use of Observer.Create<T> to create helper observer instances to ensure that the right methods of the subscribed observers are called, the right number of times (and even—although somewhat harder to track in all but the simplest cases—the right order).

The Discovery

In working towards Part 5 which will cover implementing the two, so far missing, methods of IObserver<T>: OnCompleted and OnError I was rather surprise when this test passed:

[TestMethod]
public void Observable4_CallingCompletedAgainIsNoOp() {
    bool hasCompleted = false;
    var sub = Obs.Create<int>(
        i => { Assert.Fail("OnNext should not have been called"); },
        exn => { Assert.Fail("OnError should not have been called"); },
        () => {
            Assert.IsFalse(hasCompleted, "OnCompleted for single subscriber called multiple times");
            hasCompleted = true;
        });

    var source = new Observable4<int>();
    using (var unsub = source.Subscribe(sub)) {
        source.Completed();
        source.Completed();
    }
    Assert.IsTrue(hasCompleted);
}

When I knew that Observable4<int>.Completed had no logic to prevent IObserver<T>.OnComplete being called multiple times. Under the debugger it was quite clear that the third lambda was not being called (briefly I considered a debugger bug, but decided to check more fully first).

I looked at the implementation of Observer.Create<T> in Reflector. That factory method creates an instance of an internal type, which derives from another internal type AbstractObserver<T>. So far largely as expected.

But in looking at the implementation of the IObserver<T> methods I saw:

public void OnCompleted() {
    if (!this.IsStopped) {
        this.IsStopped = true;
        this.Completed();
    }
}

I.e. it keeps track of a call to OnCompleted and blocks further events. All three IObserver<T> methods check for IsStopped, and it is also set in OnError.

The Implication

If I want an observer that I can use to test an observable for compliance to the IObserver<T> semantic contract, I cannot use Observer.Create<T>. It will hide breaking the “OnCompleted or OnError end the observable’s event sequence”.

The Solution

In the end the fix is rather easy: make my own Observer.Create<T> which doesn’t contain any such logic. This is rather simple:

public static class Obs {
    public static AnonObserver<T> Create<T>(Action<T> onNext) {
        return new AnonObserver<T>(onNext);
    }
    public static AnonObserver<T> Create<T>(Action<T> onNext, Action<Exception> onError, Action onComplete) {
        return new AnonObserver<T>(onNext, onError, onComplete);
    }
}

public class AnonObserver<T> : IObserver<T> {
    private Action<T> onNext;
    private Action onComplete;
    private Action<Exception> onError;

    public AnonObserver(Action<T> onNext) {
        if (onNext == null) { throw new ArgumentNullException("onNext"); }
        this.onNext = onNext;
    }
    public AnonObserver(Action<T> onNext, Action<Exception> onError, Action onComplete) {
        if (onNext == null) { throw new ArgumentNullException("onNext"); }
        if (onError == null) { throw new ArgumentNullException("onError"); }
        if (onComplete == null) { throw new ArgumentNullException("onComplete"); }
        this.onNext = onNext;
        this.onError = onError;
        this.onComplete = onComplete;
    }

    public void OnCompleted() {
        if (onComplete != null) {
            onComplete();
        }
    }

    public void OnError(Exception error) {
        if (onError != null) {
            onError(error);
        }
    }

    public void OnNext(T value) {
        onNext(value);
    }
}

I now need to just replace all the use of Observer.Create<T> with Obs.Create<T> (and try and think of a better name).

Corrected URL Spelling

Posted in Meta on February 17th, 2010 by Richard – Comments Off on Corrected URL Spelling

Oops…

Just noticed a few of the URLs for the Reusable “ObservableSource” for the Reactive Extensions series of posts were misspelt. Now corrected.

Creating a Reusable “ObservableSource” for the Reactive Extensions (part 4)

Posted in .NET Futures, Rx on February 17th, 2010 by Richard – Comments Off on Creating a Reusable “ObservableSource” for the Reactive Extensions (part 4)

Supporting Re-entrant Event Publication: Implementation

Introduction

In Part 3 I looked at the effect of calling Subject<T>.OnNext from within a subscriber to that Subject<T>. I really do not think that the out of order events that creates are going to create anything other than confusion.

In my implementation, continuing on from Part 1 and Part 2, I want to avoid the implicit re-ordering that happens when Next(value) is called from within a subscriber.

The Re-Entrant Implementation

In the basic implementation a snapshot of the subscribers is taken (to avoid problems with the collection being modified by re-entrant subscribe and unsubscribe operations), then iterated over:

var subs = subscribers.Values.ToArray();
foreach (var sub in subs) {
   sub.OnNext(value);
}

In the case of a re-entrant Next call this loop will simply exist on the stack twice:

Observable2.Next()
ATestObserver.OnNext()
Observable2.Next()

With the second call iterating through the list of subscribers before the first moves on to the next.

The solution here is to recognise that an iteration is taking place already when Next is called and, rather than just looping to call all subscribers, buffer up the new value. The original call keeps looping until all buffered values are sent out. (Once, later in this series, concurrency is introduced locking is going to be needed here.)

First two new fields are needed. A Queue to act as buffer, and a flag to indicate events are currently being pushed to the subscribers:

private Queue<T> pendingEvents = new Queue<T>();
private bool pushingEvent = false;

Then reworking the Next implementation:

pendingEvents.Enqueue(value);
if (!pushingEvent) {
    try {
        pushingEvent = true;
        while (pendingEvents.Count > 0) {
            var subs = subscribers.Values.ToArray();
            var v = pendingEvents.Dequeue();
            foreach (var s in subs) {
                s.OnNext(v);
            }
        }
    } finally {
        pushingEvent = false;
    }
}

A few points worth noting:

  • By always putting the new event value in the queue there is no special cases needed for the first event

  • The try…finally ensures the pushingEvent gets reset, but this could leave un-pushed events. This is a consequence of the RX exception model (propogate back to the called), but with the added twist that a re-entrant call to this will not get the exception, the original caller of Next will.

  • I have chosen to re-snapshot the subscribers for each event. This will avoid a subscriber that unsubscribes in response to one event getting more, even if already queued up.

Next Steps

This is still missing Exception and Complete methods (to call IObserver<T>.Error() and .OnCompleted.

…and concurrency support of course.

Parallel Framework Extensions: Three Minor Things I Didn’t Know This Morning

Posted in Parallel Extensions on February 8th, 2010 by Richard – Comments Off on Parallel Framework Extensions: Three Minor Things I Didn’t Know This Morning

1. You Can Add a Continuation To A Completed Task

And the continuation will be run. Given the code:

var t1 = Task.Factory.StartNew(() => { /* Noop */ });
Thread.Sleep(100); // Ensure t1 has run
Trace.Assert(t1.Status == TaskStatus.RanToCompletion);
// Now t1 is completed... we can still add a continuation:
bool continued = false;
var t2 = t1.ContinueWith(t => { continued = true; });
t2.Wait();
Trace.Assert(continued);

Neither assert will fire. Thus the continue block has executed despite being added to Task t1 after t1 had run. This does of course prevent a race between TaskFactory.StartNew returning a task that is scheduled (and therefore possibly already completed) with a call to Task.ContinueWith.

2. You Can Continue A Task Multiple Times

And the continuations run concurrently. I’m not sure that this would be useful very often (only if multiple tasks need to be started following a single initial task), but nice to know there is not an arbitrary zero or one multiplicity on continuations. Sample code:

var t1 = new Task(() => { Console.WriteLine("  This is the original task"); });
var t2 = t1.ContinueWith(t => {
    Console.WriteLine("   This is the first continiation (before sleep: thread #{0})", Thread.CurrentThread.ManagedThreadId);
    Thread.Sleep(250);
    Console.WriteLine("   This is the first continiation (after sleep: thread #{0})", Thread.CurrentThread.ManagedThreadId);
});
var t3 = t1.ContinueWith(t => {
    Console.WriteLine("   This is the second continiation (before sleep: thread #{0})", Thread.CurrentThread.ManagedThreadId);
    Thread.Sleep(250);
    Console.WriteLine("   This is the second continiation (after sleep: thread #{0})", Thread.CurrentThread.ManagedThreadId);
});
t1.Start();
Task.WaitAll(new[] { t1, t2, t3 });

Depending on the system this can give different results, but one should see the two “before sleep” messages before either of the two “after sleep” messages: both tasks are executing concurrently.

3. TaskFactory.FromAsync Starts the Task

No illustration of this, it just doesn’t say so in the documentation. I discovered this when a InvalidOperationException was thrown for starting an already complete task (only two statements later, the intermediate one adding a continuation). To be fair to the designers, there is really no reason to want to delay the execution of the task as the creation is a single statement (and if you really needed to, just use a helper (anonymous) function).

Creating a Reusable “ObservableSource” for the Reactive Extensions (part 3)

Posted in .NET Futures, Rx on February 4th, 2010 by Richard – Comments Off on Creating a Reusable “ObservableSource” for the Reactive Extensions (part 3)

Supporting Re-entrant Event Publication: Why It is Significant

Introduction

Continuing from Part 1 and Part 2 where the first steps to a reusable IObservable<T> were taken.

But this is something of an aside. Because in starting to look at the reasons to explicitly support this, I found that the RX type Subject<T> (in assembly "System.Reactive", namespace System.Collections.Generic) has exactly the same behaviour as Observable2<T> from Part 2.

Personally I feel that this behaviour will lead to inconsistent and unexpected ordering of events as seen by subscribers, depending on the order in which the observers subscribed and the details of the internals of the observable (what kind of collection is used to store the list of subscribers). This opens up the potential of races to subscribe with different timing leading to seeing out of order events.

One final note here, where I started drafting my implementation of IObservable<T> I was not aware of Subject<T>. When I became aware of this type, I almost decided to give up on my own implementation. With this discovery I am not rather glad I didn't.

The Test Code

var obs = new Subject<int>();
var subCalls = 0;
var sub1Calls = 0;

var sub0 = Observer.Create<int>(i => {
    ++subCalls;
    Console.WriteLine("Sub 0 ({2}): sub1 called {0} times (total subscriber calls {1})", sub1Calls, subCalls, i);
});
var sub1 = Observer.Create<int>(i => {
    ++subCalls;
    ++sub1Calls;
    Console.WriteLine("Sub 1 ({2}): sub1 called {0} times (total subscriber calls: {1})", sub1Calls, subCalls, i);
    if (i > 0 && i < 3) {
	obs.OnNext(-1 * i);
    }
});
var sub2 = Observer.Create<int>(i => {
    ++subCalls;
    Console.WriteLine("Sub 2 ({2}): sub1 called {0} times (total subscriber calls: {1})", sub1Calls, subCalls, i);
});

using (var d0 = obs.Subscribe(sub0))
using (var d1 = obs.Subscribe(sub1))
using (var d2 = obs.Subscribe(sub2)) {
    obs.OnNext(1);
}

Console.WriteLine("END: sub1 called {0} times, all subscribers {1} times", sub1Calls, subCalls);

In this the three subscribers (imaginatively named sub0, sub1 and sub2) all increment a count of subscriber calls, and all print the same details to the console: which subscriber, the event value, the total number of subscriber calls and the number of calls to the second subscriber (sub1).

Additionally the second subscriber will raise an event itself, recursively publishing an event if the current value is 1≤i≤2 with value -1×i. Thus there is no more than a single level of recursion.

The Results

Sub 0 (1): sub1 called 0 times (total subscriber calls 1)
Sub 1 (1): sub1 called 1 times (total subscriber calls: 2)
Sub 0 (-1): sub1 called 1 times (total subscriber calls 3)
Sub 1 (-1): sub1 called 2 times (total subscriber calls: 4)
Sub 2 (-1): sub1 called 2 times (total subscriber calls: 5)
Sub 2 (1): sub1 called 2 times (total subscriber calls: 6)
END: sub1 called 2 times, all subscribers 6 times

As expected there are six calls to subscribers, with each subscriber seeing two events. But there is an oddity in the order in which the subscribers see events.

Not all subscribers receive the first event raised first, sub2 receives event -1 before event 1: in reverse order. But sub0 sees them in order. The only difference between the two subscribers is the order in which they were subscribed relative to sub1.

Hence the comment above about a race-condition, and dependence on the implementation of Subject<T>. If a different container were used the order in which subscribers are called could be changed without any client code changing.

Clearly the ordering between subscribers is subject to the scheduler in use, but a single subscriber seeing events in a different order certainly could be a problem. This is exacerbated by the dependence on when the subscription is performed. A concurrent system could have the relative order of subscription subject to external factors (e.g. how many cores, how slow an assembly is to load, …).

Next

This post is now long enough, so my solution to this will wait for part 4.