Ali Sabzevari
05/15/2020, 11:30 PMCOUNTER=0
while :
do
COUNTER=$((COUNTER + 1))
echo "$COUNTER"
sleep 1
done
And a kotlin code that runs this script in a process:
fun InputStream.linesToFlow() = channelFlow<String> {
reader().forEachLine {
offer(it)
}
}
fun main() {
runBlocking {
val p = ProcessBuilder("./cmd.sh").start()
p.inputStream.linesToFlow().map {
println(it)
it
}.filter { "3" in it }.first()
println("Done")
p.waitFor()
}
}
I expect this script to have this output:
1
2
3
Done
But it doesn't print anything!octylFractal
05/15/2020, 11:34 PMThe default CoroutineDispatcher for this builder is an internal implementation of event loop that processes continuations in this blocked thread until the completion of this coroutine.
Zach Klippenstein (he/him) [MOD]
05/15/2020, 11:35 PMlinesToFlow
loop is running on the dispatcher from runBlocking
and never yields or suspends, so the rest of your coroutines never have a chance to execute.octylFractal
05/15/2020, 11:35 PMchannelFlow
+ consuming in a separate coroutinelinesToFlow()
body will never stop emitting, since you said offer
and not send
, and therefore the code to process the channel cannot resumeAli Sabzevari
05/15/2020, 11:37 PMlinesToFlow
it should work, right?octylFractal
05/15/2020, 11:38 PMAli Sabzevari
05/15/2020, 11:38 PMoctylFractal
05/15/2020, 11:39 PMfun main() {
runBlocking {
val p = ProcessBuilder("./cmd.sh").start()
p.inputStream.bufferedReader().lineSequence().asFlow()
.flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
.map {
println(it)
it
}.filter { "3" in it }.first()
println("Done")
p.waitFor()
}
}
p.inputStream.bufferedReader().lineSequence().asFlow().flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
to a function if you wantAli Sabzevari
05/15/2020, 11:40 PMoctylFractal
05/15/2020, 11:42 PMAli Sabzevari
05/15/2020, 11:42 PMoctylFractal
05/15/2020, 11:42 PMAli Sabzevari
05/15/2020, 11:43 PMoctylFractal
05/15/2020, 11:44 PMDispatchers.Default
+ <http://Dispatchers.IO|Dispatchers.IO>
, and it will never create this state where there aren't enough threads (e.g. Default can have 8 and IO 64, even if IO uses some of the same 8 threads as the default dispatcher, the default dispatcher will spawn more if it needs to)This dispatcher shares threads with a Default dispatcher, so using withContext(Dispatchers.IO) { ... } does not lead to an actual switching to another thread — typically execution continues in the same thread.
Ali Sabzevari
05/15/2020, 11:51 PMDispatchers
here:
It looks like even if I remove .flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
, the code works as expected. Based on your elaboration, I don't understand why it works correctly.withTimeout
, it only works when I run the flow on IO
dispatcher:
fun main() {
runBlocking {
val p = ProcessBuilder("./cmd.sh").start()
withTimeout(2000) {
p.inputStream.linesToFlow().map {
println(it)
it
}.filter { "3" in it }.first()
}
println("Done")
p.waitFor()
}
}
octylFractal
05/16/2020, 4:29 PMwithTimeout
might change that, it shouldn't be blocking anywhere...Ali Sabzevari
05/17/2020, 9:26 AM