Here’s another question from some experiment I’m d...
# coroutines
a
Here’s another question from some experiment I’m doing. I’m trying to write an async EventBus. It simply registers EventListeners associated to a class, and when he receives a notify it finds the right listeners and invokes them asynchronously. Now, I was thinking that it’s a good idea to make this component being initialized with a CoroutineScope, such that someone from outside can actually decide which scope it runs into (ie I want to decide which threadpool will power the eventbus). I came out with something like below and wanted to ask your opinion if that’s good practice or am I missing something.
Copy code
class AsyncEventBus(
    private val eventBusScope: CoroutineScope
) : EventBus {
    private val eventListeners: MutableMap<Class<*>, MutableList<EventListener<*>>> = mutableMapOf()

    override fun <E : Event> registerListener(aClass: Class<out E>, eventListener: EventListener<E>) {
        val listeners = retrieveListeners(aClass)
        listeners.add(eventListener)
    }

    override fun <E : Event> notify(event: E) {
        eventListeners[event::class.java]?.asSequence()
            ?.filterIsInstance<EventListener<E>>()
            ?.forEach {
                eventBusScope.launch {
                    it.handle(event)
                }
            }
    }

    @Suppress("UNCHECKED_CAST")
    private fun <E : Event> retrieveListeners(aClass: Class<out E>): MutableList<EventListener<E>> =
        eventListeners.getOrPut(aClass) { mutableListOf() } as MutableList<EventListener<E>>
}
My questions on my code (apart from whether I’m not following best practices for some reasons) are: • is there anything that can happen that I’m not considering? • I’m happy that the EventListener handle is not a suspending function since I wanted to make it transparent to who implements handle(). Of course this has the drawback that if someone start to use async blocks into the handle() function (with a different scope) it may find that code to run even though the listener has been killed. Is this something I should be scared about? or is it fair to say that if the handle() goes async, you will take care of what can happen?
o
will
registerListener
and
notify
ever be called concurrently? you may need to make the map thread-safe
a
good point, I think the design would be to never call them concurrently, but you’re right it may possibly happen. I have to make the map thread safe you’re right! thanks
o
here is an alternative model that I think fits more with coroutines https://gist.github.com/octylFractal/7b43cce07b135cbbd246b34e24269075
instead of using a listener, you subscribe to a Flow of events, which can be scheduled on a dispatcher of the caller's choice the notifier also now has back-pressure though, which can be more "correct" but that depends on how your code uses the event bus
a
thanks a lot! When you say
the notifier also now has back-pressure though, which can be more “correct” but that depends on how your code uses the event bus
do you mean that with flows implementation the consumer of the bus can have backpressure? Can you expand it a little bit?
o
because the BroadcastChannel has only
BUFFERED
size, if a downstream event listener backs up the Flow (say, events are fired every 1 second but it takes 10 seconds to process an event), then eventually
notify
will block waiting for it to process
👍 1
this can be avoided by having the downstream event listener move the processing of the event to be async, so that it only takes a few milliseconds to setup event processing for each event, but the downside is that you have potentially unbounded tasks
you can of course adjust the size of the BroadcastChannel to have more lee-way, or even have individual event listeners call
.buffer(X)
on their flow to re-buffer the events
a
got it, thanks for the explanation, so you think in general in these cases it’s better to use flows rather than coroutines, right? Any reference you can share to study them?
o
Flows are a part of coroutines, https://kotlinlang.org/docs/reference/coroutines/flow.html is the basic intro docs
a
gotcha thanks was reading that
anything more deep that you recommend to read?
c
Would be nice if this becomes to a lightway kotlin (with coroutines & flow support) library. 👍
👍 1
o
I don't have any other recommendations I think, I usually learn by looking at the implementation and kdocs
a
Resuming this thread 🙂 @octylFractal I was looking at your gist but I am a bit doubtful, what is the advantage in your mind to use flows instead of simply launch in this specific case of an event bus?
o
the consumer of the event is able to control the context in which their event listener is run, rather than it being forced by the event bus. this means they can choose to listen in a custom thread pool, on a "main" event thread, or directly on the IO dispatcher if needed
a
gotcha thanks, actually the idea behind the event bus is that you should be able to run it in a threadpool for the command bus itself. On case you need to run (for some weird reasons) on different threadpool you can still have 2 event buses, but in general that should never happen. My idea was to abstract the 'how' the bus is handling events asynchronously by giving to its consumers just a play interface, hiding all the infrastructure of flows, suspend and channels from the consumers that consume it, if that makes sense?
o
it makes sense, but personally I would not do it, I think coroutines tries to reduce the idea of needing other threadpools for stuff besides I/O. the
Dispatchers.Default
should have all you need for CPU-bound tasks, adding more threads is just extra threading pressure on the OS, and will probably result in very very slightly slower code, depending on how many extra threads you have in that pool, and how many extra tasks they are executing for the events
though I do not know what your EventListener interface looks like -- if it (
handle
) is not
suspend
, then most of this is moot -- you'll be in a blocking situation, and you will probably need a custom threadpool for that
a
yeah the interface for EventListener is handle() with not suspending. That’s because I may have different event listeners, sync, async. My idea was to let you configure how the event listener should behave without bringing with you the information all over the places (if possible) and break your implementation that might already be there. For how much I understand your reasons, I think you are referring to which is the dispatcher used to process events and I agree with you. My doubt is more on “is that not a good practice to have a Scope injected into the bus so you can decide how the asynchronicity works (possibly with Dispathers.Default but you will be able to change it in case you need)?
@octylFractal I was giving your code a try again after studying coroutines🙂 (sorry I’m still a bit confused). I wrote a test to use the bus with channel and flow like this
Copy code
@Test
    fun testingFlow() = runBlocking {
        val bus = EventBus()
        bus.notify(AsyncEvent("hello"))
        val flow = bus.subscribe<AsyncEvent>()

        launch {
            flow.collect {
                println(it)
            }
        }

        delay(Long.MAX_VALUE)
    }
