Adding Completed and Error Methods

Introduction

So far in this series (links below) the focus has been on implementing Next and Subscribe in a way that is re-entrant. But that leaves two methods uncovered. Both Completed and Error end the sequence, but in different ways. Error passes an exception indicating the source has failed, whereas Completed just indicates it has ended.

Based on Subject<T> when a sequence of events is ended:

  • Existing subscribers received a call to their OnError or OnCompleted (as applicable).

  • Further calls to any of Subject<T>'s IObserver<T> methods is a no-op.

  • Any new subscribers immediately get a call to their OnError or OnCompleted (as applicable) and nothing more.

Previously

  • Part 1: The basic implementation of Subscribe, Unsubscribe, and Next.

  • Part 2: Making Subscribe and Unsubscribe re-entrant.

  • Part 3: Why re-entrancy is important.

  • Part 4: Making Next re-entrant.

Notification<T> As A Way to Store An Observable Sequence

The reactive Framework (Rx) distributable comes with three assemblies:

  • System.CoreEx

  • System.Reactive

  • System.Interactive

System.CoreEx contains types that would have been in System or System.Core had .NET been designed with Rx in it from the start. System.Interactive extends LINQ to Objects by back-porting many of IObservable<T>'s extension methods (or ’operators’) to IEnumerable<T>. System.Reactive implements LINQ like operators for IObservable<T>.

My observable implementation almost doesn’t need any of these three assemblies (IObservable<T> and IObserver<T> being defined in the mscorlib assembly).

However one rather useful group of types defined in the System.CoreEx assembly in namespace System.Collections.Generic are Notification<T> and its subtypes. These types allow the “value” of an observable event to be stored. Using one of the three subtypes (one each for OnNext, OnCompleted and OnError) defined as members of Notification<T> (hence created as, for example, new Notification<int>.OnNext(1)). Therefore a collection of events covering all three IObserver<T> methods can be easily created without some custom helper types.

These types are provided for the Materialize and Dematerialise enumerable and observable operators (the former being one the back-ported operators in System.Interactive).

Overview of The Implementation of Completed and OnError

Both follow the same pattern as the existing Next implementation:

  1. If ended, return

  2. Add the new event to a queue of pending events.

  3. If not already pushing an event, process the queue.

The first of these steps is new, and handles the requirement that either Completed or OnError ends the sequence.

To make handling new subscriptions after the end of the sequence the endOfEvents field is a Notification<T> reference. If null the sequence has not ended, otherwise this is the event to immediately push. Also a helper from System.Reactive is used as the return value: System.Disposables.Disposable.Empty which is an instance of a type which implements IDisposable as a no-op and these is no need to track this after the end subscriber (it will never be called again).

The New Implementation

The single biggest change to support storing notifications (rather than values of type T) is in PushEvents where different kinds of notifications need to call different IObserver<T> methods, so a switch is needed. This is in its own method (SendNotification) because it can then be reused to send the ending event to post-end subscribers.

The implementations of Next, Completed and Error end up being very similar and simple.

Overall, compared to the code from part 4, the implementation is a somewhat longer, but certainly better factored with each method doing one thing. The most complex (PushEvents) does have to loop over both pending events and subscribers, but that is all it does.

The Code

using System;
using System.Collections.Generic;
using System.Linq;

public class Observable4<T> : IObservable<T> {
    private Dictionary<int, IObserver<T>> subscribers = new Dictionary<int, IObserver<T>>();
    private int nextSubscriber = 0;
    private Queue<Notification<T> pendingEvents = new Queue<Notification<T>>();
    private bool pushingEvent = false;  // If true, only queue the event.
    private Notification<T> endOfEvents;   // Not null => have had ended, and this is the Completed or Error that ended it.

    public IDisposable Subscribe(IObserver<T> observer) {
        if (endOfEvents != null) {
            // Already have end of sequence, so just tell this new subscriber
            // and return no-op.
            SendNotification(observer, endOfEvents);
            return System.Disposables.Disposable.Empty;
        } else {
            subscribers.Add(nextSubscriber, observer);
            return new Observable4Disposer(this, nextSubscriber++);
        }
    }

    private void Unsubscribe(int index) {
        subscribers.Remove(index);
    }

    public void Next(T value) {
        if (endOfEvents != null) { return; }
        pendingEvents.Enqueue(new Notification<T>.OnNext(value));
        PushEvents();
    }

    public void Completed() {
        if (endOfEvents != null) { return; }
        endOfEvents = new Notification<T>.OnCompleted();
        pendingEvents.Enqueue(endOfEvents);
        PushEvents();
    }

    public void Error(Exception exn) {
        if (endOfEvents != null) { return; }
        endOfEvents = new Notification<T>.OnError(exn);
        pendingEvents.Enqueue(endOfEvents);
        PushEvents();
    }

    private void PushEvents() {
        if (!pushingEvent) {
            try {
                pushingEvent = true;
                while (pendingEvents.Count > 0) {
                    // To allow subscribers to unsubscribe while calling them, don\'t
                    // directly iterate over the collection, but use a copy.
                    // Take this each time, in case some event triggers a change
                    var subs = subscribers.Values.ToArray();
                    var n = pendingEvents.Dequeue();
                    foreach (var s in subs) {
                        SendNotification(s, n);
                    }
                }
            } finally {
                pushingEvent = false;
            }
        }
    }

    private static void SendNotification(IObserver<T> obs, Notification<T> n) {
        switch (n.Kind) {
        case NotificationKind.OnNext:
            obs.OnNext(n.Current);
            break;1
        case NotificationKind.OnCompleted:
            obs.OnCompleted();
            break;
        case NotificationKind.OnError:
            obs.OnError(((Notification<T>.OnError)(n)).Exception);
            break;
        }
    }

    private class Observable4Disposer : IDisposable {
        private Observable4<T> target;
        private int index;
        internal Observable4Disposer(Observable4<T> target, int index) {
            this.target = target;
            this.index = index;
        }

        public void Dispose() {
            target.Unsubscribe(index);
        }
    }
}

Further Steps

The toughest part to get right is still left: concurrency. This will be difficult as much to test as to implement, tests using workers are never simple and just being confident that they are doing things concurrently is hard (after all a simple Interlocked.Increment might be slow enough to effectively eliminate any concurrency due to forcing CPU cache synchronisation even with multiple idle cores).