Håkon Pettersen
01/16/2024, 9:05 AMclass MessageManager(
dispatcher: CoroutineDispatcher = <http://Dispatchers.IO|Dispatchers.IO>,
) {
private val messageFlow = flow {
while (true) {
delay(60_000) // Emit every minute
try {
val messages = service.getMessages().await()
emit(messages)
} catch (error: Exception) {
Log.e("Messages", "Error fetching messages: $error")
}
}
}.flowOn(dispatcher)
.shareIn(scope, SharingStarted.Lazily, 1)
val messages: StateFlow<List<Message>> = messageFlow.stateIn(
CoroutineScope(Dispatchers.Main),
SharingStarted.WhileSubscribed(),
emptyList()
)
}
Test:
@OptIn(ExperimentalCoroutinesApi::class)
@RunWith(RobolectricTestRunner::class)
class MessageManagerTest {
private val testScheduler = TestCoroutineScheduler()
private val application: Application = mockk(relaxed = true)
private val testDispatcher = StandardTestDispatcher(testScheduler)
private val mockMessages = listOf(Message("id1", "Test message 1", listOf("mock")))
@Before
fun setUp() {
Dispatchers.setMain(testDispatcher)
}
@After
fun tearDown() {
Dispatchers.resetMain()
}
@Test
fun `messages emits expected data`() = runTest {
// Initialize BroadcastManager
val messageManager = MessageManager(application, testDispatcher)
// Test the messages flow with Turbine
messageManager.messages.test {
// Assert initial state
assertTrue(awaitItem().isEmpty())
// Advance time and assert next emission
advanceTimeBy(60_000)
assertEquals(mockMessages, awaitItem())
cancelAndIgnoreRemainingEvents()
}
}
}
My expectation is that I should only receive one emitted list of messages before the remaning events gets cancelled, but that is not the case. Currently the messageFlow never stops emitting messages. Am I missing something obvious here?hfhbd
01/16/2024, 9:11 AMHåkon Pettersen
01/16/2024, 9:19 AMprivate val messageFlow = flow {
while (true) {
delay(60_000) // Emit every minute
try {
val messages = service.getMessages().await()
emit(messages)
} catch (error: CancellationException) {
Log.e("Messages", "Error fetching messages: $error")
} catch (error: Exception) {
Log.e("Messages", "Error fetching messages: $error")
}
}
}.flowOn(dispatcher)
.shareIn(scope, SharingStarted.Lazily, 1)
hfhbd
01/16/2024, 9:20 AMphldavies
01/16/2024, 9:20 AMHåkon Pettersen
01/16/2024, 9:26 AMprivate val messageFlow = flow {
while (true) {
delay(60_000) // Emit every minute
try {
val messages = service.getMessages().await()
emit(messages)
} catch (error: CancellationException) {
throw error
} catch (error: Exception) {
Log.e("Messages", "Error fetching messages: $error")
}
}
}.flowOn(dispatcher)
.shareIn(scope, SharingStarted.Lazily, 1)
From what I can tell the test is still not able to actually cancel anything, still runs until OOM.Håkon Pettersen
01/16/2024, 9:50 AM@Test
fun test() = runTest {
manager = MessageManager(application, testDispatcher)
val messages = mutableListOf<List<Message>>()
val job = launch {
manager.messages.collect {
println("collecting message")
messages.add(it)
}
}
advanceTimeBy(120_000)
job.cancel()
}
Håkon Pettersen
01/16/2024, 10:03 AMDmitry Khalanskiy [JB]
01/16/2024, 10:45 AMclass MessageManager(
dispatcher: CoroutineDispatcher = <http://Dispatchers.IO|Dispatchers.IO>,
scope: CoroutineScope = CoroutineScope(Dispatchers.Main)
) {
val messages: StateFlow<List<Message>> = messageFlow.stateIn(
scope,
SharingStarted.WhileSubscribed(),
emptyList()
)
}
Then, in the test,
val messageManager = MessageManager(testDispatcher, backgroundScope)
Dmitry Khalanskiy [JB]
01/16/2024, 10:46 AMadvanceTimeBy
is heavily discouraged, delay(120.seconds)
is the preferred solution.Håkon Pettersen
01/16/2024, 11:39 AMDmitry Khalanskiy [JB]
01/16/2024, 11:43 AMsuspend
function, so it won't block the thread;
• When we introduce new features to the test framework, they may not interoperate with advanceTimeBy
well, but are guaranteed to work with `delay`;
• One less thing to learn for anyone who reads the code;
More: https://github.com/Kotlin/kotlinx.coroutines/issues/3919