Francis Reynders
04/06/2023, 5:00 PMMutableSharedFlow
into which events from backend are emitted. I would like for the client to be able to subscribe to it for 2 use cases:
1. Initially, must collect all messages since start, so some replay is needed
2. Subscribe again in case of reconnect after a network issues for example. Ideally this should replay as of last collected message of (1) or previous (2). I believe this can be achieved by having replay=0
without buffer for this SharedFlow and thus letting the emitter suspend, but that is incompatible with 1.
Any idea how this is typically implemented?val activeFlowId = AtomicInteger()
val sharedFlow = eventChannel.consumeAsFlow().shareIn(scope, SharingStarted.Eagerly, 1024)
fun getFlow(minIndex: Int = 0): Flow<ChatEvent> {
val flowId = activeFlowId.incrementAndGet()
return sharedFlow
.filter { it.index >= minIndex }
.takeWhile { flowId == activeFlowId.get() }
.cancellable()
.buffer(16, BufferOverflow.DROP_OLDEST)
}
Basically I'm manually keeping an id
for each subscription and only taking values if the id
matches. That should stop collections from previous ids if they still handle emits. Also adding a small buffer with DROP_OLDEST
so inactive subscriptions do not block the emissions from the shared flow.
There is probably a better way, any insight is appreciated.class HandlerChatSession
private constructor(
val chatEnded: AtomicBoolean,
val eventFlow: MutableSharedFlow<ChatEvent>,
private val eventIndex: AtomicInteger,
private val currentFlowListenerContext: MutableStateFlow<Job?>
) {
fun getFlow(minIndex: Int = 0): Flow<ChatEvent> {
currentFlowListenerContext.value?.cancel("New subscription received. Cancelling current.")
return eventFlow
.asSharedFlow()
.onStart { currentFlowListenerContext.value = currentCoroutineContext().job }
.takeWhile { !chatEnded.get() }
.filter { it.index >= minIndex }
}
When receiving a new subscription for the shared flow, we cancel the coroutineContext.job of the previous one, thus making sure the collection of the old subscription effectively stops.aarkling
04/10/2023, 2:23 PMFrancis Reynders
04/13/2023, 8:04 AM