I always have trouble seeing a "bug" in my code si...
# coroutines
c
I always have trouble seeing a "bug" in my code similar to this. It just doesn't make sense to me why if I'm collecting a flow that lasts forever, that my next coroutine will never get called.
Copy code
viewModelScope.launch {
  repository.getBooks().collect { list ->
    state.books.clear()
    state.books.addAll(list)
  }

  repository.doSomething() <--- Will never get called
}
I almost want to write a lint check or something to prevent myself from shooting myself in the foot. Anyone have any suggestions on how to change my mindset on this?
j
collect
is a function call that processes all elements of the flow. It's like a
forEach
on a collection. If the collection (flow) is infinite, then
forEach
(
collect
) never ends
c
Yeah. But to a reader, how do you know if it's infinite. I've contemplated naming my methods
getBooksInfinite
because of this.
j
Even if it's not infinite, a
collect
call will process all elements in the flow (unless the current coroutine is cancelled before that of course), so it's still important to note regardless of the size of the flow. That said, the naming indeed could be better here because you don't know whether it gets the current collection of books or if it subscribes to books indefinitely, getting new ones as they come.
There is no clear convention to my knowledge for such naming. I guess I would name
getBooks()
a suspend function that returns a list of books, but rather
booksFlow()
or
booksSubscription()
a non-suspending function that returns a (likely infinite) flow of such items
c
Wait. My books subscription doesn't need to be a suspend call? i.e.
Copy code
suspend fun getBooks(): Flow<List<Book>>
can just be
Copy code
fun getBooks(): Flow<List<Book>>
hm. seems to compile just fine. i wonder why i didn't know that and wonder why there wasn't any error.
j
Usually functions that return flows do nothing by themselves, so they rarely have a reason to be
suspend
. The body of the
flow { .. }
builders is only run when collected, that's why
collect
itself has to be suspend, but not the flow builder call.
wonder why there wasn't any error
It's not an error to mark a function as
suspend
needlessly, but the compiler should have given you a warning that the
suspend
keyword was unnecessary (unless maybe it's via an interface, in which case it cannot know that some implementations wouldn't want to be suspend)
💯 1
c
thanks. i definitely learned some things. i still feel like maybe naming is unforunately my issue. but yeah. it just feels like "collect" is also maybe the misleading piece. because it could be collecting one item, or it could be collecting a bunch of items. If I want the eollection to be running in parralel with doSomething() my only course of action is this?
Copy code
viewModelScope.launch {
  repository.getBooks().collect { list ->
    state.books.clear()
    state.books.addAll(list)
  }
}

viewModelScope.launch {
  repository.doSomething()
}
👌 1
n
Yes, so every launch is in parallel and if stuff inside the first launch fails, the other launch isn't affected
j
@Colton Idle yes, you should indeed launch both, or at least run
doSomething()
outside of the first launch (if you're already in a coroutine). If you want to express concurrency between these 2 things, you have to put them in separate coroutines, regardless of the flow's size/finiteness. Statements that are sequential within the same coroutine (the same
launch
) are executed sequentially.
e
like this: (
.onEach.launchIn
doesn't require as much nesting as
launch { .collect }
)
Copy code
repository.getBooks().onEach { list ->
    state.books = list
}.launchIn(viewModelScope)
viewModelScope.launch {
    repository.doSomething()
}
p
If you are sure the flow is endless you could write an extension called sth like collectInfiniteFlow that returns nothing.
While nothing means kotlin.Nothing
m
You can use oneach+launchIn everytime and you solve the problem. Some will say you should always use collect, but the argument for that is bad tbh, so it is fine
t
Personally I use:
Copy code
inline fun <T> Flow<T>.eachInScope(scope: CoroutineScope, crossinline block: suspend (T) -> Unit): Job = onEach {
    block(it)
}.launchIn(scope)
This is explicit and properly return a Job that you can keep if you need cancellation without cancelling the scope.
s
Wrap them in launch {} if you want them to be done in parallel. Be explicit about what you’re doing. You want to in parallel, launch a job which is going to be collecting something, so make the code show that you’re doing that. I like to avoid launchIn for that reason among others. I would write this snippet as:
Copy code
viewModelScope.launch {
  launch {
    repository.getBooks().collect { list ->
      state.books.clear()
      state.books.addAll(list)
    }
  }
  repository.doSomething()
}
Just my personal opinion on this 😅
c
Ahhh. so many good opinions and discussions here. thank you all!
j
Great discussion. If I may add my 2 cents: I don't think naming a flow to indicate whether it is finite or not is a great design choice. For one, there's no guarantee that the name and implementation (finite or infinite) will stay in sync in the future. Implementations change all the time. And the compiler can't help with automating enforcement of the connection between name and implementation. Furthermore, the finiteness of a flow is an implementation detail that clients are better off never assuming they know anything about. All clients of a flow should treat a flow as if it is infinite. If it's not so now, fine, but it could become infinite in the future. And if a client is safe for infinite flows now, then it will still be safe if the flow ever changes from finite to infinite, or even back again.
👍 3
c
@julian what's your typical approach for this? I agree with what you said, but always find myself making the same mistake. lol
j
@Colton Idle My approach is pretty much some variation of the excellent suggestions in this thread, which boils down to having the goal of making flow collection the last thing that executes in a coroutine. Because you have no idea how long that coroutine will be suspended. Another way to think of it is, code that must run should not follow suspend function calls, because, again there's no way to know when the suspend call will return. Instead pack the suspend calls into their own coroutine, and have the must-run code run concurrently with the coroutine. This example is adapted from @Stylianos Gakis’s snippet:
Copy code
launch {
    launch {
        flow.collect { }
    }
    someSuspendFn()
}
Note that collection is the last thing to execute in the inner coroutine. But the inner coroutine does not need to be the last to execute in the outer coroutine, since
launch
is concurrent relative to code following it, not sequential. It's okay to catch yourself making the same mistake over and over. That's how learning occurs naturally. Unlearning a habit isn't easy. You fail, fail, fail, but each time remind yourself of your goal to not fail. Over time, you fail less frequently, and then not at all.
❤️ 1
😍 1
🎉 1
s
Julian covered super well the reasons I always go with this approach as well, thanks for explaining it so well Julian. Can't go wrong with that approach. With the mild bonus of not introducing any new functions that your colleagues may be unfamiliar with.
💯 1
🙏 2
p
But it’s still kind of error prone and requires human attention. An approach like this simply would not compile.
s
Sometimes I may actually want to do something after collecting a flow completes. Even if it’s long running, there may be some logic that needs to be done after it’s finished. Then I’d need to fall back to the “normal” approach. Imo “forever” part should be an implementation detail, it may not actually be an infinite flow, and I don’t think we need to have to know that on the call site, but take care of both infinite and finite cases. But in any way, yeah maybe someone would like to use this convenience function. I’d probably not though. I might change my mind sometime who knows 😅
p
I won't as well, that was to solve the original issue
Copy code
suspend fun <T> Flow<T>.infiniteCollect(collector: FlowCollector<T>): Nothing {
  collect(collector)
  awaitCancellation()
}
This one will do it even better
💯 1
e
I would lean towards no extension. requiring that a flow never terminates for correct operation is as unsafe as requiring that a flow always terminates for correct operation.
👍 1
p
It doesn't require it, it's just a helper for writing correct code but does not change the behavior outcome. If it is finite it won't change anything
e
awaitCancellation()
does change behavior - the containing scope will never end unless cancelled
p
Ah that is something I did not have in mind, thanks!
👍 1
u
think of collect as a suspending for loop; then it starts to make sense