https://kotlinlang.org logo
Title
u

ursus

11/09/2019, 10:58 PM
Hi, how would one migrate Single.fromCallable { } into coroutines? Should I just leave it as regular blocking function?
b

bholota

11/09/2019, 11:19 PM
It depends. It may be plain kotlin function and you can later launch it as coroutine, in background. It may be suspending function if you need to use other suspending functions there.
u

ursus

11/09/2019, 11:22 PM
In this case its long blocking code, i.e. the function was synchronous, wrapped with Single.fromCallable
I mean it works, but, I suspend this is uncancelable right? in order for it to be cancelable I need to wrap the blocking code in suspendCancellableCoroutine ?
b

bholota

11/09/2019, 11:28 PM
coroutines support cooperative cancellation, so your code has to do explicit check for cancel. If function doesn't support cancellation, even if job.cancel() is called it will wait for function to finish.
u

ursus

11/09/2019, 11:31 PM
Yes I understand that I need to break the blocking code manually, but I meant step before that, i.e. this
Single.fromCallable { calculation1() }
 .flatMap { calculation2() }
if this chain gets disposed, yes calculation1 will continue till completion, however calculation2 wont get triggered
which is not the case if I do
launch {
   calculation1()
   calculation2()
}
given calculation1,2 are just regular kotlin blocking functions
b

bholota

11/09/2019, 11:34 PM
so you have to check if flow was cancelled right between your method calls
u

ursus

11/09/2019, 11:35 PM
wouldnt wrapping it in suspendCancellableCoroutine be more idiomatic?
b

bholota

11/09/2019, 11:46 PM
Imho no, good old try-catch is easier to read. With suspendCancelableCoroutine things are getting more callback based
u

ursus

11/10/2019, 12:00 AM
btw I do see isActive in launch block, however this is in a suspend function, where there is no isActive?
b

bholota

11/10/2019, 12:03 AM
you may use CoroutineScope.isActive that returns boolean or use some suspending functions like
yield()
or
delay()
that will throw CancellationException
both approaches use
isActive
but
yield()
also allows suspension (or throws exception)
u

ursus

11/10/2019, 12:09 AM
but what if there is no scope? I mean no scope in that class, lets say its ApiClient, which just declares suspend functions, and they are used elsewhere, where there is scope
b

bholota

11/10/2019, 12:14 AM
with coroutines you always have CoroutineScope. You are executing coroutines in scope. So if you have ApiClient you can execute those in ViewModel scope for example or Activity scope in android if you want to live dangerously
it's kinda like lifecycle for coroutine execution or Disposable for rxjava (oversimplification)
u

ursus

11/10/2019, 12:25 AM
I mean this
interface Api {

    @GET
    suspend fun foo(@Url url: String): Foo
}

class ApiClient(private val api: Api) {

	suspend fun whatever(url: String): Whatever {
        val foo = api.foo(url)
        val bar = blockingCode1(foo)
        val whatever = blockingCode2(bar)
        return whatever
    }

    fun blockingCode1(foo: Foo) {
    	...
    }

    fun blockingCode2(bar: Bar) {
    	...
    }
}
there no scope in apiclient, am I right?
in
whatever
I dont see this.isActive
b

bholota

11/10/2019, 12:32 AM
there is no scope, ApiClient just provides suspending functions that may be called in coroutine launched in some scope, also suspend function can't launch coroutine.
You also can't cancel suspending function but you can cancel job which executes suspending function. If you really need isActive you may consider to create extension function for CoroutineScope. Also
whatever
shouldn't handle cancellation, it's caller responsibility.
u

ursus

11/10/2019, 12:38 AM
I think youre midunderstanding me, yes youre correct, callsite will be canceling that function, but, if it does so while blockingCode1 is executing, also blockingCode2 will be executed, which I want to avoid (blockingCode1 finishing execution after cancel is okay in this case)
b

bholota

11/10/2019, 12:46 AM
fun CoroutineScope.whatever(url: String): Whatever {
        val foo = api.foo(url)
        val bar = blockingCode1(foo)
        if (isActive) throwOrReturnDefault()
        val whatever = blockingCode2(bar)
        return whatever
    }
with that you are 'promising' to execute it in some scope so you can access this scope state.
u

ursus

11/10/2019, 12:56 AM
I see, that seems to work, but doesnt it change semantics?
o

octylFractal

11/10/2019, 12:57 AM
(ignoring all context above) `Single.fromCallable`'s direct translation is likely
flow { emit(yourCallable()) }
Flow is kotlin's counterpart to RxJava
u

ursus

