Creating a Reusable 'ObservableSource' for the Reactive Extensions (Part 4)
Note reloaded into my rebuilt blog: I know the test code should be a unit test now! This was written over 13 years ago!
Supporting Re-entrant Event Publication: Implementation
Introduction
In Part 3 I looked
at the effect of calling Subject<T>.OnNext
from within
a subscriber to that Subject<T>
. I really do not think
that the out of order events that creates are going to create anything
other than confusion.
In my implementation, continuing on from
Part 1 and
Part 2, I
want to avoid the implicit re-ordering that happens when
Next(value)
is called from within a subscriber.
The Re-Entrant Implementation
In the basic implementation a snapshot of the subscribers is taken (to avoid problems with the collection being modified by re-entrant subscribe and unsubscribe operations), then iterated over:
In the case of a re-entrant Next
call this loop will simply
exist on the stack twice:
With the second call iterating through the list of subscribers before the first moves on to the next.
The solution here is to recognise that an iteration is taking place already
when Next
is called and, rather than just looping to call all
subscribers, buffer up the new value. The original call keeps looping
until all buffered values are sent out. (Once, later in this series,
concurrency is introduced locking is going to be needed here.)
First two new fields are needed. A Queue
to act as buffer, and
a flag to indicate events are currently being pushed to the subscribers:
private Queue<T> pendingEvents = new Queue<T>(); private bool pushingEvent = false;
Then reworking the Next
implementation:
A few points worth noting:
-
By always putting the new event value in the queue there is no special cases needed for the first event
-
The try…finally ensures the
pushingEvent
gets reset, but this could leave un-pushed events. This is a consequence of the RX exception model (propogate back to the called), but with the added twist that a re-entrant call to this will not get the exception, the original caller ofNext
will. -
I have chosen to re-snapshot the subscribers for each event. This will avoid a subscriber that unsubscribes in response to one event getting more, even if already queued up.
Next Steps
This is still missing Exception
and Complete
methods (to call IObserver<T>.Error()
and
.OnCompleted
.
…and concurrency support of course.