Skip to main content

Reagent is theoretically the best of ReactiveX

This extended article discusses the merits of Jake Wharton's Reagent framework for Kotlin which has deep theoretical advantages standing in for ReactiveX. I've stripped away any class definitions to make a lightweight functional version that I'll derive and justify from first principles here, with implementations for Hacklang, Javascript and Kotlin. Jake Wharton has put the project on hiatus, and while I agree that it's hard to fight the massive mindshare of ReactiveX, Reagent is at least a powerful lens to the semantics of streaming and the minimum that we require of our languages to make them reactive. At least I hope to convey that if one feels ReactiveX is wasteful on top of an async-await language, their feeling isn't wrong.

The maximum is in the middle

There's a spectrum in the Reactive world. On one side, there's ReactiveX (abbreviated as Rx). On the other side, there is InteractiveX (abbreviated as Ix). Reactive Streams, Reactor, and Flowable are Rx. Kotlin channels are Ix. Rx.NET gives implementations of both, so we're assured of some equivalence between the two. What's separates them?

The fundamental gap

We'll step backwards from the endpoint to the data sources to build out this difference.

The API between the two often looks like they only differ by the way the endpoint consumes data. Is it a callback like ReactiveX, or an asynchronous iterator like InteractiveX? Behind the endpoint, there's a bunch of operators that are chained together in an effectively identical way.

// Ix
// given `Stream<T> S`, `Stream<T> S2`
foreach(S.map(...).window(S2).flatMap(...) await as v) {
	consume(v);
}

// Rx
S.map(...).window(S2).flatMap(...)
 .subscribe(v => {
 	consume(v);
 });

In the implementation though, they are markedly different because the consumption pattern matters: each operator is an endpoint for the data from the one before it in the chain. That means life as an operator has much to do with who the operator interacts with:

  1. An Ix operator is awoken by the scheduler with a new value, mixes it with ambient state to make something, and hands control back to the scheduler to distribute its creation and wait for the next element.
  2. An Rx operator is awoken by either the scheduler or another operator with a new value, makes something out of it, and pushes it into the next consumer itself.

The one consequence italicized above is the most important performance point, and when control hops all over the place, the Ix implementation loses by upwards of orders of magnitude, according to the author of Reagent for Kotlin Jake Wharton.

However, what it loses in performance Ix gains in expressiveness. This comes in a few forms:

  1. Exceptions: the execution of the handler isn't separated from the call stack that prepared and arrived to it, so exceptions propagate exactly like synchronous code. No need to learn another API to try-catch like in Rx, where exceptions are boxed within the event loop's own call stack and require onError and friends.

  2. Cancelling: in Ix, you don't need to tell a stream you're consuming that to stop, because at every element it asks you — it's your choice to await the next item or not. In most Rx implementations, the stream to implements another interface to tell you it's Disposable so you can send it the signal to stop calling you. This is actually a bit weird.

  3. Completion: when a stream completes in Ix, you come out of the block and start executing the next code. No frills. In Rx, completion is moved into a handler that doesn't return anywhere, so you have to push signals outward into the ambient state for other routines to pick up the work later.

What's fascinating is that this distinction is fundamental. Rx touts a "functional style" of programming with streams, and this is absolutely precise. The async is completely confined to a functional programming language written in operators — a language built on top of and separate from the host language. It's a complete progamming language because Church-Turing has something to say about that too, as map-reduce (scan in this case) covers all programs over flat streams, and flatMap + window lets you handle the higher dimensions including time. This creates the divide between the operator pipeline and surrounding code: Rx has no choice but to box error handling, cancellation and completion into its own constructs.

Ix is mostly written in the host language, but while you might expect me to say it's the polar opposite of Rx, it actually isn't really. This is because in our own code, we don't create a new asynchronous iterator every time we do something to a value. It's obviously overkill in the simplest part of the spectrum: consider a filter-map with Ix vs. what a sane person would normally write:

