https://kotlinlang.org logo
#coroutines
Title
# coroutines
c

Colton Idle

08/03/2022, 3:37 PM
I'm stress testing some callbackFlow code I have written. Here it is currently
Copy code
fun getBooks(): Flow<List<Book>> {
  return callbackFlow {
    val listener =
      FirebaseFirestore.getInstance()
        .collection("books")
        .addSnapshotListener { value, error ->
          if (error != null || value == null) { /*dosomething*/ }
          var books: List<Book>? = null
          runBlocking(<http://Dispatchers.IO|Dispatchers.IO>) {
              books = (value!!.toObjects())
          }
          trySend(books!!)
        }
    awaitClose { listener.remove() }
  }
}
this seems to work well... but to stress test it I wrapped
books = (value!!.toObjects())
with
repeat (1000)
and now my UI hangs. Shouldn't the dispatchers.io take care of this?
Copy code
runBlocking(<http://Dispatchers.IO|Dispatchers.IO>) {
  repeat(1000) {
    books = (value!!.toObjects())
  }
}
u

uli

08/03/2022, 3:54 PM
Dispatchers.IO schedules to the background all right. But runBlocking blocks your thread waiting for it to be done. If being asynchronous is OK for you, you can use `launch`instead of
runBlocking
If you need to wait for
books
to be assigned, you have an issue. Outside couroutines, you can not wait without blocking your current thread.
maybe you get away with:
Copy code
scope.launch(<http://Dispatchers.IO|Dispatchers.IO>) {
  send(value!!.toObject)
}
And then get rid of the
!!
;-)
c

Colton Idle

08/03/2022, 3:59 PM
Hm. I was trying to use repeat because I'm trying to stress test whether or not toObject() takes a long time. Hence me doing repeat(1000)
u

uli

08/03/2022, 4:02 PM
So, what is your question?
Copy code
Shouldn't the <http://dispatchers.io|dispatchers.io> take care of this?
Not if used with runBlocking. But you can use launch like above. Put in your repeat and see, that it will no longer block your ui thread
c

Colton Idle

08/03/2022, 4:05 PM
You answered my original question. I was just trying to respond to your suggestion of just using
Copy code
scope.launch(<http://Dispatchers.IO|Dispatchers.IO>) {
  send(value!!.toObject)
}
but thank you for teaching. that makes sense.
I have this written now
Copy code
var books: List<Book>? = null
launch(<http://Dispatchers.IO|Dispatchers.IO>) {
  delay(5000)
  repeat(10000) {
    books = (value!!.toObjects())
  }
  trySend(books!!)
}
and no lag!
r

Robert Williams

08/03/2022, 4:08 PM
This isn’t going to handle backpressure well and may lead to events being emitted out of order or other bad things
Good advice is to get out of callback world as fast as you can and do your transforms in flow world
i.e.
trySend(value)
immediately and then use
map
,
filter
,
flowOn
etc to do the transforms
f

Francesc

08/03/2022, 4:13 PM
you should also call this with
flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
if you try to change dispatchers within the flow you may get exceptions due to some guarantees implemented in the flow logic that would be broken
c

Colton Idle

08/03/2022, 4:14 PM
interesting
So this example by google here maybe isn't the best? https://developer.android.com/kotlin/flow#callback
I reallllly wish firestore as a library had first class support for coroutines/flow.
f

Francesc

08/03/2022, 4:16 PM
in that example from Google you could add
flowOn
to the returned flow to make it run off the main thread
u

uli

08/03/2022, 4:17 PM
Copy code
fun getBooks(): Flow<List<Book>> {
  return callbackFlow {
    val listener =
      FirebaseFirestore.getInstance()
        .collection("books")
        .addSnapshotListener { value, error ->
          if (error != null) { /*dosomething*/ }
          trySend(value)
        }
    awaitClose { listener.remove() }
  }
  .filterNotNull()
  .map{
    value.toObjects()
  }
  .flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
@Francesc, @Colton Idle I guess, that’s a about the way
r

Robert Williams

08/03/2022, 4:18 PM
flowOn is not needed in the Google code because the Flow is a simple Channel backed flow so there’s nothing to run off the main thread
The code that runs on the main thread is the addSnapshotListener code but threading there is managed by the library, not by Flows so flowOn will do nothing
u

uli

08/03/2022, 4:19 PM
it was meant for the
value.toObjects()
r

Robert Williams

08/03/2022, 4:19 PM
Hence it’s important to return as soon as possible and do anything expensive on the Flow
u

uli

08/03/2022, 4:20 PM
so the goal is to get the
map
off the main thread
c

Colton Idle

08/03/2022, 4:22 PM
So does everyones generally agree that @uli's last code snippet is what i should be doing?
it breaks out of the callback flow as soon as possible and maps the value and flows onto IO dispatcher... so I think that just about takes everyones suggestion into account
u

uli

08/03/2022, 4:23 PM
Perfect time to close this thread as my train arrives in one minute ;-)
f

Francesc

08/03/2022, 4:25 PM
that works, but if your collector is slow you may drop emissions
c

