Hi, I need share operator. The <implementations> s...
# coroutines
j
Hi, I need share operator. The implementations seem to be quite complicated so I've written this. Do you have any idea why it shouldn't work correctly?
Copy code
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
import java.util.concurrent.atomic.*

fun <T> Flow<T>.share(): Flow<T> {
  val channel = ConflatedBroadcastChannel<T>()
  val counter = AtomicInteger()
  var job: Job? = null

  return channel
    .asFlow()
    .onStart {
      if (counter.incrementAndGet() == 1) {
        job = GlobalScope.launch {
          this@share
            .catch {
              channel.close(it)
            }
            .collect {
              channel.offer(it)
            }
        }
      }
    }
    .onCompletion {
      if (counter.decrementAndGet() == 0) {
        job?.cancelAndJoin()
        job = null
      }
    }
}
e
You have two pieces of state here:
counter
and
job
and there's a race in how you check/update them. E.g. you can end up in an invalid state with
counter > 0 && job == null
👍 1
j
Thanks!
b
Also there is an issue with Conflated channel - it seems unexpected that
share()
operator will conflate emission by default. You probably need to make it
Copy code
fun <T> Flow<T>.share(capacity: Int = CONFLATED/BUFFERED): Flow<T> { 
    val channel = BroadcastChannel<T>(capacity)
.....
👍 2
🤔 1
j
Isn't rendevouz more clean option? share() operator will do the sharing but not replay?
b
there is no rendevouz broadcast channel - only conflated and buffered. Closest to rendevous is buffered with capacity = 1
buffered broadcast channel won't replay last element
j
Why not? It will replay the last element if there is an open subscription, won't it?
b
with Conflated will, with Buffered won't - Conflated: https://pl.kotl.in/tOI0C_NCA Buffered: https://pl.kotl.in/NtvlYT9gV You can check ArrayBroadcastChannel (it's under the hood when you choose BUFFERED) docs:
* Note, that elements that are sent to this channel while there are no subscribers are immediately lost.
👆 it applies to the second subscriber as well - if first subscriber consumed element, it lost