https://kotlinlang.org logo
#coroutines
Title
# coroutines
u

ubu

04/09/2020, 9:13 AM
Copy code
fun <A, B, C : Any, R> Flow<A>.withLatestFrom(
  other: Flow<B>,
  another: Flow<C>,
  transform: suspend (A, B, C) -> R
): Flow<R> = flow {
  coroutineScope {
    val latestB = AtomicReference<B?>()
    val latestC = AtomicReference<C?>()
    val outerScope = this
    launch {
      try {
        other.collect { latestB.set(it) }
      } catch (e: CancellationException) {
        outerScope.cancel(e) // cancel outer scope on cancellation exception, too
      }
    }
    launch {
      try {
        another.collect { latestC.set(it) }
      } catch (e: CancellationException) {
        outerScope.cancel(e) // cancel outer scope on cancellation exception, too
      }
    }
    collect { a: A ->
      val b = latestB.get()
      val c = latestC.get()
      if (b != null && c != null) {
        emit(transform(a, b, c))
      }
    }
  }
}
Hi guys. Inspired by this implementation by @elizarov, I needed to re-implement this operator to accept several streams. Is there any problems inherent to this implementation? Two
launch
blocks disturb me somehow.
d

Dominaezzz

04/09/2020, 12:18 PM
I don't think you need the try catch, since you have structured concurrency.
u

ubu

04/09/2020, 2:05 PM
as far as I understand, if we have an error from one of the streams (
other
,
another
in the method signature above), then this would not cancel the original stream (
Flow<T>
). but i am really a novice to coroutines.
d

Dominaezzz

04/09/2020, 2:17 PM
Oh I see, you're specifically catching cancellations. I wish there was a nicer way to achieve this.
u

ubu

04/09/2020, 2:26 PM
I wish there was this operator implemented by Kotlin Coroutines library. It’s really useful, a must-have.
e

elizarov

04/09/2020, 2:27 PM
I would be greatful if you add your specific use-case (how, why and when you use it) to this issue: https://github.com/Kotlin/kotlinx.coroutines/issues/1498
u

ubu

04/09/2020, 2:29 PM
sure, I will.
5 Views