Colton Idle
08/03/2022, 3:37 PMfun getBooks(): Flow<List<Book>> {
return callbackFlow {
val listener =
FirebaseFirestore.getInstance()
.collection("books")
.addSnapshotListener { value, error ->
if (error != null || value == null) { /*dosomething*/ }
var books: List<Book>? = null
runBlocking(<http://Dispatchers.IO|Dispatchers.IO>) {
books = (value!!.toObjects())
}
trySend(books!!)
}
awaitClose { listener.remove() }
}
}
this seems to work well... but to stress test it I wrapped books = (value!!.toObjects())
with repeat (1000)
and now my UI hangs. Shouldn't the dispatchers.io take care of this?
runBlocking(<http://Dispatchers.IO|Dispatchers.IO>) {
repeat(1000) {
books = (value!!.toObjects())
}
}
uli
08/03/2022, 3:54 PMrunBlocking
If you need to wait for books
to be assigned, you have an issue. Outside couroutines, you can not wait without blocking your current thread.scope.launch(<http://Dispatchers.IO|Dispatchers.IO>) {
send(value!!.toObject)
}
And then get rid of the !!
;-)Colton Idle
08/03/2022, 3:59 PMuli
08/03/2022, 4:02 PMShouldn't the <http://dispatchers.io|dispatchers.io> take care of this?
Not if used with runBlocking. But you can use launch like above. Put in your repeat and see, that it will no longer block your ui threadColton Idle
08/03/2022, 4:05 PMscope.launch(<http://Dispatchers.IO|Dispatchers.IO>) {
send(value!!.toObject)
}
var books: List<Book>? = null
launch(<http://Dispatchers.IO|Dispatchers.IO>) {
delay(5000)
repeat(10000) {
books = (value!!.toObjects())
}
trySend(books!!)
}
and no lag!Robert Williams
08/03/2022, 4:08 PMtrySend(value)
immediately and then use map
, filter
, flowOn
etc to do the transformsFrancesc
08/03/2022, 4:13 PMflowOn(<http://Dispatchers.IO|Dispatchers.IO>)
Colton Idle
08/03/2022, 4:14 PMFrancesc
08/03/2022, 4:16 PMflowOn
to the returned flow to make it run off the main threaduli
08/03/2022, 4:17 PMfun getBooks(): Flow<List<Book>> {
return callbackFlow {
val listener =
FirebaseFirestore.getInstance()
.collection("books")
.addSnapshotListener { value, error ->
if (error != null) { /*dosomething*/ }
trySend(value)
}
awaitClose { listener.remove() }
}
.filterNotNull()
.map{
value.toObjects()
}
.flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
@Francesc, @Colton Idle I guess, that’s a about the wayRobert Williams
08/03/2022, 4:18 PMuli
08/03/2022, 4:19 PMvalue.toObjects()
Robert Williams
08/03/2022, 4:19 PMuli
08/03/2022, 4:20 PMmap
off the main threadColton Idle
08/03/2022, 4:22 PMuli
08/03/2022, 4:23 PMFrancesc
08/03/2022, 4:25 PMColton Idle
08/03/2022, 4:26 PMFrancesc
08/03/2022, 4:27 PMtrySend
so it will try, but if the buffer is full, it will drop itRobert Williams
08/03/2022, 4:30 PMNick Allen
08/03/2022, 4:32 PM.conflate()
after callbackFlow
so that it always just keeps the latest value from firestore, I assume that's the only one you care about.
And it looks like you can ditch .flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
, I don't see any blocking codeuli
08/03/2022, 4:33 PMRobert Williams
08/03/2022, 4:35 PMconflate
and/or replace map
with mapLatest
Colton Idle
08/03/2022, 4:39 PMRobert Williams
08/03/2022, 4:43 PMColton Idle
08/03/2022, 4:43 PMRobert Williams
08/03/2022, 4:43 PMNick Allen
08/03/2022, 4:43 PMmapLatest
for back-pressure.
Only use it if mapLatest
lambda is suspending code that actually suspends (not blocking on IO dispatcher). It processes every item and when it gets a new item it cancels and waits for the previous lambda (just the lambda, nothing downstream).
If the code after mapLatest is slow, then you'll still have backpressure and could miss updates.
They are completely separate and you should not interchange themColton Idle
08/03/2022, 4:45 PMfun getBooks(): Flow<List<Book>> {
return callbackFlow {
val listener =
FirebaseFirestore.getInstance()
.collection("books")
.addSnapshotListener { value, error ->
if (error != null) { /*dosomething*/ }
trySend(value)
}
awaitClose { listener.remove() }
}
.conflate()
.filterNotNull()
.map{
value.toObjects()
}
.flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
Robert Williams
08/03/2022, 4:49 PMmapLatest
does work better if the block supports cooperative cancellation but it won’t caused missed updates any more than conflate willFrancesc
08/03/2022, 4:49 PMRobert Williams
08/03/2022, 4:49 PMNick Allen
08/03/2022, 4:55 PMmapLatest
is used with a block that doesn't support cooperative cancellation (like deserialization), then every new event is blocked waiting for the previous lambda invocation to finish, and every item is processed, which could result in backpressure causing trySend to fail and so the most recent value could be dropped. This is the opposite of what you want, which is to keep the latest item.uli
08/03/2022, 5:03 PMtrySend(value to error)
@Inject
@DefaultDispatcher
lateinit val defaultDispatcher : Disptachers
fun getBooks(): Flow<List<Book>> {
return callbackFlow {
val listener =
FirebaseFirestore.getInstance()
.collection("books")
.addSnapshotListener { value, error ->
trySend(value to error)
}
awaitClose { listener.remove() }
}
.doSomethingWithNullValueAndError() // Depends. It might be better after conflate
.conflate()
.map { (value, _) ->
value.toObjects()
}
.flowOn(defaultDispatcher)
Robert Williams
08/03/2022, 5:21 PMColton Idle
08/03/2022, 6:06 PMuli
08/03/2022, 6:13 PMColton Idle
08/06/2022, 5:19 PMfun <T> firestoreFlow(query: Query, clazz: Class<T>, dispatcher: CoroutineDispatcher): Flow<List<T>> {
return callbackFlow {
val listener = query.addSnapshotListener { value, _ -> trySend(value) }
awaitClose { listener.remove() }
}
.conflate()
.filterNotNull()
.map { value -> value.toObjects(clazz) }
.flowOn(dispatcher)
}
Francesc
08/06/2022, 5:22 PMflowOn
. I would also pass a lambda that does the mapping for you.map
you call a transform method that the caller providesColton Idle
08/06/2022, 5:23 PMFrancesc
08/06/2022, 5:24 PMColton Idle
08/06/2022, 5:45 PMFrancesc
08/06/2022, 5:48 PM.catch
block to the flow and decide how to handle the error. The collector should have a much better context to be able to decide what to do with the errorColton Idle
08/06/2022, 5:50 PMFrancesc
08/06/2022, 5:50 PMfun <T> firestoreFlow(query: Query, clazz: Class<T>, dispatcher: CoroutineDispatcher): Flow<List<T>> {
return callbackFlow {
val listener = query.addSnapshotListener { value, error ->
if (error != null) {
throw MyException(exception = error)
}
if (value == null) {
throw MyException(
"Null value returned from Firestore",
IOException()
)
}
trySend(value)
awaitClose { listener.remove() }
}
.conflate()
.filterNotNull()
.map { value -> value.toObjects(clazz) }
.flowOn(dispatcher)
}
fun sample() {
getBooks()
.onEach { /* do stuff */ }
.catch { ex ->
// do other stuff
}
.launchIn(scope)
}
class MyException(
message: String = "Failed to load data from firestore",
exception: Exception,
): IOException(message, exception)
.retry
block to the flow to re-attempt, based on what error you got and how many times you've already retriedrunBlocking
instead of the latest one, but the idea is the sameNick Allen
08/06/2022, 6:15 PMclose
with the exception instead of throwing it.Francesc
08/06/2022, 6:23 PMColton Idle
08/06/2022, 8:57 PMFrancesc
08/06/2022, 9:02 PMColton Idle
08/06/2022, 9:07 PMRobert Williams
08/08/2022, 9:02 AMColton Idle
08/13/2022, 1:35 PMtrySendBlocking(snapshot)
instead of trySend(snapshot)
@martinbonnin Why not just use trySend instead of risking blocking the thread, which is likely to be the main one?
Good question. I'm not sure TBH. I took inspiration from https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/callback-flow.html