I'm having an issue where a flow isn't being colle...
# coroutines
d
I'm having an issue where a flow isn't being collected and I can't for the life of me figure out why basic sealed class hierarchy:
Copy code
sealed class Update {
    object Success : Update()
    data class Progress(val message: String) : Update()
    data class Failure(val throwable: Throwable) : Update()
}
Success
and
Failure
emissions are collected, but
Progress
ones are not applying only two operators:
Copy code
.filterNotNull()
.transformWhile { update ->
   emit(update)
   update !is Update.Failure && update !is Update.Success
}
I've debugged and logged everywhere I could, and the
Progress
objects are definitely emitted by the flow, but they never make it to the
.collect
. Am I missing something really obvious here?
b
Not sure what you intended to do with the last line in your transform block, as it doesn't do anything. The
transform
operator only returns
Unit
.
d
transformWhile
returns a boolean. I want the flow to complete after a failure or success is emitted, and what I have is more or less exactly like the example in the documentation for
transformWhile
.
b
My mistake, I thought that was just
transform
. Reading the doc:
Applies transform function to each value of the given flow while this function returns
true
.
This says that it applies the transforms function (emits your value) so long as you return true. In your case, you are returning
false
when the
Update
object is of type
Progress
, which is why it doesn't emit that. Also, you should consider using the
onComplete
and
catch
to emit your success or failures.
d
I'm not sure that's correct either. If update is
Update.Progress
then
update !is Update.Failure
is true and
update !is Update.Success
is also true, so it should continue emitting. Also, the
emit
call is made before it returns, which should guarantee at least one
Progress
gets emitted anyhow.
b
Is it possible you aren't emitting any Progress items? Add in an
onEach
between the filter and transform operators, and print out any Progress objects. Maybe even print your boolean logic there as well.
Or change the boolean logic to
update is Progress
?
d
I've tried adding an
onEach
and I also debugged to make sure that I was emitting the
Progress
items and that they were being updated in the
StateFlow
, but still no luck
I've tried removing both operators too, and nothing changed which is even more odd
b
So
Progress
is not emitting with both operators removed?
d
Yep 😕
This is where it's emitted:
Copy code
while (true) {
    when (val status = getUpdateStatus()) {
        null -> Timber.i("Lost connection to update server")
        is Update.Failure, Update.Success -> {
             _updatesFlow.emit(status)
             break
        }
        else -> _updatesFlow.emit(status)
   }
   delay(UPDATE_POLL_FREQUENCY)
}
I was using
.value
before but changed to
.emit
to see if it would make any difference, not that it should and this is how the Flow(s) are declared:
Copy code
private val _updatesFlow = MutableStateFlow<Update?>(null)
val updateFlow: Flow<Update?> = _updatesFlow
Nothing fancy! But still...
b
The only thing I can think of is that
StateFlow
is suppressing emits of
Progress
because they are the same when compared? Is the message the same for
Progress
on every call to
getUpdateStatus()
?
d
Some are, but in my tests there have always been at least two different values in the
message
field. I also tried adding an
Instant
that defaulted to
Instant.now()
but that didn't help either.
Ok, I'll have to try reproducing this later, but it seems like the problem was that I was returning the flow from a suspend function and then collecting it? Basically:
Copy code
suspend fun doUpdates(): Flow<Update> = coroutineScope {
  launch { startUpdates() }
  return@coroutineScope updatesFlow.filterNotNull().transformWhile { 
    // ...
  }
}

// and then where I called it:
viewModelScope.launch {
  doUpdates().collect { update ->
    // Do things here
  }
}
It looks like a bug to me
b
The
doUpdate()
doesn't need to be a suspend function. You can put
startUpdates()
inside an
onStart {}
operator, and just return the flow, that way you don't start updates prematurely before you start collecting.
d
Huh that's pretty clever, I had no idea you could do that. Unfortunately it also doesn't seem to work, even though my logging says that it is triggering the
startUpdates()
call...
I was able to reproduce it in isolation if you'd like to take a look: https://github.com/dellisd/coroutines-bug I'll probably open an issue for it soon
b
I see the problem. Going through each broken case to explain: Broken1
Copy code
suspend fun doLoop() {
    for (i in 0..5) {
        flow.emit(Status.Progress("Message #$i"))
        println("Emitted: Message #$i")
        delay(1000)
    }
    flow.emit(Status.Finish)
    println("Emitted: Finish")
}

fun exampleUseCase(): Flow<Status> {
    return flow
        .onStart {
            coroutineScope { launch { doLoop() } }
        }
        .transformWhile {
            emit(it)
            it !is Status.Finish
        }
}
In your
start
block, you are creating a new
coroutineScope
block to launch a new job. However, that
coroutineScope
block will suspend until all tasks in it complete, which means it'll iterate through all the
Progress
events, emitting them, but because
transformWhile
doesn't start until
onStart
finishes, you've consumed all events, and only get the last one.
Same thing happens in this case: Broken2
Copy code
suspend fun exampleUseCase(): Flow<Status> = coroutineScope {
    launch { doLoop() }

    return@coroutineScope flow
        .transformWhile {
            emit(it)
            it !is Status.Finish
        }
}
That
coroutineScope
block suspends until all jobs in it complete. But in this case, the suspend point is on
exampleUseCase()
, which means that needs to complete before you start collecting.
The last one works, because you aren't waiting on the completion of a new
coroutineScope
, but launching new (and separate) coroutines to perform the tasks as needed. You aren't suspending on a child scope (and awaiting it to complete).
d
Ohhhh, I was unaware of that behaviour for
coroutineScope
. Thank you for explaining. Do you know if there's any way to accomplish the desired behaviour without making the function extend
CoroutineScope
or otherwise having to pass the coroutine scope in as a parameter?
Nvm, I see now that passing it as a parameter is the recommended (and probably only) approach. Thank you again!