Note reloaded into my rebuilt blog: I know the test code should be a unit test now! This was written over 13 years ago!
Continuing on from
Observable1<T> implemented in
Part 1 with
The aim in this part is to allow re-entrant subscribe and unsubscribe operations to not fail.
Observable1<T> a subscriber which tries to unsubscribe itself in its
OnNext implementation will lead to an
Observable1<T>.OnNext. This is because
Unsubscribe both modify the collection of subscribers in
OnNext is iterating over. The .NET Framework collections do not support this.
Supporting Re-entrant Operations
Unsubscribe to work is really very simple: just
take a copy of the set of subscribers before iterating over it. Replacing the implementation of
with the following in
Observable2<T> (full implementation is given
And now code like the following doesn’t fail:
Note this test code wouldn’t pass with an arbitrary observable, where
the requirement to allow asynchronous subscribe could lead to
yet being set by the time the subscriber code is called (the null check will at least
avoid the crash).
While this seems an odd thing to do, consider a subscriber which just wants to react to
one event (or the implementation of the RX operator
Progress Against the Overall Requirements
See the full list of requirements.
#1: Manage subscribers: done. Can subscribe zero or more observers, and freely add or remove them.
#2: OnNext, OnComplete and OnError: OnNext is done (will be called zero or more times). But neither completion or errors are supported, this also means that the source cannot indicate the end of events, every even sequence is currently infinite.
#3 Exceptions from subscribers will fall back to the event producer: this will happen, but will also mean no other subscribers will see the event.
Based on this forum thread, about
Subject<T>, the right approach here in Rx is not completely agreed.
#4: Done: can freely add and remove subscribers.
#5: Re-entrancy: this, part 2, makes a start. But a subscriber calling
Nextis likely to lead to some unexpected sequencing of calls.