Francis Reynders

04/06/2023, 5:00 PM
Also another question; I've been struggling with this for a while now. Say I have a
into which events from backend are emitted. I would like for the client to be able to subscribe to it for 2 use cases: 1. Initially, must collect all messages since start, so some replay is needed 2. Subscribe again in case of reconnect after a network issues for example. Ideally this should replay as of last collected message of (1) or previous (2). I believe this can be achieved by having
without buffer for this SharedFlow and thus letting the emitter suspend, but that is incompatible with 1. Any idea how this is typically implemented?
Still looking for a solution. I'm currently thinking about using this:
val activeFlowId = AtomicInteger()
    val sharedFlow = eventChannel.consumeAsFlow().shareIn(scope, SharingStarted.Eagerly, 1024)

    fun getFlow(minIndex: Int = 0): Flow<ChatEvent> {
        val flowId = activeFlowId.incrementAndGet()

        return sharedFlow
            .filter { it.index >= minIndex }
            .takeWhile { flowId == activeFlowId.get() }
            .buffer(16, BufferOverflow.DROP_OLDEST)
Basically I'm manually keeping an
for each subscription and only taking values if the
matches. That should stop collections from previous ids if they still handle emits. Also adding a small buffer with
so inactive subscriptions do not block the emissions from the shared flow. There is probably a better way, any insight is appreciated.
Reconsidered the approach and now using this:
class HandlerChatSession
private constructor(
    val chatEnded: AtomicBoolean,
    val eventFlow: MutableSharedFlow<ChatEvent>,
    private val eventIndex: AtomicInteger,
    private val currentFlowListenerContext: MutableStateFlow<Job?>
) {

    fun getFlow(minIndex: Int = 0): Flow<ChatEvent> {
        currentFlowListenerContext.value?.cancel("New subscription received. Cancelling current.")
        return eventFlow
            .onStart { currentFlowListenerContext.value = currentCoroutineContext().job }
            .takeWhile { !chatEnded.get() }
            .filter { it.index >= minIndex }
When receiving a new subscription for the shared flow, we cancel the coroutineContext.job of the previous one, thus making sure the collection of the old subscription effectively stops.


04/10/2023, 2:23 PM
I think the reason you're struggling here is because this use case is really much more suited for Channels rather than SharedFlows. Channels already have all of the traits you are looking for out of the box (mainly ensuring that all messages reach a consumer).

Francis Reynders

04/13/2023, 8:04 AM
@aarkling I must provide a flow due to the underlying libary (grpc-kotlin). I see what you are saying though, I may reconsider the approach, thinking about having a single consumer of the channel at all times and emitting those into a flow.