Now, the test keeps waiting on collect, and it seems the event is never dispatched on the flow… am I missing something here?
o
yes, https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-broadcast-channel.html
when capacity positive, but less than UNLIMITED – creates ArrayBroadcastChannel with a buffer of given capacity. Note: this channel looses all items that are send to it until the first subscriber appears;
the latter part of that phrase is important
a
correct, that’s why I send an event using notify, then I subscribe to those event, but I don’t receive anything 😞
o
you sent the event first, so it was lost because nothing was subscribed
try swapping the
notify
and
subscribe
a
yeah sorry, my bad, I thought it was a problem of event sending, but even if I invert the order between notify and subscribe, it still hang waiting 😞
o
oh, right, there's another detail there -- flow doesn't subscribe until
collect
occurs
testing can be tricky because it's difficult to send something after the flow has subscribed....
a
should this work?
Copy code
@Test
    fun testingFlow() = runBlocking {
        val bus = EventBus()
        val flow = bus.subscribe<AsyncEvent>()

        launch {
            flow.collect {
                println(it)
            }
        }

        bus.notify(AsyncEvent("hello"))
        delay(Long.MAX_VALUE)
    }
o
I would doubt it, try doing
delay(100L)
first
that is,
launch
,
delay(100L)
,
notify
a
you’re right, by doing delay it works
now I’m a bit confused though 😄 why also collect should be attached to the flow to make the broadcast channel deliver the events? is that a peculiarity of the Broadcast channel?
o
because flows are cold by default, so nothing is subscribed or flows until you invoke a terminal operation like
collect
it is partially the fact that the broadcast channel loses the items when there are no subscribers
https://github.com/Kotlin/kotlinx.coroutines/issues/2034 I think is supposed to solve these issues
a
gotcha, thanks for the reference, would it be easier at that stage to use a receive channel instead of a flow then?
o
no, I don't think it will help
a
got it, ok thanks for the help, I’m honestly a bit nervous of using Broadcast channels and flows like this. Apart from the fact that it seems Broadcast channels will be killed soon, I’m not 100% sure of why to use flow in this specific case. Do you believe that avoiding using BroadcastChannel but just using Channel would be better?
o
no, just Channel is not designed for this sort of shared state. my suggestion was just a prototype that's untested, I think that eventually SharedFlow will be what you want, but for now this solution might be lacking
a
got it, to have a production ready code, do you think that then is better to launch {} event listeners similarly to my first example, while SharedFlow gets released?
o
yea, it will work well enough
a
yeah I do have an interface, so I can provide more Buses depending on the use case, got it. So back on my initial design, do you think something like this a good compromise? or do you see some disadvantages that I can’t see?
Copy code
class EventBus(
    private val eventBusScope: CoroutineScope
) : EventBus {
    private val eventListeners: ConcurrentHashMap<Class<*>, MutableList<EventListener<*>>> = ConcurrentHashMap()

    override fun <E : Event> registerListener(aClass: Class<out E>, eventListener: EventListener<E>) {
        val listeners = retrieveListeners(aClass)
        listeners.add(eventListener)
    }

    override fun <E : Event> notify(event: E) {
        eventListeners[event::class.java]?.asSequence()
            ?.filterIsInstance<EventListener<E>>()
            ?.forEach {
                eventBusScope.launch {
                    it.handle(event)
                }
            }
    }

    @Suppress("UNCHECKED_CAST")
    private fun <E : Event> retrieveListeners(aClass: Class<out E>): MutableList<EventListener<E>> =
        eventListeners.getOrPut(aClass) { mutableListOf() } as MutableList<EventListener<E>>
}
I believe the only missing bit there is to avoid killing all the
handle
child if an event listener throws exception for some reason, but a supervisor should do the job there
o
you may want to try / catch and log instead
a
you mean instead of a supervisor? just doing something like
Copy code
forEach {

                  eventBusScope.launch {
                    try{
                      it.handle(event)
                    } catch {log}
                  }
            }
o
yes
🙏 1
👍 1