# ProactiveX

## The maximum is in the middle

There's a spectrum in the Reactive world. On one side, there's ReactiveX (abbreviated as Rx). On the other side, there is InteractiveX (abbreviated as Ix). For some implementations to see the difference, we can look at the former's many friends — Reactive Streams, Reactor, Flowable — and the latter's more meager and nascent friendbase — really just Kotlin channels for now. Theoretically though, the difference between the two is concise but deep.

### The fundamental gap

We'll step backwards from the endpoint to the data sources to build out this difference.

The API between the two often looks like they only differ by the way the endpoint consumes data. Is it a callback like ReactiveX, or an asynchronous iterator like InteractiveX? Behind the endpoint, there's a bunch of operators that are chained together in an effectively identical way.

// Ix
// given Stream<T> S, Stream<T> S2
foreach(S.map(...).window(S2).flatMap(...) await as v) {
consume(v);
}

// Rx
S.map(...).window(S2).flatMap(...)
.subscribe(v => {
consume(v);
});


In the implementation though, they are markedly different because the consumption pattern matters: each operator is an endpoint for the data from the one before it in the chain. That means life as an operator has much to do with who the operator interacts with:

1. An Ix operator is awoken by the scheduler with a new value, mixes it with ambient state to make something, and hands control back to the scheduler to distribute its creation and wait for the next element.
2. An Rx operator is awoken by either the scheduler or another operator with a new value, makes something out of it, and pushes it into the next consumer itself.

The one consequence italicized above is the most important performance point, and when control hops all over the place, the Ix implementation loses by upwards of orders of magnitude, according to the author of Reagent for Kotlin Jake Wharton.

However, what it loses in performance Ix gains in expressiveness. This comes in a few forms:

1. Exceptions: the execution of the handler isn't separated from the call stack that prepared and arrived to it so exceptions propagate exactly like synchronous code. No need to learn another API to try-catch like in Rx, where exceptions are boxed within the event loop's own call stack and require onError and friends.
2. Cancelling: in Ix, you don't need to tell a stream you're consuming that to stop, because at every element it asks you — it's your choice to await the next item or not. In most Rx implementations, the stream to implements another interface to tell you it's Disposable so you can send it the signal to stop calling you. This is actually a bit weird.
3. Completion: when a stream completes in Ix, you come out of the block and start executing the next code. No frills. In Rx, completion is moved into a handler that doesn't return anywhere, so you have to push signals outward into the ambient state for other routines to pick up the work later.

What's fascinating is that this distinction is fundamental. Rx touts a "functional style" of programming with streams, and this is absolutely precise. The async is completely confined to a functional programming language written in operators — a language built on top of and separate from the host language. It's obviously complete because you can write chunks of programs with it — Church-Turing has something to say about that too, as map-reduce (scan in this case) covers all programs over flat streams, and flatMap + window lets you handle the higher dimensions including time. This creates the divide between the operator pipeline and surrounding code: Rx has no choice but to box error handling, cancellation and completion into its own constructs.

Ix is mostly written in the host language, but while you might expect me to say it's the polar opposite of Rx, it actually isn't really. This is because in our own code, we don't create a new asynchronous iterator every time we do something to a value. It's obviously overkill in the simplest part of the spectrum: consider a filter-map with Ix vs. what a sane person would normally write:

Stream<int> S = random_int_stream();

/** Ix **/
Stream<List<int>> factored = S.filter(is_composite).map(get_factors);
foreach(factored await as factors) {
println(factors[2]);
}
// ↓ expands into
foreach(async {
foreach(async {
foreach(S await as v) {
if(is_composite(v))
yield v;
}
} await as v) {
yield get_factors(v);
}
} await as factors) {
println(factors[2]);
}

/** Hand-written **/
foreach(S await as v) {
if(is_composite(v))
println(get_factors(v)[2]);
}


