spierce7
09/13/2020, 8:34 PMFlow
to an Observable
safe? The reason I’m wondering is because the Observable
can be cancelled, but the Flow
can’t. Is the suspending behavior of the coroutine sufficient?
With Kotlin Multiplatform for the JS side we’re converting `Flow`s to rxjs
Observable
s. Code inside the thread.fun <T> Flow<T>.toObservable(): Observable<T> {
val flow = this
return Observable { _, subscriber ->
GlobalScope.launch(start = CoroutineStart.UNDISPATCHED) {
try {
flow.collect {
if (!subscriber.closed) {
subscriber.next(it)
}
}
if (!subscriber.closed) {
subscriber.complete()
}
} catch (e: Throwable) {
if (!subscriber.closed) {
subscriber.error(e)
}
}
}
}
}
louiscad
09/13/2020, 8:39 PMFlow
can definitely be cancelled by cancelling the collector directly or indirectly (e.g. through an operator). Your snippet doesn't seem to link to Observable
cancellation though.spierce7
09/13/2020, 8:39 PMObservable
creation scope, and cancel the internal scope, shouldn’t I?Zach Klippenstein (he/him) [MOD]
09/13/2020, 8:41 PMspierce7
09/13/2020, 8:43 PMreturn Observable { _, subscriber ->
val job = Job()
GlobalScope.launch(job, start = CoroutineStart.UNDISPATCHED) {
try {
flow.collect {
if (!subscriber.closed) {
subscriber.next(it)
}
}
if (!subscriber.closed) {
subscriber.complete()
}
} catch (e: Throwable) {
if (!subscriber.closed) {
subscriber.error(e)
}
}
}
subscriber.add {
job.cancel()
}
}
Zach Klippenstein (he/him) [MOD]
09/13/2020, 8:52 PMlouiscad
09/14/2020, 1:52 AMObservable
API, but I think you can write unit tests to check it works as expected for different cases like cancellation, exceptions, and you can take inspiration from what Zach just linked that is definitely tested.elizarov
09/14/2020, 2:31 PM