https://kotlinlang.org logo
#coroutines
Title
# coroutines
b

Brais Gabin

01/03/2021, 11:19 AM
👋 I want to convert the
Stream
returned by
Files.walk(Path)
to a
Flow
. I could use the extension function
consumeAsFlow()
but
Files.walk(Path)
blocks the calling thread. So, what's the correct way to do it? I implemented this but it doesn't feel right:
Copy code
flow<Path> {
    Files.walk(path)
        .forEach { path -> runBlocking { emit(path) } }
}
.flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
And, as side question, is there any library that implements the filesystem api as correct suspend functions?
k

kenkyee

01/03/2021, 12:13 PM
Run file IO in a coroutine...withContext(Dispatchers.IO) { file stuff {
b

Brais Gabin

01/03/2021, 12:19 PM
So, is this better?
Copy code
flow<Path> {
    withContext(<http://Dispatchers.IO|Dispatchers.IO>) { Files.walk(path) }
        .forEach { path -> runBlocking { emit(path) } }
}
k

kenkyee

01/03/2021, 12:43 PM
That won't block at least 🙂 Could also just run whatever you need to do in the coroutine... Not sure what you're doing with the flow..
☝🏼 1
b

Brais Gabin

01/03/2021, 2:35 PM
That doesn't work I get an exception:
Copy code
java.lang.IllegalStateException: Flow invariant is violated:
		Emission from another coroutine is detected.
		Child of BlockingCoroutine{Active}@34f9275f, expected child of ProducerCoroutine{Active}@547a8927.
		FlowCollector is not thread-safe and concurrent emissions are prohibited.
		To mitigate this restriction please use 'channelFlow' builder instead of 'flow'
Is there something like
Flow.defer()
as RxJava have
Observable.defer()
. This way I could do something like:
Copy code
Flow.defer { Files.walk(Path).consumeAsFlow() }
  .flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
t

tseisel

01/03/2021, 4:18 PM
Have you tried converting your Stream to a Flow with https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-jdk8/kotlinx.coroutines.stream/java.util.stream.-stream/index.html ? (This is provided by the
kotlinx-coroutines-jdk8
artifact)
b

Brais Gabin

01/03/2021, 8:22 PM
Yes, the problem is that it seems that the creation of the stream is the blocking thing.
d

dave08

01/04/2021, 7:07 AM
Maybe use a callbackFlow builder? But you would probably need the withContext there too, you'll just avoid the runBlocking... you could maybe even make an extension function to do all this if you do this many times.
t

tseisel

01/04/2021, 8:59 AM
This should do the trick:
Copy code
fun Path.walk() = flow<Path> {
    val paths = Files.walk(this@walk).consumeAsFlow()
    emitAll(paths)
}.flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
b

Brais Gabin

01/04/2021, 10:49 PM
That was it!!
emitAll
did the trick!
5 Views