https://kotlinlang.org logo
#coroutines
Title
# coroutines
a

Andrea Giuliano

07/24/2020, 7:43 AM
Here’s another question from some experiment I’m doing. I’m trying to write an async EventBus. It simply registers EventListeners associated to a class, and when he receives a notify it finds the right listeners and invokes them asynchronously. Now, I was thinking that it’s a good idea to make this component being initialized with a CoroutineScope, such that someone from outside can actually decide which scope it runs into (ie I want to decide which threadpool will power the eventbus). I came out with something like below and wanted to ask your opinion if that’s good practice or am I missing something.
Copy code
class AsyncEventBus(
    private val eventBusScope: CoroutineScope
) : EventBus {
    private val eventListeners: MutableMap<Class<*>, MutableList<EventListener<*>>> = mutableMapOf()

    override fun <E : Event> registerListener(aClass: Class<out E>, eventListener: EventListener<E>) {
        val listeners = retrieveListeners(aClass)
        listeners.add(eventListener)
    }

    override fun <E : Event> notify(event: E) {
        eventListeners[event::class.java]?.asSequence()
            ?.filterIsInstance<EventListener<E>>()
            ?.forEach {
                eventBusScope.launch {
                    it.handle(event)
                }
            }
    }

    @Suppress("UNCHECKED_CAST")
    private fun <E : Event> retrieveListeners(aClass: Class<out E>): MutableList<EventListener<E>> =
        eventListeners.getOrPut(aClass) { mutableListOf() } as MutableList<EventListener<E>>
}
My questions on my code (apart from whether I’m not following best practices for some reasons) are: • is there anything that can happen that I’m not considering? • I’m happy that the EventListener handle is not a suspending function since I wanted to make it transparent to who implements handle(). Of course this has the drawback that if someone start to use async blocks into the handle() function (with a different scope) it may find that code to run even though the listener has been killed. Is this something I should be scared about? or is it fair to say that if the handle() goes async, you will take care of what can happen?
o

octylFractal

07/24/2020, 7:44 AM
will
registerListener
and
notify
ever be called concurrently? you may need to make the map thread-safe
a

Andrea Giuliano

07/24/2020, 7:50 AM
good point, I think the design would be to never call them concurrently, but you’re right it may possibly happen. I have to make the map thread safe you’re right! thanks
o

octylFractal

07/24/2020, 8:03 AM
here is an alternative model that I think fits more with coroutines https://gist.github.com/octylFractal/7b43cce07b135cbbd246b34e24269075
instead of using a listener, you subscribe to a Flow of events, which can be scheduled on a dispatcher of the caller's choice the notifier also now has back-pressure though, which can be more "correct" but that depends on how your code uses the event bus
a

Andrea Giuliano

07/24/2020, 8:29 AM
thanks a lot! When you say
the notifier also now has back-pressure though, which can be more “correct” but that depends on how your code uses the event bus
do you mean that with flows implementation the consumer of the bus can have backpressure? Can you expand it a little bit?
o

octylFractal

07/24/2020, 8:31 AM
because the BroadcastChannel has only
BUFFERED
size, if a downstream event listener backs up the Flow (say, events are fired every 1 second but it takes 10 seconds to process an event), then eventually
notify
will block waiting for it to process
👍 1
this can be avoided by having the downstream event listener move the processing of the event to be async, so that it only takes a few milliseconds to setup event processing for each event, but the downside is that you have potentially unbounded tasks
you can of course adjust the size of the BroadcastChannel to have more lee-way, or even have individual event listeners call
.buffer(X)
on their flow to re-buffer the events
a

Andrea Giuliano

07/24/2020, 10:09 AM
got it, thanks for the explanation, so you think in general in these cases it’s better to use flows rather than coroutines, right? Any reference you can share to study them?
o

octylFractal

07/24/2020, 10:11 AM
Flows are a part of coroutines, https://kotlinlang.org/docs/reference/coroutines/flow.html is the basic intro docs
a

Andrea Giuliano

