Corrected URL Spelling

Posted in Meta on February 17th, 2010 by Richard – Comments Off on Corrected URL Spelling

Oops…

Just noticed a few of the URLs for the Reusable “ObservableSource” for the Reactive Extensions series of posts were misspelt. Now corrected.

Creating a Reusable “ObservableSource” for the Reactive Extensions (part 4)

Posted in .NET Futures, Rx on February 17th, 2010 by Richard – Comments Off on Creating a Reusable “ObservableSource” for the Reactive Extensions (part 4)

Supporting Re-entrant Event Publication: Implementation

Introduction

In Part 3 I looked at the effect of calling Subject<T>.OnNext from within a subscriber to that Subject<T>. I really do not think that the out of order events that creates are going to create anything other than confusion.

In my implementation, continuing on from Part 1 and Part 2, I want to avoid the implicit re-ordering that happens when Next(value) is called from within a subscriber.

The Re-Entrant Implementation

In the basic implementation a snapshot of the subscribers is taken (to avoid problems with the collection being modified by re-entrant subscribe and unsubscribe operations), then iterated over:

var subs = subscribers.Values.ToArray();
foreach (var sub in subs) {
   sub.OnNext(value);
}

In the case of a re-entrant Next call this loop will simply exist on the stack twice:

Observable2.Next()
ATestObserver.OnNext()
Observable2.Next()

With the second call iterating through the list of subscribers before the first moves on to the next.

The solution here is to recognise that an iteration is taking place already when Next is called and, rather than just looping to call all subscribers, buffer up the new value. The original call keeps looping until all buffered values are sent out. (Once, later in this series, concurrency is introduced locking is going to be needed here.)

First two new fields are needed. A Queue to act as buffer, and a flag to indicate events are currently being pushed to the subscribers:

private Queue<T> pendingEvents = new Queue<T>();
private bool pushingEvent = false;

Then reworking the Next implementation:

pendingEvents.Enqueue(value);
if (!pushingEvent) {
    try {
        pushingEvent = true;
        while (pendingEvents.Count > 0) {
            var subs = subscribers.Values.ToArray();
            var v = pendingEvents.Dequeue();
            foreach (var s in subs) {
                s.OnNext(v);
            }
        }
    } finally {
        pushingEvent = false;
    }
}

A few points worth noting:

  • By always putting the new event value in the queue there is no special cases needed for the first event

  • The try…finally ensures the pushingEvent gets reset, but this could leave un-pushed events. This is a consequence of the RX exception model (propogate back to the called), but with the added twist that a re-entrant call to this will not get the exception, the original caller of Next will.

  • I have chosen to re-snapshot the subscribers for each event. This will avoid a subscriber that unsubscribes in response to one event getting more, even if already queued up.

Next Steps