Colton Idle

08/03/2022, 4:26 PM
if my collector is slow... i may drop emissions. pardon my ignorance. but is this "backpressure"?
f

Francesc

08/03/2022, 4:27 PM
yes, you have a
trySend
so it will try, but if the buffer is full, it will drop it
r

Robert Williams

08/03/2022, 4:30 PM
Yeah. callback flow has a fixed buffer so this is unavoidable when you don’t control the source
But the new code handles it much better because it’ll only map things that come out of the buffer rather than everything
n

Nick Allen

08/03/2022, 4:32 PM
Just add
.conflate()
after
callbackFlow
so that it always just keeps the latest value from firestore, I assume that's the only one you care about. And it looks like you can ditch
.flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
, I don't see any blocking code
r

Robert Williams

08/03/2022, 4:35 PM
Yep,
conflate
and/or replace
map
with
mapLatest
c

Colton Idle

08/03/2022, 4:39 PM
ooh. Map latest...
So mapLatest or conflate. any real reason to use one over the other?
and yes @Nick Allen the flowOn I believe is necessary because toObjects is essetnially json deserialization which can take some time with larger lists that i get back.
r

Robert Williams

08/03/2022, 4:43 PM
They do slightly different things and you can actually use both
c

Colton Idle

08/03/2022, 4:43 PM
both... at the same time?
r

Robert Williams

08/03/2022, 4:43 PM
conflate reduces buffer size to 1 and only keeps latest
mapLatest will cancel the old map as soon as it receives a new event
n

Nick Allen

08/03/2022, 4:43 PM
Do not use
mapLatest
for back-pressure. Only use it if
mapLatest
lambda is suspending code that actually suspends (not blocking on IO dispatcher). It processes every item and when it gets a new item it cancels and waits for the previous lambda (just the lambda, nothing downstream). If the code after mapLatest is slow, then you'll still have backpressure and could miss updates. They are completely separate and you should not interchange them
c

Colton Idle

