:wave: I want to convert the `Stream` returned by ...
# coroutines
b
👋 I want to convert the
Stream
returned by
Files.walk(Path)
to a
Flow
. I could use the extension function
consumeAsFlow()
but
Files.walk(Path)
blocks the calling thread. So, what's the correct way to do it? I implemented this but it doesn't feel right:
Copy code
flow<Path> {
    Files.walk(path)
        .forEach { path -> runBlocking { emit(path) } }
}
.flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
And, as side question, is there any library that implements the filesystem api as correct suspend functions?
k
Run file IO in a coroutine...withContext(Dispatchers.IO) { file stuff {
b
So, is this better?
Copy code
flow<Path> {
    withContext(<http://Dispatchers.IO|Dispatchers.IO>) { Files.walk(path) }
        .forEach { path -> runBlocking { emit(path) } }
}
k
That won't block at least 🙂 Could also just run whatever you need to do in the coroutine... Not sure what you're doing with the flow..
☝🏼 1
b
That doesn't work I get an exception:
Copy code
java.lang.IllegalStateException: Flow invariant is violated:
		Emission from another coroutine is detected.
		Child of BlockingCoroutine{Active}@34f9275f, expected child of ProducerCoroutine{Active}@547a8927.
		FlowCollector is not thread-safe and concurrent emissions are prohibited.
		To mitigate this restriction please use 'channelFlow' builder instead of 'flow'
Is there something like
Flow.defer()
as RxJava have
Observable.defer()
. This way I could do something like:
Copy code
Flow.defer { Files.walk(Path).consumeAsFlow() }
  .flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
t
Have you tried converting your Stream to a Flow with https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-jdk8/kotlinx.coroutines.stream/java.util.stream.-stream/index.html ? (This is provided by the
kotlinx-coroutines-jdk8
artifact)
b
Yes, the problem is that it seems that the creation of the stream is the blocking thing.
d
Maybe use a callbackFlow builder? But you would probably need the withContext there too, you'll just avoid the runBlocking... you could maybe even make an extension function to do all this if you do this many times.
t
This should do the trick:
Copy code
fun Path.walk() = flow<Path> {
    val paths = Files.walk(this@walk).consumeAsFlow()
    emitAll(paths)
}.flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
b
That was it!!
emitAll
did the trick!