Hi all, I'm looking for a simple coroutine-based e...
# coroutines
Hi all, I'm looking for a simple coroutine-based event bus. I found this implementation https://gist.github.com/takahirom/f2dbcc3053adfd87ac7e321d95a23021 however I have an issue with it. It uses
which keeps the last sent item and delivers it to new subscribers in
. I'm looking for an event bus implementation that does not cache items. So if an event is send but there are no active subscriptions the bus should just drop the event. Unfortunately there doesn't seem to be a default implementation of
with these characteristics. Has anyone an idea how to implement a simple non-caching event bus?
Actually what I'm looking for probably would be a
I basically rewrote my event bus without using a BroadcastChannel. This is my implementation
Copy code
import kotlinx.atomicfu.AtomicRef
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.experimental.DefaultDispatcher
import kotlinx.coroutines.experimental.channels.ReceiveChannel
import kotlinx.coroutines.experimental.channels.RendezvousChannel
import kotlinx.coroutines.experimental.channels.filter
import kotlinx.coroutines.experimental.channels.map
import kotlinx.coroutines.experimental.launch
import kotlin.coroutines.experimental.CoroutineContext

 * Coroutine-based event bus.
class EventBus {

    private val subscribers = atomic<Iterable<Subscriber>>(emptyList())

    fun send(event: Any, context: CoroutineContext = DefaultDispatcher) {
        launch(context) {
            subscribers.value.forEach { subscriber ->

    fun subscribe(): ReceiveChannel<Any> =
        Subscriber(subscribers).also { subscriber ->
            subscribers.value = subscribers.value + subscriber

    inline fun <reified T> subscribeToEvent() =
        subscribe().filter { it is T }.map { it as T }

    private class Subscriber(private val subscribers: AtomicRef<Iterable<Subscriber>>) : RendezvousChannel<Any>() {
        override fun cancel(cause: Throwable?): Boolean =
            close(cause).also {
                subscribers.value = subscribers.value - this
BroadcastChannel(capacity = 1)
is likely to be what you're looking for
@louiscad No, what I would need is a BroadcastChannel with capacity 0 but that's not possible 😉
Copy code
when (capacity) {
        0 -> throw IllegalArgumentException("Unsupported 0 capacity for BroadcastChannel")
@svenjacobs IIRC, the behavior is different when it's a
, and a capacity of 1 behaves similarly to a
, I suggest you give it a try
@louiscad You are right. BroadcastChannel(1) is what I'm looking for. Actually it's an ArrayBroadcastChannel then and the behaviour is even documented: "Note, that elements that are sent to this channel while there are no subscribers are immediately lost.".
Thank you 🙂
Glad I helped you! ☺️