Is there a best practice for performing for handli...
# coroutines
j
Is there a best practice for performing for handling a situation where you want to cancel/invalidate a flow if some input changes? The line of thought goes: 1. Get a flow representing the values for the term “a”. 2. Trigger a network call that results in updating the search with new results for “a”. 3. The collected flow now receives the updated data for the search “a”. 4. Start a new search for the term “abc”. 5. The previously collected flow is canceled and only the new flow with results for “abc” is collected. It works to hang onto the old Job where the initial flow is collected and then cancel that before collecting the new flow but that feels hacky. Playground Link (Code in the thread)
Copy code
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

// Question: Is it hacky to hang onto the old Job where the first flow is collected
// and then to cancel it to achieve the desired results?

private val globalScope = CoroutineScope(SupervisorJob() + Dispatchers.Default)

val remoteData = listOf(
    "abc",
    "ab",
    "abb",
    "anb",
    "abcd",
    "abcdef",
    "ged",
    "maybe"
)

class SimpleCache {
    private var initial = setOf(
        "one",
        "two"
    )
    private val mutableStateFlow = MutableStateFlow(initial)

    fun resetCache() = mutableStateFlow.update { initial }

    fun search(term: String) =
        mutableStateFlow.asStateFlow().map { valuesSet ->
            valuesSet.filter { it.contains(term) }
        }

    suspend fun fetchData(term: String) {
        // Taking 3 to simulate an API where there are more matches for the
        // filter and a subsequent fetch can increase the results for a previous
        // call to `search`
        val newItems = remoteData.filter { it.contains(term) }.take(3)

        delay(500)

        mutableStateFlow.update { it + newItems }
    }

    var previousSearchJob: Job? = null
}

suspend fun searchWithCancellation(term: String, cache: SimpleCache) {
    globalScope.launch {
        cache.previousSearchJob?.cancel()

        cache.previousSearchJob = launch {
            cache.search(term).collect {
                println("For: $term")
                println("Collected: $it")
            }
        }
        launch {
            cache.fetchData(term)
        }
    }
}

suspend fun searchWithoutCancellation(term: String, cache: SimpleCache) {
    globalScope.launch {
        launch {
            cache.search(term).collect {
                println("For: $term")
                println("Collected: $it")
            }
        }
        launch {
            cache.fetchData(term)
        }
    }
}

fun main() {
    val cancellationCache = SimpleCache()
    val baseCache = SimpleCache()

    runBlocking {
        // This section is fine for the first flow but once there are
        // more than one being collected at the same time things break.
        println("Without Cancellation")
        searchWithoutCancellation(term = "a", baseCache)

        delay(1000)

        searchWithoutCancellation(term = "abc", baseCache)

        delay(3000)

        // This section prints the desired results
        println()
        println("With Cancellation")

        searchWithCancellation(term = "a", cancellationCache)

        delay(1000)

        searchWithCancellation(term = "abc", cancellationCache)

        delay(3000)
    }
    /* Output
        Without Cancellation
        For: a
        Collected: []
        For: a
        Collected: [abc, ab, abb]
        For: abc
        Collected: [abc]
        For: a <-- This collection is undesired and could come before or after the collection of "abc"
        Collected: [abc, ab, abb, abcd, abcdef]
        For: abc
        Collected: [abc, abcd, abcdef]

        With Cancellation
        For: a
        Collected: []
        For: a
        Collected: [abc, ab, abb]
        For: abc
        Collected: [abc]
        For: abc
        Collected: [abc, abcd, abcdef]
     */
}
g
No its not hacky to keep the job and cancel it and it's the correct approach. If you don't like it you can use ‘flatMapLatest’ instead as it automatically cancels the previous emissions.
n
Doing this yourself can lead to subtle errors. For example, you could collect an old result after you've collected a new result with your current code since
cancel
only starts the cancellation process. When you want to switch your
Flow
based on some change, look to the
*Latest
operators.
Copy code
val searchTerm = MutableStateFlow("")
...
searchTerm
    .transformLatest {
        if (!it.isBlank()) {
            emitAll(search(it)) //Let's say search returns a Flow
        }
    }
    .collect { println("Collecting: $it") }
...
searchTerm.value = "a" //result will start emitting for search("a")

searchTerm.value = "abc" //result will cancel search("a") and start emitting for search("abc")
j
Very interesting! That makes sense to look for the
*Latest
operators. Thanks for the example. I figured there’d be some sort of subtle issue with the way I had it. So since
cancel
only starts the cancellation process, does
cancelAndJoin
wait until the cancellation has been completed before continuing? It looks that way from the docs and because
cancelAndJoin
is a suspend function. I think that the
*Latest
would still make more sense for what I’m needing.
n
It's more complicated than that. Coroutines do not run in the order you call
launch
. If order matters, then you must use one coroutine with a loop (like a
Flow.collect
) or explicitly control it some other way like with a
CompletableDeferred
. Also, there's no guarantees between the launched code and the code that called
launch
.
job = launch { job.cancel(); ... }
could cancel then assign, or assign then cancel. And something like
previousJob.cancelAndJoin()
is dangerous because you could cancel the previously launched coroutine before it has a chance to cancel the one before that. And
cancelAndJoin
will only wait on the preview job, it doesn't wait on the job before that. If a coroutine is cancelled while calling
cancelAndJoin
then, it'll throw the cancellation exception immediately and not keep waiting.