Creating a Reusable “ObservableSource” for the Reactive Extensions (part 4)

Supporting Re-entrant Event Publication: Implementation

Introduction

In Part 3 I looked at the effect of calling Subject<T>.OnNext from within a subscriber to that Subject<T>. I really do not think that the out of order events that creates are going to create anything other than confusion.

In my implementation, continuing on from Part 1 and Part 2, I want to avoid the implicit re-ordering that happens when Next(value) is called from within a subscriber.

The Re-Entrant Implementation

In the basic implementation a snapshot of the subscribers is taken (to avoid problems with the collection being modified by re-entrant subscribe and unsubscribe operations), then iterated over:

var subs = subscribers.Values.ToArray();
foreach (var sub in subs) {
   sub.OnNext(value);
}

In the case of a re-entrant Next call this loop will simply exist on the stack twice:

Observable2.Next()
ATestObserver.OnNext()
Observable2.Next()

With the second call iterating through the list of subscribers before the first moves on to the next.

The solution here is to recognise that an iteration is taking place already when Next is called and, rather than just looping to call all subscribers, buffer up the new value. The original call keeps looping until all buffered values are sent out. (Once, later in this series, concurrency is introduced locking is going to be needed here.)

First two new fields are needed. A Queue to act as buffer, and a flag to indicate events are currently being pushed to the subscribers:

private Queue<T> pendingEvents = new Queue<T>();
private bool pushingEvent = false;

Then reworking the Next implementation:

pendingEvents.Enqueue(value);
if (!pushingEvent) {
    try {
        pushingEvent = true;
        while (pendingEvents.Count > 0) {
            var subs = subscribers.Values.ToArray();
            var v = pendingEvents.Dequeue();
            foreach (var s in subs) {
                s.OnNext(v);
            }
        }
    } finally {
        pushingEvent = false;
    }
}

A few points worth noting:

  • By always putting the new event value in the queue there is no special cases needed for the first event

  • The try…finally ensures the pushingEvent gets reset, but this could leave un-pushed events. This is a consequence of the RX exception model (propogate back to the called), but with the added twist that a re-entrant call to this will not get the exception, the original caller of Next will.

  • I have chosen to re-snapshot the subscribers for each event. This will avoid a subscriber that unsubscribes in response to one event getting more, even if already queued up.

Next Steps

This is still missing Exception and Complete methods (to call IObserver<T>.Error() and .OnCompleted.

…and concurrency support of course.

This entry was posted Wednesday, February 17th, 2010 18:07. In .NET Futures, Rx.
You can follow any responses to this entry through the RSS 2.0 feed.

Comments are closed.