Brais Gabin
01/03/2021, 11:19 AMStream returned by Files.walk(Path) to a Flow. I could use the extension function consumeAsFlow() but Files.walk(Path) blocks the calling thread. So, what's the correct way to do it? I implemented this but it doesn't feel right:
flow<Path> {
Files.walk(path)
.forEach { path -> runBlocking { emit(path) } }
}
.flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
And, as side question, is there any library that implements the filesystem api as correct suspend functions?kenkyee
01/03/2021, 12:13 PMBrais Gabin
01/03/2021, 12:19 PMflow<Path> {
withContext(<http://Dispatchers.IO|Dispatchers.IO>) { Files.walk(path) }
.forEach { path -> runBlocking { emit(path) } }
}kenkyee
01/03/2021, 12:43 PMBrais Gabin
01/03/2021, 2:35 PMjava.lang.IllegalStateException: Flow invariant is violated:
Emission from another coroutine is detected.
Child of BlockingCoroutine{Active}@34f9275f, expected child of ProducerCoroutine{Active}@547a8927.
FlowCollector is not thread-safe and concurrent emissions are prohibited.
To mitigate this restriction please use 'channelFlow' builder instead of 'flow'
Is there something like Flow.defer() as RxJava have Observable.defer(). This way I could do something like:
Flow.defer { Files.walk(Path).consumeAsFlow() }
.flowOn(<http://Dispatchers.IO|Dispatchers.IO>)tseisel
01/03/2021, 4:18 PMkotlinx-coroutines-jdk8 artifact)Brais Gabin
01/03/2021, 8:22 PMdave08
01/04/2021, 7:07 AMtseisel
01/04/2021, 8:59 AMfun Path.walk() = flow<Path> {
val paths = Files.walk(this@walk).consumeAsFlow()
emitAll(paths)
}.flowOn(<http://Dispatchers.IO|Dispatchers.IO>)Brais Gabin
01/04/2021, 10:49 PMemitAll did the trick!