Jesse Hill
08/02/2022, 7:09 PMimport kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
// Question: Is it hacky to hang onto the old Job where the first flow is collected
// and then to cancel it to achieve the desired results?
private val globalScope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
val remoteData = listOf(
"abc",
"ab",
"abb",
"anb",
"abcd",
"abcdef",
"ged",
"maybe"
)
class SimpleCache {
private var initial = setOf(
"one",
"two"
)
private val mutableStateFlow = MutableStateFlow(initial)
fun resetCache() = mutableStateFlow.update { initial }
fun search(term: String) =
mutableStateFlow.asStateFlow().map { valuesSet ->
valuesSet.filter { it.contains(term) }
}
suspend fun fetchData(term: String) {
// Taking 3 to simulate an API where there are more matches for the
// filter and a subsequent fetch can increase the results for a previous
// call to `search`
val newItems = remoteData.filter { it.contains(term) }.take(3)
delay(500)
mutableStateFlow.update { it + newItems }
}
var previousSearchJob: Job? = null
}
suspend fun searchWithCancellation(term: String, cache: SimpleCache) {
globalScope.launch {
cache.previousSearchJob?.cancel()
cache.previousSearchJob = launch {
cache.search(term).collect {
println("For: $term")
println("Collected: $it")
}
}
launch {
cache.fetchData(term)
}
}
}
suspend fun searchWithoutCancellation(term: String, cache: SimpleCache) {
globalScope.launch {
launch {
cache.search(term).collect {
println("For: $term")
println("Collected: $it")
}
}
launch {
cache.fetchData(term)
}
}
}
fun main() {
val cancellationCache = SimpleCache()
val baseCache = SimpleCache()
runBlocking {
// This section is fine for the first flow but once there are
// more than one being collected at the same time things break.
println("Without Cancellation")
searchWithoutCancellation(term = "a", baseCache)
delay(1000)
searchWithoutCancellation(term = "abc", baseCache)
delay(3000)
// This section prints the desired results
println()
println("With Cancellation")
searchWithCancellation(term = "a", cancellationCache)
delay(1000)
searchWithCancellation(term = "abc", cancellationCache)
delay(3000)
}
/* Output
Without Cancellation
For: a
Collected: []
For: a
Collected: [abc, ab, abb]
For: abc
Collected: [abc]
For: a <-- This collection is undesired and could come before or after the collection of "abc"
Collected: [abc, ab, abb, abcd, abcdef]
For: abc
Collected: [abc, abcd, abcdef]
With Cancellation
For: a
Collected: []
For: a
Collected: [abc, ab, abb]
For: abc
Collected: [abc]
For: abc
Collected: [abc, abcd, abcdef]
*/
}
George Theocharis
08/02/2022, 8:16 PMNick Allen
08/02/2022, 9:01 PMcancel
only starts the cancellation process.
When you want to switch your Flow
based on some change, look to the *Latest
operators.
val searchTerm = MutableStateFlow("")
...
searchTerm
.transformLatest {
if (!it.isBlank()) {
emitAll(search(it)) //Let's say search returns a Flow
}
}
.collect { println("Collecting: $it") }
...
searchTerm.value = "a" //result will start emitting for search("a")
searchTerm.value = "abc" //result will cancel search("a") and start emitting for search("abc")
Jesse Hill
08/02/2022, 9:20 PM*Latest
operators. Thanks for the example. I figured there’d be some sort of subtle issue with the way I had it. So since cancel
only starts the cancellation process, does cancelAndJoin
wait until the cancellation has been completed before continuing? It looks that way from the docs and because cancelAndJoin
is a suspend function. I think that the *Latest
would still make more sense for what I’m needing.Nick Allen
08/05/2022, 12:26 AMlaunch
. If order matters, then you must use one coroutine with a loop (like a Flow.collect
) or explicitly control it some other way like with a CompletableDeferred
.
Also, there's no guarantees between the launched code and the code that called launch
. job = launch { job.cancel(); ... }
could cancel then assign, or assign then cancel.
And something like previousJob.cancelAndJoin()
is dangerous because you could cancel the previously launched coroutine before it has a chance to cancel the one before that. And cancelAndJoin
will only wait on the preview job, it doesn't wait on the job before that. If a coroutine is cancelled while calling cancelAndJoin
then, it'll throw the cancellation exception immediately and not keep waiting.