https://kotlinlang.org logo
#coroutines
Title
# coroutines
d

deviant

03/05/2017, 4:14 PM
fetchData is something like firebaseReference.addValueEventListener(listener) in my real code. it is totally safe to call from UI thread. it registers callback that can be triggered many times. Also it is necessary to unregister callback when activity is destroyed or when we dont need updates anymore. so i need some mechanism to catch cancel event inside this method
Copy code
fun dataChannel(context: CoroutineContext) = produce<Any>(context) {
        val listener = object : Listener {
            override fun onSuccess(value: Any) {
                offer(value)    // emit to channel
            }

            override fun onError(throwable: Throwable) {
                close(throwable)    // close channel
                //throw throwable
            }
        }
        addListener(listener)
        
        fetchData() // invoke callback's onResult here
        
        // need to register removeListener() somewhere
    }
k

kingsley

03/05/2017, 4:36 PM
deviant: Hey. Sorry I'm not quite familiar with the way "value event listener" works. From you you have here, the
fun dataChannel()
will be creating a producer job that you can read new values from. I suppose it doesn't matter if firebase continues sending those values as long as you have removed your listener? If that is the case, then I think this is fine. You might need to add
removeListener
before you call
close
. Also consider suspending
send(value)
over offer
d

deviant

03/05/2017, 4:40 PM
@kingsley 1. send() is not allowed inside callback method, because it is not a coroutine. 2. i need to trigger removeListener in onCancel (when it canceled outside), not just in the end of method or in the onError callback i was thinking about wrapping this whole method into another cancellable coroutine, but didn't success yet
k

kingsley

03/05/2017, 4:56 PM
Okay. I see. I'll suggest an opposite approach to handling this: 1. Create an actor, which will process whatever value you receive from Firebase. 2. the actor exposes a job that you can use to feed in new values for processing. 3. Create your firebase listener like you normally would, and using this job, send values to the actor 4. Cancel your firebase listener like you normally would, and when you are done, shutdown the actor Something like:
Copy code
val actorJob = actor<Any>(context) {
  for (value in channel) {
    // do something with value
  }
}

firebase.addListener(object: Listener {
  override fun onSuccess(value: Any) = actorJob.offer(value)

  override fun onError(throwable: Throwable) = Unit // Not sure what you want to do here
})

firebase.onCancel { actorJob.close() }

// Or when activity is destroyed
override fun onDestroy() {
  actorJob.close()
  super.onDestroy()
}
Will this work for you?
👍 1
e

elizarov

03/05/2017, 4:58 PM
The code above will need to
offer
instead of send
k

kingsley

03/05/2017, 4:59 PM
Oh. Yes. Fixed. Thank you
e

elizarov

03/05/2017, 4:59 PM
Actor can (should?) be created with unlimited size buffer (
capacity = Channel.UNLIMITED
), this way no values will ever get lost even if processing in actor is slow.
But unfortunatel, there is no backpressure with
offer
. Which might be Ok, depending on how mcuh data is coming in.
d

deviant

03/05/2017, 5:06 PM
well, in my case the processing is pretty fast, and emitting is rare, so it's ok. thanks @kingsley will try that approach
ended up with this approach:
Copy code
suspend fun dataChannel() = produce<Any>(CommonPool) {
        log("start the channel")
        val listener = ListenerChannel()
        addListener(listener)

        Database.floodListener()

        try {
            for (any in listener) {
                offer(any)
            }
        } catch (e: IllegalStateException) {
            log("canceled!")
            removeListener(listener)
            close(e)
        }

    }

    class ListenerChannel : LinkedListChannel<Any>(), Listener {
        override fun onSuccess(value: Any) {
            offer(value)
        }

        override fun onError(throwable: Throwable) {
            close(throwable)
        }

    }
so when channel being closed outside (e.g. on activity destroy) the listener will be recycled in the catch section
5 Views