How to make a ReactiveX
I was tipped off to Reactive Frameworks from @Robert Harvey on a long-winded StackExchange question of mine for what "reactive databases" were called. I was infatuated with Hack's elegant async API and let me tell you it absolutely expanded my world in all the right ways.
So, I had in one hand ReactiveX and its bundle of super powerful, super expressive operators, but in a bit of a... backwards wrapper (you'll see what I mean). In the other, Hack had super expressive await-async and asynchronous generators, but generators which couldn't be chopped up and reassembled with the dexterity of ReactiveX. You can see what Hack async generators can and can't do natively at their docs and in terms of ReactiveX operators a bit further below.
"Await-async" is a critical detail here. RxPHP exists, but in my opinion it's extra showing of how clumsy the hallmark callback style of ReactiveX can be when the language suggests anonymous functions be used sparingly. But then ask, what is it about streaming operators that needs me to consume values through a subscribe
callback? Nothing! Consider an iterable dual — let's call it Stream
— of Observable
. Look! Streaming code can finally just look like... code.
// "InteractiveX" server
Router R = new Router("./");
Logger logger = new Logger("./log.log");
Stream<Request> stream = server(8080).merge(server(80));
while(true) {
try {
foreach(stream await as request) {
await request.send(R.route(request.uri));
// ...
}
}
catch(ServerException e) {
await logger.log(e.message);
switch(e.severity) {
case E_CRIT: throw e;
case E_WARN: break;
// ...
}
}
}
Compare with:
// ReactiveX server
Router R = new Router("./");
Logger logger = new Logger("./log.log");
Observable<Request> stream = server(8080).merge(server(80));
stream.materialize().flatMap(async (Notification<Request> notification) => {
// consume in flatMap here because of the async
switch(notification.Kind) {
case NotificationKind.OnNext:
await request.send(R.route(request.uri));
// ...
break;
case NotificationKind.OnError:
await logger.log(e.message);
switch(e.severity) {
case E_CRIT: throw e;
case E_WARN: break;
// ...
}
break;
case NotificationKind.OnComplete:
// ...
break;
});
// if `materialize` isn't your style, you'll probably be left using `retry` and friends.
I spent two years of my spare programming time to bring ReactiveX to await-async frameworks in Hack and PHP and released these frameworks within a few weeks of each other: HHReactor for Hack first, then AmpReactor for PHP next (actually, only after being tipped off to an await-async framework in PHP by @assertchris on a Reddit post promoting HHReactor).
During development, I noticed that there was a common core of behavior that was very easy to separate from the operator implementations. This core contributed buffer
-, flatmap
- and merge
-like mechanics, and it turns out the rest of the operators just mix these behaviors with various synchronous steps (mostly state machine transitions). By virtue of... needing to implement them, I also distilled the handful of features that this core needed from the language itself:
- Async Generators:
yield
ing inside an async function makes an asynchronous generator, which is iterable using a looping construct to iterate through them. What I callStream
s throughout this document are all async generators.
Condition
s: Condition variables that can beawait
ed and notified from a separate contextschedule
function: A way to schedule Tasks to run in parallel, preferably arbitrarily. That being said, Hack's scheduler is fairly restrictive in that tasks can only be scheduled if something isawait
ing on it, so arbitrary task scheduling is mostly just a matter of convenience.
Outside of this core behavior set, at least with await-async, most of the operators are actually just convenience functions!
Sorting operators into neat piles
Let's build up from the foundation of a Stream<T>
, which is just an asynchronous generator that produces T
-typed emissions.
As promised, this core set of behaviors divides ReactiveX operators into just a handful of categories that are worth distinguishing from an implementation standpoint. Below are the descriptions of these categories, and a table dividing the operators is just below it.
- Trivial operators (usually creation, e.g. from) that are already supported by async generators.
map
-like operators fall in here too, these operators consisting ofmap
obviously, but alsoscan
,concat
and actually almost 50% of the others. merge
-like, which abstractly-speaking reduce space dimensions to time. This includesflatmap
-like actually, to be explained below. Type signatures similar tostatic Iterable<Stream<T>> -> Stream<T>
- Buffer-like, which groups emissions from one stream by the timings of another.
Stream<any> -> Stream<T>
- GroupBy-like, which splits streams based on a key function.
(T -> Tk) -> Stream<Stream<T>>
- Language-specific/Custom which are language-specific or offer a different baseline behavior of
Stream
, likeUsing
orDefer
respectively.
From just the type signatures, it's possible to glean the way that the required language features are used to implement the core behaviors.
- Trivial operators just need asynchronous generators by definition
merge
-like operators need to consumeStream
s in parallel (schedule
), push emissions into a common queue, and notify consumers to resume consumption (Condition
)buffer
-like operators need to consume oneStream
for timing and anotherStream
for values simultaneously (schedule
), but the loop around the timing stream can yield right back to the consumer, so no need forCondition
s.groupby
makes many streams from one, so while it doesn't need toschedule
, it might need to notify many consumers that a new value is ready (Condition
).- The remaining operators, which are language-specific or need some custom accessories, are somewhat more heterogenous in their needs. These are elaborated later.
Below is a table of this classification applied to common operators.
Abstract implementation
Below, I've laid out why ReactiveX is really a composition of the five categories above in five sections of compact pseudocode implementations. Let's go!
Trivial operators
Language features required: async generators
These come straight from asynchronous generators. Creation operators from synchronous data structures just use synchronous loops and yield
under an async signature. Map-like are generally state machines (e.g. scan, reduce) that are synchronous aside from a simple async loop or two.
async Stream<T> from<T>(Iterable<T> A) {
foreach(A as v) {
yield v;
}
};
async Stream<T> scan<T, Tv>(AsyncIterable<T> S, ((Tv, T) -> Tv) f, Tv init) {
foreach(S await as v) {
init = f(init, v);
yield init;
}
}
Buffer-like
Requires: async generators, schedule
Buffer and its friends are also state machines, but they rely on timing from another stream, so both must be running simultaneously.
async Stream<T> buffer<T>(Stream<T> S, Stream<any> timing) {
List buffer = new List();
schedule(Task.run(async () => {
foreach(S await as v)
buffer.push(v);
}));
foreach(timing await as _) {
yield buffer;
buffer = new List();
}
}
Merge-like
Requires: async generators, schedule
, Conditions
To support merging, the gist is to iterate all iterators in separate contexts, push values to a common queue, and signal the callers to consume. Condition variable to the rescue!
async Stream<T> merge<T>(Iterable<Stream<T>> streams) {
Pointer<Condition> can_consume = new Pointer(new Condition());
Queue buffer = new Queue();
Iterable<Task<void>> coroutines = streams.map(async (Stream<T> S) => {
foreach(await S as v) {
buffer.enqueue(v);
can_consume.deref().broadcast();
}
});
foreach(coroutines as coroutine) {
schedule(coroutine);
}
while(!is_all_finished(coroutines)) {
await can_consume;
can_consume.set(new Condition());
while(v = buffer.dequeue()) {
yield v;
}
}
}
Case Study: flatmap
Flatmap is actually a merge
of sorts, but the list of iterables grows asynchronously. With arbitrary schedule
ing, this is a trivial extension to merge:
async Stream<T> flatmap<T>(Stream<Stream<T>> Ss) {
Pointer<Condition> can_consume = new Pointer(new Condition());
Queue buffer = new Queue();
List<Task<void>> coroutines = new List();
schedule(async () => {
foreach(streams await as S) {
Task<void> coroutine = Task.run(async () => {
foreach(await S as v) {
buffer.enqueue(v);
can_consume.deref().broadcast();
}
});
coroutines.push(coroutine);
schedule(coroutine);
}
});
while(!is_all_finished(coroutines)) {
await can_consume;
can_consume.set(new Condition());
while(v = buffer.dequeue()) {
yield v;
}
}
}
Case study: debounce
Debounce & co. can be implemented by flatmap
, by spawning streams that just delay each element and seeing if they've expired at the end. Check it out:
async Stream<T> debounce<T>(Stream<T> S, int delay) {
int idx = 0;
return flatmap(S, async (v) => {
idx++;
int stashed_idx = idx;
await sleep(delay);
if(stashed_idx == idx)
yield v;
});
}
GroupBy
Requires: conditions
GroupBy is unusual among others in that within the operator, it generates Streams. There is a long but efficient implementation (linear in number of elements), and a short but inefficient implementation (grows with number of elements times number of distinct elements). The short is sketched below, the long explained afterwards.
// short, inefficient groupBy
async Stream<T> groupBy<T, Tk>(Stream<T> stream, (T -> Tk) key_fn) {
Set<Tk> keys = new Set();
foreach(stream await as v) {
Tk key = key_fn(v);
if(!keys.has(key)) {
keys.add(key);
yield stream.clone().map(key_fn).filter(key);
}
}
}
Custom/Language-specific
Language-specific operators consist of operators that specify schedulers for iterators to run on, plus To
. The former consists of ObserveOn
, SubscribeOn
, Serialize
(guarantees of which require some help from the scheduler) and Using
. This lies behind the abstraction outlined earlier, so it was actually very useful to mentally separate these while implementing. Hack, for example, has exactly one scheduler, so none of these were implemented in HHReactor.
What remains — Defer
and And/Then/When
— are not much beyond Stream
, but just require some extra objects. Defer
is implemented in HHReactor
just with a proxy consumer that waits for a next
call before attaching onto the Stream
it wraps; nothing fancy. Similarly, And/Then/When
just needs an extra supporting class, this time Plan
.
What about sharing Stream
s?
When there are multiple consumers to a single stream, ReactiveX defines three sharing policies:
- Replay: All consumers get all items regardless of when they joined, requiring all items to be stored forever.
- Publish: All consumers get all items after they join.
- Share: The items are partitioned among consumers so no consumer gets the same item.
But wait, to protect ourselves against the unpredictable scheduler, we already have a queue to buffer values before consumers finally get control to start consuming them. All of these policies sound like they describe how consumers affect other consumers' access to items of this buffer!
First, note that the (Queue, Condition)
pair that's been recurring a bunch is essentially the "get-able" interface that faces the consumer. Let's define a smart way to clone this pair so consumers can get new values from the same source but not affect each other's access to the values they've consumed.
- Give the new clone a half-cloned
Queue
, where the current index becomes independent from the parent, but the tail of the queue is shared so when new elements are enqueued, all consumers can access them. - Give the new clone a reference to the original
Condition
, so when new elements are enqueued, all consumers try to consume.
These three behaviors then only differ on what clones consumers are holding and when they were produced:
Replay
: never advancing the original iterator, and giving new clients clones of it, where the current index will always start at the beginning.Publish
: cloning any generator that's actively-consumed, so the current index will start near the tail.Share
: Consume the same instance as another consumer, i.e. without cloning. Then the queue is exactly identical between the two consumers and they fight for values. Ultimately, the contention comes from the fairness and optimism of notifications on Condition.
- GitHub
- ReactiveX