How would you guys deal with the following scenari...
# coroutines
n
How would you guys deal with the following scenario? • I have a Channel which is fed some input data and then closed. This can’t be changed. • Data must go through some expensive processing. I do:
consumeAsFlow().flatMapConcat { … }
• It must be collected by a few collectors concurrently and I don’t want the expensive processing to run again (consumeAsFlow wouldn’t allow it anyway). So I added
shareIn(scope)
• But I still want the flow to end when the channel is closed. So one trick is to emit an “end” signal like
null
before sharing, and after sharing, for each collector, do
takeWhile { it != null }.filterNotNull()
• Also I still want collectors to receive all elements regardless of timing, so I add something like
replay = 50
to the shareIn operator. This is annoying because it’s a randomly high number with no guarantees. At the same time I don’t think shareIn is designed to handle unlimited replay? It feels wrong to me but I can’t think of any other solution.
f
For the second problem (replay capacity), if you want that a new subscribers also gets all the previous values, then you have to put replay cache to unlimited. Obviously this will keep consuming memory with all the previous values until the flow is GC-ed (ie if you emit 1M events, you will cache a cache with 1M entries). If you want subscribers to just process values at their pace, but not older values, then you need to buffer the flow (without replay cache if you want).
consumeAsFlow().flatMapConcat { … }.buffer(UNLIMITED).shareIn(...)
j
So you want every collector to get all values, right? And that even for a collector that starts collecting after all values were already processed by another collector, is that correct? If yes then you will need anyway to buffer everything somewhere for as long as you want to allow new collectors to come and start collecting.
n
you have to put replay cache to unlimited
I will do, thanks.
but not older values
No, I do want older values. Subscription will start right after sharing but I might miss the first values depending on the dispatcher.
So you want every collector to get all values, right?
Correct. My solution works, it just looks ugly - the fact that I’m using
shareIn
to cache and broadcast the results, but then I have to undo the
shareIn
with the
emit(null)
trick so that the flow can end, and add infinite replay to mock a cold flow. See some code below.
Copy code
scope.launch {
  val shared = channel
    .consumeAsFlow()
    .flatMapLatest { ... }
    .map { it as X? } // "cast" to emit null on completion
    .onCompletion { emit(null) }
    .shareIn(this, Eagerly, 500) // random high enough number
  
  coroutineScope {
    launch(IO) { 
      shared.takeWhile { it != null }
        .filterNotNull()
        .collect { ... }
    }

    launch { 
      shared.takeWhile { it != null }
        .filterNotNull()
        .collect { ... }
    }
  }
}