Use case: I want a flow of events from a callback ...
# coroutines
s
Use case: I want a flow of events from a callback based API. But whenever there is a new value incoming, I need to call a suspending function (may make a network request). However this callback is not suspending, so I am not in a suspending context. More details in thread 🧵
Copy code
fun getLoginStatusAsFlow(): Flow<LoginStatus> {
  return callbackFlow {
    val callback: (data: Any) -> Unit = { data ->
      val result = someSuspendingFunction(data) //Error here, not in a suspending scope
      channel.trySend(result) // Or .send
    }
    foo.registerCallback(callback)
    awaitClose { foo.unregisterCallback(callback) }
  }
}
Any ideas on how to approach this?
j
Send the data into the
Flow
,
map
the network call.
h
How about using
flow.map {}
?
Copy code
fun getLoginStatusAsFlow(): Flow<LoginStatus> {
  return callbackFlow {
    val callback: (data: Any) -> Unit = { data ->
      channel.send(data) // Or .send
    }
    foo.registerCallback(callback)
    awaitClose { foo.unregisterCallback(callback) }
  }.map {data ->
    someSuspendingFunction(data)
  }
}
c
The
callbackFlow
is probably fine for what you need, unless it's emitting really quickly. callbackFlow basically just sends events into a buffered
Channel
(default buffer capacity of 64), and then exposes that channel as a
Flow
. So like the others have said,
callbackFlow { }.map { }
should be perfectly fine for this usecase, since you can post up to 64 events to get queued up and processed one-by-one from the Flow before anything bad happens
s
Wow of course, I didn’t think of this. In my use case I’d only be interested in the latest value, no interest in historical information. In this case I guess I can also replace
map
with
mapLatest
which would cancel the old request right? I also don’t quite care for storing older triggers, would then removing the default buffer capacity be a smart idea, with something like
.conflate()
right before the map? Thanks a lot for all the suggestions and help btw, really appreciated!
n
I'd actually be more concerned about conflating after
mapLatest
.
conflate
only affects behavior if items are sent faster than they are collected.
mapLatest
cancels on new events so the upstream rarely runs into this. Not that it's impossible, cancellation and just receiving from the
Channel
can take time to dispatch and finish.
s
But if the
callbackFlow
callback sends a lot of values, while I’m still inside the
mapLatest
and I’m pending cancellation, I don’t want all the incoming value to wait in queue and go into the
mapLatest
only to be cancelled again, conflating them would simply drop the oldest events and have the latest one wait until the ongoing
mapLatest
is being cancelled. * After testing it a bit, it seems like it will enter the
mapLatest
even if there are many queued things. This is problematic in the case that my
mapLatest
isn’t cooperating with cancelation immediately. I can add a yield() as the first line of
mapLatest
and it achieves the same goal. But you’re right, conflating them after the
mapLatest
as well would also help the observer to only receive the latest value in case it’s for some reason consuming the new values at a lower pace. Thanks for this tip as well!