Hi, how would one migrate Single.fromCallable { } ...
# coroutines
u
Hi, how would one migrate Single.fromCallable { } into coroutines? Should I just leave it as regular blocking function?
b
It depends. It may be plain kotlin function and you can later launch it as coroutine, in background. It may be suspending function if you need to use other suspending functions there.
u
In this case its long blocking code, i.e. the function was synchronous, wrapped with Single.fromCallable
I mean it works, but, I suspend this is uncancelable right? in order for it to be cancelable I need to wrap the blocking code in suspendCancellableCoroutine ?
b
coroutines support cooperative cancellation, so your code has to do explicit check for cancel. If function doesn't support cancellation, even if job.cancel() is called it will wait for function to finish.
u
Yes I understand that I need to break the blocking code manually, but I meant step before that, i.e. this
Copy code
Single.fromCallable { calculation1() }
 .flatMap { calculation2() }
if this chain gets disposed, yes calculation1 will continue till completion, however calculation2 wont get triggered
which is not the case if I do
Copy code
launch {
   calculation1()
   calculation2()
}
given calculation1,2 are just regular kotlin blocking functions
b
so you have to check if flow was cancelled right between your method calls
u
wouldnt wrapping it in suspendCancellableCoroutine be more idiomatic?
b
Imho no, good old try-catch is easier to read. With suspendCancelableCoroutine things are getting more callback based
u
btw I do see isActive in launch block, however this is in a suspend function, where there is no isActive?
b
you may use CoroutineScope.isActive that returns boolean or use some suspending functions like
yield()
or
delay()
that will throw CancellationException
both approaches use
isActive
but
yield()
also allows suspension (or throws exception)
u
but what if there is no scope? I mean no scope in that class, lets say its ApiClient, which just declares suspend functions, and they are used elsewhere, where there is scope
b
with coroutines you always have CoroutineScope. You are executing coroutines in scope. So if you have ApiClient you can execute those in ViewModel scope for example or Activity scope in android if you want to live dangerously
it's kinda like lifecycle for coroutine execution or Disposable for rxjava (oversimplification)
u
I mean this
Copy code
interface Api {

    @GET
    suspend fun foo(@Url url: String): Foo
}

class ApiClient(private val api: Api) {

	suspend fun whatever(url: String): Whatever {
        val foo = api.foo(url)
        val bar = blockingCode1(foo)
        val whatever = blockingCode2(bar)
        return whatever
    }

    fun blockingCode1(foo: Foo) {
    	...
    }

    fun blockingCode2(bar: Bar) {
    	...
    }
}
there no scope in apiclient, am I right?
in
whatever
I dont see this.isActive
b
there is no scope, ApiClient just provides suspending functions that may be called in coroutine launched in some scope, also suspend function can't launch coroutine.
You also can't cancel suspending function but you can cancel job which executes suspending function. If you really need isActive you may consider to create extension function for CoroutineScope. Also
whatever
shouldn't handle cancellation, it's caller responsibility.
u
I think youre midunderstanding me, yes youre correct, callsite will be canceling that function, but, if it does so while blockingCode1 is executing, also blockingCode2 will be executed, which I want to avoid (blockingCode1 finishing execution after cancel is okay in this case)
b
Copy code
fun CoroutineScope.whatever(url: String): Whatever {
        val foo = api.foo(url)
        val bar = blockingCode1(foo)
        if (isActive) throwOrReturnDefault()
        val whatever = blockingCode2(bar)
        return whatever
    }
with that you are 'promising' to execute it in some scope so you can access this scope state.
u
I see, that seems to work, but doesnt it change semantics?
o
(ignoring all context above) `Single.fromCallable`'s direct translation is likely
flow { emit(yourCallable()) }
Flow is kotlin's counterpart to RxJava
u
I dont want to use flow for singles; suspend functions map better to singles
o
if the callable happens to be blocking, and it's not easy to make it cancellable, add
.flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
afterward
if you want to check for cancellation, even without a scope, use
coroutineContext[Job]!!.isActive
u
yes I know, but subsequent blocking code - via fromcallable wont be ran, and here it does
Copy code
Single.fromCallable { calculation1() }
 .flatMap { calculation2() }
o
I'm not sure what you're trying to get at -- upon cancellation, do you want
calculation2()
to run if the code was in
calculation1()
when cancelled?
u
Im refactoring rxjava code to coroutines, and obviously I want parity, and Im having problem as how to port Single.fromCallable { blockingCode }. I read that in coroutines its just supposed to be blocking functions and thats it. However, If you imagine code like this
Copy code
Single.fromCallable { calculation1() }
 .flatMap { calculation2() }
and this gets disposed mid calculation1, then yes it will run to completion because its blocking, however calculation2 wont get executed If I port it like this
Copy code
val job = scope.launch {
    calculation1()
    calculation2()
}
and job.cancel() mid calculation1, then calculation2 WILL get executed after calculation1 completes
o
correct, because cancellation is cooperative. just add an
if (!coroutineContext[Job]!!.isActive) throw CancellationException()
or just plain
yield()
(which is also cancellable) between them, and it will work
u
ok yield seems to work as expected, thanks!
btw aside, if I print thread when running via Dispatchers.IO, and Dispatchers.Default, they both seems to be called
DefaultDispatcher-worker-X
, is that okay? in rx io would be called io worker etc
b
IO and Default shares threads to avoid context switching
u
thx
g
The most idiomatic way imo, which also provides cancellation between operations, just wrap each blocking call to own suspend function with required dispatcher and call them from any suspend function without additional code
u
what do you mean with required dispatcher? withContext() ?
g
yes
u
wont that switch thread unnecessarily?
g
depends on which thread is used on callsite
u
IO
g
Than no thread switch, otherwise yes, it wil be context switch
Again, it's just a matter of code simplictiy and idiomatic way to do that. I think you try to do premature optimization instead of actual migration of your code
!coroutineContext[Job]!!.isActive
No need to do that, there is
coroutineContext.isActive
extension property
and if you use IO or Default it wouldn't be any context switch anyway
Code with two separate suspend functions also opens window for future optimizations, such as make cancellation each of them more granular, or make them asynchronous without change of call site
u
yea makes sense, thanks
Copy code
private suspend fun foo() {
        return withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
            ...
            while(isActive) {

            }
        }
    }
like this?
g
yes something like that
u
Guys, its there doOnDispose equivalent? So I can easily see that the coroutine was canceled?
u
I mean from within suspend function
o
why can't you use that in the suspend function? you merely need to attach it to the job. if you don't want to keep track of cancellation after the suspend function exits, open a new sub-job using
coroutineScope { }
u
sorry I'm a noob, but I dont see a way to get reference to the job inside the suspend fun
o
coroutineContext[Job]
g
Guys, its there doOnDispose equivalent? So I can easily see that the coroutine was canceled?
As it was said before, direct equivalent is invokeOmCompletion, but you probably don't need this (except if you just need some side effect), the easiest and most flexible way is just use try/catch and catch CancellationException, so you can even control invocation flow
u
Yes its just for debugging. @gildor that exception wont be thrown until blocking code returns it seems
g
Yes, if blocking code is not cancellable