How do i stop subscription of the flow. below is m...
# coroutines
t
How do i stop subscription of the flow. below is my scenario 1. poll data from some non blocking io (possibly http endpoint) 2. process the data 3. acknowledge that I processed the data Now let's say after some specific period, I want to stop polling the data. How to do it? so that my coroutines are not leaked. gist of my tried approach. https://gist.github.com/thiyagu06/974ede71f4ec0da24bee4574450ab6e7
o
if you check for cancellation (using
isActive
or
yield
) in
onEach
, cancelling the context should suffice
launchIn
also returns the
Job
that it starts, so if you hold on to that you can cancel it too
b
another option is to use structured concurrency that you already have established in your
InfiniteFlowProcessor
Copy code
class InfiniteFlowProcessor : CoroutineScope {
    private val job = Job()

    override val coroutineContext get() = job + Executors...asCoroutineDispatcher()

    fun start() {
        ...launchIn(this)
    }

    fun stop() {
        job.cancelChildren()
    }
}
or, if you don't want to expose the processor's scope publically, do:
Copy code
private val scope = CoroutineScope(job + Executors...asCoroutineDispatcher())
Copy code
fun start() {
    ...launchIn(scope)
}
t
so my stop function will look this, and coroutines started inside
channelFlow
block also will be cancelled.
Copy code
fun stop () {
  scope.cancel()
}
b
IIRC, yes
👍 1
one thing to note about
.cancel
vs
.cancelChildren
is you won't be able to have
start()
do anything after
stop()
has been called because the
val job
has been cancelled fully, vs just its children jobs