07/24/2020, 12:12 PM
gotcha thanks was reading that
anything more deep that you recommend to read?
c

Christopher Elías

07/24/2020, 2:27 PM
Would be nice if this becomes to a lightway kotlin (with coroutines & flow support) library. 👍
👍 1
o

octylFractal

07/24/2020, 6:04 PM
I don't have any other recommendations I think, I usually learn by looking at the implementation and kdocs
a

Andrea Giuliano

07/26/2020, 4:28 PM
Resuming this thread 🙂 @octylFractal I was looking at your gist but I am a bit doubtful, what is the advantage in your mind to use flows instead of simply launch in this specific case of an event bus?
o

octylFractal

07/26/2020, 6:40 PM
the consumer of the event is able to control the context in which their event listener is run, rather than it being forced by the event bus. this means they can choose to listen in a custom thread pool, on a "main" event thread, or directly on the IO dispatcher if needed
a

Andrea Giuliano

07/26/2020, 10:48 PM
gotcha thanks, actually the idea behind the event bus is that you should be able to run it in a threadpool for the command bus itself. On case you need to run (for some weird reasons) on different threadpool you can still have 2 event buses, but in general that should never happen. My idea was to abstract the 'how' the bus is handling events asynchronously by giving to its consumers just a play interface, hiding all the infrastructure of flows, suspend and channels from the consumers that consume it, if that makes sense?
o

octylFractal

07/26/2020, 10:55 PM
it makes sense, but personally I would not do it, I think coroutines tries to reduce the idea of needing other threadpools for stuff besides I/O. the
Dispatchers.Default
should have all you need for CPU-bound tasks, adding more threads is just extra threading pressure on the OS, and will probably result in very very slightly slower code, depending on how many extra threads you have in that pool, and how many extra tasks they are executing for the events
though I do not know what your EventListener interface looks like -- if it (
handle
) is not
suspend
, then most of this is moot -- you'll be in a blocking situation, and you will probably need a custom threadpool for that
a

Andrea Giuliano

07/27/2020, 8:31 AM
yeah the interface for EventListener is handle() with not suspending. That’s because I may have different event listeners, sync, async. My idea was to let you configure how the event listener should behave without bringing with you the information all over the places (if possible) and break your implementation that might already be there. For how much I understand your reasons, I think you are referring to which is the dispatcher used to process events and I agree with you. My doubt is more on “is that not a good practice to have a Scope injected into the bus so you can decide how the asynchronicity works (possibly with Dispathers.Default but you will be able to change it in case you need)?
@octylFractal I was giving your code a try again after studying coroutines🙂 (sorry I’m still a bit confused). I wrote a test to use the bus with channel and flow like this
Copy code
@Test
    fun testingFlow() = runBlocking {
        val bus = EventBus()
        bus.notify(AsyncEvent("hello"))
        val flow = bus.subscribe<AsyncEvent>()

        launch {
            flow.collect {
                println(it)
            }
        }

        delay(Long.MAX_VALUE)
    }
Now, the test keeps waiting on collect, and it seems the event is never dispatched on the flow… am I missing something here?
o

octylFractal

08/02/2020, 7:47 PM
yes, https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-broadcast-channel.html
when capacity positive, but less than UNLIMITED – creates ArrayBroadcastChannel with a buffer of given capacity. Note: this channel looses all items that are send to it until the first subscriber appears;
the latter part of that phrase is important
a

Andrea Giuliano

08/02/2020, 7:48 PM
correct, that’s why I send an event using notify, then I subscribe to those event, but I don’t receive anything 😞
o

octylFractal

08/02/2020, 7:49 PM
you sent the event first, so it was lost because nothing was subscribed
try swapping the
notify
and
subscribe
a

Andrea Giuliano

08/02/2020, 7:50 PM
yeah sorry, my bad, I thought it was a problem of event sending, but even if I invert the order between notify and subscribe, it still hang waiting 😞
o

octylFractal

08/02/2020, 7:50 PM
oh, right, there's another detail there -- flow doesn't subscribe until
collect
occurs
testing can be tricky because it's difficult to send something after the flow has subscribed....
a