11/10/2019, 12:58 AM
I dont want to use flow for singles; suspend functions map better to singles
o

octylFractal

11/10/2019, 12:58 AM
if the callable happens to be blocking, and it's not easy to make it cancellable, add
.flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
afterward
if you want to check for cancellation, even without a scope, use
coroutineContext[Job]!!.isActive
u

ursus

11/10/2019, 1:01 AM
yes I know, but subsequent blocking code - via fromcallable wont be ran, and here it does
Single.fromCallable { calculation1() }
 .flatMap { calculation2() }
o

octylFractal

11/10/2019, 1:03 AM
I'm not sure what you're trying to get at -- upon cancellation, do you want
calculation2()
to run if the code was in
calculation1()
when cancelled?
u

ursus

11/10/2019, 1:07 AM
Im refactoring rxjava code to coroutines, and obviously I want parity, and Im having problem as how to port Single.fromCallable { blockingCode }. I read that in coroutines its just supposed to be blocking functions and thats it. However, If you imagine code like this
Single.fromCallable { calculation1() }
 .flatMap { calculation2() }
and this gets disposed mid calculation1, then yes it will run to completion because its blocking, however calculation2 wont get executed If I port it like this
val job = scope.launch {
    calculation1()
    calculation2()
}
and job.cancel() mid calculation1, then calculation2 WILL get executed after calculation1 completes
o

octylFractal

11/10/2019, 1:09 AM
correct, because cancellation is cooperative. just add an
if (!coroutineContext[Job]!!.isActive) throw CancellationException()
or just plain
yield()
(which is also cancellable) between them, and it will work
u

ursus

11/10/2019, 1:19 AM
ok yield seems to work as expected, thanks!
btw aside, if I print thread when running via Dispatchers.IO, and Dispatchers.Default, they both seems to be called
DefaultDispatcher-worker-X
, is that okay? in rx io would be called io worker etc
b

bholota

11/10/2019, 1:21 AM
IO and Default shares threads to avoid context switching
u

ursus

11/10/2019, 1:46 AM
thx
g

gildor

11/10/2019, 3:13 AM
The most idiomatic way imo, which also provides cancellation between operations, just wrap each blocking call to own suspend function with required dispatcher and call them from any suspend function without additional code
u

ursus

11/10/2019, 3:16 AM
what do you mean with required dispatcher? withContext() ?
g

gildor

11/10/2019, 3:16 AM
yes
u

ursus

11/10/2019, 3:17 AM
wont that switch thread unnecessarily?
g

gildor

11/10/2019, 3:17 AM
depends on which thread is used on callsite
u

ursus

11/10/2019, 3:18 AM
IO
g

gildor

11/10/2019, 3:18 AM
Than no thread switch, otherwise yes, it wil be context switch
Again, it's just a matter of code simplictiy and idiomatic way to do that. I think you try to do premature optimization instead of actual migration of your code
!coroutineContext[Job]!!.isActive
No need to do that, there is
coroutineContext.isActive
extension property
and if you use IO or Default it wouldn't be any context switch anyway
Code with two separate suspend functions also opens window for future optimizations, such as make cancellation each of them more granular, or make them asynchronous without change of call site
u

ursus

11/10/2019, 3:23 AM
yea makes sense, thanks
private suspend fun foo() {
        return withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
            ...
            while(isActive) {

            }
        }
    }
like this?
g

gildor

11/10/2019, 3:26 AM
yes something like that
u

ursus

11/10/2019, 4:30 AM
Guys, its there doOnDispose equivalent? So I can easily see that the coroutine was canceled?
u

ursus

11/10/2019, 5:02 AM
I mean from within suspend function
o

octylFractal

11/10/2019, 5:03 AM
why can't you use that in the suspend function? you merely need to attach it to the job. if you don't want to keep track of cancellation after the suspend function exits, open a new sub-job using
coroutineScope { }
u

ursus

11/10/2019, 5:27 AM
sorry I'm a noob, but I dont see a way to get reference to the job inside the suspend fun
o

octylFractal

11/10/2019, 5:30 AM
coroutineContext[Job]
g

gildor

11/10/2019, 10:11 AM
Guys, its there doOnDispose equivalent? So I can easily see that the coroutine was canceled?
As it was said before, direct equivalent is invokeOmCompletion, but you probably don't need this (except if you just need some side effect), the easiest and most flexible way is just use try/catch and catch CancellationException, so you can even control invocation flow
u

ursus

11/10/2019, 3:40 PM
Yes its just for debugging. @gildor that exception wont be thrown until blocking code returns it seems
g

gildor

11/10/2019, 3:50 PM
Yes, if blocking code is not cancellable