We have a function that converts on-disk data into...
# coroutines
n
We have a function that converts on-disk data into a stream of objects. We have multiple files and wanted to use coroutines to process them concurrently. The solution we came up with looks like this:
Copy code
fun readAll(): Stream<Metadata> {
    return runBlocking {
        channelFlow<Stream<Metadata>> {
            Type.supportedTypes.forEach { type ->
                launch(<http://Dispatchers.IO|Dispatchers.IO> + CoroutineName("Read all for $type")) {
                    send(metadataPersistenceHelper(type).readAll())
                }
            }
        }.reduce { accumulator, value -> Stream.concat(accumulator, value) }
    }
}
(
Type
has multiple supported types and
metadataPersistenceHelper
does the heavy IO). Does this makes sense or are we abusing coroutines here?
o
unless you have some need to stick with Streams, it's probably better if you make it all Flow
☝️ 1
n
No real reason to stick with streams; that is just a left-over from an earlier java implementation. We always consume all the results anyway, so I was actually thinking of converting to use
List
(and thus return
List<Metadata>
)
h
I'm wondering if it's a good or bad ideia to use
runBlocking
in a non-test scenario 🤔
o
it depends on how far upwards you can propagate the
suspend
modifier
if you're being called by some Java code that won't know what a
suspend
function is, you either have to
launch
or use
runBlocking
h
Got it
@nkiesel are you calling this code from Java?
n
yes (until we switch everything to Kotlin but that might be a few years...).
h
😄 well... Okay
n
Really I have 2 concerns: (1) does that do what we want (i.e. concurrently run the internal readAll) and (2) are we abusing coroutines here
o
yes it works, I don't really know if it's abusing it but I can think of a better way to write this
n
and perhaps (3) is
channelFlow
+
launch
+
reduce
the right approach (I do understand that it will blow up with a large set of supportedTypes but this comes from an enum which will never have mor ethan a dozen or so items)
o
I'm curious, what does
metadataPersistenceHelper(type)
return?
n
it returns a type-specific reader all implementing an interface containing
readAll
o
so personally I prefer to implement like this:
Copy code
return runBlocking {
        flow {
            withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
                Type.supportedTypes.forEach { type ->
                    val deferredStream = async(CoroutineName("Read all for $type")) {
                        metadataPersistenceHelper(type).readAll()
                    }
                    emit(deferredStream)
                }
            }
        }
            .buffer(Channel.UNLIMITED)
            .map { it.await() }
            .reduce { accumulator, value -> Stream.concat(accumulator, value) }
    }
since it's a little more sequential-like
n
interesting! I will go through this and try too understand why we e.g. need the
.buffer(Channel.UNLIMITED)
here
o
as a hint, the UNLIMITED part is not strictly necessary, it allows a bound on parallel tasks but since you said it was irrelevant (already bounded), I left it UNLIMITED
n
in my original version, looks like I could use
channelFlow {
instead of
channelFLow<Stream<Metadata>>
, no?
o
I think so, it might depend on the new inference system which is enabled in the IDE but not in the compiler by default
n
Good point; forgot about that. But actually in this case also works when using
./gradlew jar