https://kotlinlang.org logo
Title
s

svenjacobs

07/30/2018, 11:38 AM
Hi all, I'm looking for a simple coroutine-based event bus. I found this implementation https://gist.github.com/takahirom/f2dbcc3053adfd87ac7e321d95a23021 however I have an issue with it. It uses
ConflatedBroadcastChannel
which keeps the last sent item and delivers it to new subscribers in
openSubscription()
. I'm looking for an event bus implementation that does not cache items. So if an event is send but there are no active subscriptions the bus should just drop the event. Unfortunately there doesn't seem to be a default implementation of
BroadcastChannel
with these characteristics. Has anyone an idea how to implement a simple non-caching event bus?
Actually what I'm looking for probably would be a
RendezvousBroadcastChannel
.
I basically rewrote my event bus without using a BroadcastChannel. This is my implementation
import kotlinx.atomicfu.AtomicRef
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.experimental.DefaultDispatcher
import kotlinx.coroutines.experimental.channels.ReceiveChannel
import kotlinx.coroutines.experimental.channels.RendezvousChannel
import kotlinx.coroutines.experimental.channels.filter
import kotlinx.coroutines.experimental.channels.map
import kotlinx.coroutines.experimental.launch
import kotlin.coroutines.experimental.CoroutineContext

/**
 * Coroutine-based event bus.
 */
class EventBus {

    private val subscribers = atomic<Iterable<Subscriber>>(emptyList())

    fun send(event: Any, context: CoroutineContext = DefaultDispatcher) {
        launch(context) {
            subscribers.value.forEach { subscriber ->
                subscriber.send(event)
            }
        }
    }

    fun subscribe(): ReceiveChannel<Any> =
        Subscriber(subscribers).also { subscriber ->
            subscribers.value = subscribers.value + subscriber
        }

    inline fun <reified T> subscribeToEvent() =
        subscribe().filter { it is T }.map { it as T }

    private class Subscriber(private val subscribers: AtomicRef<Iterable<Subscriber>>) : RendezvousChannel<Any>() {
        override fun cancel(cause: Throwable?): Boolean =
            close(cause).also {
                subscribers.value = subscribers.value - this
            }
    }
}
l

louiscad

07/30/2018, 11:10 PM
BroadcastChannel(capacity = 1)
is likely to be what you're looking for
s

svenjacobs

07/31/2018, 4:58 AM
@louiscad No, what I would need is a BroadcastChannel with capacity 0 but that's not possible 😉
when (capacity) {
        0 -> throw IllegalArgumentException("Unsupported 0 capacity for BroadcastChannel")
l

louiscad

07/31/2018, 9:17 AM
@svenjacobs IIRC, the behavior is different when it's a
BroadcastChannel
, and a capacity of 1 behaves similarly to a
RendezVousChannel
, I suggest you give it a try
s

svenjacobs

08/02/2018, 5:49 AM
@louiscad You are right. BroadcastChannel(1) is what I'm looking for. Actually it's an ArrayBroadcastChannel then and the behaviour is even documented: "Note, that elements that are sent to this channel while there are no subscribers are immediately lost.".
Thank you 🙂
l

louiscad

08/03/2018, 11:46 PM
Glad I helped you! ☺️