Lilly
04/21/2021, 12:20 PMFlow
manually but also waits until all launched coroutines in this flow have finished? I have a channel which is receivedAsFlow
which in turn keeps the flow running until the channel is closed (by user or by exception). I would like to escape from this flow but also wait until all coroutines have finished...Zach Klippenstein (he/him) [MOD]
04/21/2021, 1:10 PMcollect{}
call)?Lilly
04/21/2021, 1:25 PMLilly
04/21/2021, 1:27 PMsubscribeToPacketChannel()
is just a call to channel.receiveAsFlow()
. So as I wrote in the comments, the flow collects multiple packets but this happens faster than parsing each packet. So while last packet has been collected there might be 1, 4 or even 6 packets still busy with parsing. After all packets are parsed I would expect that the flow completes but since it's collecting from a channel it never completes.
What do you mean by escape from the flow – you want the collector to stop collecting (execute the next line after theBy "escape" I mean that I would expect, that the flow completes after all packets have been parsed and the next line is executedcall)?collect{}
Zach Klippenstein (he/him) [MOD]
04/21/2021, 1:40 PMpacketCollectorJob
, which isn’t actually a job but a CoroutineScope
)?Zach Klippenstein (he/him) [MOD]
04/21/2021, 1:42 PMLilly
04/21/2021, 1:49 PMLilly
04/21/2021, 2:02 PMDo you mean to launch those parsing coroutines as children of that outer coroutine scope (I forgot to remove this line with, which isn’t actually a job but apacketCollectorJob
)?CoroutineScope
packetCollectorJob
. It is part of my current workaround:
coroutineScope {
launch {
val packetCollectorJob = this
dataSource.subscribeToPacketChannel().collect { packet ->
try {
coroutineScope {
// Launch parsing in its own coroutine because
// it might happen that parsing hasn't finished
// while next packet arrives
launch(<http://Dispatchers.IO|Dispatchers.IO>) {
packet.deserialize(destination) // non-suspending function, destination is a Map
}
}
} catch (e: LastPacketException) {
// Within deserialize: when last packet is determined
// this exception is thrown. But detection of last packet
// does not mean all packets are parsed/ all child coroutines
// have been finished. So I delay execution before canceling the scope
// which in turn cancels the flow which is the goal
launch {
delay(6000L) // TODO dirty workaround
/*
* Cancel coroutine which in turn cancels the flow.
* This is necessary because [subscribeToPacketChannel] function
* relies on a channel, i.e. the flow collector is only cancelled when
* channel is closed by user or by exception of the flow collector itself.
*/
packetCollectorJob.cancel()
}
}
}
}
launch { // another coroutine }
}
Zach Klippenstein (he/him) [MOD]
04/21/2021, 2:18 PMpacket.deserialize(destination, deviceProfile)
, is destination
a callback of some sort?Lilly
04/21/2021, 2:19 PMMap
Zach Klippenstein (he/him) [MOD]
04/21/2021, 2:19 PMLilly
04/21/2021, 2:20 PMZach Klippenstein (he/him) [MOD]
04/21/2021, 2:21 PMdeserialize
actually do IO, or just process data that’s already in memory?Lilly
04/21/2021, 2:23 PMZach Klippenstein (he/him) [MOD]
04/21/2021, 2:31 PMBuffer
doesn’t mean you’re doing IO, Buffer
is just an in-memory data structureZach Klippenstein (he/him) [MOD]
04/21/2021, 2:33 PMDefault
dispatcher, not IO
. And if that’s the case, then are you sure that deserializing is actually so expensive that each packet has to be done in parallel?Zach Klippenstein (he/him) [MOD]
04/21/2021, 2:35 PMLilly
04/21/2021, 2:39 PMdestination
) fasterZach Klippenstein (he/him) [MOD]
04/21/2021, 2:40 PMcoroutineScope {
launch(Dispatchers.Default) {
dataSource.subscribeToPacketChannel()
.map {
try {
packet.deserialize(destination, …)
return@map true
} catch (e: LastPacketException) {
return@map false
}
}
.takeWhile { it }
.collect()
}
// launch other coroutines
}
Lilly
04/21/2021, 2:46 PMZach Klippenstein (he/him) [MOD]
04/21/2021, 2:49 PMIO
is good for this sort of thing because it uses a thread pool with a large thread count. It assumes that, at any given time, all of its threads will be blocked on OS requests. That is, not doing any actual work, just waiting for the OS to reschedule them. Because these threads aren’t actually doing any processing themselves, there’s no reason to limit them to the number of available processors/cores. It’s bad for CPU-bound work because the system can only run as many tasks in parallel as there are physical processors/cores – any more than that, and the OS is going to have to start swapping between threads, giving them each a chance to run. Switching threads is relatively expensive, since the OS has to swap out all the registers and stuff. So while the OS can handle more threads actively doing work than there are processors to do it, it’s not as efficient.
Default
is good for CPU-bound work because it uses a thread pool that basically has only as many threads as there are actual physical cores/processors available. This means that, if the Default
dispatcher is saturated with work, the available processing hardware will be saturated with work, but no more coroutines will be resumed until there’s another physical processor available. It’s bad for IO because if one of the threads is blocked on IO, that means there’s effectively one physical processor that can’t be used to do actual work while the jvm is waiting for the OS to return a response.Lilly
04/21/2021, 3:44 PMZach Klippenstein (he/him) [MOD]
04/21/2021, 4:01 PMBuffer
in memory that’s been filled already, then working with that Buffer
directly is not IO in this sense, it’s effectively the same thing as doing operations on a MutableList
.Lilly
04/21/2021, 4:31 PMZach Klippenstein (he/him) [MOD]
04/21/2021, 4:36 PMLilly
04/21/2021, 4:41 PMcoroutineScope {
launch(<http://Dispatchers.IO|Dispatchers.IO>) {
packet.deserialize(destination)
}
}
isn't it the same as just calling packet.deserialize(destination)
? So in the end it doesn't really make sense or?Zach Klippenstein (he/him) [MOD]
04/21/2021, 4:42 PMwithContext(<http://Dispatchers.IO|Dispatchers.IO>) {
packet.deserialize(destination)
}
Zach Klippenstein (he/him) [MOD]
04/21/2021, 4:42 PMwithContext
instead of `coroutineScope`+`launch`Lilly
04/21/2021, 4:50 PMcoroutineScope {
launch(<http://Dispatchers.IO|Dispatchers.IO>) {
packet.deserialize(destination)
}
}
does not make sense, because I run packet in it's own coroutine but since it's wrapped with coroutineScope
the effect of parallelism is neutralized (because coroutineScope waits for completion of its childs so in the end the packets are parsed sequentially), so it's the same as just calling packet.deserialize(destination)
. Am I right with this assumption?Zach Klippenstein (he/him) [MOD]
04/21/2021, 4:57 PMpacketCollectorJob
scope, since that seemed to be your intent.Lilly
04/21/2021, 5:03 PMandylamax
04/26/2021, 2:11 AMDefault
and IO
Dispatchers. Funny thing is I already knew this stuff, but damn... That explanation made me learn again something I already knew😂