Still a flow/coroutine noob. I'm using firebase fi...
# coroutines
c
Still a flow/coroutine noob. I'm using firebase firestore and it has a listener approach to listen to updates on my data. I want to convert this into a Flowable so I can observe it. Is this an okay approach
Copy code
suspend fun getThingFlow(myId: String): Flow<Thing?> {
    val thingFlow = MutableStateFlow<Thing?>(null)
    FirebaseFirestore.getInstance()
        .document("things/$myId")
        .addSnapshotListener { value, error ->
            thingFlow.emit(value!!.toObject<Thing>())
        }

    return thingFlow
}
It also seems like
Copy code
suspendCoroutine {
and
Copy code
callbackFlow
are popular. Just overall confused on which would be the best approach here.
k
There is a special API for transforming callback-based API to coroutines:
Copy code
suspend fun getThingFlow(myId: String): Flow<Thing?> =
        suspendCancellableCoroutine<Thing> { continuation ->
            FirebaseFirestore.getInstance()
                .document("things/$myId")
                .addSnapshotListener { value, error ->
                    continuation.resume(value!!.toObject<Thing>()) // note that this `resume` method is extension function one, so you need to import it
                }
            it.invokeOnCancellation {
                // do the cleanup here
            }
        }
I don’t know
FirebaseFirestore
API, so be careful copy-pasting it, but generally it should be something like above
c
@KamilH weird that the api is "coroutine" (suspendCancellableCoroutine) and not "flow" (like suspendCancellableFlow) Makes it seem like I wont keep getting updates besides the first one. But okay. I will give it a whirl. thanks
Oh yeah. Docs for suspendCancellableCoroutine say "For multi-shot callback APIs see callbackFlow."
k
Yes, sorry, you’re right I miss-read your request. You should use `callbackFlow`:
Copy code
suspend fun getThingFlow(myId: String): Flow<Thing?> =
        callbackFlow<Thing?> {
            FirebaseFirestore.getInstance()
                .document("things/$myId")
                .addSnapshotListener { value, error ->
                    trySend(value!!.toObject<Thing>()) 
                }
            
            invokeOnClose {
                // do the cleanup here
            }
        }
☝️ 3
As I said, I don’t know 
FirebaseFirestore
 API, so if there is any indication that this stream is finished (closed) you can use
close()
function to close this
flow
c
Cool. I will keep researching. This gets me closer seemingly... 🤞
oh fuck yeah. it works! LIVE UPDATES! Thanks for teaching!
t
callbackFlow
is the way to go, but honestly I’m impressed there’s no flow api for Firebase SDK yet
3
h
BTW StateFlow is a hot flow, callbackFlow is a cold one.
j
+1 to @hfhbd, with the
callbackFlow
approach, the function shouldn't be suspending. Also, the listener should be assigned to a variable so it can be unregistered in
awaitClose
. (see the docs for callbackFlow, they are pretty helpful)
c
With @Joffreys advice I changed it to:
Copy code
fun getThingFlow(myId: String): Flow<Thing?> =
        callbackFlow<Thing?> {
            val listener = FirebaseFirestore.getInstance()
                .document("things/$myId")
                .addSnapshotListener { value, error ->
                    trySend(value!!.toObject<Thing>()) 
                }
            
            invokeOnClose {
                listener.close()
            }
        }
Thanks everyone!
👍 2
h
@Colton Idle Not exactly 😄
Copy code
fun getThingFlow(myId: String): Flow<Thing?> =
        callbackFlow<Thing?> {
            val listener = FirebaseFirestore.getInstance()
                .document("things/$myId")
                .addSnapshotListener { value, error ->
                    trySend(value!!.toObject<Thing>()) 
                }
            
            invokeOnClose {
               close()
            }
        awaitClose {
          listener.close()
        }
c
ooOoh. invokeOnClose and awaitClose?
j
I don't think
invokeOnClose
should be called at all here. Only
awaitClose
h
@Joffrey Yeah, the
invokeOnClose
function should be part of
listener
to finish/close the flow when all items are emitted. Otherwise, this flow will never finish by itself.
j
No,
invokeOnClose
shouldn't be called in the listener either, only
close()
should (when you want to close the channel of the flow).
invokeOnClose
is a callback on the
SendChannel
itself, why doesn't really have its place in a
callbackFlow
setup AFAIU
h
I mean this:
Copy code
fun getThingFlow(myId: String): Flow<Thing?> =
        callbackFlow<Thing?> {
            val listener = FirebaseFirestore.getInstance()
                .document("things/$myId")
                .addSnapshotListener { value, error ->
                    trySend(value!!.toObject<Thing>()) 
                }
            listener.onError { // I dont know the FireStore API
close(it)
}
            listener.invokeOnClose { // or onFinished. I dont know the FireStore API...
               close()
            }
        awaitClose {
          listener.close()
        }
j
Oh ok, in your previous snippet
invokeOnClose
was that of the
SendChannel
, and is an existing function. I didn't get that you were talking about a hypothetical closing callback of the listener. There is no such thing AFAIK in this case, it's just a subscription to receive updates. It has no end in itself, so the only way to end the flow is by cancelling it from the collector side, which
awaitClose
is meant to handle:
Copy code
fun getThingFlow(myId: String): Flow<Thing?> =
        callbackFlow<Thing?> {
            val listener = FirebaseFirestore.getInstance()
                .document("things/$myId")
                .addSnapshotListener { value, error ->
                    trySend(value!!.toObject<Thing>()) 
                }
            
            awaitClose {
                listener.close()
            }
        }
👍 1
h
Thanks, I wasn't aware this is an infinite stream.
c
Wait, so @Joffrey's last example is what I should go with? 😅
j
Yes, I believe so
c
i also just submitted a FR to firebase.
🆒 1
j
If you don't call
awaitClose
anywhere, your flow will terminate immediately.
invokeOnClose
just registers a handler but doesn't wait
Please link your issue here, it might help other people 😉
c
theres no public issue. the FR for firebase is only through a form
j
Ah ok, fair enough then 😄
c
Okay. Here's my final usage
Copy code
fun getThingFlow(myId: String): Flow<Thing?> =
        callbackFlow<Thing?> {
            val listener = FirebaseFirestore.getInstance()
                .document("things/$myId")
                .addSnapshotListener { value, error ->
                    trySend(value!!.toObject<Thing>()) 
                }
            
            awaitClose {
                listener.remove()
            }
        }
The only real change I made was listener.remove() since there is no close() api (i dont know where I got that from)
🎉 1
For anyone following along. It looks like firebase is getting flow support! https://github.com/firebase/firebase-android-sdk/pull/1252#issuecomment-1213179027
104 Views