Skip to main content

How to make a ReactiveX

I was tipped off to Reactive Frameworks from @Robert Harvey on a long-winded StackExchange question of mine for what "reactive databases" were called. I was infatuated with Hack's elegant async API and let me tell you it absolutely expanded my world in all the right ways.

So, I had in one hand ReactiveX and its bundle of super powerful, super expressive operators, but in a bit of a... backwards wrapper (you'll see what I mean). In the other, Hack had super expressive await-async and asynchronous generators, but generators which couldn't be chopped up and reassembled with the dexterity of ReactiveX. You can see what Hack async generators can and can't do natively at their docs and in terms of ReactiveX operators a bit further below.

"Await-async" is a critical detail here. RxPHP exists, but in my opinion it's extra showing of how clumsy the hallmark callback style of ReactiveX can be when the language suggests anonymous functions be used sparingly. But then ask, what is it about streaming operators that needs me to consume values through a subscribe callback? Nothing! Consider an iterable dual — let's call it Stream — of Observable. Look! Streaming code can finally just look like... code.

// "InteractiveX" server
Router R = new Router("./");
Logger logger = new Logger("./log.log");
Stream<Request> stream = server(8080).merge(server(80));
while(true) {
	try {
		foreach(stream await as request) {
			await request.send(R.route(request.uri));
			// ...
		}
	}
	catch(ServerException e) {
		await logger.log(e.message);
		switch(e.severity) {
			case E_CRIT: throw e;
			case E_WARN: break;
			// ...
		}
	}
}

Compare with:

// ReactiveX server
Router R = new Router("./");
Logger logger = new Logger("./log.log");
Observable<Request> stream = server(8080).merge(server(80));
stream.materialize().flatMap(async (Notification<Request> notification) => {
	// consume in flatMap here because of the async
	switch(notification.Kind) {
		case NotificationKind.OnNext:
			await request.send(R.route(request.uri));
			// ...
			break;
		case NotificationKind.OnError:
			await logger.log(e.message);
			switch(e.severity) {
				case E_CRIT: throw e;
				case E_WARN: break;
				// ...
			}
			break;
		case NotificationKind.OnComplete:
			// ...
			break;
});
// if `materialize` isn't your style, you'll probably be left using `retry` and friends.

I spent two years of my spare programming time to bring ReactiveX to await-async frameworks in Hack and PHP and released these frameworks within a few weeks of each other: HHReactor for Hack first, then AmpReactor for PHP next (actually, only after being tipped off to an await-async framework in PHP by @assertchris on a Reddit post promoting HHReactor).

During development, I noticed that there was a common core of behavior that was very easy to separate from the operator implementations. This core contributed buffer-, flatmap- and merge-like mechanics, and it turns out the rest of the operators just mix these behaviors with various synchronous steps (mostly state machine transitions). By virtue of... needing to implement them, I also distilled the handful of features that this core needed from the language itself:

  1. Async Generators: yielding inside an async function makes an asynchronous generator, which is iterable using a looping construct to iterate through them. What I call Streams throughout this document are all async generators.
  1. Conditions: Condition variables that can be awaited and notified from a separate context
  2. schedule function: A way to schedule Tasks to run in parallel, preferably arbitrarily. That being said, Hack's scheduler is fairly restrictive in that tasks can only be scheduled if something is awaiting on it, so arbitrary task scheduling is mostly just a matter of convenience.

Outside of this core behavior set, at least with await-async, most of the operators are actually just convenience functions!

Sorting operators into neat piles

Let's build up from the foundation of a Stream<T>, which is just an asynchronous generator that produces T-typed emissions.

As promised, this core set of behaviors divides ReactiveX operators into just a handful of categories that are worth distinguishing from an implementation standpoint. Below are the descriptions of these categories, and a table dividing the operators is just below it.  

  1. Trivial operators (usually creation, e.g. from) that are already supported by async generators. map-like operators fall in here too, these operators consisting of map obviously, but also scan, concat and actually almost 50% of the others.
  2. merge-like, which abstractly-speaking reduce space dimensions to time. This includes flatmap-like actually, to be explained below. Type signatures similar to static Iterable<Stream<T>> -> Stream<T>
  1. Buffer-like, which groups emissions from one stream by the timings of another. Stream<any> -> Stream<T>
  2. GroupBy-like, which splits streams based on a key function. (T -> Tk) -> Stream<Stream<T>>
  3. Language-specific/Custom which are language-specific or offer a different baseline behavior of Stream, like Using or Defer respectively.

From just the type signatures, it's possible to glean the way that the required language features are used to implement the core behaviors.

  1. Trivial operators just need asynchronous generators by definition
  2. merge-like operators need to consume Streams in parallel (schedule), push emissions into a common queue, and notify consumers to resume consumption (Condition)
  3. buffer-like operators need to consume one Stream for timing and another Stream for values simultaneously (schedule), but the loop around the timing stream can yield right back to the consumer, so no need for Conditions.
  4. groupby makes many streams from one, so while it doesn't need to schedule, it might need to notify many consumers that a new value is ready (Condition).
  5. The remaining operators, which are language-specific or need some custom accessories, are somewhat more heterogenous in their needs. These are elaborated later.

Below is a table of this classification applied to common operators.

Abstract implementation

Below, I've laid out why ReactiveX is really a composition of the five categories above in five sections of compact pseudocode implementations. Let's go!

Trivial operators

Language features required: async generators

These come straight from asynchronous generators. Creation operators from synchronous data structures just use synchronous loops and yield under an async signature. Map-like are generally state machines (e.g. scan, reduce) that are synchronous aside from a simple async loop or two.

async Stream<T> from<T>(Iterable<T> A) {
	foreach(A as v) {
		yield v;
	}
};

async Stream<T> scan<T, Tv>(AsyncIterable<T> S, ((Tv, T) -> Tv) f, Tv init) {
	foreach(S await as v) {
		init = f(init, v);
		yield init;
	}
}

Buffer-like

Requires: async generators, schedule

Buffer and its friends are also state machines, but they rely on timing from another stream, so both must be running simultaneously.

async Stream<T> buffer<T>(Stream<T> S, Stream<any> timing) {
	List buffer = new List();
	schedule(Task.run(async () => {
		foreach(S await as v)
			buffer.push(v);
	}));
	foreach(timing await as _) {
		yield buffer;
		buffer = new List();
	}
}

Merge-like

Requires: async generators, schedule, Conditions

To support merging, the gist is to iterate all iterators in separate contexts, push values to a common queue, and signal the callers to consume. Condition variable to the rescue!

async Stream<T> merge<T>(Iterable<Stream<T>> streams) {
	Pointer<Condition> can_consume = new Pointer(new Condition());
	Queue buffer = new Queue();
	Iterable<Task<void>> coroutines = streams.map(async (Stream<T> S) => {
		foreach(await S as v) {
			buffer.enqueue(v);
			can_consume.deref().broadcast();
		}
	});
	foreach(coroutines as coroutine) {
		schedule(coroutine);
	}
	while(!is_all_finished(coroutines)) {
		await can_consume;
		
		can_consume.set(new Condition());
		while(v = buffer.dequeue()) {
			yield v;
		}
	}
}

Case Study: flatmap

Flatmap is actually a merge of sorts, but the list of iterables grows asynchronously. With arbitrary scheduleing, this is a trivial extension to merge:

async Stream<T> flatmap<T>(Stream<Stream<T>> Ss) {
	Pointer<Condition> can_consume = new Pointer(new Condition());
	Queue buffer = new Queue();
	List<Task<void>> coroutines = new List();
	schedule(async () => {
		foreach(streams await as S) {
			Task<void> coroutine = Task.run(async () => {
				foreach(await S as v) {
					buffer.enqueue(v);
					can_consume.deref().broadcast();
				}
			});
			coroutines.push(coroutine);
			schedule(coroutine);
		}
	});
		
	while(!is_all_finished(coroutines)) {
		await can_consume;
		
		can_consume.set(new Condition());
		while(v = buffer.dequeue()) {
			yield v;
		}
	}
}

Case study: debounce

Debounce & co. can be implemented by flatmap, by spawning streams that just delay each element and seeing if they've expired at the end. Check it out:

async Stream<T> debounce<T>(Stream<T> S, int delay) {
    int idx = 0;
    return flatmap(S, async (v) => {
        idx++;
        int stashed_idx = idx;
        await sleep(delay);
        if(stashed_idx == idx)
            yield v;
    });
}

GroupBy

Requires: conditions

GroupBy is unusual among others in that within the operator, it generates Streams. There is a long but efficient implementation (linear in number of elements), and a short but inefficient implementation (grows with number of elements times number of distinct elements). The short is sketched below, the long explained afterwards.

// short, inefficient groupBy
async Stream<T> groupBy<T, Tk>(Stream<T> stream, (T -> Tk) key_fn) {
	Set<Tk> keys = new Set();
	foreach(stream await as v) {
		Tk key = key_fn(v);
		if(!keys.has(key)) {
			keys.add(key);
			yield stream.clone().map(key_fn).filter(key);
		}
	}
}

Custom/Language-specific

Language-specific operators consist of operators that specify schedulers for iterators to run on, plus To. The former consists of ObserveOn, SubscribeOn, Serialize (guarantees of which require some help from the scheduler) and Using. This lies behind the abstraction outlined earlier, so it was actually very useful to mentally separate these while implementing. Hack, for example, has exactly one scheduler, so none of these were implemented in HHReactor.

What remains — Defer and And/Then/When — are not much beyond Stream, but just require some extra objects. Defer is implemented in HHReactor just with a proxy consumer that waits for a next call before attaching onto the Stream it wraps; nothing fancy. Similarly, And/Then/When just needs an extra supporting class, this time Plan.

What about sharing Streams?

When there are multiple consumers to a single stream, ReactiveX defines three sharing policies:

  1. Replay: All consumers get all items regardless of when they joined, requiring all items to be stored forever.
  2. Publish: All consumers get all items after they join.
  3. Share: The items are partitioned among consumers so no consumer gets the same item.

But wait, to protect ourselves against the unpredictable scheduler, we already have a queue to buffer values before consumers finally get control to start consuming them. All of these policies sound like they describe how consumers affect other consumers' access to items of this buffer!

First, note that the (Queue, Condition) pair that's been recurring a bunch is essentially the "get-able" interface that faces the consumer. Let's define a smart way to clone this pair so consumers can get new values from the same source but not affect each other's access to the values they've consumed.

  1. Give the new clone a half-cloned Queue, where the current index becomes independent from the parent, but the tail of the queue is shared so when new elements are enqueued, all consumers can access them.
  2. Give the new clone a reference to the original Condition, so when new elements are enqueued, all consumers try to consume.

These three behaviors then only differ on what clones consumers are holding and when they were produced:

GitHub

acrylic-origami/HHReactor

ReactiveX

Operators

GitHub

Reactive Manifesto