CLOVIS
08/12/2024, 4:27 PMFlow.any {}
? I assume it's because flow may be infinite? It seems simple to build on top of Flow.transformWhile
?ross_a
08/12/2024, 4:38 PM.firstOrNull{ // predicate } != null
- if your flow can't contain nullsCLOVIS
08/12/2024, 4:39 PMList.any
/`Sequence.any` doCLOVIS
08/12/2024, 4:40 PMross_a
08/12/2024, 4:41 PMross_a
08/12/2024, 4:42 PMCLOVIS
08/12/2024, 4:42 PMCLOVIS
08/12/2024, 4:42 PMCLOVIS
08/12/2024, 4:45 PMDmitry Khalanskiy [JB]
08/12/2024, 5:21 PMCLOVIS
08/12/2024, 5:28 PMparentObjects // List
.flatMap { it.subObjectIds }
.distinct()
// from here on, I'm going to be dereferencing stuff.
.toFlow()
.map { externalSystem.get(it) }
.any { validatesCondition(it) }
By using Flow, I can avoid dereferencing anything more as soon as short-circuiting is possible. Since the English sentence in the spec is literally “if any sub-objects...”, I'd like to write .any because that's more readable.CLOVIS
08/12/2024, 5:29 PMCLOVIS
08/12/2024, 5:30 PMasSequence
. Since everyone is already used to Sequence.any
, it's much more readable if the same operation is available on Flow
too.Dmitry Khalanskiy [JB]
08/12/2024, 7:02 PMDaniel Pitts
08/12/2024, 8:07 PMsuspend
function, any reason not to just wrap that in a runBlocking
in this context?CLOVIS
08/13/2024, 7:27 AMsuspend
in Sequence.map
Dmitry Khalanskiy [JB]
08/13/2024, 8:15 AMmap
, sure, but you can do
sequence<Int> {
things.forAll {
yield(externalSystem.get(it))
}
}
Maybe what you want is actually Sequence.mapSuspend
or something like that?CLOVIS
08/13/2024, 8:16 AMFlow.any
that didn't exist for Sequence.any
and Iterable.any
?Dmitry Khalanskiy [JB]
08/13/2024, 8:24 AMFlow
is inherently about asynchronous computations, and what you're doing here just doesn't look asynchronous. Sequence
is built for short-circuiting traversal of collections, which looks like a better choice here.
Either Sequence
is a better fit for your use case and we should try to enhance that (so that the code isn't extremely unreadable), or Flow
really is a good fit for your use case, in which case your request for any
is perfectly reasonable.Dmitry Khalanskiy [JB]
08/13/2024, 8:24 AMrunBlocking
also work for you?CLOVIS
08/13/2024, 8:28 AMFlow
is described in many places as "the equivalent of Sequence
when you need `suspend`". This is exactly the usage I am having.
what you're doing here just doesn't look asynchronousHow? I am writing a
suspend
function that calls other suspend
functions to get some result. What's not asynchronous here?
wouldEverything I have learned in this Slack, in the discussions everywhere is to never usealso work for you?runBlocking
runBlocking
within a suspend
function, because it breaks structured concurrency, breaks stacktrace recovery, makes code single-threaded, etc. But you're recommending it now?CLOVIS
08/13/2024, 8:31 AMsuspend
function (apparently not asynchronous) and a Flow
(apparently asynchronous). This seems contradictory with most of the documentation / community resources.Dmitry Khalanskiy [JB]
08/13/2024, 8:36 AMsuspend
function that calls other suspend
functions to get some result. What's not asynchronous here?
Sure, let me try to rephrase this: from what you've described, it looks like you want your code to proceed like this:
• Take one element.
• Call externalSystem.get
on it.
• Wait for externalSystem.get
to finish.
• If it validatesCondition
, bail. If not, go back to step 1.
If this is so, then the operation you're proposing is entirely linear, that is, not asynchronous.
> it breaks structured concurrency, breaks stacktrace recovery, makes code single-threaded, etc. But you're recommending it now?
If these factors you've mentioned are important in your case, yes, this is a deal-breaker. I'm not recommending `runBlocking`—I'm asking whether it's suitable. If it's not, it's not. For example, if the code in which you call any
is itself just runBlocking { a.b.c.map { }.any { } }
, you won't lose anything by putting runBlocking
inside the map
instead, but if you are running this in a suspend
function, that's another story.CLOVIS
08/13/2024, 8:44 AMIf this is so, then the operation you're proposing is entirely linearm that is, not asynchronous.By that definition, all
Flow
operators are synchronous. To my knowledge, all common Flow
operators process elements one-by-one linearly (expect maybe flatMapMerge
).
If these factors you've mentioned are important in your case, yes, this is a deal-breaker. I'm not recommending `runBlocking`: I'm asking whether it's suitable. If it's not, it's not. For example, if the code in which you callYour comment said thatis itself justany
, you won't lose anything by puttingrunBlocking { a.b.c.map { }.any { } }
inside therunBlocking
instead, but if you are running this in amap
function, that's another story.suspend
runBlocking
looks reasonable. Your comment did not say "in the very particular case of this pipeline being directly called in a runBlocking
itself", which should be something extremely rare since it is discouraged basically everywhere. But now you say that it wouldn't be a good idea if it were in a suspend
function, which should be almost all Flow
usage all the time. If that is indeed the case, then I believe the previous comment was particularly harmful advice as most people would take your word for it (that runBlocking
within a flow pipeline is reasonable).Dmitry Khalanskiy [JB]
08/13/2024, 8:51 AMBy that definition, allNot at all. If you have a non-trivialoperators are synchronous.Flow
collect
, there's asynchronous interaction between the production of elements and their consumption. You can have flowOn
, buffer
, and such things that also add their own asynchronous behavior. If your code needs any of this, then Sequence
is entirely unsuitable and Flow
is your only choice.
Your comment said thatIt did not. It said that another comment looked reasonable, and I stand by that. That comment also didn't propose anything harmful, it asked you whetherlooks reasonable.runBlocking
runBlocking
was suitable.
I believe the previous comment was particularly harmful advice as most people would take your word for it (thatIf my comment came off as endorsement ofwithin a flow pipeline is reasonable)runBlocking
runBlocking
in suspend
contexts, I'm sorry. Thank you for highlighting this, and let me reiterate: calling runBlocking
in a thread that you know is used for asynchronous work is an error.
runBlocking
in a Flow
pipeline is unreasonable. In a Sequence
pipeline, it may be reasonable, depending on what your code does.CLOVIS
08/13/2024, 8:58 AMNot at all. If you have a non-trivialBut you did mention that, in the absence of this, you believe using, there's asynchronous interaction between the production of elements and their consumption. You can havecollect
,flowOn
, and such things that also add their own asynchronous behavior. If your code needs any of this, thenbuffer
is entirely unsuitable andSequence
is your only choice.Flow
Sequence
is more appropriate that using Flow
. I'm not saying there is anything wrong with that, but I do want to understand why, because I don't.
In my opinion, this is perfectly reasonable flow usage:
fun loadItems(date: Instant): Flow<Item> =
repository.loadItemsAt(date)
val activeItems = loadItems(date)
.filter { it.isActive }
.toList()
But indeed, this entire thing could be rewritten with a Sequence
and a bunch of embedded launch
and await
s. Would you say this is incorrect flow usage, and Sequence
should be used instead?
If this example is correct Flow
usage, then I do not understand the fundamental difference with mine, which is more or less identical:
loadItems(…)
.map { …suspending operation… }
.any { …predicate… }
Dmitry Khalanskiy [JB]
08/13/2024, 9:08 AMloadItemsAt
, but I'd assume it is a good example of Flow
usage, for one important reason: this function looks like it would benefit from structured concurrency. If the function where the activeItems
list is accumulated gets cancelled, we want the stream of incoming values to stop. So, it has to support cancellation.Dmitry Khalanskiy [JB]
08/13/2024, 9:10 AMexternalSystem.get
whenever the function calling any
gets cancelled (if it can get cancelled), then Sequence
is not a good fit, you need structured concurrency.CLOVIS
08/13/2024, 9:12 AMLikewise, if your pipeline needs to cancelWould you say that functions should be written by default without cancelling behavior and structured concurrency, and cancelling behavior be added later if it proves useful?whenever the function callingexternalSystem.get
gets cancelled (if it can get cancelled), thenany
is not a good fit, you need structured concurrency.Sequence
CLOVIS
08/13/2024, 9:14 AMSequence
etc (thus no possible cancellation), knowing that it won't cancel if one day a caller expects it to?Dmitry Khalanskiy [JB]
08/13/2024, 9:21 AMFlow
or a suspend fun
, you're promising the clients of your code that this is well-behaved asynchronous code that appropriately reacts to cancellations and doesn't hog the thread; if you provide a Sequence
or a fun
, you don't give such guarantees. There are more requirements on Flow
and suspend fun
, but in turn, they are more flexible for their users.CLOVIS
08/13/2024, 9:22 AMsuspend fun(…) -> Boolean
.Dmitry Khalanskiy [JB]
08/13/2024, 9:25 AMFlow
and not a Sequence
, it must not use runBlocking
, it needs to obey cancellation where appropriate and avoid doing heavy uninterruptible computations.CLOVIS
08/13/2024, 9:26 AMFlow.any
, no? I don't see how I could validate all these rules without it.Dmitry Khalanskiy [JB]
08/13/2024, 9:33 AMkotlinx.coroutines
developers are experts in kotlinx.coroutines
, but are not experts in how people actually use it and what tasks people solve with it.Dmitry Khalanskiy [JB]
08/13/2024, 9:41 AMsuspend
only to satisfy the compiler and you don't intend for it to share the thread or be interruptible, you can make it a plain fun
and use runBlocking
with suspend
as an implementation detail.Dmitry Khalanskiy [JB]
08/13/2024, 9:42 AMsuspend
but is.Dmitry Khalanskiy [JB]
08/13/2024, 9:42 AMFlow
where it's not needed at all and just a suspend
(or, again, even a non-suspend
!) function would suffice.CLOVIS
08/13/2024, 9:43 AMIt's difficult to convey emotions and intent over text, so I think it's worth pointing out that this is not about your request being "legitimate" or not, it's about us trying to understand your constraints.I'm sorry that this is how it came across, I was purely attempting to refer to:
EitherThere was no intent on my side to make this a battle. The only thing I am dissatisfied with in this discussion is that multiple proposed solutions would have been actively harmful to this codebase, and the only reason this came to light is because I pushed further to clarify the risks, mostly based on things I have heard before in this Slack. It seems to me that if someone that spends less time reading everything here had asked this question originally, the thread would have been over very quickly and they would have used one of these solutions thinking it was the recommended solution by the KotlinX.Coroutines team. I do, however, really appreciate that you are spending the time to understand the problem and propose alternative solutions. I do, too, really want to find the best solution, whichever it may be. However, so far, I haven't really seen anything that convinces me thatis a better fit for your use case and we should try to enhance that (so that the code isn't extremely unreadable), orSequence
really is a good fit for your use case, in which case your request forFlow
is perfectly reasonable.any
Flow.any
isn't.CLOVIS
08/13/2024, 9:44 AMI'm saying this because I've seen a lot of code that doesn't actually need to beThis thread is already quite long, so this is probably not the right place for it, but I would love to see a blog post or some other thread specifically about this 🙏but is.suspend
CLOVIS
08/13/2024, 9:45 AMsuspend
is because all the expensive/retrieval operations I mentioned are gRPC calls, and those are suspend
in this codebase.Dmitry Khalanskiy [JB]
08/13/2024, 9:48 AMThe only thing I am dissatisfied with in this discussion is that multiple proposed solutions would have been actively harmful to this codebaseI'm a frequent reader of this Slack as well, and I've noticed you participating in many discussions here, so I assumed I could get straight to the point in this discussion and solve disagreements on the go. If you were a newcomer, I'd make sure to highlight the shared common wisdoms of this Slack channel. I don't think it's harmful to get to the point when both sides agree on that, because the kind of newcomers who would read this thread before asking their question are also the kind of newcomers that would carefully read many threads and eventually see the light.
Dmitry Khalanskiy [JB]
08/13/2024, 9:52 AMThe reason all of this isOh, that's interesting! Do they execute inis because all the expensive/retrieval operations I mentioned are gRPC callssuspend
<http://Dispatchers.IO|Dispatchers.IO>
?
suspend fun ExternalSystem.get(value: Value) = withContext(<http://Dispatcher.IO|Dispatcher.IO>) { ... }
CLOVIS
08/13/2024, 12:31 PM<http://Dispatchers.IO|Dispatchers.IO>
?
I do not know, that would deep within the gRPC library. Does it make a difference here? I could try to find out if it does.Dmitry Khalanskiy [JB]
08/13/2024, 12:33 PMCLOVIS
08/13/2024, 12:34 PMDmitry Khalanskiy [JB]
08/13/2024, 12:34 PMCLOVIS
08/13/2024, 12:35 PMwithContext
calls anywhere between our code and that implementation, so if it's not in the library, I don't believe we have.