https://kotlinlang.org logo
#coroutines
Title
# coroutines
n

niqo01

12/06/2023, 4:49 AM
I have defined a
callbackFlow
looking somewhat like this:
Copy code
callbackFlow {
 val callback = object : Callback { // Implementation of some callback interface
        override fun onProgress(value: Int) {
            trySendBlocking(UploadStatus.Progress(value))

        }
        override fun onError(cause: Throwable) {
            trySendBlocking(UploadStatus.Error(cause))
            close()
        }
        override fun onCompleted(key: String){
            trySendBlocking(UploadStatus.Completed(key))
            close()
        }
    }
    api.register(callback)

    awaitClose { api.unregister(callback) }
}
My issue is that the
UploadStatus.Completed
or the
UploadStatus.Error
are not received by the collector. If I remove the
close()
those values are emitted but then my flow never stop. What am I misunderstanding?
s

Steven Veltema

12/06/2023, 5:30 AM
Try replacing
close()
with
channel.close()
.
n

niqo01

12/06/2023, 5:56 AM
Same issue.
s

Sam

12/06/2023, 10:57 AM
I don't see anything obvious in the code you shared that would cause the behaviour you describe. It looks like
close()
can only ever be called after those events have been sent. Maybe there is something going wrong in the way the flow is later transformed or collected?
r

ross_a

12/06/2023, 11:47 AM
what's the return result of
trySendBlocking
?
And is there a
.conflate
or
.buffer
between the callbackFlow and collector?
n

niqo01

12/06/2023, 6:25 PM
Thanks to you all advice, I was able to figure it out. the culprit is the
sample(2.seconds)
operator I am using later downstream. I was also able to reproduce like so:
Copy code
flow {
        repeat(10) {
            emit(it)
            delay(110)
        }
    }
        .sample(200)
        .onEach { println(it) }
        .collect()
It will print:
Copy code
1
3
5
7
8
Missing the last value of 9. The interesting thing is that the example above is taken from the official Kotlin doc: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/sample.html In there example the output is
Copy code
1, 3, 5, 7, 9
I think it's a mistake on their side.
s

Sam

12/07/2023, 9:38 AM
Huh, that is indeed confusing. I can just about see why the example could produce different output if the timings end up slightly off, but... what on earth does "the latest element is not emitted if it does not fit into the sampling window" mean? Surely if an element overflows the sampling period, it just rolls into the next period?
Or is it trying to say that the final period is truncated as soon as the upstream flow terminates, and any elements that fall within the incomplete period are discarded? 🤔
r

ross_a

12/07/2023, 9:41 AM
I think it's saying the latter, but not in the clearest manner
And it lines up with the issue that was experienced here
👍 2
2 Views