Derek Ellis
04/27/2021, 10:16 PMsealed class Update {
object Success : Update()
data class Progress(val message: String) : Update()
data class Failure(val throwable: Throwable) : Update()
}
Success
and Failure
emissions are collected, but Progress
ones are not
applying only two operators:
.filterNotNull()
.transformWhile { update ->
emit(update)
update !is Update.Failure && update !is Update.Success
}
I've debugged and logged everywhere I could, and the Progress
objects are definitely emitted by the flow, but they never make it to the .collect
. Am I missing something really obvious here?baxter
04/28/2021, 3:39 AMtransform
operator only returns Unit
.Derek Ellis
04/28/2021, 3:43 AMtransformWhile
returns a boolean. I want the flow to complete after a failure or success is emitted, and what I have is more or less exactly like the example in the documentation for transformWhile
.baxter
04/28/2021, 6:26 AMtransform
. Reading the doc:
Applies transform function to each value of the given flow while this function returnsThis says that it applies the transforms function (emits your value) so long as you return true. In your case, you are returning.true
false
when the Update
object is of type Progress
, which is why it doesn't emit that. Also, you should consider using the onComplete
and catch
to emit your success or failures.Derek Ellis
04/28/2021, 11:59 AMUpdate.Progress
then update !is Update.Failure
is true and update !is Update.Success
is also true, so it should continue emitting.
Also, the emit
call is made before it returns, which should guarantee at least one Progress
gets emitted anyhow.baxter
04/28/2021, 5:04 PMonEach
between the filter and transform operators, and print out any Progress objects. Maybe even print your boolean logic there as well.update is Progress
?Derek Ellis
04/28/2021, 5:27 PMonEach
and I also debugged to make sure that I was emitting the Progress
items and that they were being updated in the StateFlow
, but still no luckbaxter
04/28/2021, 5:46 PMProgress
is not emitting with both operators removed?Derek Ellis
04/28/2021, 5:47 PMwhile (true) {
when (val status = getUpdateStatus()) {
null -> Timber.i("Lost connection to update server")
is Update.Failure, Update.Success -> {
_updatesFlow.emit(status)
break
}
else -> _updatesFlow.emit(status)
}
delay(UPDATE_POLL_FREQUENCY)
}
I was using .value
before but changed to .emit
to see if it would make any difference, not that it should
and this is how the Flow(s) are declared:
private val _updatesFlow = MutableStateFlow<Update?>(null)
val updateFlow: Flow<Update?> = _updatesFlow
Nothing fancy! But still...baxter
04/28/2021, 7:16 PMStateFlow
is suppressing emits of Progress
because they are the same when compared? Is the message the same for Progress
on every call to getUpdateStatus()
?Derek Ellis
04/28/2021, 7:21 PMmessage
field. I also tried adding an Instant
that defaulted to Instant.now()
but that didn't help either.suspend fun doUpdates(): Flow<Update> = coroutineScope {
launch { startUpdates() }
return@coroutineScope updatesFlow.filterNotNull().transformWhile {
// ...
}
}
// and then where I called it:
viewModelScope.launch {
doUpdates().collect { update ->
// Do things here
}
}
It looks like a bug to mebaxter
04/28/2021, 8:33 PMdoUpdate()
doesn't need to be a suspend function. You can put startUpdates()
inside an onStart {}
operator, and just return the flow, that way you don't start updates prematurely before you start collecting.Derek Ellis
04/28/2021, 8:48 PMstartUpdates()
call...baxter
04/29/2021, 3:25 AMsuspend fun doLoop() {
for (i in 0..5) {
flow.emit(Status.Progress("Message #$i"))
println("Emitted: Message #$i")
delay(1000)
}
flow.emit(Status.Finish)
println("Emitted: Finish")
}
fun exampleUseCase(): Flow<Status> {
return flow
.onStart {
coroutineScope { launch { doLoop() } }
}
.transformWhile {
emit(it)
it !is Status.Finish
}
}
In your start
block, you are creating a new coroutineScope
block to launch a new job. However, that coroutineScope
block will suspend until all tasks in it complete, which means it'll iterate through all the Progress
events, emitting them, but because transformWhile
doesn't start until onStart
finishes, you've consumed all events, and only get the last one.suspend fun exampleUseCase(): Flow<Status> = coroutineScope {
launch { doLoop() }
return@coroutineScope flow
.transformWhile {
emit(it)
it !is Status.Finish
}
}
That coroutineScope
block suspends until all jobs in it complete. But in this case, the suspend point is on exampleUseCase()
, which means that needs to complete before you start collecting.coroutineScope
, but launching new (and separate) coroutines to perform the tasks as needed. You aren't suspending on a child scope (and awaiting it to complete).Derek Ellis
04/29/2021, 11:38 AMcoroutineScope
. Thank you for explaining. Do you know if there's any way to accomplish the desired behaviour without making the function extend CoroutineScope
or otherwise having to pass the coroutine scope in as a parameter?