Sharing operators effectively turn cold streams into hot streams. Consider this example, with a fake
shareInWithoutScope
operator:
val upstream = flow {
emit("starting")
delay(1000)
emit("finished")
}.shareInWithoutScope()
// First collector
launch { // this: scope1
withTimeout(500) {
upstream.collect {}
}
}
// Second collector
launch {
// this: scope2
delay(250)
upstream.collect {}
}
In order to collect upstream, the operator needs a scope to collect in. If this collection happens lazily, it could use the scope of the first collector (
scope1
). Then the second collector starts collecting in
scope2
after 250 ms, which is fine. Then, at 500 ms,
scope1
gets cancelled – but
scope2
is still collecting, so the upstream collector needs to keep running. So sharing operators can’t rely on their downstream collector scopes to manage the shared collection.