deviant
03/05/2017, 4:14 PMfun dataChannel(context: CoroutineContext) = produce<Any>(context) {
val listener = object : Listener {
override fun onSuccess(value: Any) {
offer(value) // emit to channel
}
override fun onError(throwable: Throwable) {
close(throwable) // close channel
//throw throwable
}
}
addListener(listener)
fetchData() // invoke callback's onResult here
// need to register removeListener() somewhere
}
kingsley
03/05/2017, 4:36 PMfun dataChannel()
will be creating a producer job that you can read new values from.
I suppose it doesn't matter if firebase continues sending those values as long as you have removed your listener? If that is the case, then I think this is fine. You might need to add removeListener
before you call close
. Also consider suspending send(value)
over offerdeviant
03/05/2017, 4:40 PMkingsley
03/05/2017, 4:56 PMval actorJob = actor<Any>(context) {
for (value in channel) {
// do something with value
}
}
firebase.addListener(object: Listener {
override fun onSuccess(value: Any) = actorJob.offer(value)
override fun onError(throwable: Throwable) = Unit // Not sure what you want to do here
})
firebase.onCancel { actorJob.close() }
// Or when activity is destroyed
override fun onDestroy() {
actorJob.close()
super.onDestroy()
}
Will this work for you?elizarov
03/05/2017, 4:58 PMoffer
instead of sendkingsley
03/05/2017, 4:59 PMelizarov
03/05/2017, 4:59 PMcapacity = Channel.UNLIMITED
), this way no values will ever get lost even if processing in actor is slow.offer
. Which might be Ok, depending on how mcuh data is coming in.deviant
03/05/2017, 5:06 PMsuspend fun dataChannel() = produce<Any>(CommonPool) {
log("start the channel")
val listener = ListenerChannel()
addListener(listener)
Database.floodListener()
try {
for (any in listener) {
offer(any)
}
} catch (e: IllegalStateException) {
log("canceled!")
removeListener(listener)
close(e)
}
}
class ListenerChannel : LinkedListChannel<Any>(), Listener {
override fun onSuccess(value: Any) {
offer(value)
}
override fun onError(throwable: Throwable) {
close(throwable)
}
}
so when channel being closed outside (e.g. on activity destroy) the listener will be recycled in the catch section