https://kotlinlang.org logo
Title
a

Ali Sabzevari

05/15/2020, 11:30 PM
I have a shell script that prints a counter value every second:
COUNTER=0
while :
do
	COUNTER=$((COUNTER + 1))
	echo "$COUNTER"
	sleep 1
done
And a kotlin code that runs this script in a process:
fun InputStream.linesToFlow() = channelFlow<String> {
    reader().forEachLine {
        offer(it)
    }
}

fun main() {
    runBlocking {
        val p = ProcessBuilder("./cmd.sh").start()
        p.inputStream.linesToFlow().map {
            println(it)
            it
        }.filter { "3" in it }.first()
        println("Done")
        p.waitFor()
    }
}
I expect this script to have this output:
1
2
3
Done
But it doesn't print anything!
o

octylFractal

05/15/2020, 11:34 PM
https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/run-blocking.html
The default CoroutineDispatcher for this builder is an internal implementation of event loop that processes continuations in this blocked thread until the completion of this coroutine.
I think this single-thread limitation applies in some way
z

Zach Klippenstein (he/him) [MOD]

05/15/2020, 11:35 PM
I think it’s because your
linesToFlow
loop is running on the dispatcher from
runBlocking
and never yields or suspends, so the rest of your coroutines never have a chance to execute.
o

octylFractal

05/15/2020, 11:35 PM
ah yes, it's due to the
channelFlow
+ consuming in a separate coroutine
the
linesToFlow()
body will never stop emitting, since you said
offer
and not
send
, and therefore the code to process the channel cannot resume
☝️ 1
a

Ali Sabzevari

05/15/2020, 11:37 PM
So if I launch a new coroutine with a separate thread inside
linesToFlow
it should work, right?
o

octylFractal

05/15/2020, 11:38 PM
possibly, but this is poorly designed IMO
a

Ali Sabzevari

05/15/2020, 11:38 PM
How can I emit the input stream of my process to a flow?
o

octylFractal

05/15/2020, 11:39 PM
I think this is better:
fun main() {
    runBlocking {
        val p = ProcessBuilder("./cmd.sh").start()
        p.inputStream.bufferedReader().lineSequence().asFlow()
            .flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
            .map {
                println(it)
                it
            }.filter { "3" in it }.first()
        println("Done")
        p.waitFor()
    }
}
2
you can condense
p.inputStream.bufferedReader().lineSequence().asFlow().flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
to a function if you want
a

Ali Sabzevari

05/15/2020, 11:40 PM
Nice, thank you. I will give it a try and come back to you if I had more questions
One quick question though
Why did you decide to use IO dispatcher here?
o

octylFractal

05/15/2020, 11:42 PM
because it's doing IO work
a

Ali Sabzevari

05/15/2020, 11:42 PM
So it does not need to be necessarily in a separate thread?
o

octylFractal

05/15/2020, 11:42 PM
that is a separate thread
a

Ali Sabzevari

05/15/2020, 11:43 PM
I thought it is a separate pool but not necessarily a separate thread
I need to read more about this
o

octylFractal

05/15/2020, 11:44 PM
that's only true for
Dispatchers.Default
+
<http://Dispatchers.IO|Dispatchers.IO>
, and it will never create this state where there aren't enough threads (e.g. Default can have 8 and IO 64, even if IO uses some of the same 8 threads as the default dispatcher, the default dispatcher will spawn more if it needs to)
ref. https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-i-o.html
This dispatcher shares threads with a Default dispatcher, so using withContext(Dispatchers.IO) { ... } does not lead to an actual switching to another thread — typically execution continues in the same thread.
a

Ali Sabzevari

05/15/2020, 11:51 PM
This dispatcher shares threads with a Default dispatcher, so using withContext(Dispatchers.IO) { ... } does not lead to an actual switching to another thread — typically execution continues in the same thread.
Thank you for the great answer 😊
I have a follow-up question regarding
Dispatchers
here: It looks like even if I remove
.flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
, the code works as expected. Based on your elaboration, I don't understand why it works correctly.
There is also another interesting catch here. When I use
withTimeout
, it only works when I run the flow on
IO
dispatcher:
fun main() {
    runBlocking {
        val p = ProcessBuilder("./cmd.sh").start()
        withTimeout(2000) {
            p.inputStream.linesToFlow().map {
                println(it)
                it
            }.filter { "3" in it }.first()
        }
        println("Done")
        p.waitFor()
    }
}
o

octylFractal

05/16/2020, 4:29 PM
I suspect it may have to do with the fact that I used a Sequence, which itself is suspending. that means that there isn't another coroutine processing events, it's all in the same stack. not sure why
withTimeout
might change that, it shouldn't be blocking anywhere...
a

Ali Sabzevari

05/17/2020, 9:26 AM
Roman Elizarov wrote a comment in my question in stackoverflow that might be helpful: https://stackoverflow.com/questions/61825334/convert-inputstream-to-flow/61833785?noredirect=1#comment109394481_61833785