Brendan Weinstein
02/24/2020, 9:10 AMclass StateMachine(val scope: CoroutineScope) {
private val inputActions: BroadcastChannel<Rankings.Action> = BroadcastChannel(Channel.BUFFERED)
private var isInitialized = atomic(false)
init {
scope.launch {
inputActions.asFlow()
.onStart { isInitialized.value = true //onStart hook is called just before collect gets called }
.collect { action ->
//handle action
}
// CANNOT CALL FUNCTION HERE BECAUSE `collect` SUSPENDS
}
}
fun dispatchAction(action: Rankings.Action) = scope.launch {
while(!isInitialized.value) {
}
inputActions.send(action)
}
}
Kiryushin Andrey
02/24/2020, 9:56 AMdispatchAction
get called before a downstream collector is attached, for it is attached right in the class constructor.uli
02/24/2020, 12:57 PMKiryushin Andrey
02/24/2020, 1:00 PMdispatchAction
method while initialization hasn't yet completed. If it should just drop actions while a collector isn't there, then simple condition check like the following will do:
if (isInitialized.value) {
inputActions.send(action)
}
Brendan Weinstein
02/24/2020, 4:43 PMuli
02/24/2020, 4:47 PMDeferred<BroadcastChannel<Rankings.Action>>
and then:
fun dispatchAction(action: Rankings.Action) = scope.launch {
inputActions.await().send(action)
}
Brendan Weinstein
02/24/2020, 4:52 PM@InternalCoroutinesApi
fun main() {
val channel = BroadcastChannel<Int>(Channel.BUFFERED)
GlobalScope.launch {
var i = 0
delay(10_000)
while (true) {
delay(1000)
channel.send(i)
i++
}
}
GlobalScope.launch {
val flow = channel.asFlow().asBindFlow {
println("onBind")
}
delay(5000)
println("adding terminal downstream collector")
flow.collect {
println("collected: $it")
}
}
while(true) {
}
}
@InternalCoroutinesApi
fun <T> Flow<T>.asBindFlow(onBind: () -> Unit): Flow<T> {
return BindFlow(onBind = onBind, flow = this)
}
@InternalCoroutinesApi
class BindFlow<T>(val onBind: () -> Unit, val flow: Flow<T>) : Flow<T> {
private var hasBinded = AtomicBoolean(false)
private val mutex = Mutex()
@InternalCoroutinesApi
override suspend fun collect(collector: FlowCollector<T>) {
if (!hasBinded.get()) {
mutex.withLock {
if (!hasBinded.get()) {
onBind()
hasBinded.set(true)
}
}
}
flow.collect {
collector.emit(it)
}
}
}
@InternalCoroutinesApi
class BindFlow<T>(val onBind: () -> Unit, val flow: Flow<T>) : Flow<T> {
private var hasBinded = AtomicBoolean(false)
@InternalCoroutinesApi
override suspend fun collect(collector: FlowCollector<T>) {
if (hasBinded.compareAndSet(false, true)) {
onBind()
}
flow.collect {
collector.emit(it)
}
}
}