CLOVIS
08/12/2024, 4:27 PMFlow.any {} ? I assume it's because flow may be infinite? It seems simple to build on top of Flow.transformWhile ?ross_a
08/12/2024, 4:38 PM.firstOrNull{ // predicate } != null - if your flow can't contain nullsCLOVIS
08/12/2024, 4:39 PMList.any /`Sequence.any` doCLOVIS
08/12/2024, 4:40 PMross_a
08/12/2024, 4:41 PMross_a
08/12/2024, 4:42 PMCLOVIS
08/12/2024, 4:42 PMCLOVIS
08/12/2024, 4:42 PMCLOVIS
08/12/2024, 4:45 PMDmitry Khalanskiy [JB]
08/12/2024, 5:21 PMCLOVIS
08/12/2024, 5:28 PMparentObjects // List
.flatMap { it.subObjectIds }
.distinct()
// from here on, I'm going to be dereferencing stuff.
.toFlow()
.map { externalSystem.get(it) }
.any { validatesCondition(it) }
By using Flow, I can avoid dereferencing anything more as soon as short-circuiting is possible. Since the English sentence in the spec is literally “if any sub-objects...”, I'd like to write .any because that's more readable.CLOVIS
08/12/2024, 5:29 PMCLOVIS
08/12/2024, 5:30 PMasSequence. Since everyone is already used to Sequence.any, it's much more readable if the same operation is available on Flow too.Dmitry Khalanskiy [JB]
08/12/2024, 7:02 PMDaniel Pitts
08/12/2024, 8:07 PMsuspend function, any reason not to just wrap that in a runBlocking in this context?CLOVIS
08/13/2024, 7:27 AMsuspend in Sequence.mapDmitry Khalanskiy [JB]
08/13/2024, 8:15 AMmap, sure, but you can do
sequence<Int> {
things.forAll {
yield(externalSystem.get(it))
}
}
Maybe what you want is actually Sequence.mapSuspend or something like that?CLOVIS
08/13/2024, 8:16 AMFlow.any that didn't exist for Sequence.any and Iterable.any ?Dmitry Khalanskiy [JB]
08/13/2024, 8:24 AMFlow is inherently about asynchronous computations, and what you're doing here just doesn't look asynchronous. Sequence is built for short-circuiting traversal of collections, which looks like a better choice here.
Either Sequence is a better fit for your use case and we should try to enhance that (so that the code isn't extremely unreadable), or Flow really is a good fit for your use case, in which case your request for any is perfectly reasonable.Dmitry Khalanskiy [JB]
08/13/2024, 8:24 AMrunBlocking also work for you?CLOVIS
08/13/2024, 8:28 AMFlow is described in many places as "the equivalent of Sequence when you need `suspend`". This is exactly the usage I am having.
what you're doing here just doesn't look asynchronousHow? I am writing a
suspend function that calls other suspend functions to get some result. What's not asynchronous here?
wouldEverything I have learned in this Slack, in the discussions everywhere is to never usealso work for you?runBlocking
runBlocking within a suspend function, because it breaks structured concurrency, breaks stacktrace recovery, makes code single-threaded, etc. But you're recommending it now?CLOVIS
08/13/2024, 8:31 AMsuspend function (apparently not asynchronous) and a Flow (apparently asynchronous). This seems contradictory with most of the documentation / community resources.Dmitry Khalanskiy [JB]
08/13/2024, 8:36 AMsuspend function that calls other suspend functions to get some result. What's not asynchronous here?
Sure, let me try to rephrase this: from what you've described, it looks like you want your code to proceed like this:
• Take one element.
• Call externalSystem.get on it.
• Wait for externalSystem.get to finish.
• If it validatesCondition, bail. If not, go back to step 1.
If this is so, then the operation you're proposing is entirely linear, that is, not asynchronous.
> it breaks structured concurrency, breaks stacktrace recovery, makes code single-threaded, etc. But you're recommending it now?
If these factors you've mentioned are important in your case, yes, this is a deal-breaker. I'm not recommending `runBlocking`—I'm asking whether it's suitable. If it's not, it's not. For example, if the code in which you call any is itself just runBlocking { a.b.c.map { }.any { } } , you won't lose anything by putting runBlocking inside the map instead, but if you are running this in a suspend function, that's another story.CLOVIS
08/13/2024, 8:44 AMIf this is so, then the operation you're proposing is entirely linearm that is, not asynchronous.By that definition, all
Flow operators are synchronous. To my knowledge, all common Flow operators process elements one-by-one linearly (expect maybe flatMapMerge).
If these factors you've mentioned are important in your case, yes, this is a deal-breaker. I'm not recommending `runBlocking`: I'm asking whether it's suitable. If it's not, it's not. For example, if the code in which you callYour comment said thatis itself justany, you won't lose anything by puttingrunBlocking { a.b.c.map { }.any { } }inside therunBlockinginstead, but if you are running this in amapfunction, that's another story.suspend
runBlocking looks reasonable. Your comment did not say "in the very particular case of this pipeline being directly called in a runBlocking itself", which should be something extremely rare since it is discouraged basically everywhere. But now you say that it wouldn't be a good idea if it were in a suspend function, which should be almost all Flow usage all the time. If that is indeed the case, then I believe the previous comment was particularly harmful advice as most people would take your word for it (that runBlocking within a flow pipeline is reasonable).Dmitry Khalanskiy [JB]
08/13/2024, 8:51 AMBy that definition, allNot at all. If you have a non-trivialoperators are synchronous.Flow
collect, there's asynchronous interaction between the production of elements and their consumption. You can have flowOn, buffer, and such things that also add their own asynchronous behavior. If your code needs any of this, then Sequence is entirely unsuitable and Flow is your only choice.
Your comment said thatIt did not. It said that another comment looked reasonable, and I stand by that. That comment also didn't propose anything harmful, it asked you whetherlooks reasonable.runBlocking
runBlocking was suitable.
I believe the previous comment was particularly harmful advice as most people would take your word for it (thatIf my comment came off as endorsement ofwithin a flow pipeline is reasonable)runBlocking
runBlocking in suspend contexts, I'm sorry. Thank you for highlighting this, and let me reiterate: calling runBlocking in a thread that you know is used for asynchronous work is an error.
runBlocking in a Flow pipeline is unreasonable. In a Sequence pipeline, it may be reasonable, depending on what your code does.CLOVIS
08/13/2024, 8:58 AMNot at all. If you have a non-trivialBut you did mention that, in the absence of this, you believe using, there's asynchronous interaction between the production of elements and their consumption. You can havecollect,flowOn, and such things that also add their own asynchronous behavior. If your code needs any of this, thenbufferis entirely unsuitable andSequenceis your only choice.Flow
Sequence is more appropriate that using Flow . I'm not saying there is anything wrong with that, but I do want to understand why, because I don't.
In my opinion, this is perfectly reasonable flow usage:
fun loadItems(date: Instant): Flow<Item> =
repository.loadItemsAt(date)
val activeItems = loadItems(date)
.filter { it.isActive }
.toList()
But indeed, this entire thing could be rewritten with a Sequence and a bunch of embedded launch and await s. Would you say this is incorrect flow usage, and Sequence should be used instead?
If this example is correct Flow usage, then I do not understand the fundamental difference with mine, which is more or less identical:
loadItems(…)
.map { …suspending operation… }
.any { …predicate… }Dmitry Khalanskiy [JB]
08/13/2024, 9:08 AMloadItemsAt, but I'd assume it is a good example of Flow usage, for one important reason: this function looks like it would benefit from structured concurrency. If the function where the activeItems list is accumulated gets cancelled, we want the stream of incoming values to stop. So, it has to support cancellation.Dmitry Khalanskiy [JB]
08/13/2024, 9:10 AMexternalSystem.get whenever the function calling any gets cancelled (if it can get cancelled), then Sequence is not a good fit, you need structured concurrency.CLOVIS
08/13/2024, 9:12 AMLikewise, if your pipeline needs to cancelWould you say that functions should be written by default without cancelling behavior and structured concurrency, and cancelling behavior be added later if it proves useful?whenever the function callingexternalSystem.getgets cancelled (if it can get cancelled), thenanyis not a good fit, you need structured concurrency.Sequence
CLOVIS
08/13/2024, 9:14 AMSequence etc (thus no possible cancellation), knowing that it won't cancel if one day a caller expects it to?Dmitry Khalanskiy [JB]
08/13/2024, 9:21 AMFlow or a suspend fun, you're promising the clients of your code that this is well-behaved asynchronous code that appropriately reacts to cancellations and doesn't hog the thread; if you provide a Sequence or a fun, you don't give such guarantees. There are more requirements on Flow and suspend fun, but in turn, they are more flexible for their users.CLOVIS
08/13/2024, 9:22 AMsuspend fun(…) -> Boolean .Dmitry Khalanskiy [JB]
08/13/2024, 9:25 AMFlow and not a Sequence, it must not use runBlocking, it needs to obey cancellation where appropriate and avoid doing heavy uninterruptible computations.CLOVIS
08/13/2024, 9:26 AMFlow.any , no? I don't see how I could validate all these rules without it.Dmitry Khalanskiy [JB]
08/13/2024, 9:33 AMkotlinx.coroutines developers are experts in kotlinx.coroutines, but are not experts in how people actually use it and what tasks people solve with it.Dmitry Khalanskiy [JB]
08/13/2024, 9:41 AMsuspend only to satisfy the compiler and you don't intend for it to share the thread or be interruptible, you can make it a plain fun and use runBlocking with suspend as an implementation detail.Dmitry Khalanskiy [JB]
08/13/2024, 9:42 AMsuspend but is.Dmitry Khalanskiy [JB]
08/13/2024, 9:42 AMFlow where it's not needed at all and just a suspend (or, again, even a non-suspend!) function would suffice.CLOVIS
08/13/2024, 9:43 AMIt's difficult to convey emotions and intent over text, so I think it's worth pointing out that this is not about your request being "legitimate" or not, it's about us trying to understand your constraints.I'm sorry that this is how it came across, I was purely attempting to refer to:
EitherThere was no intent on my side to make this a battle. The only thing I am dissatisfied with in this discussion is that multiple proposed solutions would have been actively harmful to this codebase, and the only reason this came to light is because I pushed further to clarify the risks, mostly based on things I have heard before in this Slack. It seems to me that if someone that spends less time reading everything here had asked this question originally, the thread would have been over very quickly and they would have used one of these solutions thinking it was the recommended solution by the KotlinX.Coroutines team. I do, however, really appreciate that you are spending the time to understand the problem and propose alternative solutions. I do, too, really want to find the best solution, whichever it may be. However, so far, I haven't really seen anything that convinces me thatis a better fit for your use case and we should try to enhance that (so that the code isn't extremely unreadable), orSequencereally is a good fit for your use case, in which case your request forFlowis perfectly reasonable.any
Flow.any isn't.CLOVIS
08/13/2024, 9:44 AMI'm saying this because I've seen a lot of code that doesn't actually need to beThis thread is already quite long, so this is probably not the right place for it, but I would love to see a blog post or some other thread specifically about this 🙏but is.suspend
CLOVIS
08/13/2024, 9:45 AMsuspend is because all the expensive/retrieval operations I mentioned are gRPC calls, and those are suspend in this codebase.Dmitry Khalanskiy [JB]
08/13/2024, 9:48 AMThe only thing I am dissatisfied with in this discussion is that multiple proposed solutions would have been actively harmful to this codebaseI'm a frequent reader of this Slack as well, and I've noticed you participating in many discussions here, so I assumed I could get straight to the point in this discussion and solve disagreements on the go. If you were a newcomer, I'd make sure to highlight the shared common wisdoms of this Slack channel. I don't think it's harmful to get to the point when both sides agree on that, because the kind of newcomers who would read this thread before asking their question are also the kind of newcomers that would carefully read many threads and eventually see the light.
Dmitry Khalanskiy [JB]
08/13/2024, 9:52 AMThe reason all of this isOh, that's interesting! Do they execute inis because all the expensive/retrieval operations I mentioned are gRPC callssuspend
<http://Dispatchers.IO|Dispatchers.IO>?
suspend fun ExternalSystem.get(value: Value) = withContext(<http://Dispatcher.IO|Dispatcher.IO>) { ... }CLOVIS
08/13/2024, 12:31 PM<http://Dispatchers.IO|Dispatchers.IO>?
I do not know, that would deep within the gRPC library. Does it make a difference here? I could try to find out if it does.Dmitry Khalanskiy [JB]
08/13/2024, 12:33 PMCLOVIS
08/13/2024, 12:34 PMDmitry Khalanskiy [JB]
08/13/2024, 12:34 PMCLOVIS
08/13/2024, 12:35 PMwithContext calls anywhere between our code and that implementation, so if it's not in the library, I don't believe we have.