This disconnect is also very fundamental. Thinking about what each expects of its consumer: the former always expects the code that its returning to to be async (Stream<T>) -> Task<*>; but the latter expects the "inner" code within the block to act on individual values: async (T) -> Task<*> (we'll see this * will eventually be used to replace break semantics). This is the function type if we really wanted to encapsulate consumers that we would write. Let's call this type Consumer then.

Of course, while illustrative, this example is silly; verging on hypocritical even since I call these operators "trivial" in "How to Implement a ReactiveX". What happens when we take this further? What if we considered the encapsulation of the whole range behaviors of Rx?

### Encapsulating both an operation and its receiver

I'll be using the implementations from "How to Implement a ReactiveX". I've shown there that the unique implementation challenges of all the operators can be lumped into just a handful of categories, and the final verdict is that they admit very concise implementations with just three tools:

1. State machines,
2. Condition variables, and
3. Arbitrary scheduling.

This is a crucial fact, because we can give this combination a simple description: a bunch of individual scopes chugging along concurrently (via arbitrary scheduling) and communicating data through shared state and control through condition variables. Here's the problem statement then:

Problem statement: we want to encapsulate all the repetitive interactions while allowing the handlers maintain all the access to the sharing in the middle for anything application-specific.

I'd also like to point out this insight which will be handy: from the straightforward implementations in "How to Implement...", it's easy to see Ix doesn't need operators like Rx does to be complete because it's embedded in the host language, so the operators are then just convenience functions.

What do these convenience functions take? We can actually get this answer directly from the structure of a foreach-await block:

In words, we want our operator to:

1. Take a stream from another operator
2. Be parameterized by values from the ambient scope
3. Take a consumer that consumes values one at a time (and might define its state outside of the operator to make a custom state machine)
4. Let the consumer decide at every element (i.e. the only instances it has control) whether it wants to continue consuming
5. Be transparent to exceptions passing from the consumer to the outer scope
6. Have a Task that represents the end of consumption so we can do things upon completion.

With some squinting, we can see something kind of awesome: these are all type contracts. We already have the type Consumer<T> := T -> Task<*>. Those requirements fill in the rest.

1. To be chainable and keep the interface consistent, the input of an operator must be a Consumer<T> to the output of the other. This means that the return from an operator definition is actually a Consumer<T> -> †; we'll find † later in this list. Call this Supplier<T>.

2. The operator-specific parameters are a potentially new set of types {T1, ..., Tn} and arguments {T1 x1, ..., Tn xn} that define its behavior.

3. By the suggestive language, we'll "take" a consumer as an argument to the operator, of Consumer<T> type of course.

4. The consumer is a function, so it can return something to signal whether it wants the next element. In Ix, remember that the consumer was giving this signal straight to the scheduler through an await or lack thereof. It's a bit strange then that consumers in Rx return void/Unit.

We'll eschew this practice and give the consumer a return type. Minimally, we want Consumer<T> := T -> Task<Unit|CANCEL> where CANCEL is any symbol we want besides Unit.

5. To keep exceptions in this stack, the operator should ideally be executed inline instead of being scheduled. This just comes directly from following the imperative structure.

6. † in the Supplier definition is still free, and we want to return some kind of lifetime Task of the consumption so the outer scope can run logic on completion. Let's return it from Supplier then! What kind of Task is it? It turns out it doesn't matter, because completion is a pure time signal. onComplete in Rx isn't called with any arguments, nor does foreach-await give anything to the continuation. The definition of Supplier is complete: Supplier := Consumer<T> -> Task<Unit>.

Therefore, we arrive at the type for an Operator and the supporting types:

Operator<in Tu, out Tv, T1, ..., Tn> := (T1 x1, ..., Tn xn) -> Supplier<Tu> -> Supplier<Tv>
Supplier<out T> := Consumer<T> -> Task<Unit>
Consumer<in T> := T -> Task<Unit|CANCEL>
for any CANCEL != Unit


### Currying is everything

Remember that in both Rx and Ix, the first-class stream object is an object — Observable in the former and whatever the async iterator type is named in the latter. In Px, the first-class stream object is a partial function. This is why the Operator definition is curried: it's first parameterized, then hooked into a source, and then one or many consumers are fed into it. Between any function applications, it can be passed around to represent any of its three levels of behavior: as an operator with certain dynamics, as a stream that will spit out certain values, or as a consumption in progress that we can block on until it ends.

The reusability that the partial application enables goes beyond convenience. Remembering that operators are not strictly necessary for completeness, it's the fundamental reasons why we want to have operators at all:

1. To let the same chunk of code consume many streams, and
2. To let a source under the same transformations give values to many consumers.

The former point makes merge trivial:

/** MERGE **/
// given Supplier<T>[] suppliers;
// given all := Task<T>[] -> Task<T[]>;
Supplier<T> merged = (Consumer<T> downstream) =>
all(map(suppliers, (Supplier<T> supplier) => supplier(downstream)));
// e.g. await merged(println);


The latter point is very interesting too because it's how we implement scheduling policies.

#### Exactly how much scheduling do you want?

I went into detail about one major disadvantage of operator implementation in Rx vs. Ix: you have to implement your own scheduler in Rx. In short, this is inevitable when an Observable holds onto all its subscribers and has to decide who gets what. In Ix, all the work is done by a top-level scheduler, which is much far nicer on the developer, but comes with the risk of degrading the performance.

There is no silver bullet, but the advantage with Px is that it can be tuned to either one. Let's consider the two extreme cases:

1. Preface: in my design, the async iterators at the bottom are turned inside out by one function each, which waits on a Consumer then iterates them and throws the values into the Consumer one by one. This turns them into Suppliers. From these base suppliers, flow networks of operators are drawn towards the terminal consumers.

2. Ix-like: Here, the operators are pure in consumers, so they never throw the consumers into a hidden state. This way, when the network is finally fully parameterized, consumers pulling from the same leaf will fork the entire branch all the way to the base suppliers, so the decision for what values are sent to which consumers is decided entirely by the scheduler.

3. Rx-like: All the paths through the flow network have at least one operator that records all consumers and schedules them itself. Notable examples are sharing operators: publish and replay do this. Here's a stripped version of publish for example, that omits the logic for cancellation for brevity:

Supplier<T> publish<T>(Supplier<T> upstream) { // := Operator<T, T>
List<Consumer<T>> downstreams = new List();
all(map(downstreams, (downstream) => downstream(v)))
);
return downstream => {
downstreams.push(downstream);
}
}


