Creating a Reusable 'ObservableSource' for the Reactive Extensions (Part 2)
Note reloaded into my rebuilt blog: I know the test code should be a unit test now! This was written over 13 years ago!
Introduction
Continuing on from Observable1<T>
implemented in
Part 1 with
Observable2<T>
here.
The aim in this part is to allow re-entrant subscribe and unsubscribe operations to not fail.
Using Observable1<T>
a subscriber which tries to unsubscribe itself in its
OnNext
implementation will lead to an InvalidOperationException
being
thrown from Observable1<T>.OnNext
. This is because Subscribe
and
Unsubscribe
both modify the collection of subscribers in Observable1<T>
and which OnNext
is iterating over. The .NET Framework collections do not support this.
Supporting Re-entrant Operations
To allow Subscribe
and Unsubscribe
to work is really very simple: just
take a copy of the set of subscribers before iterating over it. Replacing the implementation of
Next
from Observable1<T>
:
with the following in Observable2<T>
(full implementation is given
below):
And now code like the following doesn’t fail:
Note this test code wouldn’t pass with an arbitrary observable, where
the requirement to allow asynchronous subscribe could lead to unsub
not
yet being set by the time the subscriber code is called (the null check will at least
avoid the crash).
While this seems an odd thing to do, consider a subscriber which just wants to react to
one event (or the implementation of the RX operator First
).
Progress Against the Overall Requirements
See the full list of requirements.
-
#1: Manage subscribers: done. Can subscribe zero or more observers, and freely add or remove them.
-
#2: OnNext, OnComplete and OnError: OnNext is done (will be called zero or more times). But neither completion or errors are supported, this also means that the source cannot indicate the end of events, every even sequence is currently infinite.
-
#3 Exceptions from subscribers will fall back to the event producer: this will happen, but will also mean no other subscribers will see the event.
Based on this forum thread, about
Subject<T>
, the right approach here in Rx is not completely agreed. -
#4: Done: can freely add and remove subscribers.
-
#5: Re-entrancy: this, part 2, makes a start. But a subscriber calling
Next
is likely to lead to some unexpected sequencing of calls.