How can I do non blocking side effects? Let’s say ...
# coroutines
p
How can I do non blocking side effects? Let’s say I have a search flow and on each emission I want to call a suspending track function but it should not delay the flow itself.
Copy code
val search = MutableStateFlow("")

suspend fun track(query: String) {
    delay(1000)
    println(query)
}

runBlocking {
    search
        .onEach { track(it) }
        .map {
            "The search is $it"
        }
        .collectLatest {
            println("Collected $it")
        }
}
In onEach I could launch a coroutine on a scope and store the job to cancel the previous tracking action but is there a better solution for that?
e
Maybe create a separate MutableSharedFlow<String> for your query and do collectLatest?
Copy code
runBlocking {
   launch { queryTrackFlow.collectLatest { track(it) } }

   search
      .onEach { queryTrackFlow.emit(it) }
      ...
      collectLatest{}
}
p
@Guilherme Almeida 😁
g
I used this Flow approach at some point, but I don't remember the solution I ended up with 😂 For tracking I would not use the collectLatest though, because you would possibly miss events if tracking takes more than expected
🙏 1
e
Copy code
search.transformLatest {
    emit(it)
    track(it)
}
p
You are sooo helpful, thanks ❤️
@Guilherme Almeida Your approach was the
job?.cancel(); job = scope.launch { track() }
and I’m wrapping my head around if this can be simplified
e
although this does have the potential to behave like
conflate
if the collector is slower than the producer, so ymmv
👍 1
but it's definitely the simplest way to use existing building blocks to cancel the previous action
s
How does
transformLatest
work when emitting something and then still doing some more work? These two things seem to be happening one after the other from the same coroutine, but I am very unsure about the order that it decides to do them in. I’d imagine it’d first continue to finish the block provided in
transformLatest
until it reaches a suspending point, which it seems to do on the first example below, but not on the second one 🤔 For this snippet for example, I can see it does:
Copy code
Waiting to be done
gonna  emit
gonna track <--- continued on after emit
Map: it:first <--- since it suspended with `delay` inside track(), it went on with the chain
Collected first
<--- since another item was emitted in the meantime, `track` got cancelled in-between here
gonna  emit
gonna track
Map: it:second <--- again suspending made it continue down the chain
Collected second
track done: second <---  then back to finishing the `transformLatest` block
finish track
second  emit
Map: it:second
Collected second
Done
BUT doing some seemingly unrelated changes, like in this snippet (changing last collect to collectLatest) it seems to always output this:
Copy code
Waiting to be done
gonna  emit
Map: it:first <-- How did it decide to do this first?
Collected first
gonna track <-- And THEN it continued on with starting the `track()` method
gonna  emit
gonna track
Map: it:second
Collected second
track done: second
finish track
second  emit
Map: it:second
Collected second
Done
This is quite non-intuitive for me. Maybe I’ve messing some setup up though, if not I’d love if someone could explain this.