Skip to main content

Guide to InteractiveX in Kotlin

I was just peeking at Kotlin to see if they supported asynchronous generators. I wasn't expecting much, because when I was making InteractiveX implementations for PHP and Hack last year, Kotlin coroutines weren't publicly documented. Man, I got way more than I bargained for. I knew even just looking at this title to this guide in kotlinx.coroutines:

Guide to reactive streams with coroutines

Not only does Kotlin 1.3.11 have async generators, but they have hot vs. cold generators: hot produce vs. cold publish; not only do they have coroutines but they have arbitrary scheduling via launch and schedulers/scoping. To marry it all together is that bare-braces lambda syntax that gives me pangs of nostalgia to Hack's async blocks.

The rest of "Guide to reactive streams with coroutines" is just a gold mine of validation:

Operators

Full-featured reactive stream libraries, like Rx, come with a very large set of operators to create, transform, combine and otherwise process the corresponding streams. Creating your own operators with support for back-pressure is notoriously difficult.

Coroutines and channels are designed to provide an opposite experience. There are no built-in operators, but processing streams of elements is extremely simple and back-pressure is supported automatically without you having to explicitly think about it.

There's actually some fundamental reasons why the above is true: check out "why ReactiveX operators are hard to make and InteractiveX opereators aren't"

I've argued before that all ReactiveX operators fall neatly into five categories of implementation: trivial, merge-like, buffer-like, groupby-like, and miscellaneous or language-specific ones. I called one of these categories "trivial" because they follow directly from the behavior of an asynchronous generator. After implementing these categories of operators in Kotlin though, this designation loses its meaning a bit because they're all basically trivial. Really, there's almost left to do because the core library takes care of buffering and signalling consumers.

Implement those operators!

Grab these operators from this gist to try them out. I skip trivial operators because I trivialize them at length in "How to make a ReactiveX".

Merge-like

Kotlin's reactive channels guide has this operator covered in depth, so go there to see the implementation! It's 4 function calls. Insane.

Flatmap case study

@ExperimentalCoroutinesApi
fun <T, U> Publisher<T>.flatmap(f: (T) -> Publisher<U>, ctx: CoroutineContext) = GlobalScope.publish(ctx) {
	consumeEach {
		launch {
			f(it).consumeEach {
				send(it)
			}
		}
	}
}

Stressing how much more elegant this is than my C#-like implementation from "How to make a ReactiveX", let's look at the step-by-step:

  1. Extend Publisher — which is returned from publish {}, the cold generator factory — with method flatmap
  2. Within the generator:
    1. At every emission in the outer stream, launch (fire and forget) a new coroutine that pipes the inner stream (mapped by f) to the outer generator. Done.

I like this example a lot because it highlights a nice difference between typical generator semantics with yield and Kotlin's send emitter function, where in the latter, inner lambdas can send directly to the generator instead of buffering to be yielded by a loop in the top level.

In Kotlin, there's a problem I still haven't pinned down for general nested Producers, when the outer producer finishes before inner producers are started. I asked about it on SO.

Debounce case study

@ExperimentalCoroutinesApi
fun <T, U> Publisher<T>.debounce(timeout: Long, ctx: CoroutineContext) = GlobalScope.produce(ctx) {
	var idx = 0;
	
	 flatmap(ctx) {
		idx++
		val stashed_idx = idx
		produce {
			delay(timeout)
			if(stashed_idx == idx)
				send(it)
		}
	 }
}

It's word-for-word the pseudocode from How to make a ReactiveX.

Buffer-like

@ExperimentalCoroutinesApi
fun <T> Publisher<T>.buffer(P: Publisher<*>, ctx: CoroutineContext) = GlobalScope.publish<List<T>>(ctx) {
	var buffer = mutableListOf<T>()
	launch {
		consumeEach {
			buffer.add(it)
		}
	}
	P.consumeEach {
		val buffer_to_send = buffer;
		buffer = mutableListOf<T>()
		send(buffer_to_send)
	}
}

It's word-for-word the pseudocode from How to make a ReactiveX.

GroupBy

Kotlin has writeable channels (known as BroadcastChannel), so we can implement an efficient version of groupBy (there are two implementations — efficient and inefficient — outlined here).

@ExperimentalCoroutinesApi
fun <T, K> Publisher<T>.groupBy(ctx: CoroutineContext, k: (T) -> K) = GlobalScope.publish<Publisher<T>>(ctx) {
	val M = mutableMapOf<K, ConflatedBroadcastChannel<T>>()
	consumeEach {
		val should_emit = !M.contains(k(it))
		val ch = M.getOrPut(k(it), { ConflatedBroadcastChannel<T>() })
		if(should_emit)
			send(publish {
				send(it)
			}) // -> readonly
		
		ch.offer(it)
		yield()
	}
	for((_, ch) in M) {
		ch.close()
	}
}

One note at // -> readonly: besides Iterator there are no common interfaces between Publisher and BroadcastChannel unfortunately; Kotlin doesn't have duck typing. That means to enforce read-only access from consumers, we need to wrap the channel in a generator. ¯\_(ツ)_/¯

And that's it!

Easy as pie. Again, try these operators yourself with this gist.

I can't overstate my joy to see more languages going interactive. Now we have:

Top image credit: JetBrains. Referenced from //blog.jetbrains.com/kotlin/2018/10/kotlin-1-3/

Operator implementations

Gist with operators

Pseudocode and explanations