Jan Skrasek
03/31/2020, 9:42 AMimport kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
import java.util.concurrent.atomic.*
fun <T> Flow<T>.share(): Flow<T> {
val channel = ConflatedBroadcastChannel<T>()
val counter = AtomicInteger()
var job: Job? = null
return channel
.asFlow()
.onStart {
if (counter.incrementAndGet() == 1) {
job = GlobalScope.launch {
this@share
.catch {
channel.close(it)
}
.collect {
channel.offer(it)
}
}
}
}
.onCompletion {
if (counter.decrementAndGet() == 0) {
job?.cancelAndJoin()
job = null
}
}
}
elizarov
03/31/2020, 9:52 AMcounter
and job
and there's a race in how you check/update them. E.g. you can end up in an invalid state with counter > 0 && job == null
Jan Skrasek
03/31/2020, 9:59 AMbezrukov
03/31/2020, 10:18 AMshare()
operator will conflate emission by default. You probably need to make it
fun <T> Flow<T>.share(capacity: Int = CONFLATED/BUFFERED): Flow<T> {
val channel = BroadcastChannel<T>(capacity)
.....
Jan Skrasek
04/01/2020, 9:59 AMbezrukov
04/01/2020, 10:08 AMJan Skrasek
04/01/2020, 10:21 AMbezrukov
04/01/2020, 11:46 AM* Note, that elements that are sent to this channel while there are no subscribers are immediately lost.š it applies to the second subscriber as well - if first subscriber consumed element, it lost