https://kotlinlang.org logo
Title
v

vitaliy

10/26/2020, 5:26 PM
Hi everybody, I’m having an issue with
BroadcastChannel
and
Flow
. Essentially I’m looking for the
navigate
to behave like rx
PublishSubject
. Issue: since
navigate
flow is created from the channel,
firstOrNull()
on it never finishes. I’m pretty new to coroutines and I will appreciate if somebody can have a look. Thanks!
import io.kotest.matchers.shouldBe
import kotlinx.coroutines.channels.BroadcastChannel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.firstOrNull
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.Test

class RetainStateTest {

    val navigateChannel = BroadcastChannel<List<String>>(Channel.BUFFERED)
    val navigate = navigateChannel.asFlow()

    val data = listOf("blah", "foo")

    @Test
    fun `should not retain state`() {
        runBlocking {
            val jobPrecondition = launch {
                navigate.firstOrNull() shouldBe data
            }
            navigateChannel.send(data)
            jobPrecondition.join()

            val job = launch {
                navigate.firstOrNull() shouldBe null
            }
            job.join()
        }
    }
}
z

Zach Klippenstein (he/him) [MOD]

10/26/2020, 5:29 PM
Wouldn’t
PublishSubject
have the same issue here? It would wait indefinitely for something to be sent to the subject, and would only return null if the subject was completed before something was sent to it.
b

bezrukov

10/26/2020, 5:29 PM
buffered broadcast channel doesn't reply the last item for subscriber who subscribes after element was sent. (in your case coroutine that takes
firstOrNull()
is launched after
send
call. To fix it, you can call
yield()
after launch. OR launch first coroutine with start = Undispatched. To receive null, flow should complete, to achieve this, you need to close
navigateChannel
v

vitaliy

10/26/2020, 5:30 PM
AFAIR on
PublishSubject
I will be able to check if it does not emit anything and it would be enough.
@bezrukov great,
start = CoroutineStart.UNDISPATCHED
solves it indeed, thanks!. Is there any other trick I can do in the second coroutine to verify, that there is nothing there for me?
z

Zach Klippenstein (he/him) [MOD]

10/26/2020, 5:36 PM
To solve this for Rx, i’d probably use a test subscriber (
navigate.test()
). You can do something similar with flows by launching a coroutine at the start of your test that collects to a channel, then you can peek into the channel to see if anything was emitted since the last check by calling
poll()
.
v

vitaliy

10/26/2020, 5:42 PM
I see what you mean here. I was hoping I can keep it all on the flow level, but looks like it is not possible. At least not for now. Thanks a lot to both of you anyway! 🍻
z

Zach Klippenstein (he/him) [MOD]

10/26/2020, 5:59 PM
Well, Flow is specifically designed to provide a solution for cold streams. This test subscriber use case isn’t strictly a cold stream thing, but it’s a great use case for channels. Rx also uses separate testing infrastructure for this. The most interesting difference, in my mind, is that the coroutine solution re-uses a general primitive (channels) that have many other use cases for this, and doesn’t require a new specially-designed construct (like Rx’s TestObserver/TestSubscriber). The fact that this doesn’t require special-casing is a good thing, imo.
b

bezrukov

10/26/2020, 6:12 PM
Is there any other trick I can do in the second coroutine to verify, that there is nothing there for me?
As I said you need to call
navigateChannel.close()
before last join. Another option (but it's weird) is to use withTimeout
v

vitaliy

10/26/2020, 6:14 PM
@Zach Klippenstein (he/him) [MOD] I agree, it’s just something to get used to. I also expect this to be somehow incorporated into some special kind of flow, like we have already `StateFlow`/`MutableStateFlow` and there will be no need to expose underlying channel. @bezrukov closing channel is possible in the scope of the test, but in the app this would live pretty much permanently, so I would rather user some other way.
b

bezrukov

10/26/2020, 6:18 PM
If it never finishes, firstOrNull will never return null by design, so it's safe to use first()
The same applicable to PublishSubject, it has functions
firstOrError
/
firstOrDefault
, and to make them work (throw an error or return default value) subject needs to be completed via subject.onComplete
v

vitaliy

10/27/2020, 11:14 AM
good point, thanks for the explanation!