Andrea Giuliano

08/02/2020, 7:51 PM
should this work?
Copy code
@Test
    fun testingFlow() = runBlocking {
        val bus = EventBus()
        val flow = bus.subscribe<AsyncEvent>()

        launch {
            flow.collect {
                println(it)
            }
        }

        bus.notify(AsyncEvent("hello"))
        delay(Long.MAX_VALUE)
    }
o

octylFractal

08/02/2020, 7:52 PM
I would doubt it, try doing
delay(100L)
first
that is,
launch
,
delay(100L)
,
notify
a

Andrea Giuliano

08/02/2020, 7:52 PM
you’re right, by doing delay it works
now I’m a bit confused though 😄 why also collect should be attached to the flow to make the broadcast channel deliver the events? is that a peculiarity of the Broadcast channel?
o

octylFractal

08/02/2020, 7:54 PM
because flows are cold by default, so nothing is subscribed or flows until you invoke a terminal operation like
collect
it is partially the fact that the broadcast channel loses the items when there are no subscribers
https://github.com/Kotlin/kotlinx.coroutines/issues/2034 I think is supposed to solve these issues
a

Andrea Giuliano

08/02/2020, 7:56 PM
gotcha, thanks for the reference, would it be easier at that stage to use a receive channel instead of a flow then?
o

octylFractal

08/02/2020, 7:56 PM
no, I don't think it will help
a

Andrea Giuliano

08/02/2020, 8:01 PM
got it, ok thanks for the help, I’m honestly a bit nervous of using Broadcast channels and flows like this. Apart from the fact that it seems Broadcast channels will be killed soon, I’m not 100% sure of why to use flow in this specific case. Do you believe that avoiding using BroadcastChannel but just using Channel would be better?
o

octylFractal

08/02/2020, 8:02 PM
no, just Channel is not designed for this sort of shared state. my suggestion was just a prototype that's untested, I think that eventually SharedFlow will be what you want, but for now this solution might be lacking
a

Andrea Giuliano

08/02/2020, 8:04 PM
got it, to have a production ready code, do you think that then is better to launch {} event listeners similarly to my first example, while SharedFlow gets released?
o

octylFractal

08/02/2020, 8:04 PM
yea, it will work well enough
a

Andrea Giuliano

08/02/2020, 8:06 PM
yeah I do have an interface, so I can provide more Buses depending on the use case, got it. So back on my initial design, do you think something like this a good compromise? or do you see some disadvantages that I can’t see?
Copy code
class EventBus(
    private val eventBusScope: CoroutineScope
) : EventBus {
    private val eventListeners: ConcurrentHashMap<Class<*>, MutableList<EventListener<*>>> = ConcurrentHashMap()

    override fun <E : Event> registerListener(aClass: Class<out E>, eventListener: EventListener<E>) {
        val listeners = retrieveListeners(aClass)
        listeners.add(eventListener)
    }

    override fun <E : Event> notify(event: E) {
        eventListeners[event::class.java]?.asSequence()
            ?.filterIsInstance<EventListener<E>>()
            ?.forEach {
                eventBusScope.launch {
                    it.handle(event)
                }
            }
    }

    @Suppress("UNCHECKED_CAST")
    private fun <E : Event> retrieveListeners(aClass: Class<out E>): MutableList<EventListener<E>> =
        eventListeners.getOrPut(aClass) { mutableListOf() } as MutableList<EventListener<E>>
}
I believe the only missing bit there is to avoid killing all the
handle
child if an event listener throws exception for some reason, but a supervisor should do the job there
o

octylFractal

08/02/2020, 8:09 PM
you may want to try / catch and log instead
a

Andrea Giuliano

08/02/2020, 8:10 PM
you mean instead of a supervisor? just doing something like
Copy code
forEach {

                  eventBusScope.launch {
                    try{
                      it.handle(event)
                    } catch {log}
                  }
            }
o

octylFractal

08/02/2020, 8:11 PM
yes
🙏 1
👍 1
6 Views