ubu
04/09/2020, 9:13 AMfun <A, B, C : Any, R> Flow<A>.withLatestFrom(
other: Flow<B>,
another: Flow<C>,
transform: suspend (A, B, C) -> R
): Flow<R> = flow {
coroutineScope {
val latestB = AtomicReference<B?>()
val latestC = AtomicReference<C?>()
val outerScope = this
launch {
try {
other.collect { latestB.set(it) }
} catch (e: CancellationException) {
outerScope.cancel(e) // cancel outer scope on cancellation exception, too
}
}
launch {
try {
another.collect { latestC.set(it) }
} catch (e: CancellationException) {
outerScope.cancel(e) // cancel outer scope on cancellation exception, too
}
}
collect { a: A ->
val b = latestB.get()
val c = latestC.get()
if (b != null && c != null) {
emit(transform(a, b, c))
}
}
}
}
Hi guys. Inspired by this implementation by @elizarov, I needed to re-implement this operator to accept several streams. Is there any problems inherent to this implementation? Two launch
blocks disturb me somehow.Dominaezzz
04/09/2020, 12:18 PMubu
04/09/2020, 2:05 PMother
, another
in the method signature above), then this would not cancel the original stream (Flow<T>
). but i am really a novice to coroutines.Dominaezzz
04/09/2020, 2:17 PMubu
04/09/2020, 2:26 PMelizarov
04/09/2020, 2:27 PMubu
04/09/2020, 2:29 PM