There's no `Flow.any {}` ? I assume it's because f...
# coroutines
c
There's no
Flow.any {}
? I assume it's because flow may be infinite? It seems simple to build on top of
Flow.transformWhile
?
r
yeah or
.firstOrNull{ // predicate } != null
- if your flow can't contain nulls
c
But that doesn't shortcut,
List.any
/`Sequence.any` do
Maybe this?
r
firstOrNull does shortcut? it calls collectWhile
but your impl works too, and with nulls
c
Oh I'm dumb
Thanks
d
Could you share your use case? It's still a mystery for us what application of Flow would benefit from this operator.
c
I have a business object that has many sub-objects. I have a business rule that says that something must happen if a condition is true for any of the sub-objects. I don't particularly care about latency for this particular event, so I want to avoid as much stress on the other systems (so they can deal with more latency-sensitive stuff). Therefore, I have a code that looks like:
Copy code
parentObjects // List
  .flatMap { it.subObjectIds }
  .distinct()
  // from here on, I'm going to be dereferencing stuff.
  .toFlow()
  .map { externalSystem.get(it) }
  .any { validatesCondition(it) }
By using Flow, I can avoid dereferencing anything more as soon as short-circuiting is possible. Since the English sentence in the spec is literally “if any sub-objects...”, I'd like to write .any because that's more readable.
If I wasn't suspending, I would write the exact same code with
asSequence
. Since everyone is already used to
Sequence.any
, it's much more readable if the same operation is available on
Flow
too.
d
d
Is this because externalSystem.get(it) is a
suspend
function, any reason not to just wrap that in a
runBlocking
in this context?
c
@Dmitry Khalanskiy [JB] I can't
suspend
in
Sequence.map
d
In
map
, sure, but you can do
Copy code
sequence<Int> {
    things.forAll {
        yield(externalSystem.get(it))
    }
}
Maybe what you want is actually
Sequence.mapSuspend
or something like that?
c
Isn't that extremely unreadable? I don't understand, what's the downside of having
Flow.any
that didn't exist for
Sequence.any
and
Iterable.any
?
d
I don't know if there is a downside. I'm trying to learn more about your use case.
Flow
is inherently about asynchronous computations, and what you're doing here just doesn't look asynchronous.
Sequence
is built for short-circuiting traversal of collections, which looks like a better choice here. Either
Sequence
is a better fit for your use case and we should try to enhance that (so that the code isn't extremely unreadable), or
Flow
really is a good fit for your use case, in which case your request for
any
is perfectly reasonable.
The suggestion by @Daniel Pitts also looks reasonable: would
runBlocking
also work for you?
c
Please clarify:
Flow
is described in many places as "the equivalent of
Sequence
when you need `suspend`". This is exactly the usage I am having.
what you're doing here just doesn't look asynchronous
How? I am writing a
suspend
function that calls other
suspend
functions to get some result. What's not asynchronous here?
would
runBlocking
also work for you?
Everything I have learned in this Slack, in the discussions everywhere is to never use
runBlocking
within a
suspend
function, because it breaks structured concurrency, breaks stacktrace recovery, makes code single-threaded, etc. But you're recommending it now?
Your answer seems to imply that there is fundamental difference between a
suspend
function (apparently not asynchronous) and a
Flow
(apparently asynchronous). This seems contradictory with most of the documentation / community resources.
d
> How? I am writing a
suspend
function that calls other
suspend
functions to get some result. What's not asynchronous here? Sure, let me try to rephrase this: from what you've described, it looks like you want your code to proceed like this: • Take one element. • Call
externalSystem.get
on it. • Wait for
externalSystem.get
to finish. • If it
validatesCondition
, bail. If not, go back to step 1. If this is so, then the operation you're proposing is entirely linear, that is, not asynchronous. > it breaks structured concurrency, breaks stacktrace recovery, makes code single-threaded, etc. But you're recommending it now? If these factors you've mentioned are important in your case, yes, this is a deal-breaker. I'm not recommending `runBlocking`—I'm asking whether it's suitable. If it's not, it's not. For example, if the code in which you call
any
is itself just
runBlocking { a.b.c.map { }.any { } }
, you won't lose anything by putting
runBlocking
inside the
map
instead, but if you are running this in a
suspend
function, that's another story.
c
If this is so, then the operation you're proposing is entirely linearm that is, not asynchronous.
By that definition, all
Flow
operators are synchronous. To my knowledge, all common
Flow
operators process elements one-by-one linearly (expect maybe
flatMapMerge
).
If these factors you've mentioned are important in your case, yes, this is a deal-breaker. I'm not recommending `runBlocking`: I'm asking whether it's suitable. If it's not, it's not. For example, if the code in which you call
any
is itself just
runBlocking { a.b.c.map { }.any { } }
, you won't lose anything by putting
runBlocking
inside the
map
instead, but if you are running this in a
suspend
function, that's another story.
Your comment said that
runBlocking
looks reasonable. Your comment did not say "in the very particular case of this pipeline being directly called in a
runBlocking
itself", which should be something extremely rare since it is discouraged basically everywhere. But now you say that it wouldn't be a good idea if it were in a
suspend
function, which should be almost all
Flow
usage all the time. If that is indeed the case, then I believe the previous comment was particularly harmful advice as most people would take your word for it (that
runBlocking
within a flow pipeline is reasonable).
d
By that definition, all
Flow
operators are synchronous.
Not at all. If you have a non-trivial
collect
, there's asynchronous interaction between the production of elements and their consumption. You can have
flowOn
,
buffer
, and such things that also add their own asynchronous behavior. If your code needs any of this, then
Sequence
is entirely unsuitable and
Flow
is your only choice.
Your comment said that
runBlocking
looks reasonable.
It did not. It said that another comment looked reasonable, and I stand by that. That comment also didn't propose anything harmful, it asked you whether
runBlocking
was suitable.
I believe the previous comment was particularly harmful advice as most people would take your word for it (that
runBlocking
within a flow pipeline is reasonable)
If my comment came off as endorsement of
runBlocking
in
suspend
contexts, I'm sorry. Thank you for highlighting this, and let me reiterate: calling
runBlocking
in a thread that you know is used for asynchronous work is an error.
runBlocking
in a
Flow
pipeline is unreasonable. In a
Sequence
pipeline, it may be reasonable, depending on what your code does.
c
Not at all. If you have a non-trivial
collect
, there's asynchronous interaction between the production of elements and their consumption. You can have
flowOn
,
buffer
, and such things that also add their own asynchronous behavior. If your code needs any of this, then
Sequence
is entirely unsuitable and
Flow
is your only choice.
But you did mention that, in the absence of this, you believe using
Sequence
is more appropriate that using
Flow
. I'm not saying there is anything wrong with that, but I do want to understand why, because I don't. In my opinion, this is perfectly reasonable flow usage:
Copy code
fun loadItems(date: Instant): Flow<Item> =
    repository.loadItemsAt(date)

val activeItems = loadItems(date)
    .filter { it.isActive }
    .toList()
But indeed, this entire thing could be rewritten with a
Sequence
and a bunch of embedded
launch
and
await
s. Would you say this is incorrect flow usage, and
Sequence
should be used instead? If this example is correct
Flow
usage, then I do not understand the fundamental difference with mine, which is more or less identical:
Copy code
loadItems(…)
    .map { …suspending operation… }
    .any { …predicate… }
d
That's a tricky question: I don't know what's going on in
loadItemsAt
, but I'd assume it is a good example of
Flow
usage, for one important reason: this function looks like it would benefit from structured concurrency. If the function where the
activeItems
list is accumulated gets cancelled, we want the stream of incoming values to stop. So, it has to support cancellation.
Likewise, if your pipeline needs to cancel
externalSystem.get
whenever the function calling
any
gets cancelled (if it can get cancelled), then
Sequence
is not a good fit, you need structured concurrency.
c
Likewise, if your pipeline needs to cancel
externalSystem.get
whenever the function calling
any
gets cancelled (if it can get cancelled), then
Sequence
is not a good fit, you need structured concurrency.
Would you say that functions should be written by default without cancelling behavior and structured concurrency, and cancelling behavior be added later if it proves useful?
In this particular example, external code cannot cancel this function, but I only know this because I know the entire system and the only place it is currently used will always run it to completion. I have no guarantees that this will remain true in the future. Should I write this function using
Sequence
etc (thus no possible cancellation), knowing that it won't cancel if one day a caller expects it to?
d
When designing any code, you should decide whether it's providing a synchronous interface or an asynchronous one. If you provide a
Flow
or a
suspend fun
, you're promising the clients of your code that this is well-behaved asynchronous code that appropriately reacts to cancellations and doesn't hog the thread; if you provide a
Sequence
or a
fun
, you don't give such guarantees. There are more requirements on
Flow
and
suspend fun
, but in turn, they are more flexible for their users.
c
This is in a
suspend fun(…) -> Boolean
.
d
Then, by contract, the code must use a
Flow
and not a
Sequence
, it must not use
runBlocking
, it needs to obey cancellation where appropriate and avoid doing heavy uninterruptible computations.
c
Then, it is legitimate of me to need
Flow.any
, no? I don't see how I could validate all these rules without it.
d
It's difficult to convey emotions and intent over text, so I think it's worth pointing out that this is not about your request being "legitimate" or not, it's about us trying to understand your constraints. It's not supposed to be a battle where you convince us of your need, it's a discussion about how best to structure our API to suit you.
kotlinx.coroutines
developers are experts in
kotlinx.coroutines
, but are not experts in how people actually use it and what tasks people solve with it.
You can validate all these rules by making your function non-suspend. If it is
suspend
only to satisfy the compiler and you don't intend for it to share the thread or be interruptible, you can make it a plain
fun
and use
runBlocking
with
suspend
as an implementation detail.
I'm saying this because I've seen a lot of code that doesn't actually need to be
suspend
but is.
For that matter, I've seen a lot of code where people use
Flow
where it's not needed at all and just a
suspend
(or, again, even a non-
suspend
!) function would suffice.
c
It's difficult to convey emotions and intent over text, so I think it's worth pointing out that this is not about your request being "legitimate" or not, it's about us trying to understand your constraints.
I'm sorry that this is how it came across, I was purely attempting to refer to:
Either
Sequence
is a better fit for your use case and we should try to enhance that (so that the code isn't extremely unreadable), or
Flow
really is a good fit for your use case, in which case your request for
any
is perfectly reasonable.
There was no intent on my side to make this a battle. The only thing I am dissatisfied with in this discussion is that multiple proposed solutions would have been actively harmful to this codebase, and the only reason this came to light is because I pushed further to clarify the risks, mostly based on things I have heard before in this Slack. It seems to me that if someone that spends less time reading everything here had asked this question originally, the thread would have been over very quickly and they would have used one of these solutions thinking it was the recommended solution by the KotlinX.Coroutines team. I do, however, really appreciate that you are spending the time to understand the problem and propose alternative solutions. I do, too, really want to find the best solution, whichever it may be. However, so far, I haven't really seen anything that convinces me that
Flow.any
isn't.
I'm saying this because I've seen a lot of code that doesn't actually need to be
suspend
but is.
This thread is already quite long, so this is probably not the right place for it, but I would love to see a blog post or some other thread specifically about this 🙏
The reason all of this is
suspend
is because all the expensive/retrieval operations I mentioned are gRPC calls, and those are
suspend
in this codebase.
d
The only thing I am dissatisfied with in this discussion is that multiple proposed solutions would have been actively harmful to this codebase
I'm a frequent reader of this Slack as well, and I've noticed you participating in many discussions here, so I assumed I could get straight to the point in this discussion and solve disagreements on the go. If you were a newcomer, I'd make sure to highlight the shared common wisdoms of this Slack channel. I don't think it's harmful to get to the point when both sides agree on that, because the kind of newcomers who would read this thread before asking their question are also the kind of newcomers that would carefully read many threads and eventually see the light.
The reason all of this is
suspend
is because all the expensive/retrieval operations I mentioned are gRPC calls
Oh, that's interesting! Do they execute in
<http://Dispatchers.IO|Dispatchers.IO>
?
Copy code
suspend fun ExternalSystem.get(value: Value) = withContext(<http://Dispatcher.IO|Dispatcher.IO>) { ... }
c
> Do they execute in
<http://Dispatchers.IO|Dispatchers.IO>
? I do not know, that would deep within the gRPC library. Does it make a difference here? I could try to find out if it does.
d
If it's an open-source library, it would help if you drop a link. Otherwise, no big deal.
d
I think I'll reply on GitHub today with some thoughts.
c
I don't see any
withContext
calls anywhere between our code and that implementation, so if it's not in the library, I don't believe we have.