I have a shell script that prints a counter value ...
# coroutines
a
I have a shell script that prints a counter value every second:
Copy code
COUNTER=0
while :
do
	COUNTER=$((COUNTER + 1))
	echo "$COUNTER"
	sleep 1
done
And a kotlin code that runs this script in a process:
Copy code
fun InputStream.linesToFlow() = channelFlow<String> {
    reader().forEachLine {
        offer(it)
    }
}

fun main() {
    runBlocking {
        val p = ProcessBuilder("./cmd.sh").start()
        p.inputStream.linesToFlow().map {
            println(it)
            it
        }.filter { "3" in it }.first()
        println("Done")
        p.waitFor()
    }
}
I expect this script to have this output:
Copy code
1
2
3
Done
But it doesn't print anything!
o
https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/run-blocking.html
The default CoroutineDispatcher for this builder is an internal implementation of event loop that processes continuations in this blocked thread until the completion of this coroutine.
I think this single-thread limitation applies in some way
z
I think it’s because your
linesToFlow
loop is running on the dispatcher from
runBlocking
and never yields or suspends, so the rest of your coroutines never have a chance to execute.
o
ah yes, it's due to the
channelFlow
+ consuming in a separate coroutine
the
linesToFlow()
body will never stop emitting, since you said
offer
and not
send
, and therefore the code to process the channel cannot resume
☝️ 1
a
So if I launch a new coroutine with a separate thread inside
linesToFlow
it should work, right?
o
possibly, but this is poorly designed IMO
a
How can I emit the input stream of my process to a flow?
o
I think this is better:
Copy code
fun main() {
    runBlocking {
        val p = ProcessBuilder("./cmd.sh").start()
        p.inputStream.bufferedReader().lineSequence().asFlow()
            .flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
            .map {
                println(it)
                it
            }.filter { "3" in it }.first()
        println("Done")
        p.waitFor()
    }
}
2
you can condense
p.inputStream.bufferedReader().lineSequence().asFlow().flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
to a function if you want
a
Nice, thank you. I will give it a try and come back to you if I had more questions
One quick question though
Why did you decide to use IO dispatcher here?
o
because it's doing IO work
a
So it does not need to be necessarily in a separate thread?
o
that is a separate thread
a
I thought it is a separate pool but not necessarily a separate thread
I need to read more about this
o
that's only true for
Dispatchers.Default
+
<http://Dispatchers.IO|Dispatchers.IO>
, and it will never create this state where there aren't enough threads (e.g. Default can have 8 and IO 64, even if IO uses some of the same 8 threads as the default dispatcher, the default dispatcher will spawn more if it needs to)
ref. https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-i-o.html
This dispatcher shares threads with a Default dispatcher, so using withContext(Dispatchers.IO) { ... } does not lead to an actual switching to another thread — typically execution continues in the same thread.
a
This dispatcher shares threads with a Default dispatcher, so using withContext(Dispatchers.IO) { ... } does not lead to an actual switching to another thread — typically execution continues in the same thread.
Thank you for the great answer 😊
I have a follow-up question regarding
Dispatchers
here: It looks like even if I remove
.flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
, the code works as expected. Based on your elaboration, I don't understand why it works correctly.
There is also another interesting catch here. When I use
withTimeout
, it only works when I run the flow on
IO
dispatcher:
Copy code
fun main() {
    runBlocking {
        val p = ProcessBuilder("./cmd.sh").start()
        withTimeout(2000) {
            p.inputStream.linesToFlow().map {
                println(it)
                it
            }.filter { "3" in it }.first()
        }
        println("Done")
        p.waitFor()
    }
}
o
I suspect it may have to do with the fact that I used a Sequence, which itself is suspending. that means that there isn't another coroutine processing events, it's all in the same stack. not sure why
withTimeout
might change that, it shouldn't be blocking anywhere...
a
Roman Elizarov wrote a comment in my question in stackoverflow that might be helpful: https://stackoverflow.com/questions/61825334/convert-inputstream-to-flow/61833785?noredirect=1#comment109394481_61833785