Brais Gabin
01/03/2021, 11:19 AMStream
returned by Files.walk(Path)
to a Flow
. I could use the extension function consumeAsFlow()
but Files.walk(Path)
blocks the calling thread. So, what's the correct way to do it? I implemented this but it doesn't feel right:
flow<Path> {
Files.walk(path)
.forEach { path -> runBlocking { emit(path) } }
}
.flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
And, as side question, is there any library that implements the filesystem api as correct suspend functions?kenkyee
01/03/2021, 12:13 PMBrais Gabin
01/03/2021, 12:19 PMflow<Path> {
withContext(<http://Dispatchers.IO|Dispatchers.IO>) { Files.walk(path) }
.forEach { path -> runBlocking { emit(path) } }
}
kenkyee
01/03/2021, 12:43 PMBrais Gabin
01/03/2021, 2:35 PMjava.lang.IllegalStateException: Flow invariant is violated:
Emission from another coroutine is detected.
Child of BlockingCoroutine{Active}@34f9275f, expected child of ProducerCoroutine{Active}@547a8927.
FlowCollector is not thread-safe and concurrent emissions are prohibited.
To mitigate this restriction please use 'channelFlow' builder instead of 'flow'
Is there something like Flow.defer()
as RxJava have Observable.defer()
. This way I could do something like:
Flow.defer { Files.walk(Path).consumeAsFlow() }
.flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
tseisel
01/03/2021, 4:18 PMkotlinx-coroutines-jdk8
artifact)Brais Gabin
01/03/2021, 8:22 PMdave08
01/04/2021, 7:07 AMtseisel
01/04/2021, 8:59 AMfun Path.walk() = flow<Path> {
val paths = Files.walk(this@walk).consumeAsFlow()
emitAll(paths)
}.flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
Brais Gabin
01/04/2021, 10:49 PMemitAll
did the trick!