The full version can be seen at the repo.

The in-between comes down to what coverage of the flow network that these scheduling operators take up. Note that addressing areas of the flow network that require special scheduling rules is straightforward: just slot in pure scheduling operators.

### Propogating cancellation, and why we don't need condition variables

Our initial model included condition variables to complement state in passing control around. However, in the context of the definition of Operator, what could condition variables really be used for?

1. In Ix, they used to be used for alerting a consumer that they can consume, as a generalized form of yield that could be called from anywhere.
2. All consumers cancel on a given branch of the operator flow network, and we want this to translate into the cancellation of the iterator in the base supplier.
3. Our consumer isn't getting Awaitables for the next item like it would in Ix, and has no onComplete handler like in Rx. We might worry that if the producing stream ever needs to cut one offshooting consumer's stream short before the bulk stream ends, it might need to use a Condition to act as a pure-time Task that is decoupled from the timing of elements in the bulk stream.

The first is solved by design via inversion of control, since consumers are Rx-like in the sense that they're functions that can be called anywhere and aren't awaiting the source.

The second is also solved by design, and reveals something interesting in the process. Note that the only Awaitables in the ecosystem are given by the completion of Supplierss. Value signals that propagate through the network flow can always be traced to a single async iterator — in other words, the top of any stack leading to a Consumer must be an async iterator. This makes a lot of sense, since the value being consumed has to come from somewhere. Note also that the stack must be entirely filled with Consumers, so everybody can return a cancellation status Unit|CANCEL. Crucially, if the operators are implemented correctly to propagate cancellation, then it will reach inside the iterator in the base supplier. My implementation of PxHack provides a helper function P2S to convert async iterators ("Producers") into suppliers, which is currently looks like this:

function P2S<T>(AsyncIterator<T> $P): Supplier<T> { return async$C ==> {
foreach($P await as$v) {
$cancel = await$C($v); if($cancel !== null)
return;
}
};
}


That's where the propagation ends. It receives the cancel signal directly from the consumer (as filtered by operators which might have to AND together the cancellations of multiple consumers), suggesting that this direction of control is correct on a fundamental level.

The third is also intriguing: there's a whole set of operators where we have to end child streams with signals unrelated to the bulk stream. These are the buffer-like operators. In these operators, child Suppliers from one stream are gated by signals from another, so to get this information across the function scope gap of the internal consumers, we would imagine that we need to use conditions. However, I argue this is a hasty proposition altogether. The alternative: don't make them into operators at all.

Let's take window as the representative operator of the group, since it's more interesting than buffer which we could easily see implemented as a state machine. I'd like to start from this skeleton of some window-like behavior:

// given Supplier<any> gating and Supplier <T> upstream;
// given some shared state of some type S state and
//   state machines f, g := S -> S
launch(gating(() => { state = f(state); }));
launch(upstream((v) => { state = g(state); h(v); }));


This might look like a weird window, but it's actually completely general. Let's examine what we can do with the regular window operator:

1. Treat each window distinctly, i.e. denote each with an integer
2. Do something at the end of each window

Let's do the former by using a single state variable to denote which window we're on, which is incremented by the gating consumer. Window transitions are just one more state variable stashing the window the upstream consumer saw last time it checked in.

The completion of a window is given exactly by an emission from gating, so we can just perform this logic in that consumer. See test_limit in PxHack for an example of a window-like operator.

## Out of the theory, into the jury: Hack implementation

I've put all of these concepts together into a Hack library (which believe it or not takes up less space on disk right now than this article!) If you read code faster than prose or want to give it a try, here's the repo for PxHack. JS, Kotlin and C# are not safe from implementations, I'm about to come into a wealth of free time with the winter break coming up, and want to push these out. If this excites you too, I would collaborate in a heartbeat. HMU @acrylicorigami or email at the contact panel.

GitHub

Hack implementation