08/03/2022, 4:45 PM
Okay. it looks like my end result is...
Copy code
fun getBooks(): Flow<List<Book>> {
  return callbackFlow {
    val listener =
      FirebaseFirestore.getInstance()
        .collection("books")
        .addSnapshotListener { value, error ->
          if (error != null) { /*dosomething*/ }
          trySend(value)
        }
    awaitClose { listener.remove() }
  }
  .conflate()
  .filterNotNull()
  .map{
    value.toObjects()
  }
  .flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
thank you everyone for teaching me a bunch of new things!
r

Robert Williams

08/03/2022, 4:49 PM
mapLatest
does work better if the block supports cooperative cancellation but it won’t caused missed updates any more than conflate will
f

Francesc

08/03/2022, 4:49 PM
looks good. A thing to consider is that you should not hardcode dispatchers, consider using a class that provides a wrapper for the dispatchers so that you can then replace those for tests
r

Robert Williams

08/03/2022, 4:49 PM
Unless the producing source is constantly faster than the mapping in which case it’ll never be able to emit anything
The alternative with just conflate is that you’ll get old data before it starts the next mapping
Oh, also sounds like your mapping is actually CPU bound and not I/O bound so better to use Default Dispatcher
n

Nick Allen

08/03/2022, 4:55 PM
If
mapLatest
is used with a block that doesn't support cooperative cancellation (like deserialization), then every new event is blocked waiting for the previous lambda invocation to finish, and every item is processed, which could result in backpressure causing trySend to fail and so the most recent value could be dropped. This is the opposite of what you want, which is to keep the latest item.
u

uli

08/03/2022, 5:03 PM
Next train … Actually I left error handling in your callback. Whatever ‘dosomething’ is should probably also go into the stream:
trySend(value to error)
So here comes an update:
Copy code
@Inject
@DefaultDispatcher
lateinit val defaultDispatcher : Disptachers

fun getBooks(): Flow<List<Book>> {
  return callbackFlow {
    val listener =
      FirebaseFirestore.getInstance()
        .collection("books")
        .addSnapshotListener { value, error ->
          trySend(value to error)
        }
    awaitClose { listener.remove() }
  }
  .doSomethingWithNullValueAndError() // Depends. It might be better after conflate
  .conflate()
  .map { (value, _) -> 
    value.toObjects()
  }
  .flowOn(defaultDispatcher)
r

Robert Williams

08/03/2022, 5:21 PM
@Nick Allen You’re right, for some reason I was thinking it’d still drop the final output but I guess it’s actually identical to map if your block can’t cancel
c

Colton Idle

08/03/2022, 6:06 PM
oooh that last snippet @uli looks like it ticks all of the boxes
I do have to admit though. flows are super powerful and i dont see how i would've gotten this far without all of you.
u

uli

08/03/2022, 6:13 PM
I just update some of my own code, a shared preferences listener. It received a key and an instance of the shared preferences. And it used to load the value in the callback. I do not think that this ever was an issue, as shared preferences are usually read from memory. But I feel the pattern of minimal work in the callback is worth following just as a habit. No reason to ever question it.
c

Colton Idle

08/06/2022, 5:19 PM
Sorry this bring this conversation up again, but I essentially have 10 functions now that are pretty much identical to the above (since I have like 10 different queries in my app). If I want to make a generic function that could do this, I basically came up with this. Thoughts?
Copy code
fun <T> firestoreFlow(query: Query, clazz: Class<T>, dispatcher: CoroutineDispatcher): Flow<List<T>> {
  return callbackFlow {
        val listener = query.addSnapshotListener { value, _ -> trySend(value) }
        awaitClose { listener.remove() }
      }
      .conflate()
      .filterNotNull()
      .map { value -> value.toObjects(clazz) }
      .flowOn(dispatcher)
}
f

Francesc

08/06/2022, 5:22 PM
I would remove the dispatcher from your method and let the caller add the
flowOn
. I would also pass a lambda that does the mapping for you.
so in the
map
you call a transform method that the caller provides
c

Colton Idle

08/06/2022, 5:23 PM
Interesting. yeah. I can see how those things could be passed in. I guess I'm trying to find the right level of abstraction and convenience for whoever has to come into my ApiService.kt and add a new flow. I want them to have an easy way to not mess this up (as flowables are still new to our team)
f

Francesc

08/06/2022, 5:24 PM
I see, what you had is more rigid but if that's all you need, it's easier to use
you could have 2 methods a more generic one that you could use everywhere you use Firebase, and a 2nd one that builds on top of the first that is more fine-tuned for this scenario of yours
c

Colton Idle

08/06/2022, 5:45 PM
yeah. i think that's what im going to do. essentially one could be shared online in a gist and anyone in the world could use (so to speak) and then I can have one that's a convenience for my codebase that builds on top of that. awesome. This has me so excited. I've learned so much the past few days. I think the only thing I'm still a bit uneasy about is how to handle errors and stuff, but maybe firestoer doesn't really send down errors if your query is correct. /shruggie
f

Francesc

08/06/2022, 5:48 PM
you can propagate the error down the flow, you can wrap the error in your custom exception and throw it, then whoever is collecting your flow can add a
.catch
block to the flow and decide how to handle the error. The collector should have a much better context to be able to decide what to do with the error
c

Colton Idle

08/06/2022, 5:50 PM
Pardon my ignorance here... but how would I do that? Now that you saw it. Putting the "error handling" on the caller does make a lot more sense. But I'm not sure how that changes my observable and what that looks like on the observer.
f

Francesc

08/06/2022, 5:50 PM
give me a minute to put together a gist
something like this, you can expand on the custom exception to add more details on what went wrong to give better context to the collector
Copy code
fun <T> firestoreFlow(query: Query, clazz: Class<T>, dispatcher: CoroutineDispatcher): Flow<List<T>> {
  return callbackFlow {
        val listener = query.addSnapshotListener { value, error -> 
            if (error != null) {
                throw MyException(exception = error)
            }
            if (value == null) {
                throw MyException(
                    "Null value returned from Firestore",
                    IOException()
                )
            }
        trySend(value)
        awaitClose { listener.remove() }
      }
      .conflate()
      .filterNotNull()
      .map { value -> value.toObjects(clazz) }
      .flowOn(dispatcher)
}

fun sample() {
    getBooks()
        .onEach { /* do stuff */ }
        .catch { ex ->
            // do other stuff
        }
        .launchIn(scope)
}

class MyException(
    message: String = "Failed to load data from firestore",
    exception: Exception,
): IOException(message, exception)
you can also add a
.retry
block to the flow to re-attempt, based on what error you got and how many times you've already retried
I realize I copied your original flow implementation with
runBlocking
instead of the latest one, but the idea is the same
I've amended it to avoid confusion
n

Nick Allen

08/06/2022, 6:15 PM
To send an exception to the flow, you need to call
close
with the exception instead of throwing it.
Otherwise you are just throwing the exception up to the Firestore code that called the listener.
f

Francesc

08/06/2022, 6:23 PM
thanks for pointing that out
c

Colton Idle

08/06/2022, 8:57 PM
Does an exception/"close" terminate the flow? I would think that I would want the flow to continue even if there's one error that's thrown in there.
f

Francesc

08/06/2022, 9:02 PM
You can use retry for that
c

Colton Idle

08/06/2022, 9:07 PM
thanks. im going to try a bit to force some failures just to see what its all about.
r

Robert Williams

08/08/2022, 9:02 AM
One more tip: you often can avoid passing Class in kotlin by using inline and reified https://kotlinlang.org/docs/inline-functions.html#reified-type-parameters
c

Colton Idle

08/13/2022, 1:35 PM
Looks like firestore IS getting support for Flows! Time to see how they implemented it! https://github.com/firebase/firebase-android-sdk/pull/1252#issuecomment-1213179027
Looks like they went for
trySendBlocking(snapshot)
instead of
trySend(snapshot)
Intersting
@martinbonnin Why not just use trySend instead of risking blocking the thread, which is likely to be the main one?
Good question. I'm not sure TBH. I took inspiration from https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/callback-flow.html
3 Views