fun <A, B, C : Any, R> Flow<A>.withLatestFrom(
other: Flow<B>,
another: Flow<C>,
transform: suspend (A, B, C) -> R
): Flow<R> = flow {
coroutineScope {
val latestB = AtomicReference<B?>()
val latestC = AtomicReference<C?>()
val outerScope = this
launch {
try {
other.collect { latestB.set(it) }
} catch (e: CancellationException) {
outerScope.cancel(e) // cancel outer scope on cancellation exception, too
}
}
launch {
try {
another.collect { latestC.set(it) }
} catch (e: CancellationException) {
outerScope.cancel(e) // cancel outer scope on cancellation exception, too
}
}
collect { a: A ->
val b = latestB.get()
val c = latestC.get()
if (b != null && c != null) {
emit(transform(a, b, c))
}
}
}
}
Hi guys. Inspired by
this implementation by
@elizarov, I needed to re-implement this operator to accept several streams. Is there any problems inherent to this implementation? Two
launch
blocks disturb me somehow.