Is converting a `Flow` to an `Observable` safe? Th...
# coroutines
s
Is converting a
Flow
to an
Observable
safe? The reason I’m wondering is because the
Observable
can be cancelled, but the
Flow
can’t. Is the suspending behavior of the coroutine sufficient? With Kotlin Multiplatform for the JS side we’re converting `Flow`s to
rxjs
Observable
s. Code inside the thread.
Copy code
fun <T> Flow<T>.toObservable(): Observable<T> {
    val flow = this

    return Observable { _, subscriber ->
        GlobalScope.launch(start = CoroutineStart.UNDISPATCHED) {
            try {
                flow.collect {
                    if (!subscriber.closed) {
                        subscriber.next(it)
                    }
                }

                if (!subscriber.closed) {
                    subscriber.complete()
                }
            } catch (e: Throwable) {
                if (!subscriber.closed) {
                    subscriber.error(e)
                }
            }
        }
    }
}
l
A
Flow
can definitely be cancelled by cancelling the collector directly or indirectly (e.g. through an operator). Your snippet doesn't seem to link to
Observable
cancellation though.
1
s
oh wait - I should have a scope created inside the
Observable
creation scope, and cancel the internal scope, shouldn’t I?
👍 1
z
Have you looked at the coroutines libraries implementation of this method for RxJava?
s
@Zach Klippenstein (he/him) [MOD] I looked for it, but couldn’t find it.
@louiscad What do you think?
Copy code
return Observable { _, subscriber ->
        val job = Job()
        GlobalScope.launch(job, start = CoroutineStart.UNDISPATCHED) {
            try {
                flow.collect {
                    if (!subscriber.closed) {
                        subscriber.next(it)
                    }
                }

                if (!subscriber.closed) {
                    subscriber.complete()
                }
            } catch (e: Throwable) {
                if (!subscriber.closed) {
                    subscriber.error(e)
                }
            }
        }

        subscriber.add {
            job.cancel()
        }
    }
l
@spierce7 I don't know the
Observable
API, but I think you can write unit tests to check it works as expected for different cases like cancellation, exceptions, and you can take inspiration from what Zach just linked that is definitely tested.
e
What do you mean by "`Observable` can be cancelled"?