Should canceling a shared Flow cancel it for other...
# coroutines
t
Should canceling a shared Flow cancel it for other subscribers? I have the following. I expect that cancelling the first subscriber should keep the upstream active till the second subscriber is done. However the minute the first subscriber is cancelled, the upstream is immediately terminated, even though a second subscriber is present.
Copy code
@Test
    fun hmm2() = runTest {
        val channel = Channel<Int>()
        val shared = channel.consumeAsFlow()
            .onCompletion { println("CANCELLED") }
            .shareIn(this, SharingStarted.WhileSubscribed())

        val j = launch(UnconfinedTestDispatcher()) {
            shared.collect {
                println("$it in first")
            }
        }

        channel.send(1)
        channel.send(2)

        advanceUntilIdle()
        val j2 = launch(UnconfinedTestDispatcher()) {
            shared.collect {
                println("$it in second")
            }
        }

        channel.send(3)
        advanceUntilIdle()

        j.cancel()

        channel.send(4)

        j2.cancel()
    }
channel.send(4)
suspends indefinitely as the upstream has been cancelled despite j2 not being cancelled
This is not an issue with
MutableSharedFlow
, however it is for any
SharedFlow
created with
sharedIn
d
channel.send(4)
suspends indefinitely
Are you sure that's what happens? When I try running your code and add
println("Sent 4")
after
channel.send(4)
, the line gets printed.
Copy code
launch(UnconfinedTestDispatcher())
Unless you're doing
Dispatchers.setMain
, this line is incorrect:
UnconfinedTestDispatcher
must be given the
testScheduler
used by
runTest
.
Dispatchers.setMain
automatically propagates its scheduler to the newly-created dispatchers.
thank you color 1
This test doesn't hang for me:
Copy code
@Test
    fun hmm2() = runTest {
        val channel = Channel<Int>()
        val shared = channel.consumeAsFlow()
            .onCompletion { println("CANCELLED") }
            .shareIn(backgroundScope, SharingStarted.WhileSubscribed())

        val j = launch(UnconfinedTestDispatcher(testScheduler)) {
            shared.collect {
                println("$it in first")
            }
        }

        channel.send(1)
        channel.send(2)

        advanceUntilIdle()
        val j2 = launch(UnconfinedTestDispatcher(testScheduler)) {
            shared.collect {
                println("$it in second")
            }
        }

        channel.send(3)
        advanceUntilIdle()

        j.cancel()

        println("Sending 4")
        channel.send(4)
        println("Sent 4")

        j2.cancel()
    }
Note the
backgroundScope
(which ensures that the coroutine created by
consumeAsFlow
gets cancelled when the test finishes).
t
I wasn't aware of
backgroundScope
, thank you!