nkiesel
05/28/2020, 7:56 PMfun readAll(): Stream<Metadata> {
return runBlocking {
channelFlow<Stream<Metadata>> {
Type.supportedTypes.forEach { type ->
launch(<http://Dispatchers.IO|Dispatchers.IO> + CoroutineName("Read all for $type")) {
send(metadataPersistenceHelper(type).readAll())
}
}
}.reduce { accumulator, value -> Stream.concat(accumulator, value) }
}
}
(Type
has multiple supported types and metadataPersistenceHelper
does the heavy IO). Does this makes sense or are we abusing coroutines here?octylFractal
05/28/2020, 7:58 PMnkiesel
05/28/2020, 8:02 PMList
(and thus return List<Metadata>
)henrikhorbovyi
05/28/2020, 8:03 PMrunBlocking
in a non-test scenario š¤octylFractal
05/28/2020, 8:04 PMsuspend
modifiersuspend
function is, you either have to launch
or use runBlocking
henrikhorbovyi
05/28/2020, 8:04 PMnkiesel
05/28/2020, 8:05 PMhenrikhorbovyi
05/28/2020, 8:06 PMnkiesel
05/28/2020, 8:06 PMoctylFractal
05/28/2020, 8:07 PMnkiesel
05/28/2020, 8:08 PMchannelFlow
+ launch
+ reduce
the right approach (I do understand that it will blow up with a large set of supportedTypes but this comes from an enum which will never have mor ethan a dozen or so items)octylFractal
05/28/2020, 8:18 PMmetadataPersistenceHelper(type)
return?nkiesel
05/28/2020, 8:27 PMreadAll
octylFractal
05/28/2020, 8:38 PMreturn runBlocking {
flow {
withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
Type.supportedTypes.forEach { type ->
val deferredStream = async(CoroutineName("Read all for $type")) {
metadataPersistenceHelper(type).readAll()
}
emit(deferredStream)
}
}
}
.buffer(Channel.UNLIMITED)
.map { it.await() }
.reduce { accumulator, value -> Stream.concat(accumulator, value) }
}
since it's a little more sequential-likenkiesel
05/28/2020, 9:37 PM.buffer(Channel.UNLIMITED)
hereoctylFractal
05/28/2020, 9:41 PMnkiesel
05/28/2020, 9:53 PMchannelFlow {
instead of channelFLow<Stream<Metadata>>
, no?octylFractal
05/28/2020, 9:54 PMnkiesel
05/28/2020, 10:01 PM./gradlew jar