niqo01
12/06/2023, 4:49 AMcallbackFlow
looking somewhat like this:
callbackFlow {
val callback = object : Callback { // Implementation of some callback interface
override fun onProgress(value: Int) {
trySendBlocking(UploadStatus.Progress(value))
}
override fun onError(cause: Throwable) {
trySendBlocking(UploadStatus.Error(cause))
close()
}
override fun onCompleted(key: String){
trySendBlocking(UploadStatus.Completed(key))
close()
}
}
api.register(callback)
awaitClose { api.unregister(callback) }
}
My issue is that the UploadStatus.Completed
or the UploadStatus.Error
are not received by the collector. If I remove the close()
those values are emitted but then my flow never stop.
What am I misunderstanding?Steven Veltema
12/06/2023, 5:30 AMclose()
with channel.close()
.niqo01
12/06/2023, 5:56 AMSam
12/06/2023, 10:57 AMclose()
can only ever be called after those events have been sent. Maybe there is something going wrong in the way the flow is later transformed or collected?ross_a
12/06/2023, 11:47 AMtrySendBlocking
?ross_a
12/06/2023, 11:49 AM.conflate
or .buffer
between the callbackFlow and collector?niqo01
12/06/2023, 6:25 PMsample(2.seconds)
operator I am using later downstream.
I was also able to reproduce like so:
flow {
repeat(10) {
emit(it)
delay(110)
}
}
.sample(200)
.onEach { println(it) }
.collect()
It will print:
1
3
5
7
8
Missing the last value of 9.
The interesting thing is that the example above is taken from the official Kotlin doc: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/sample.html
In there example the output is
1, 3, 5, 7, 9
I think it's a mistake on their side.Sam
12/07/2023, 9:38 AMSam
12/07/2023, 9:40 AMross_a
12/07/2023, 9:41 AMross_a
12/07/2023, 9:41 AM