Stream<int> S = random_int_stream();

/** Ix **/
Stream<List<int>> factored = S.filter(is_composite).map(get_factors);
foreach(factored await as factors) {
	println(factors[2]);
}
// ↓ expands into
foreach(async {
        	foreach(async {
        	        	foreach(S await as v) {
        	        		if(is_composite(v))
        	        			yield v;
        	        	}
        	        } await as v) {
        		yield get_factors(v);
        	}
        } await as factors) {
	println(factors[2]);
}

/** Hand-written **/
foreach(S await as v) {
	if(is_composite(v))
		println(get_factors(v)[2]);
}

This disconnect is also very fundamental. Thinking about what each expects of its consumer: the former always expects the code that its returning to to be async (Stream<T>) -> Task<*>; but the latter expects the "inner" code within the block to act on individual values: async (T) -> Task<*> (we'll see this * will eventually be used to replace break semantics). This is the function type if we really wanted to encapsulate consumers that we would write. Let's call this type Consumer then.

Of course, while illustrative, this example is silly; verging on hypocritical even since I call these operators "trivial" in "How to Implement a ReactiveX". What happens when we take this further? What if we considered the encapsulation of the whole range behaviors of Rx?

Encapsulating both an operation and its receiver

I'll be using the implementations from "How to Implement a ReactiveX". I've shown there that the unique implementation challenges of all the operators can be lumped into just a handful of categories, and the final verdict is that they admit very concise implementations with just three tools:

  1. State machines,
  2. Condition variables, and
  3. Arbitrary scheduling.

This is a crucial fact, because we can give this combination a simple description: a bunch of individual scopes chugging along concurrently (via arbitrary scheduling) and communicating data through shared state and control through condition variables. Here's the problem statement then:

Problem statement: we want to encapsulate all the repetitive interactions while allowing the handlers maintain all the access to the sharing in the middle for anything application-specific.

I'd also like to point out this insight which will be handy: from the straightforward implementations in "How to Implement...", it's easy to see Ix doesn't need operators like Rx does to be complete because it's embedded in the host language, so the operators are then just convenience functions.

What do these convenience functions take? We can actually get this answer directly from the structure of a foreach-await block:

In words, we want our operator to:

  1. Take a stream from another operator
  2. Be parameterized by values from the ambient scope
  3. Have a Task that encapsulates the work of consumption to pass exceptions around and permit some completion logic.
  4. Take a consumer that consumes values one at a time (and might define its state outside of the operator to make a custom state machine)
  5. Let the consumer decide at every element whether it wants to continue consuming
  6. Be transparent to exceptions passing from the consumer to the outer scope

With some squinting, we can see something kind of awesome: these are all type contracts. We already have the type Consumer<T> := T -> Task<*>. Those requirements fill in the rest.

  1. We know that Operators take Consumers at the output to accept data. To be chainable, an operator must also consume from other operators by passing a Consumer<T> to them. So, does an operator look like Consumer<T> -> Consumer<T>? There are a few problems with this type:

    • Operators that make substreams can't take this type. False: this is just one inversion. Of course flatmap: Supplier<T> -> Supplier<Supplier<T>> to flatmap: Consumer<T> -> Consumer<Consumer<T>> makes no sense both variance-wise and semantically, but flatmap: Consumer<Supplier<T>> -> Consumer<T> does.
    • The return type is a Consumer, so a partially-constructed pipeline will be passed around as a consumer looking for a source. This goes against familiar practices, where Observables and Iterables as the primitives of Rx and Ix are data sources, not data sinks. (but only customary)
    • Generally, we have a greater variety of consumers than producers so we want to parameterize them last. [Why?]

This means that the return from an operator definition is actually a Consumer<T> -> †; we'll find later in this list. Call this Supplier<T>. 2. The operator-specific parameters are a potentially new set of types {T1, ..., Tn} and arguments {T1 x1, ..., Tn xn} that define its behavior. 3. in the Supplier definition is still free, and we want the operator to eventually return some kind of lifetime Task of the consumption. What kind of Task is it? It turns out it doesn't matter, because completion is a pure time signal. onComplete in Rx isn't called with any arguments, nor does foreach-await give anything to the continuation. The definition of Supplier is almost complete, but as we'll see in 5. this is still lacking. So far we have Supplier := Consumer<T> -> ‡ -> Task<Unit> where has yet to be found. 4. By the suggestive language, we'll "take" a consumer as an argument to the operator, of Consumer<T> type of course. 5. The consumer is a function, so it can return something to signal whether it wants the next element. In Ix, the consumer was communicating a desire for the next element to the scheduler through an await or lack thereof. It's a bit strange then that consumers in Rx return void/Unit.

We'll eschew this practice and give the consumer a return type. Let's have the `Consumer` return `Task<Unit|CANCEL>`, where `CANCEL` is any symbol we want besides `Unit`. This is returned into the `Supplier`, which implements the logic to ensure the consumer is never called after that. The `Supplier` could enforce this by permanently setting some internal state, or it could cancel its own suppliers.

- In the latter case, if this chain of cancellation propagates all the way to base suppliers, all the corresponding event sources are finished because the base suppliers will `break` out of their loops. This is an important case because the lifetime of the base supplier ends right at that point. Then, all consumers downstream of these event sources that are blocking on `Supplier` lifetimes _could complete sooner_. This leads to the "hard" completion in the above figure. More specifically we're interested in the completion handler: a block of code that follows the `await` on a `Supplier` lifetime.
- In either case, if none of the cancellations propagate to the base suppliers, the enforcement of the "never called again" rule is totally artificial: it only manifests as changes to the states of some `Supplier`s. This change of state could eventually give rise to a hard cancellation, but only in conjunction with other changes to state. By itself then, these cancellations do not affect the lifetime of `Suppliers`. This leads to the "soft" completion in the above figure. The completion handler in this case is a function that must be passed into the operator.

Most basic operators would prefer to promote hard cancellation if they can, because it's efficient and surefire, so they will propagate the cancellation signal to their own suppliers. However, when suppliers have many consumers, sometimes one consumer only wants to stop flow to itself without halting the rest. I'll stress that enforcing this with the supplier is an _not necessary_ for the completeness of the framework. The consumer could instead be called anyways and use its own state to determine if it should do work or not. However, the semantics of cancellation are strong, insofar as the consumer should expect to never do work again upon returning the `CANCEL` signal. Since the suppliers are usually standardized into operators while the consumers are specified by the framework users, this is a useful thing to implement on the framework side.

Therefore, the `Supplier` should accept an `onComplete` callback: a function that is called exactly once when the consumer returns `CANCEL`. This replaces the equivalent, clumsy inline code of calling the completion handler in every clause in the consumer that results in a cancellation.

This completes the `Supplier` type where the free parameter `‡` is our onComplete handler: `Consumer -> onComplete -> Task<Unit>` where `onComplete := () -> Task<Unit>`.

One issue arises from this definition: we can pass around the partial application after passing in a `Consumer`, so there is both an `onComplete` argument and a `Task` return that we can wait for and execute completion logic on afterward. Which should the receiver use? I'll make the case that the `onComplete` is semantically stronger since `onComplete` is driven by the `Consumer` that's passed in, which the receiver has much more understanding of than the opaque `Task` that's derived from base suppliers. We can also easily enforce that the `onComplete` handler be triggered by the end of the `Task` to make it more practical still. Therefore, unless the the receiver has complete visibility of the whole pipeline and deep understanding of the `Task`, it should prefer the `onComplete` handler.
	
| In Hack, I'm especially lucky because the return from a void function is actually `null`, so I can just impose the semantics that `null` from a consumer means to supply the next value, and anything else signals a cancellation.
  1. To keep exceptions in this stack, the operator should ideally be executed inline instead of being scheduled. This just comes directly from following the imperative structure.

Therefore, we arrive at the type for an Operator and the supporting types:

Operator<in Tu, out Tv, T1, ..., Tn> := (T1 x1, ..., Tn xn) -> Supplier<Tu> -> Supplier<Tv>
Supplier<out T> := Consumer<T> -> onComplete -> Task<Unit>
onComplete := () -> Task<Unit>
Consumer<in T> := T -> Task<Unit|CANCEL>
	for any CANCEL != Unit

General structure

The above types describe the sinks of the data pipeline (Consumers) and the nodes within the pipeline (Operators). However, while we have a Supplier type, we still need to fill in how we get from an async iterator — the truly fundamental supplier of values — to a Supplier. Since we designed the framework to be directly equivalent to an async iterator, all of these "base suppliers" can be created with the same, unopininated, dead-simple conversion function:

Supplier<T> stream_to_base_supplier(Stream S) {
	return downstream => async onComplete => {
		foreach(S await as v) {
			if(await downstream(v) == CANCEL) {
				await onComplete();
				break;
			}
		}
	};
}

This function anchors the whole framework. Note that:

These base suppliers are an interesting "phase boundary" if you will, where exceptions stop propagating through the pipeline and start propagating through the stack, where control switches hands from the framework to the language runtime, and where the onComplete call coincides with the end of the lifetime Task. All of these will be explained in more depth starting... right away.

Currying is everything

Remember that in both Rx and Ix, the first-class stream object is an object — Observable in the former and whatever the async iterator type is named in the latter. In Px, the first-class stream object is a partial function. This is why the Operator definition is curried: it's first parameterized, then hooked into a source, and then one or many consumers are fed into it. Between any function applications, it can be passed around to represent any of its three levels of behavior: as an operator with certain dynamics, as a stream that will spit out certain values, or as a consumption in progress that we can block on until it ends.

The reusability that the partial application enables goes beyond convenience. Remembering that operators are not strictly necessary for completeness, it's the fundamental reasons why we want to have operators at all:

  1. To let the same chunk of code consume many streams,
  2. To let a source under the same transformations give values to many consumers, and
  3. To let many consumers define completion logic.

The first point is notable for making merge trivial:

/** MERGE **/
// given `Supplier<T>[] suppliers`;
// given `all := Task<T>[] -> Task<T[]>`;
Supplier<T> merged = (Consumer<T> downstream) =>
	all(map(suppliers, (Supplier<T> supplier) => supplier(downstream)));
// e.g. await merged(println);

The second point brings up some interesting things to think about, especially because it shines light on how we can implement scheduling policies.

Exactly how much scheduling do you want?

I went into detail about one major disadvantage of operator implementation in Rx vs. Ix: you have to implement your own scheduler in Rx. In short, this is inevitable when an Observable holds onto all its subscribers and has to decide who gets what. In Ix, all the work is done by a top-level scheduler, which is much far nicer on the developer, but comes with the risk of degrading the performance.

There is no silver bullet, but the advantage with Px is that it can be tuned to either one. Let's consider the two extreme cases:

  1. Preface: in my design, the async iterators at the bottom are turned inside out by one function each, which waits on a Consumer then iterates them and throws the values into the Consumer one by one. This turns them into Suppliers. From these base suppliers, flow networks of operators are drawn towards the terminal consumers.

  2. Ix-like: Here, the operators are pure in consumers, so they never throw the consumers into a hidden state. This way, when the network is finally fully parameterized, consumers pulling from the same leaf will fork the entire branch all the way to the base suppliers, so the decision for what values are sent to which consumers is decided entirely by the scheduler.

  3. Rx-like: All the paths through the flow network have at least one operator that records all consumers and schedules them itself. Notable examples are sharing operators: publish and replay do this. Here's a stripped version of publish for example, that omits the logic for cancellation for brevity:

    Supplier<T> publish<T>(Supplier<T> upstream) { // := Operator<T, T>
    	List<Consumer<T>> downstreams = new List();
    	Task<Unit> lifetime = upstream((v) =>
    		all(map(downstreams, (downstream) => downstream(v)))
    	);
    	return downstream => {
    		downstreams.push(downstream);
    		return lifetime;
    	}
    }
    // The full version can be seen at https//github.com/acrylic-origami/PxHack.
    

The difference between Ix-like and Rx-like can be traced to the presence of internal state in the operators. If no operators have any internal state, then each time a supplier is filled out with a new set of consumer and onComplete, it binds new sets of consumers and onCompletes to its suppliers, all the way to base suppliers, which run a fresh foreach-await loop. If we visualize it as a flow network, we would see all the branches that lead to this supplier are cloned and grafted back onto the same base. In Rx-like, scheduling operators typically expose a Supplier that take the Consumer and shove them into the internal state, returning a common lifetime of its own internal Consumer, akin to the publish example above. Here, the branches that lead to the terminal supplier are cloned up to a scheduling operator, then grafted back onto that operator. By stopping short of the base suppliers, the cloned branches are under the command of an artificial scheduler instead.

The cloning of branches in the flow network is a powerful thing that generalizes well beyond scheduling. Something synonymous to the Ix-like and Rx-like distinction will happen in the next section which examines onComplete.

The in-between comes down to what coverage of the flow network that these scheduling operators take up. Addressing areas of the flow network that require special scheduling rules is dead simple: just slot in pure scheduling operators.

onComplete weirdness: a groupBy case study

The last point from our "fundamental reasons to have operators" was "letting many consumers define completion logic". This was the basis of having an onComplete handler in the Supplier type (see the previous section). We're going to justify this choice with a particularly tricky operator: groupBy.

First, we'll make a theoretical observation. For any supplier that has no internal state, note that the following cases are always exactly equivalent:

// given `Supplier S`, and a nop for onComplete `() -> Task<Unit> nop`:
// CASE 1
await new List(
	S(v => f1(v))(nop),
	S(v => f2(v))(nop)
);

// ---
// CASE 2
await S(v => await new List(f1(v), f2(v)))(nop);

This is the branch cloning that we saw in the last section. Sometimes we can even get functional equivalence for suppliers that do have internal state. groupBy suppliers are one such example. For illustrative purposes, suppose we groupBy on a base supplier, so cancellations translate directly into breaking out of core loops and completing the lifetime Task. There are two distinct ways to implement groupBy (as I've described before). In both cases, every new key emits a Supplier to the outer Consumer. What this Supplier does is what's different.

  1. The emitted Supplier waits to be bound with an inner Consumer, then maps this inner Consumer to that key in a dictionary hanging out in the internal state. Every key that's been seen before queries the dictionary and calls the inner consumer with the latest value.
  2. The emitted Supplier is an ad hoc operator that's resubscribes to the parent Supplier, and checks every value to see if the key matches the one that created it.

Without a doubt, the latter is much less efficient. It boils down to the difference between a dictionary lookup in the former in O(1) vs. a linear search in the latter over the number of unique keys: O(k). However, in the latter, the consumer of each key does have its own Supplier — it can force the supplier to cancel its parent, and opens up the possibility of hard completion! See it in action in this inlined groupBy:

// given `Stream<T> stream`,
// given key function `T -> Tk keyfn`
// given list of unique keys `List<Tk> keys`,
// given dictionary of consumers `Map<Tk, Consumer> C`
List<Task<void>> lifetimes = keys.map(async (k, idx) => {
	foreach(stream await as v) {
		if(keyfn(v) == k) {
			if(await C[k](v) == CANCEL) {
				break;
			}
		}
	}
});
await lifetimes.map(async lifetime => {
	await lifetime;
	// onComplete logic here
	// note we've used the lifetime Task to run completion logic
});

One implementation can be efficient, the other can use the lifetime Task to signal completion. Nothing can do both nicely. Where does this gap come from then?

Unlike other operators, groupBy discriminates between the consumers it gives values to. Contrast this with Task with multiple consumers: it will distribute its eventual value to all consumers indiscriminately. Because of this indiscriminate signalling, all consumers are awoken on every value from the asynchronous iterator — itself a sequence of Tasks. If we want to control the duration of each lifetime Task through asynchronous functions alone (in contrast to using condition variables), we have no choice but to pay the linear penalty. The general conclusion is that if we want to differentiate the lifetime of the Supplier for many consumers individually, we need to connect each to distinct base async iterators.

Propogating cancellation, and why we don't need condition variables

Our initial model included condition variables to complement state in passing control around. However, in the context of the definition of Operator, what could condition variables really be used for?

  1. In Ix, they used to be used for alerting a consumer that they can consume, as a generalized form of yield that could be called from anywhere.
  2. All consumers cancel on a given branch of the operator flow network, and we want this to translate into the cancellation of the iterator in the base supplier.
  3. Our consumer isn't getting Awaitables for the next item like it would in Ix, and has no onComplete handler like in Rx. We might worry that if the producing stream ever needs to cut one offshooting consumer's stream short before the bulk stream ends, it might need to use a Condition to act as a pure-time Task that is decoupled from the timing of elements in the bulk stream.

The first is solved by design via inversion of control, since consumers are Rx-like in the sense that they're functions that can be called anywhere and aren't awaiting the source.

The second is also solved by design, and reveals something interesting in the process. Note that the only Awaitables in the ecosystem are given by the completion of Supplierss. Value signals that propagate through the network flow can always be traced to a single async iterator — in other words, the top of any stack leading to a Consumer must be an async iterator. This makes a lot of sense, since the value being consumed has to come from somewhere. Note also that the stack must be entirely filled with Consumers, so everybody can return a cancellation status Unit|CANCEL. Crucially, if the operators are implemented correctly to propagate cancellation, then it will reach inside the iterator in the base supplier, i.e. up to the if(await downstream(v) == CANCEL) in our stream_to_base_supplier().

That's where the propagation ends. It receives the cancel signal directly from the consumer (as filtered by operators which might have to AND together the cancellations of multiple consumers), suggesting that this direction of control is correct on a fundamental level.

The third is also intriguing: there's a whole set of operators where we have to end child streams with signals unrelated to the bulk stream. These are the buffer-like operators. In these operators, child Suppliers from one stream are gated by signals from another, so to get this information across the function scope gap of the internal consumers, we would imagine that we need to use conditions. However, I argue this is a hasty proposition altogether. The alternative: don't make them into operators at all.

Let's take window as the representative operator of the group, since it's more interesting than buffer which we could easily see implemented as a state machine. I'd like to start from this skeleton of some window-like behavior:

// given `Supplier<any> gating` and `Supplier <T> upstream`;
// given some shared state of some type `S state` and 
//   state machines `f, g := S -> S`
launch(gating(() => { state = f(state); }));
launch(upstream((v) => { state = g(state); h(v); }));

This might look like a weird window, but it's actually completely general. Let's examine what we can do with the regular window operator:

  1. Treat each window distinctly, i.e. denote each with an integer
  2. Do something at the end of each window

Let's do the former by using a single state variable to denote which window we're on, which is incremented by the gating consumer. Window transitions are just one more state variable stashing the window the upstream consumer saw last time it checked in.

The completion of a window is given exactly by an emission from gating, so we can just perform this logic in that consumer. See test_limit in PxHack for an example of a window-like operator.

Out of the theory, into the jury

I've put all of these concepts together into a Hack library (which believe it or not all take up less space on disk right now than this article!) If you read code faster than prose or want to give it a try, here's the repo for the Hack implementation, the JS implementation and the Kotlin implementation.

GitHub

Hack implementation