Greetings friends. For some reason I'm drawing a b...
# coroutines
f
Greetings friends. For some reason I'm drawing a blank on this thing; I feel like I've solved this problem many times before. Suppose I have a flow, mid-stream I want to fire-and-forget some side-effect task without blocking the entire flow. Something like:
Copy code
// collected by someone else
return someFlow
  .onEach {
     launch {
        // do the task
     }
  }
The only thing is, I don't have the current CoroutineScope. I could make a child scope inside onEach with
coroutineScope {}
but my understanding is that'll block until the coroutine I'm launching completes. I could do something like
CoroutineScope(coroutineContext).launch
I suppose? But I'm not sure if that's the best practice
k
There's not really such a thing as "fire and forget" in structured concurrency. If you don't have a reference to some other CoroutineScope you can launch work into you can't launch the task. If you intentionally want to break structured concurrency (you shouldn't) to fire these tasks, use GlobalScope.
f
Yeah agreed, I don't want to break structured concurrency. At least in the sense that, if the CoroutineScope that is collecting the Flow gets cancelled, I'd like this Job I'm launching to get cancelled too. But on the other hand, I don't want that Job to hold up the Flow at all. In other words, I would like the element being handled by onEach to immediately move on to the next Flow operator without waiting for this Job to complete. Is it possible to have it both ways?
k
Not without having reference to the scope responsible for collecting the flow. Without that constraint, the easiest way would be to:
Copy code
suspend fun collectTheFlow() = coroutineScope {
  flow.onEach {
    launch { ... }
  }.collect {
    ...
  }
}
My recommendation would be to trigger the launch side effects where the flow is collected if possible.
f
thanks very much!
k
Regarding your original point:
I could do something like
CoroutineScope(coroutineContext).launch
This might work okay? But it'd be very weird code and something I'd definitely flag in code review.
f
yeah that felt fishy
e
creating an unowned
CoroutineScope
is almost equivalent to
GlobalScope
you could define an extension
Copy code
fun <T> Flow<T>.onEachAsync(
    action: suspend (T) -> Unit
): Flow<T> = flow {
    coroutineScope {
        collect {
            launch { action(it) }
            emit(it)
        }
    }
}
which would have a scope per flow collector
👍 1
d
If you have a "fire-and-forget" use-case, it MIGHT make sense to have it be its own scope.
FireAndForgetScope
. Depends on the use-case.
If the task should not delay the collecting job, then it should be in a different scope.
f
That makes sense. Come to think of it, I don't necessarily want the task to cancel if the Flow is canceled in this particular case. So that may be another reason to make a new scope for it. I'll give some thought to what the lifetime of the scope should be
d
Yeah, also consider
supervisorScope
to keep the failures isolated.
💯 1