is there are way to close a Channel (`channelFlow`...
# coroutines
m
is there are way to close a Channel (
channelFlow
or
callbackFlow
) without sending a error to the consumers?
s
Closing a Channel or ending a Flow?
m
I have a
channelFlow
that I want to close it
s
A channelFlow is a Flow, backed internally by a Channel….
m
sure
k
seems to me you'd want to launch a coroutine for the flow and cancel the job that houses it?
s
How is your (channel)Flow implemented right now?
@kevin.cianfarini That would close/end the Flow from a consumer/collector perspective. Maybe, but i’m not sure, @magnumrocha wants to have the Flow (producer) itself or some other third party close/end it?
m
yes, I would like the receivers get the signal of the close by the .onComplete() without any error
k
Copy code
val flow = channelFlow {
    // do some emissions
    close()
}
m
yes, like this
k
okay I wasn't sure if I was understanding your question
m
but
close()
will throw a CloseSendChannelException
s
You don’t need to call close(). Just exiting the lambda is sufficient
1
Copy code
val flow = channelFlow {
    // Flow start here.
    ... flow emits stuff here for a while.
    ...
    // FLow ends/closes right here by exiting the lambda
}
Often, for long lived flows you’ll see this, preventing the flow from ending prematurely:
Copy code
val flow = channelFlow {
    ...
    ...
    awaitClose { /* do clean up */ }
}
The
awaitClose
suspends forever, until the scope in which the
flow
is collected is cancelled
k
That will also be run if the channel is closed, right?
s
Why would you close the channel? The channel in this
channelFlow
is an implementation detail, mostly.
k
Suspends the current coroutine until the channel is either closed or cancelled and invokes the given block before resuming the coroutine.
m
my situation is most like this:
Copy code
callbackFlow {
    val callback = object : Callback {
        override fun onNextValue(value: T) {
            offer(value)
        }
        override fun onApiError(cause: Throwable) {
            cancel(CancellationException("API Error", cause))
        }
        override fun onCompleted() = channel.close()
    }
    api.register(callback)

    awaitClose { api.unregister(callback) }
}
k
channelflow is often used for callbacks. Close on error callback
s
Note that
callbackFlow
==
channelFlow
, they are exactly the same 🙂 Just a difference in name 🙂
👌 2
m
the behavior I'am looking for is like the
emitter.onComplete()
of RxJava
this close the Stream and don't emit any Exceptions
s
Ah… a third party (through the onCompleted) like to end the Flow.
Not sure if calling
channel.close()
will work, you may get errors about using a closed channel…., you’d need to exit that lambda.
k
m
yes, am I am using it
s
onCompletion is just a callback, just like Rx’
doOnComplete
m
but I receive a
CloseSendChannelException
k
ah
m
Copy code
.onCompletion { error -> someAction() }
in this case error is a
CloseSendChannelException
and I have different action for errors...
s
yeah… The implementation of
override fun onCompleted()
needs to cause the lambda of the
channelFlow
to end.
m
I have a workaround for now, where I've created a Exception like:
OnFinishException()
, and when I want to close my
channelFlow
with success, I do:
close(OnFinishException())
and on the consumer side:
Copy code
.onCompletion { error -> if (error is OnFinishException) finishAction() else errorAction(error) }
s
What if you just call
cancel()
?
m
just call
cancel()
will make the flow throws a
CloseSendChannelException
to the consumers..
in my case, I have some errors treatments...
k
could you use cancel and close for different things? Cancel throws a cancellation exception, wherreas close throws a close exception
and then base your finish/error treatment on that versus creating your own exception
m
yes that what I'm doing right now
m
but, I don't like it so much
k
for what reason?
cancellation and closing mean different things
👍 1
a channel is usually closed because it's done
👍 1
and cancelled because of an error
m
I agree with you point of view, but I don't understand why close a channel/flow dispatches a Exception in the stream
in some cases, like mine, I want just close the channel/flow and causes the
.onComplete()
in the consumers...
s
Cancelled can happen without an error as well. A CancellationException is somewhat special. A CancellationException allows for the orderly wind-down of a Job
This works for me:
Copy code
fun main() {
    val callback = Observable.just(1, 2, 3).delay(200, TimeUnit.MILLISECONDS)

    val intFlow = channelFlow<Int> {
        suspendCancellableCoroutine<Unit> { cont ->
            callback.subscribe(
                { // onNext
                    offer(it)
                },
                { // onError
                    cont.resumeWithException(it)
                },
                { // onCompleted
                    cont.resume(Unit)
                }
            )
        }
    }

    // Start collecting the flow
    val scope = CoroutineScope(<http://Dispatchers.IO|Dispatchers.IO>)

    scope.launch {
        intFlow.collect {
            println(it)
        }
        println("Done")
    }

    Thread.sleep(2000)
}
k
How else would you signal the consumer that a channel is closed without throwing?
s
I used a
suspendCancallableCoroutine
that only resumes on a
onCompleted
(cont.resume(Unit)), which then will cause the channelFlow’s lambda to end.
m
to represent the end of the producing of some data
think in an operartion of scan for some data...
when the scan completes, you want to alert the consumers..
so I close() the flow to alert the end of the scanning process
because, I will not produce any data for the scanning action anymore
s
@magnumrocha Did you try to use the
suspendCancallableCoroutine
so that the lambda of your channel/callback-flow ends in an orderly fashion?
m
I don't know if this will work for my case because I have a subscription of my channelFlow in the consumer side
s
This wont’ work?
Copy code
fun <T> makeFlow(api: API) : Flow<T> = callbackFlow {
    suspendCancellableCoroutine<Unit> { cont ->
        val callback = object : Callback<T> {
            override fun onNextValue(value: T) {
                offer(value)
            }

            override fun onApiError(cause: Throwable) {
                cont.resumeWithException(cause)
            }

            override fun onCompleted() {
                cont.resume(Unit)
            }
        }
        api.register(callback)
        cont.invokeOnCancellation { api.unregister(callback) }
    }
}
m
humm... let me try it 🙂