This is still missing Exception and Complete methods (to call IObserver<T>.Error() and .OnCompleted.

…and concurrency support of course.

Parallel Framework Extensions: Three Minor Things I Didn’t Know This Morning

Posted in Parallel Extensions on February 8th, 2010 by Richard – Comments Off on Parallel Framework Extensions: Three Minor Things I Didn’t Know This Morning

1. You Can Add a Continuation To A Completed Task

And the continuation will be run. Given the code:

var t1 = Task.Factory.StartNew(() => { /* Noop */ });
Thread.Sleep(100); // Ensure t1 has run
Trace.Assert(t1.Status == TaskStatus.RanToCompletion);
// Now t1 is completed... we can still add a continuation:
bool continued = false;
var t2 = t1.ContinueWith(t => { continued = true; });
t2.Wait();
Trace.Assert(continued);

Neither assert will fire. Thus the continue block has executed despite being added to Task t1 after t1 had run. This does of course prevent a race between TaskFactory.StartNew returning a task that is scheduled (and therefore possibly already completed) with a call to Task.ContinueWith.

2. You Can Continue A Task Multiple Times

And the continuations run concurrently. I’m not sure that this would be useful very often (only if multiple tasks need to be started following a single initial task), but nice to know there is not an arbitrary zero or one multiplicity on continuations. Sample code:

var t1 = new Task(() => { Console.WriteLine("  This is the original task"); });
var t2 = t1.ContinueWith(t => {
    Console.WriteLine("   This is the first continiation (before sleep: thread #{0})", Thread.CurrentThread.ManagedThreadId);
    Thread.Sleep(250);
    Console.WriteLine("   This is the first continiation (after sleep: thread #{0})", Thread.CurrentThread.ManagedThreadId);
});
var t3 = t1.ContinueWith(t => {
    Console.WriteLine("   This is the second continiation (before sleep: thread #{0})", Thread.CurrentThread.ManagedThreadId);
    Thread.Sleep(250);
    Console.WriteLine("   This is the second continiation (after sleep: thread #{0})", Thread.CurrentThread.ManagedThreadId);
});
t1.Start();
Task.WaitAll(new[] { t1, t2, t3 });

Depending on the system this can give different results, but one should see the two “before sleep” messages before either of the two “after sleep” messages: both tasks are executing concurrently.

3. TaskFactory.FromAsync Starts the Task

No illustration of this, it just doesn’t say so in the documentation. I discovered this when a InvalidOperationException was thrown for starting an already complete task (only two statements later, the intermediate one adding a continuation). To be fair to the designers, there is really no reason to want to delay the execution of the task as the creation is a single statement (and if you really needed to, just use a helper (anonymous) function).

Creating a Reusable “ObservableSource” for the Reactive Extensions (part 3)

Posted in .NET Futures, Rx on February 4th, 2010 by Richard – Comments Off on Creating a Reusable “ObservableSource” for the Reactive Extensions (part 3)

Supporting Re-entrant Event Publication: Why It is Significant

Introduction

Continuing from Part 1 and Part 2 where the first steps to a reusable IObservable<T> were taken.

But this is something of an aside. Because in starting to look at the reasons to explicitly support this, I found that the RX type Subject<T> (in assembly "System.Reactive", namespace System.Collections.Generic) has exactly the same behaviour as Observable2<T> from Part 2.

Personally I feel that this behaviour will lead to inconsistent and unexpected ordering of events as seen by subscribers, depending on the order in which the observers subscribed and the details of the internals of the observable (what kind of collection is used to store the list of subscribers). This opens up the potential of races to subscribe with different timing leading to seeing out of order events.

One final note here, where I started drafting my implementation of IObservable<T> I was not aware of Subject<T>. When I became aware of this type, I almost decided to give up on my own implementation. With this discovery I am not rather glad I didn't.

The Test Code

var obs = new Subject<int>();
var subCalls = 0;
var sub1Calls = 0;

var sub0 = Observer.Create<int>(i => {
    ++subCalls;
    Console.WriteLine("Sub 0 ({2}): sub1 called {0} times (total subscriber calls {1})", sub1Calls, subCalls, i);
});
var sub1 = Observer.Create<int>(i => {
    ++subCalls;
    ++sub1Calls;
    Console.WriteLine("Sub 1 ({2}): sub1 called {0} times (total subscriber calls: {1})", sub1Calls, subCalls, i);
    if (i > 0 && i < 3) {
	obs.OnNext(-1 * i);
    }
});
var sub2 = Observer.Create<int>(i => {
    ++subCalls;
    Console.WriteLine("Sub 2 ({2}): sub1 called {0} times (total subscriber calls: {1})", sub1Calls, subCalls, i);
});

using (var d0 = obs.Subscribe(sub0))
using (var d1 = obs.Subscribe(sub1))
using (var d2 = obs.Subscribe(sub2)) {
    obs.OnNext(1);
}

Console.WriteLine("END: sub1 called {0} times, all subscribers {1} times", sub1Calls, subCalls);

In this the three subscribers (imaginatively named sub0, sub1 and sub2) all increment a count of subscriber calls, and all print the same details to the console: which subscriber, the event value, the total number of subscriber calls and the number of calls to the second subscriber (sub1).

Additionally the second subscriber will raise an event itself, recursively publishing an event if the current value is 1≤i≤2 with value -1×i. Thus there is no more than a single level of recursion.

The Results

Sub 0 (1): sub1 called 0 times (total subscriber calls 1)
Sub 1 (1): sub1 called 1 times (total subscriber calls: 2)
Sub 0 (-1): sub1 called 1 times (total subscriber calls 3)
Sub 1 (-1): sub1 called 2 times (total subscriber calls: 4)
Sub 2 (-1): sub1 called 2 times (total subscriber calls: 5)
Sub 2 (1): sub1 called 2 times (total subscriber calls: 6)
END: sub1 called 2 times, all subscribers 6 times

As expected there are six calls to subscribers, with each subscriber seeing two events. But there is an oddity in the order in which the subscribers see events.

Not all subscribers receive the first event raised first, sub2 receives event -1 before event 1: in reverse order. But sub0 sees them in order. The only difference between the two subscribers is the order in which they were subscribed relative to sub1.

Hence the comment above about a race-condition, and dependence on the implementation of Subject<T>. If a different container were used the order in which subscribers are called could be changed without any client code changing.

Clearly the ordering between subscribers is subject to the scheduler in use, but a single subscriber seeing events in a different order certainly could be a problem. This is exacerbated by the dependence on when the subscription is performed. A concurrent system could have the relative order of subscription subject to external factors (e.g. how many cores, how slow an assembly is to load, …).

Next

This post is now long enough, so my solution to this will wait for part 4.

Creating a Reusable "ObservableSource" for the Reactive Extensions (part 2)

Posted in .NET Futures, Rx on January 24th, 2010 by Richard – Comments Off on Creating a Reusable "ObservableSource" for the Reactive Extensions (part 2)

Introduction

Continuing on from Observable1<T> implemented in Part 1 with Observable2<T> here.

The aim in this part is to allow re-entrant subscribe and unsubscribe operations to not fail. Using Observable1<T> a subscriber which tries to unsubscribe itself in its OnNext implementation will lead to an InvalidOperationException being thrown from Observable1<T>.OnNext. This is because Subscribe and Unsubscribe both modify the collection of subscribers in Observable1<T> and which OnNext is iterating over. The .NET Framework collections do not support this.

Supporting Re-entrant Operations

To allow Subscribe and Unsubscribe to work is really very simple: just take a copy of the set of subscribers before iterating over it. Replacing the implementation of Next from Observable1<T>:

public void Next(T value) {
    foreach (var sub in subscribers.Values) {
        sub.OnNext(value);
    }
}

with the following in Observable2<T> (full implementation is given below):

public void Next(T value) {
    var subs = subscribers.Values.ToArray();
    foreach (var sub in subs) {
        sub.OnNext(value);
    }
}

And now code like the following doesn’t fail:

bool hasUnsubscribed = false;
IDisposable unsub = null;
var sub = Observer.Create<int>(i => {
    Console.WriteLine("This is the subscriber, arg: {0}", i);
    if (unsub != null) {
        unsub.Dispose();
        unsub = null;
        hasUnsubscribed = true;
    }
});

var source = new Observable2<int>();
unsub = source.Subscribe(sub);
source.Next(1);
Assert.IsTrue(hasUnsubscribed);
Assert.IsNull(unsub);

Note this test code wouldn’t pass with an arbitrary observable, where the requirement to allow asynchronous subscribe could lead to unsub not yet being set by the time the subscriber code is called (the null check will at least avoid the crash).

While this seems an odd thing to do, consider a subscriber which just wants to react to one event (or the implementation of the RX operator First).

Progress Against the Overall Requirements

See the full list of requirements.

  • #1: Manage subscribers: done. Can subscribe zero or more observers, and freely add or remove them.

  • #2: OnNext, OnComplete and OnError: OnNext is done (will be called zero or more times). But neither completion or errors are supported, this also means that the source cannot indicate the end of events, every even sequence is currently infinite.

  • #3 Exceptions from subscribers will fall back to the event producer: this will happen, but will also mean no other subscribers will see the event.

    Based on this forum thread, about Subject<T>, the right approach here in Rx is not completely agreed.

  • #4: Done: can freely add and remove subscribers.

  • #5: Re-entrancy: this, part 2, makes a start. But a subscriber calling Next is likely to lead to some unexpected sequencing of calls.

The Observable2<T> Implementation

using System;
using System.Collections.Generic;
using System.Linq;

public class Observable2<T> : IObservable<T> {
    private Dictionary<int, IObserver<T>> subscribers = new Dictionary<int, IObserver<T>>();
    private int nextSubscriber = 0;

    public IDisposable Subscribe(IObserver<T> observer) {
        subscribers.Add(nextSubscriber, observer);
        return new Observable1Disposer(this, nextSubscriber++);
    }

    private void Unsubscribe(int index) {
        subscribers.Remove(index);
    }

    public void Next(T value) {
        // To allow subscribers to unsubscribe while calling them, don't
        // directly iterate over the collection, but use a copy.
        var subs = subscribers.Values.ToArray();
        foreach (var sub in subs) {
            sub.OnNext(value);
        }
    }


    private class Observable1Disposer : IDisposable {
        private Observable2<T> target;
        private int index;
        internal Observable1Disposer(Observable2<T> target, int index) {
            this.target = target;
            this.index = index;
        }

        public void Dispose() {
            target.Unsubscribe(index);
        }
    }
}

Creating a Reusable "ObservableSource" for the Reactive Extensions (Part 1)

Posted in .NET Futures, Rx on January 22nd, 2010 by Richard – Comments Off on Creating a Reusable "ObservableSource" for the Reactive Extensions (Part 1)

Introduction

As seen in the requirements for an implementation of IObservable<T> there is a lot of ground to cover. So the idea is to start with a massively simplified subset of the requirements to better understand what is will take to implement the full requirements. In other words, a series of partially functional prototypes.

As a first step, just implement:

  • Subscribe, and maintain a list of subscribed observers.
  • A disposable helper type to act as a return from Subscribe to unsubscribe.
  • Next to call the OnNext of each subscriber

This implementation does not:

  • Handle concurrency.
  • Support re-entrancy. Any call by a subscriber back into the Observable is, at best, going to fail.
  • Make use of the rest of the IObserver<T> interface. No errors and no end to the events.
  • Calling the subscribers asynchronously, in a synchronisation context of their (or the Reactive Framework's choice). (The System.Linq.Observable.ObserveOn<T>(IObservable<T> source, SynchronizationContext sync) method of RX might be able to be used to work around this.)

Implementation

using System;
using System.Collections.Generic;

public class Observable1<T> : IObservable<T> {
    private Dictionary<int, IObserver<T>> subscribers = new Dictionary<int, IObserver<T>>();
    private int nextSubscriber = 0;
    
    public IDisposable Subscribe(IObserver<T> observer) {
        subscribers.Add(nextSubscriber, observer);
        return new Observable1Disposer(this, nextSubscriber++);
    }

    private void Unsubscribe(int index) {
        subscribers.Remove(index);
    }

    public void Next(T value) {
        foreach (var sub in subscribers.Values) {
            sub.OnNext(value);
        }
    }


    private class Observable1Disposer : IDisposable {
        private Observable1<T> target;
        private int index;
        internal Observable1Disposer(Observable1<T> target, int index) {
            this.target = target;
            this.index = index;
        }

        public void Dispose() {
            target.Unsubscribe(index);
        }
    }
}

Testing

Obviously I have some test code, but it all gets somewhat tedious very quickly as it covers all sorts of edge cases (e.g. check that subscribing and then unsubscribing before any events works—it does). So these are a couple of indicative samples.

First what must be just about the simplest possible case, subscribe, one event and unsubscribe. Check that subscriber was called once:

var calls = 0;
var sub = Observer.Create<int>(i => {
    Console.WriteLine("This is the subscriber, arg: {0}", i);
    ++calls;
});

var source = new Observable1<int>();
using (var disp = source.Subscribe(sub)) {
    source.Next(1);
}

if (calls != 1) {
    Console.WriteLine("Test FAILED");
}

And then this to show that arbitrary interleaving of subscribe and unsubscribe operations will also work:

var calls1 = 0;
var sub1 = Observer.Create<int>(i => {
    Console.WriteLine("This is subscriber #1, arg: {0}", i);
    ++calls1;
});
var calls2 = 0;
var sub2 = Observer.Create<int>(i => {
    Console.WriteLine("This is subscriber #2, arg: {0}", i);
    ++calls2;
});

var source = new Observable1<int>();
IDisposable disp1 = null, disp2 = null;
try {
    disp1 = source.Subscribe(sub1);
    disp2 = source.Subscribe(sub2);
    disp2.Dispose();
    source.Next(1); // sub1 gets this
    disp2 = source.Subscribe(sub2);
    source.Next(2); // sub1 and sub2 get this
    disp1.Dispose();
    source.Next(3); // sub2 gets this
    disp1 = source.Subscribe(sub1);
    source.Next(4); // Both get this
} finally {
    disp1.Dispose();
    disp2.Dispose();
}

if (calls1 != 3 || calls2 != 3) {
    Console.WriteLine("Test FAILED");
}

In this second approach, I ended up adding the comments to the Next() calls to help me work out the counts. Then running and checking the console ensured all the events went to at least one subscriber. Clearly any real (production) code like this should be condemned with even the most limited of code reviews, but tests need to cover the bases.

Both tests also show just how useful lambdas are when working with Rx, the ability to create test subscribers that are both inline with the test, and close over locals makes state handling much easier.

How this first IObservable<T> Is Inadequate

Quick answer: in almost every way. Longer answer: while events are forwarded, and subscribers are managed the integrity of the system is only preserved by the subscribers playing by a very limited execution model. In particular no subscriber can unsubscribe in direct response to an event, because the iteration over the set of subscribers will fail if the Subscribe or Unsubscribe are called while in that iteration.

Next Steps

Deal with the lack of re-entrancy, so a subscriber can be added or removed while events are being sent to the subscribers. Also use a test framework to make testing a little more integrated.

Requirements for IObservable<T>

Posted in .NET Futures, Rx on January 10th, 2010 by Richard – 2 Comments

Introduction

The recently released preview of the Reactive Extensions (Rx) from Microsoft has had a lot of coverage from Channel 9. It includes support for compositional handling of observable event streams from:

  • WinForms events,
  • WPF events,
  • the asynchronous pattern (BeginX and EndX method pairs), and
  • an IEnumerable<T>.

But it does not include anything reusable from which to build your own observable object.

This is unfortunate because it turns out creating an observable (a type implementing IObservable<T> is harder than it might be expected to be. It is certainly much more involved than implementing IEnumerable<T>/​IEnumerator<T> without using yield (as one needed to do to create a fully custom collection in .NET 1.0 and 1.1).

To start with one needs the requirements. There does not appear to be any explicit listing (yet), but from one source or another there is the information one needs. I've collected a few below.

Requirements for an IObservable<T> Implementation

In writing an implementation of IObservable<T> one should follow the documented semantics (the syntax is, of course, enforced by the compiler and runtime). And therefore be consistent with existing implementation, especially with the implementations contained within the Reactive Framework.

The documentation for IObservable<T> is (currently, in preview release state) quite brief, but the sources listed below, and a little intuition can get a long way.

  1. There will be zero or more subscribers.

    This does mean handling zero subscribers (all operations are no-ops) as well as handling many subscribers. This drives much of the implementation:

    • Need a collection of subscribers.
    • Need a mechanism for the object returned by the Subscribe(observer) to the subscriber to identify which subscriber is unsubscribing when Dispose() is called on that retuned object. This need to link back to a specific subscription means this disposable object cannot be trivial.
    • Every event from the observable will require iterating through the list of subscribers. This needs to be both thread safe and re-entrant. In practice this means the observable needs to take a snapshot of the subscribers and iterate through that for each event (this makes use of the assumed asynchronous unsubscribe noted below).
  2. A subscriber, which remains subscribed, will see zero or more calls to it's OnNext method maybe followed by OnError or OnComplete.

    In pseudo-regular-expression this could be: OnNext*(OnError|OnComplete)?.

    All the following patterns are acceptable

    • nothing, no events at all,
    • OnNext one or more times (no OnComplete or OnError),
    • Just OnComplete.
  3. Exceptions thrown by subscribers caused non-specified effects. It could do anything from nothing (i.e. observable absorbs them with a no-op catch block) to killing the whole process.

    This is more significant for subscribers: they should avoid throwing.

  4. Subscribers can be added or removed (via the IDisposable implementing object returned from the observable's Subscribe method).

  5. There is no restriction on the subscriber being notified, or any other piece of code, from calling into the observable. A subscriber (or other object called from the subscriber) or an entirely separate type on a different thread, can (directly or indirectly) call observable methods.

    For example it is quite acceptable for a subscriber to unsubscribe itself while handling a notification. For example:

          var input = new[] { 1, 2, 3, 4, 5 };
          IDisposable d = null;
          var sub = Observer.Create(x => {
              Console.WriteLine("Received {0}", x);
              d.Dispose();
          });
          d = input.Subscribe(sub);

    In this code, one line is printed to the console.

    Strictly speaking the code above is incorrect. An observable could start sending events to the subscriber before Subscribe returns, thus d may not yet have a value when the OnNext is called. In this case that will lead to a NullReferenceException. This can be fixed by unsubscribing on the first event in which d has been set.

There are a couple of things the observable is not responsible for:

  • The observable does not need to ensure Subscribe returns before events are seen (as noted below the simple one-shot subscriber above).

  • The observable does not need to complete an unsubscribe immediately. Rather the subscriber must handle receiving more events even after unsubscribing (this is true of .NET events as will always be an inherent race-condition when removing a listener).

Sources of Requirements

  1. "Reactive Extensions API in depth: Contract" is directly from the team.

    It includes the following non-obvious points:

    • Subscribe(subscriber) is asynchronous: the subscriber could start receiving events before the IDisposable with which to unsubscribe is returned.
    • Dispose() of the object returned by Subscribe(subscriber) is also asynchronous: you may receive events from the observable after you have disposed the object.
    • IObservable<T>.Subscribe(subscriber) must not throw. (I think this should be should not throw—after all out of memory and similar cannot be effectively be prevented in all cases.
    • Exceptions from subscribers will not, in general, be caught (and fall through to the context of the observable).
  2. IObservable<T> documentation on MSDN

  3. IObserver<T> documentation on MSDN

  4. Blog posting An RSS Dashboard in F#, part two (IObservables) by Brian McNamara (2009-12-20).

    In F#, but the requirements for re-entrancy is clear.

  5. This question on Stack Overflow also has some useful information.

And next: on to an actual implementation.

Winter Photographs

Posted in Photos on January 9th, 2010 by Richard – Comments Off on Winter Photographs

Uploaded a few photos of the recent “once in a lifetime” snow here.

(And I do know that for many places this would be just a thin covering that is barely noticed. I live in the south of England, a place where 2cm of snow is above average; this was close to 20cm.)

Blog Created

Posted in Meta on December 23rd, 2009 by Richard – Comments Off on Blog Created

… but no content yet.