fxCancellable.kt
# arrow
r
fxCancellable.kt
👍 3
p
in here, cancel would cancel all current and future runs of the program, right?
shouldn’t we have something that works per run?
t
Thank you! I think so, too. I wrote the code that works with CoroutineScope, but since this is not an actual child of Coroutine, there is a possibility of trouble.
Copy code
package test

import arrow.core.Either
import arrow.effects.ForIO
import arrow.effects.OnCancel
import arrow.effects.data.internal.BindingCancellationException
import arrow.effects.extensions.io.async.defer
import arrow.effects.extensions.io.fx.fxCancellable
import arrow.effects.fix
import arrow.effects.typeclasses.ConcurrentCancellableContinuation
import arrow.unsafe
import kotlinx.coroutines.*
import kotlin.coroutines.ContinuationInterceptor
import kotlin.coroutines.CoroutineContext

fun main(args: Array<String>) {
    val job = SupervisorJob()
    val viewModel = ViewModel(job)
    viewModel.loadSessions()
    Thread.sleep(200L)
    viewModel.loadSessions()
    Thread.sleep(200L)
    job.cancel()
    Thread.sleep(200L)

    /**
     * output:
     * Key note
     * How to use Arrow Fx
     */
}

class ViewModel(private val job: Job) :
        CoroutineScope by (CoroutineScope(job + Dispatchers.Default)) {
    private val repository = Repository()

    fun loadSessions() {
        fxCancellable({
            !effect {
                repository.fetchSessions()
            }
        }, { either: Either<Throwable, List<String>> ->
            either.fold({ throwable: Throwable ->
                if (throwable !is BindingCancellationException) {
                    throwable.printStackTrace()
                }
            }, { list: List<String> ->
                list.forEach(::println)
            })
        })
    }
}

class Repository {
    suspend fun fetchSessions(): List<String> {
        delay(300L)
        return listOf("Key note", "How to use Arrow Fx")
    }
}

fun <A> CoroutineScope.fxCancellable(
        arg0: suspend ConcurrentCancellableContinuation<ForIO, *>.() -> A,
        onResult: (Either<Throwable, A>) -> Unit
) {
    val (program, cancel) = fxCancellable(arg0)
    val job: Job? = coroutineContext[Job]
    job?.invokeOnCompletion {
        cancel()
    }
    unsafe {
        val coroutineDispatcher = coroutineContext[ContinuationInterceptor] ?: Dispatchers.Default as CoroutineContext
        defer(coroutineDispatcher) { program }
                .fix()
                .unsafeRunAsyncCancellable(OnCancel.Silent, onResult)
    }
}
r
If you want cancelling a la carte we have Fibers for that.
in whatever case you can create multiple
fxCancelable
blocks if you don;t want to use fibers https://arrow-kt.io/docs/effects/fx/async/#cancellation
each one of those will return an IO op which you can pass to parMapN https://arrow-kt.io/docs/effects/fx/async/#parmapn having retained in values the cancel tokens before since they are stored in a tuple when returned from fxCancellable.
I think they fit different use cases but you can always get a hold of the cancel tokens
p
@takahirom CoroutineScope is exclusive of kotlinx.coroutines, I don’t believe fx coroutines get cancelled with it, but I’ve never tested
t
Thanks! I'll check it 👍