magnumrocha
02/20/2020, 4:33 PMchannelFlow
or callbackFlow
) without sending a error to the consumers?streetsofboston
02/20/2020, 4:35 PMmagnumrocha
02/20/2020, 4:37 PMchannelFlow
that I want to close itstreetsofboston
02/20/2020, 4:37 PMmagnumrocha
02/20/2020, 4:37 PMkevin.cianfarini
02/20/2020, 4:38 PMstreetsofboston
02/20/2020, 4:38 PMstreetsofboston
02/20/2020, 4:39 PMmagnumrocha
02/20/2020, 4:40 PMkevin.cianfarini
02/20/2020, 4:40 PMval flow = channelFlow {
// do some emissions
close()
}
magnumrocha
02/20/2020, 4:40 PMkevin.cianfarini
02/20/2020, 4:41 PMkevin.cianfarini
02/20/2020, 4:41 PMmagnumrocha
02/20/2020, 4:41 PMclose()
will throw a CloseSendChannelExceptionstreetsofboston
02/20/2020, 4:41 PMstreetsofboston
02/20/2020, 4:42 PMval flow = channelFlow {
// Flow start here.
... flow emits stuff here for a while.
...
// FLow ends/closes right here by exiting the lambda
}
streetsofboston
02/20/2020, 4:43 PMval flow = channelFlow {
...
...
awaitClose { /* do clean up */ }
}
The awaitClose
suspends forever, until the scope in which the flow
is collected is cancelledkevin.cianfarini
02/20/2020, 4:44 PMstreetsofboston
02/20/2020, 4:44 PMchannelFlow
is an implementation detail, mostly.kevin.cianfarini
02/20/2020, 4:45 PMSuspends the current coroutine until the channel is either closed or cancelled and invokes the given block before resuming the coroutine.
magnumrocha
02/20/2020, 4:45 PMcallbackFlow {
val callback = object : Callback {
override fun onNextValue(value: T) {
offer(value)
}
override fun onApiError(cause: Throwable) {
cancel(CancellationException("API Error", cause))
}
override fun onCompleted() = channel.close()
}
api.register(callback)
awaitClose { api.unregister(callback) }
}
kevin.cianfarini
02/20/2020, 4:45 PMstreetsofboston
02/20/2020, 4:45 PMcallbackFlow
== channelFlow
, they are exactly the same 🙂
Just a difference in name 🙂magnumrocha
02/20/2020, 4:46 PMemitter.onComplete()
of RxJavamagnumrocha
02/20/2020, 4:46 PMstreetsofboston
02/20/2020, 4:47 PMstreetsofboston
02/20/2020, 4:48 PMchannel.close()
will work, you may get errors about using a closed channel…., you’d need to exit that lambda.kevin.cianfarini
02/20/2020, 4:48 PMmagnumrocha
02/20/2020, 4:48 PMstreetsofboston
02/20/2020, 4:48 PMdoOnComplete
magnumrocha
02/20/2020, 4:48 PMCloseSendChannelException
kevin.cianfarini
02/20/2020, 4:49 PMmagnumrocha
02/20/2020, 4:49 PM.onCompletion { error -> someAction() }
magnumrocha
02/20/2020, 4:49 PMCloseSendChannelException
magnumrocha
02/20/2020, 4:49 PMstreetsofboston
02/20/2020, 4:50 PMoverride fun onCompleted()
needs to cause the lambda of the channelFlow
to end.magnumrocha
02/20/2020, 4:52 PMOnFinishException()
, and when I want to close my channelFlow
with success, I do: close(OnFinishException())
magnumrocha
02/20/2020, 4:53 PM.onCompletion { error -> if (error is OnFinishException) finishAction() else errorAction(error) }
streetsofboston
02/20/2020, 4:54 PMcancel()
?magnumrocha
02/20/2020, 4:56 PMcancel()
will make the flow throws a CloseSendChannelException
to the consumers..magnumrocha
02/20/2020, 4:56 PMkevin.cianfarini
02/20/2020, 4:58 PMkevin.cianfarini
02/20/2020, 4:58 PMkevin.cianfarini
02/20/2020, 4:59 PMmagnumrocha
02/20/2020, 4:59 PMkevin.cianfarini
02/20/2020, 4:59 PMmagnumrocha
02/20/2020, 4:59 PMkevin.cianfarini
02/20/2020, 4:59 PMkevin.cianfarini
02/20/2020, 4:59 PMkevin.cianfarini
02/20/2020, 4:59 PMkevin.cianfarini
02/20/2020, 4:59 PMmagnumrocha
02/20/2020, 5:02 PMmagnumrocha
02/20/2020, 5:02 PM.onComplete()
in the consumers...streetsofboston
02/20/2020, 5:03 PMstreetsofboston
02/20/2020, 5:05 PMfun main() {
val callback = Observable.just(1, 2, 3).delay(200, TimeUnit.MILLISECONDS)
val intFlow = channelFlow<Int> {
suspendCancellableCoroutine<Unit> { cont ->
callback.subscribe(
{ // onNext
offer(it)
},
{ // onError
cont.resumeWithException(it)
},
{ // onCompleted
cont.resume(Unit)
}
)
}
}
// Start collecting the flow
val scope = CoroutineScope(<http://Dispatchers.IO|Dispatchers.IO>)
scope.launch {
intFlow.collect {
println(it)
}
println("Done")
}
Thread.sleep(2000)
}
kevin.cianfarini
02/20/2020, 5:05 PMstreetsofboston
02/20/2020, 5:06 PMsuspendCancallableCoroutine
that only resumes on a onCompleted
(cont.resume(Unit)), which then will cause the channelFlow’s lambda to end.magnumrocha
02/20/2020, 5:06 PMmagnumrocha
02/20/2020, 5:06 PMmagnumrocha
02/20/2020, 5:07 PMmagnumrocha
02/20/2020, 5:11 PMmagnumrocha
02/20/2020, 5:11 PMstreetsofboston
02/20/2020, 5:18 PMsuspendCancallableCoroutine
so that the lambda of your channel/callback-flow ends in an orderly fashion?magnumrocha
02/20/2020, 5:20 PMstreetsofboston
02/20/2020, 5:33 PMfun <T> makeFlow(api: API) : Flow<T> = callbackFlow {
suspendCancellableCoroutine<Unit> { cont ->
val callback = object : Callback<T> {
override fun onNextValue(value: T) {
offer(value)
}
override fun onApiError(cause: Throwable) {
cont.resumeWithException(cause)
}
override fun onCompleted() {
cont.resume(Unit)
}
}
api.register(callback)
cont.invokeOnCancellation { api.unregister(callback) }
}
}
magnumrocha
02/20/2020, 5:34 PM