Derek Ellis
04/27/2021, 10:16 PMsealed class Update {
object Success : Update()
data class Progress(val message: String) : Update()
data class Failure(val throwable: Throwable) : Update()
}
Success and Failure emissions are collected, but Progress ones are not
applying only two operators:
.filterNotNull()
.transformWhile { update ->
emit(update)
update !is Update.Failure && update !is Update.Success
}
I've debugged and logged everywhere I could, and the Progress objects are definitely emitted by the flow, but they never make it to the .collect . Am I missing something really obvious here?baxter
04/28/2021, 3:39 AMtransform operator only returns Unit.Derek Ellis
04/28/2021, 3:43 AMtransformWhile returns a boolean. I want the flow to complete after a failure or success is emitted, and what I have is more or less exactly like the example in the documentation for transformWhile.baxter
04/28/2021, 6:26 AMtransform. Reading the doc:
Applies transform function to each value of the given flow while this function returnsThis says that it applies the transforms function (emits your value) so long as you return true. In your case, you are returning.true
false when the Update object is of type Progress, which is why it doesn't emit that. Also, you should consider using the onComplete and catch to emit your success or failures.Derek Ellis
04/28/2021, 11:59 AMUpdate.Progress then update !is Update.Failure is true and update !is Update.Success is also true, so it should continue emitting.
Also, the emit call is made before it returns, which should guarantee at least one Progress gets emitted anyhow.baxter
04/28/2021, 5:04 PMonEach between the filter and transform operators, and print out any Progress objects. Maybe even print your boolean logic there as well.baxter
04/28/2021, 5:05 PMupdate is Progress?Derek Ellis
04/28/2021, 5:27 PMonEach and I also debugged to make sure that I was emitting the Progress items and that they were being updated in the StateFlow, but still no luckDerek Ellis
04/28/2021, 5:28 PMbaxter
04/28/2021, 5:46 PMProgress is not emitting with both operators removed?Derek Ellis
04/28/2021, 5:47 PMDerek Ellis
04/28/2021, 5:49 PMwhile (true) {
when (val status = getUpdateStatus()) {
null -> Timber.i("Lost connection to update server")
is Update.Failure, Update.Success -> {
_updatesFlow.emit(status)
break
}
else -> _updatesFlow.emit(status)
}
delay(UPDATE_POLL_FREQUENCY)
}
I was using .value before but changed to .emit to see if it would make any difference, not that it should
and this is how the Flow(s) are declared:
private val _updatesFlow = MutableStateFlow<Update?>(null)
val updateFlow: Flow<Update?> = _updatesFlow
Nothing fancy! But still...baxter
04/28/2021, 7:16 PMStateFlow is suppressing emits of Progress because they are the same when compared? Is the message the same for Progress on every call to getUpdateStatus()?Derek Ellis
04/28/2021, 7:21 PMmessage field. I also tried adding an Instant that defaulted to Instant.now() but that didn't help either.Derek Ellis
04/28/2021, 8:19 PMsuspend fun doUpdates(): Flow<Update> = coroutineScope {
launch { startUpdates() }
return@coroutineScope updatesFlow.filterNotNull().transformWhile {
// ...
}
}
// and then where I called it:
viewModelScope.launch {
doUpdates().collect { update ->
// Do things here
}
}
It looks like a bug to mebaxter
04/28/2021, 8:33 PMdoUpdate() doesn't need to be a suspend function. You can put startUpdates() inside an onStart {} operator, and just return the flow, that way you don't start updates prematurely before you start collecting.Derek Ellis
04/28/2021, 8:48 PMstartUpdates() call...Derek Ellis
04/28/2021, 9:15 PMbaxter
04/29/2021, 3:25 AMsuspend fun doLoop() {
for (i in 0..5) {
flow.emit(Status.Progress("Message #$i"))
println("Emitted: Message #$i")
delay(1000)
}
flow.emit(Status.Finish)
println("Emitted: Finish")
}
fun exampleUseCase(): Flow<Status> {
return flow
.onStart {
coroutineScope { launch { doLoop() } }
}
.transformWhile {
emit(it)
it !is Status.Finish
}
}
In your start block, you are creating a new coroutineScope block to launch a new job. However, that coroutineScope block will suspend until all tasks in it complete, which means it'll iterate through all the Progress events, emitting them, but because transformWhile doesn't start until onStart finishes, you've consumed all events, and only get the last one.baxter
04/29/2021, 3:27 AMsuspend fun exampleUseCase(): Flow<Status> = coroutineScope {
launch { doLoop() }
return@coroutineScope flow
.transformWhile {
emit(it)
it !is Status.Finish
}
}
That coroutineScope block suspends until all jobs in it complete. But in this case, the suspend point is on exampleUseCase(), which means that needs to complete before you start collecting.baxter
04/29/2021, 3:29 AMcoroutineScope, but launching new (and separate) coroutines to perform the tasks as needed. You aren't suspending on a child scope (and awaiting it to complete).Derek Ellis
04/29/2021, 11:38 AMcoroutineScope. Thank you for explaining. Do you know if there's any way to accomplish the desired behaviour without making the function extend CoroutineScope or otherwise having to pass the coroutine scope in as a parameter?Derek Ellis
04/29/2021, 12:54 PM