How do I synchronize the dispatchAction function b...
# coroutines
b
How do I synchronize the dispatchAction function below so that it only starts sending items once there is a downstream collector? onStart unfortunately get called right before collect.
Copy code
class StateMachine(val scope: CoroutineScope) {  

  private val inputActions: BroadcastChannel<Rankings.Action> = BroadcastChannel(Channel.BUFFERED)
  private var isInitialized = atomic(false)

  init {
    scope.launch {
      inputActions.asFlow()
        .onStart { isInitialized.value = true //onStart hook is called just before collect gets called }
        .collect { action ->
          //handle action
        }
      // CANNOT CALL FUNCTION HERE BECAUSE `collect` SUSPENDS
    }
  }

  fun dispatchAction(action: Rankings.Action) = scope.launch {
    while(!isInitialized.value) {
    }
    inputActions.send(action)
  }
}
k
What is you intention for actions dispatched before initialization happens? Do you want them to get lost or to get buffered in channel and consumed later when a collector gets attached to the flow? Also I can't quite get how could
dispatchAction
get called before a downstream collector is attached, for it is attached right in the class constructor.
u
@Kiryushin Andrey a job is launched in the constructor to attach the collector. This is not the same as the collector being attached in the constructor
k
Oh, right, I've overlooked that, thank you for noticing
Not sure though what behavior is desired for the
dispatchAction
method while initialization hasn't yet completed. If it should just drop actions while a collector isn't there, then simple condition check like the following will do:
Copy code
if (isInitialized.value) {
    inputActions.send(action)
}
b
The desired behavior is for send to suspend until the first collector has subscribed to upstream events
This is a code pared down a little bit from implementing an mvi/redux-like pattern for mobile. Events are dispatched from the view and get pipelined through a scan operator that spits out a Flow of ViewState's back to the view
u
how about making inputActions a
Deferred<BroadcastChannel<Rankings.Action>>
and then:
Copy code
fun dispatchAction(action: Rankings.Action) = scope.launch {
    inputActions.await().send(action)
  }
b
ooo like that idea. I will give that a try and report back
ah sorry -- I was overly optimistic and responded while walking to work! I guess the crux of the question is how to define a hook for when the first collector starts collecting on a flow. I probably should have framed the question that way from the get-go
The challenge is that collect is a suspending function, so you must call it within a coroutine and you cannot execute any code after the collect call.
Copy code
@InternalCoroutinesApi
fun main() {
    val channel = BroadcastChannel<Int>(Channel.BUFFERED)

    GlobalScope.launch {
        var i = 0
        delay(10_000)
        while (true) {
            delay(1000)
            channel.send(i)
            i++
        }
    }

    GlobalScope.launch {
        val flow = channel.asFlow().asBindFlow {
            println("onBind")
        }

        delay(5000)
        println("adding terminal downstream collector")
        flow.collect {
            println("collected: $it")
        }
    }
    while(true) {

    }
}

@InternalCoroutinesApi
fun <T> Flow<T>.asBindFlow(onBind: () -> Unit): Flow<T> {
    return BindFlow(onBind = onBind, flow = this)
}

@InternalCoroutinesApi
class BindFlow<T>(val onBind: () -> Unit, val flow: Flow<T>) : Flow<T> {
    private var hasBinded = AtomicBoolean(false)
    private val mutex = Mutex()

    @InternalCoroutinesApi
    override suspend fun collect(collector: FlowCollector<T>) {
        if (!hasBinded.get()) {
            mutex.withLock {
                if (!hasBinded.get()) {
                    onBind()
                    hasBinded.set(true)
                }
            }
        }
        flow.collect {
            collector.emit(it)
        }
    }
}
And we can avoid the mutex in BindFlow with compareAndSet
Copy code
@InternalCoroutinesApi
class BindFlow<T>(val onBind: () -> Unit, val flow: Flow<T>) : Flow<T> {
    private var hasBinded = AtomicBoolean(false)

    @InternalCoroutinesApi
    override suspend fun collect(collector: FlowCollector<T>) {
        if (hasBinded.compareAndSet(false, true)) {
            onBind()
        }
        flow.collect {
            collector.emit(it)
        }
    }
}