https://kotlinlang.org logo
Title
s

spierce7

09/13/2020, 8:34 PM
Is converting a
Flow
to an
Observable
safe? The reason I’m wondering is because the
Observable
can be cancelled, but the
Flow
can’t. Is the suspending behavior of the coroutine sufficient? With Kotlin Multiplatform for the JS side we’re converting `Flow`s to
rxjs
Observable
s. Code inside the thread.
fun <T> Flow<T>.toObservable(): Observable<T> {
    val flow = this

    return Observable { _, subscriber ->
        GlobalScope.launch(start = CoroutineStart.UNDISPATCHED) {
            try {
                flow.collect {
                    if (!subscriber.closed) {
                        subscriber.next(it)
                    }
                }

                if (!subscriber.closed) {
                    subscriber.complete()
                }
            } catch (e: Throwable) {
                if (!subscriber.closed) {
                    subscriber.error(e)
                }
            }
        }
    }
}
l

louiscad

09/13/2020, 8:39 PM
A
Flow
can definitely be cancelled by cancelling the collector directly or indirectly (e.g. through an operator). Your snippet doesn't seem to link to
Observable
cancellation though.
1
s

spierce7

09/13/2020, 8:39 PM
oh wait - I should have a scope created inside the
Observable
creation scope, and cancel the internal scope, shouldn’t I?
👍 1
z

Zach Klippenstein (he/him) [MOD]

09/13/2020, 8:41 PM
Have you looked at the coroutines libraries implementation of this method for RxJava?
s

spierce7

09/13/2020, 8:43 PM
@Zach Klippenstein (he/him) [MOD] I looked for it, but couldn’t find it.
@louiscad What do you think?
return Observable { _, subscriber ->
        val job = Job()
        GlobalScope.launch(job, start = CoroutineStart.UNDISPATCHED) {
            try {
                flow.collect {
                    if (!subscriber.closed) {
                        subscriber.next(it)
                    }
                }

                if (!subscriber.closed) {
                    subscriber.complete()
                }
            } catch (e: Throwable) {
                if (!subscriber.closed) {
                    subscriber.error(e)
                }
            }
        }

        subscriber.add {
            job.cancel()
        }
    }
l

louiscad

09/14/2020, 1:52 AM
@spierce7 I don't know the
Observable
API, but I think you can write unit tests to check it works as expected for different cases like cancellation, exceptions, and you can take inspiration from what Zach just linked that is definitely tested.
e

elizarov

09/14/2020, 2:31 PM
What do you mean by "`Observable` can be cancelled"?