Czar
03/07/2020, 9:40 AMlouiscad
03/07/2020, 11:10 AMDeferred
that could break structured concurrency.Czar
03/07/2020, 12:15 PMlouiscad
03/07/2020, 12:44 PMCzar
03/07/2020, 12:47 PMlouiscad
03/07/2020, 12:59 PMval turnOnEvents: Flow<Unit> = getTurnOnEvents() // Can be whatever and whatever type you want.
turnOnEvents.collectLatest {
ensureTurnedOn()
delay(10.minutes.toLongMilliseconds())
turnOff()
}
There's also ways to turn it into a Flow<Boolean>
where the Boolean
represents whether the thing should be on and off. Let me know (with a mention to ping me) if you need that or a variation for optimal integration for your use case.Czar
03/07/2020, 1:06 PMlouiscad
03/07/2020, 1:11 PMturnOff
can only be called if delay
completes, which is not happening if its cancelled because a new element came into turnOnEvents
(as per the collectLatest
operator behavior that you can see documented). As for creating a flow, there multiple (infinite) ways to do so. The most basic way is to use the flow { }
builder, do whatever you want and call emit
once you have a value. If your source is always hot, you'll probably want to use a BroadcastChannel
, offer values to it (Unit
if there's nothing special to provide apart from the event itself), and use asFlow
(or copy paste its implementation into your code since its @FlowPreview
API that can change).Czar
03/07/2020, 1:12 PMinterface EventHandler<T : Event> {
suspend fun onEvent(event: T)
}
The handler is a singletonlouiscad
03/07/2020, 1:12 PMcallbackFlow
for that (wrap calls to offer
with runCatching
to avoid race conditions on cancellation and unregistering of your callback).Flow
can help you not just for the use case you're having today: https://kotlinconf.com/2019/talks/video/2019/131903/Czar
03/07/2020, 1:36 PMlouiscad
03/07/2020, 1:38 PMcallbackFlow
Czar
03/07/2020, 1:43 PMlouiscad
03/07/2020, 1:44 PMcallbackFlow
builder, and call offer
(wrapped in runCatching
) in your onEvent
callback. I mean, just like in the KDoc of callbackFlow
.BroadcastChannel
instead and use asFlow()
on it as I said before.Czar
03/07/2020, 2:02 PMobject Handler : EventHandler<OnEvent> {
private val cdc: BroadcastChannel<Long> = BroadcastChannel(1)
private val countDownFlow = cdc.asFlow()
override fun handle(event: OnEvent) {
cdc.send(0L)
coundDownFlow.collectLatest {
ensureOn()
delay(10.minutes)
turnOff()
}
}
}
this doesn't work for me by the way. nothing in collectLatest
block gets executed.louiscad
03/07/2020, 2:07 PMEventHandler
ever registered and unregistered somewhere?Czar
03/07/2020, 2:08 PMlouiscad
03/07/2020, 2:08 PMCzar
03/07/2020, 2:09 PMlouiscad
03/07/2020, 2:09 PMcallbackFlow
?Czar
03/07/2020, 2:10 PMlouiscad
03/07/2020, 2:10 PMcallbackFlow
Czar
03/07/2020, 2:10 PMlouiscad
03/07/2020, 2:10 PMEventHandler
coming from, first, and foremost, and is it already registered, or do you pass it somewhere?Czar
03/07/2020, 2:12 PMval handlers = List<EventHandler<*>>
handlers.forEach { eventBus.register(it) }
louiscad
03/07/2020, 2:13 PMprivate val broadcastChannel: BroadcastChannel<Unit> = BroadcastChannel(1)
val handler = object Handler : EventHandler<OnEvent> {
override fun handle(event: OnEvent) {
runCatching { broadcastChannel.offer(Unit) } // Workaround for <https://github.com/Kotlin/kotlinx.coroutines/issues/974>
}
}
val coundDownFlow = broadcastChannel.asFlow()
coundDownFlow.collectLatest {
ensureOn()
delay(10.minutes)
turnOff()
}
Czar
03/07/2020, 2:15 PMlouiscad
03/07/2020, 2:16 PMcallbackFlow
is designed to transform a single callback registration, callbacks arriving and unregistering into a Flow
that you can collect multiple times.
If you're using multiple callbacks together (I'm not debating if that's a bad idea or not), you cannot use callbackFlow
, but the BroadcastChannel
trick I corrected for you should do.Flow
extensions, maybe there's already something to replace the EventHandler
you have. In that case, you'd just need to use the collectLatest
operator on it.Czar
03/07/2020, 2:22 PMyou can launch a coroutine for that in the scope you needwhat do you mean? I don't need it anywhere, I just need it to happen 😄 Thanks for all the help by the way, I'm very confused by all of this at the moment
handle
function, so where would I launch it? Tried to put it in init
, but then realizsed it will block the construction of the handlerasFlow().collectLatest{}
// in the handler object:
init {
thread(start = true) {
runBlocking {
broadcastChannel.asFlow()
.collectLatest { }
}
}
}
It is also unsustainable, as in fact I'll have many such objects and thread per object is exactly something I'm trying to circumvent with coroutines 😞louiscad
03/07/2020, 2:57 PMGlobalScope.launch
. Otherwise, you can reuse an existing scope, and cancel it if/when you need it (in which case it's probably wiser to replace BroadcastChannel
with a proper flow built using callbackFlow
that correctly unregisters the callback in the awaitClose
lambda.Czar
03/07/2020, 3:22 PMlouiscad
03/07/2020, 3:34 PMCzar
03/08/2020, 11:01 AM