I'm using `java.util.stream.Stream.consumeAsFlow()...
# coroutines
j
I'm using
java.util.stream.Stream.consumeAsFlow()
to process a stream as a flow. Sometimes, it appears that the
finally { stream.close() }
block is not getting called (possibly on certain cancellation / error conditions?) (confirmed by duplicating the StreamFlow class and adding a log statement), although if I add an
onCompletion {}
to the returned flow, that block does get called even when the finally gets skipped. Anything that can explain this behavior? Will probably move back to using the
onCompletion
to close the Stream in the meantime though.
l
Can you link the sources of the
consumeAsFlow
function?
j
https://github.com/Kotlin/kotlinx.coroutines/blob/master/integration/kotlinx-coroutines-jdk8/src/stream/Stream.kt we basically do:
Copy code
val stream = jooqQuery.fetchLazy().stream().map { /* map result to data class */}
return stream.consumeAsFlow().flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
       .onCompletion { stream.close() } // should not need this line, as the finally block in the StreamFlow should close it
filed in github for anyone coming along later: https://github.com/Kotlin/kotlinx.coroutines/issues/1825
👀 1