spierce7
09/13/2020, 8:34 PMFlow to an Observable safe? The reason I’m wondering is because the Observable can be cancelled, but the Flow can’t. Is the suspending behavior of the coroutine sufficient?
With Kotlin Multiplatform for the JS side we’re converting `Flow`s to rxjs Observable s. Code inside the thread.spierce7
09/13/2020, 8:35 PMfun <T> Flow<T>.toObservable(): Observable<T> {
val flow = this
return Observable { _, subscriber ->
GlobalScope.launch(start = CoroutineStart.UNDISPATCHED) {
try {
flow.collect {
if (!subscriber.closed) {
subscriber.next(it)
}
}
if (!subscriber.closed) {
subscriber.complete()
}
} catch (e: Throwable) {
if (!subscriber.closed) {
subscriber.error(e)
}
}
}
}
}louiscad
09/13/2020, 8:39 PMFlow can definitely be cancelled by cancelling the collector directly or indirectly (e.g. through an operator). Your snippet doesn't seem to link to Observable cancellation though.spierce7
09/13/2020, 8:39 PMObservable creation scope, and cancel the internal scope, shouldn’t I?Zach Klippenstein (he/him) [MOD]
09/13/2020, 8:41 PMspierce7
09/13/2020, 8:43 PMspierce7
09/13/2020, 8:43 PMreturn Observable { _, subscriber ->
val job = Job()
GlobalScope.launch(job, start = CoroutineStart.UNDISPATCHED) {
try {
flow.collect {
if (!subscriber.closed) {
subscriber.next(it)
}
}
if (!subscriber.closed) {
subscriber.complete()
}
} catch (e: Throwable) {
if (!subscriber.closed) {
subscriber.error(e)
}
}
}
subscriber.add {
job.cancel()
}
}Zach Klippenstein (he/him) [MOD]
09/13/2020, 8:52 PMlouiscad
09/14/2020, 1:52 AMObservable API, but I think you can write unit tests to check it works as expected for different cases like cancellation, exceptions, and you can take inspiration from what Zach just linked that is definitely tested.elizarov
09/14/2020, 2:31 PM