```fun <A, B, C : Any, R> Flow<A>.with...
# coroutines
u
Copy code
fun <A, B, C : Any, R> Flow<A>.withLatestFrom(
  other: Flow<B>,
  another: Flow<C>,
  transform: suspend (A, B, C) -> R
): Flow<R> = flow {
  coroutineScope {
    val latestB = AtomicReference<B?>()
    val latestC = AtomicReference<C?>()
    val outerScope = this
    launch {
      try {
        other.collect { latestB.set(it) }
      } catch (e: CancellationException) {
        outerScope.cancel(e) // cancel outer scope on cancellation exception, too
      }
    }
    launch {
      try {
        another.collect { latestC.set(it) }
      } catch (e: CancellationException) {
        outerScope.cancel(e) // cancel outer scope on cancellation exception, too
      }
    }
    collect { a: A ->
      val b = latestB.get()
      val c = latestC.get()
      if (b != null && c != null) {
        emit(transform(a, b, c))
      }
    }
  }
}
Hi guys. Inspired by this implementation by @elizarov, I needed to re-implement this operator to accept several streams. Is there any problems inherent to this implementation? Two
launch
blocks disturb me somehow.
d
I don't think you need the try catch, since you have structured concurrency.
u
as far as I understand, if we have an error from one of the streams (
other
,
another
in the method signature above), then this would not cancel the original stream (
Flow<T>
). but i am really a novice to coroutines.
d
Oh I see, you're specifically catching cancellations. I wish there was a nicer way to achieve this.
u
I wish there was this operator implemented by Kotlin Coroutines library. It’s really useful, a must-have.
e
I would be greatful if you add your specific use-case (how, why and when you use it) to this issue: https://github.com/Kotlin/kotlinx.coroutines/issues/1498
u
sure, I will.