Andrea Giuliano
07/24/2020, 7:43 AMclass AsyncEventBus(
private val eventBusScope: CoroutineScope
) : EventBus {
private val eventListeners: MutableMap<Class<*>, MutableList<EventListener<*>>> = mutableMapOf()
override fun <E : Event> registerListener(aClass: Class<out E>, eventListener: EventListener<E>) {
val listeners = retrieveListeners(aClass)
listeners.add(eventListener)
}
override fun <E : Event> notify(event: E) {
eventListeners[event::class.java]?.asSequence()
?.filterIsInstance<EventListener<E>>()
?.forEach {
eventBusScope.launch {
it.handle(event)
}
}
}
@Suppress("UNCHECKED_CAST")
private fun <E : Event> retrieveListeners(aClass: Class<out E>): MutableList<EventListener<E>> =
eventListeners.getOrPut(aClass) { mutableListOf() } as MutableList<EventListener<E>>
}
My questions on my code (apart from whether I’m not following best practices for some reasons) are:
• is there anything that can happen that I’m not considering?
• I’m happy that the EventListener handle is not a suspending function since I wanted to make it transparent to who implements handle(). Of course this has the drawback that if someone start to use async blocks into the handle() function (with a different scope) it may find that code to run even though the listener has been killed. Is this something I should be scared about? or is it fair to say that if the handle() goes async, you will take care of what can happen?octylFractal
07/24/2020, 7:44 AMregisterListener
and notify
ever be called concurrently? you may need to make the map thread-safeAndrea Giuliano
07/24/2020, 7:50 AMoctylFractal
07/24/2020, 8:03 AMAndrea Giuliano
07/24/2020, 8:29 AMthe notifier also now has back-pressure though, which can be more “correct” but that depends on how your code uses the event busdo you mean that with flows implementation the consumer of the bus can have backpressure? Can you expand it a little bit?
octylFractal
07/24/2020, 8:31 AMBUFFERED
size, if a downstream event listener backs up the Flow (say, events are fired every 1 second but it takes 10 seconds to process an event), then eventually notify
will block waiting for it to process.buffer(X)
on their flow to re-buffer the eventsAndrea Giuliano
07/24/2020, 10:09 AMoctylFractal
07/24/2020, 10:11 AMAndrea Giuliano
07/24/2020, 12:12 PMChristopher Elías
07/24/2020, 2:27 PMoctylFractal
07/24/2020, 6:04 PMAndrea Giuliano
07/26/2020, 4:28 PMoctylFractal
07/26/2020, 6:40 PMAndrea Giuliano
07/26/2020, 10:48 PMoctylFractal
07/26/2020, 10:55 PMDispatchers.Default
should have all you need for CPU-bound tasks, adding more threads is just extra threading pressure on the OS, and will probably result in very very slightly slower code, depending on how many extra threads you have in that pool, and how many extra tasks they are executing for the eventshandle
) is not suspend
, then most of this is moot -- you'll be in a blocking situation, and you will probably need a custom threadpool for thatAndrea Giuliano
07/27/2020, 8:31 AM@Test
fun testingFlow() = runBlocking {
val bus = EventBus()
bus.notify(AsyncEvent("hello"))
val flow = bus.subscribe<AsyncEvent>()
launch {
flow.collect {
println(it)
}
}
delay(Long.MAX_VALUE)
}
Now, the test keeps waiting on collect, and it seems the event is never dispatched on the flow… am I missing something here?octylFractal
08/02/2020, 7:47 PMwhen capacity positive, but less than UNLIMITED – creates ArrayBroadcastChannel with a buffer of given capacity. Note: this channel looses all items that are send to it until the first subscriber appears;
Andrea Giuliano
08/02/2020, 7:48 PMoctylFractal
08/02/2020, 7:49 PMnotify
and subscribe
Andrea Giuliano
08/02/2020, 7:50 PMoctylFractal
08/02/2020, 7:50 PMcollect
occursAndrea Giuliano
08/02/2020, 7:51 PM@Test
fun testingFlow() = runBlocking {
val bus = EventBus()
val flow = bus.subscribe<AsyncEvent>()
launch {
flow.collect {
println(it)
}
}
bus.notify(AsyncEvent("hello"))
delay(Long.MAX_VALUE)
}
octylFractal
08/02/2020, 7:52 PMdelay(100L)
firstlaunch
, delay(100L)
, notify
Andrea Giuliano
08/02/2020, 7:52 PMoctylFractal
08/02/2020, 7:54 PMcollect
Andrea Giuliano
08/02/2020, 7:56 PMoctylFractal
08/02/2020, 7:56 PMAndrea Giuliano
08/02/2020, 8:01 PMoctylFractal
08/02/2020, 8:02 PMAndrea Giuliano
08/02/2020, 8:04 PMoctylFractal
08/02/2020, 8:04 PMAndrea Giuliano
08/02/2020, 8:06 PMclass EventBus(
private val eventBusScope: CoroutineScope
) : EventBus {
private val eventListeners: ConcurrentHashMap<Class<*>, MutableList<EventListener<*>>> = ConcurrentHashMap()
override fun <E : Event> registerListener(aClass: Class<out E>, eventListener: EventListener<E>) {
val listeners = retrieveListeners(aClass)
listeners.add(eventListener)
}
override fun <E : Event> notify(event: E) {
eventListeners[event::class.java]?.asSequence()
?.filterIsInstance<EventListener<E>>()
?.forEach {
eventBusScope.launch {
it.handle(event)
}
}
}
@Suppress("UNCHECKED_CAST")
private fun <E : Event> retrieveListeners(aClass: Class<out E>): MutableList<EventListener<E>> =
eventListeners.getOrPut(aClass) { mutableListOf() } as MutableList<EventListener<E>>
}
handle
child if an event listener throws exception for some reason, but a supervisor should do the job thereoctylFractal
08/02/2020, 8:09 PMAndrea Giuliano
08/02/2020, 8:10 PMforEach {
eventBusScope.launch {
try{
it.handle(event)
} catch {log}
}
}
octylFractal
08/02/2020, 8:11 PM