https://kotlinlang.org logo
#coroutines
Title
# coroutines
f

Fleshgrinder

06/24/2022, 1:56 PM
Hey 😊 maybe someone knows a nicer solution to a simple problem. I have two flows that are resolving things, the first one from the Internet, the second one from the filesystem. Hence, they are working with different hardware and can run both concurrently and in parallel. Now I need to wait for both to collect everything from their sources, because I want to perform some logic with the results of both. What I have:
Copy code
val result = awaitAll(
  async { networkFlow.toSet() },
  async { filesystemFlow.toList() },
)
val networkResult = result[0] as Set<NetworkResult>
val filesystemResult = result[1] as List<FilesystemResult>
This works exactly as intended, but the ceremony required to get it to work makes it seem even worse than threading and futures. I was hoping for at least …
Copy code
val (networkResult, filesystemResult) = awaitAll(
  async { networkFlow.toSet() },
  async { filesystemFlow.toList() },
)
… but this does not exist. Any other way to make this nicer? 🤔
m

mkrussel

06/24/2022, 2:00 PM
You could manually call await on each async call. That way you at least don't need the casting. It will probably involve more variables.
Copy code
val networkDeferred = async { networkFlow.toSet() }
val filesystemResult = filesystem.toList()
val networkResult = networkDeferred.await()
1
👍 1
f

Fleshgrinder

06/24/2022, 2:03 PM
This would spare me the array, list, and casting. Definitely an improvement. 👍
y

yschimke

06/24/2022, 2:25 PM
@Fleshgrinder it's an extension method
Copy code
val (network, filesystem) = listOf(
    async { networkFlow() },
    async { filesystemFlow() },
    ).awaitAll()
Although maybe, I'm misunderstanding your awaitAll. Since you already have it there.
f

Fleshgrinder

06/24/2022, 2:27 PM
There's also an awaitAll extension on Collection, yes, it just turns the receiver Collection into an array and calls the awaitAll I'm already calling. No need to create even more objects. 😉
y

yschimke

06/24/2022, 2:29 PM
Nice TIL
So what's wrong with this then?
Copy code
val (network, filesystem) = awaitAll(
    async { networkFlow() },
    async { filesystemFlow() },
)
Ahhh, because different types?
I'm using List, List.
Makes sense.
f

Fleshgrinder

06/24/2022, 3:12 PM
Exactly, both flows contain different types and need to resolve to different containers.
👍🏻 1
s

stojan

06/24/2022, 6:05 PM
looks like
parZip
from Arrow you can check the implementation here..... it's basically a generic version of your code: https://github.com/arrow-kt/arrow/blob/051847de3b1d3186fc9843864aaf642a2f4a854f/ar[…]-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/ParZip.kt
f

Fleshgrinder

06/24/2022, 8:09 PM
So it seems this is the way to go. I checked the individual await as suggested by @mkrussel but it would not properly work without copying all the stuff from the awaitAll impl. It's actually interesting to see how much machinery is necessary to make any of this work. I'm starting to question if any of it is worth it. Reading even 100 files (small YAMLs) with parsing and validation is so fast that sequential is probably more efficient at the end. Simply because it doesn't require all the overhead introduced by Coroutines.
t

Tijl

06/25/2022, 7:56 PM
you can solve it using Flow and zip
Copy code
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

suspend fun main() {    
    val network = flow { emit("foo") }
    val filesystem = flow { emit(42) }
    val (networkResult, filesystemResult) = network.zip(filesystem) { network, filesystem -> network to filesystem }.first()
    println("$networkResult and $filesystemResult")
}
https://pl.kotl.in/bcWP5F2Un
and you can probably make a
zipToPair
extension method, if it does not exist somewhere already
f

Fleshgrinder

06/26/2022, 6:44 AM
But this would mean that they block each other, because inside where the pair is created we are calling
toList
or
toSet
on one of both first.
u

uli

06/26/2022, 12:17 PM
Copy code
So it seems this is the way to go. I checked the individual await as suggested by 
@mkrussel
 but it would not properly work without copying all the stuff from the awaitAll impl.
What would go wrong with @mkrussel’s exact solution?
Or what would go wrong with this more symmetric approach?
Copy code
val networkResultDeferred = async { networkFlow.toSet() }
val filesystemResultDeferred = async { filesystemFlow.toList() }

val networkResult = networkResultDeferred.await()
val filesystemResult = filesystemResultDeferred.await()
t

Tijl

06/26/2022, 12:24 PM
there is neither
toList
nor
toSet
in my example, but you can just do this inside the flow
flow { emit(whateverFlow.toList()) }
Copy code
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

suspend fun main() {
    val networkF = flowOf("foo", "bar")
    val network = flow { emit(networkF.toList()) }
    val fileF = flowOf(4, 2)
    val filesystem = flow { emit(fileF.toSet()) }
    val (networkResult, filesystemResult) = network.zip(filesystem) { network, filesystem -> network to filesystem }.first()
    println("$networkResult and $filesystemResult")
}
https://pl.kotl.in/6j74wOg06 the point is trying to combine two async results is hard, combining two flows is easy. You can shortcut this even more but I think the above example best demonstrates why this works.
f

Fleshgrinder

06/26/2022, 12:30 PM
@uli it seems that there are a lot of pitfalls while awaiting multiple asyncs, check out: https://github.com/Kotlin/kotlinx.coroutines/blob/5841effaf9cee6f0839b072bd8aa8e05d01e054e/kotlinx-coroutines-core/common/src/Await.kt#L72-L91 @Tijl I know that your example did not include any
toList
or
toSet
, but my question did. 😝 Since it’s more code than the original that is also less clear I prefer the original solution, especially if hidden behind something like the
parZip
of Arrow. However, I’d be curious which one performs better. 🤔
t

Tijl

06/26/2022, 12:55 PM
You’re kind of giving off dual requirements here, it can’t do anything “hidden” by calling a method (though
awaitAll
is exempted?), but it has to be as compact as possible. it doesn’t really even matter which approach you take, it’s pretty much the same solution (work is mostly in reading a file or doing a network call in parallel, not in combining the result). Both can be made into
val (one, two) = collectBothConcurrent(flow1, flow2)
or something similar. Personally I avoid unneeded
as
hard casting like the plague (unless you don’t mind warnings you’ll need a suppress I think), but each to their own. I guess you like low line/symbol count more.
f

Fleshgrinder

06/26/2022, 1:17 PM
Your solution is great, not sure why you feel attacked because I arbitrarily chose the shorter to be more to my liking. I think that the flows are going to result in more overhead. Simply because there is more machinery in place that needs to verify when and if a flow ended. I have no proof for it, but it seems logical. What's really needed are
awaitAll
overloads that accept 2 to 26 elements, preserve their types, and allow destructuring. It would make the Arrow impl obsolete and allow everyone to use it without worrying about weird side effects or adding unnecessary overhead.
The cast is safe in this situation, just because the compiler cannot know due to the array conversions doesn't make it unsafe. The suppression is only needed because of type erasure since I'm dealing with generic collections on all sides. But, again, safe, nothing else can ever come from either position in the result. The compiler does nothing more than we do here: keeping track of types and interpreting the bits in memory accordingly.
t

Tijl

06/26/2022, 2:58 PM
I don’t feel attacked, as I said both solutions do more or less the same and are usable. Your solution at the very least is very readable. Type safety is type safety though, and hard casts are not same. Even if your solution is runtime always safe e.g. if you refactor the code to make the toSet a toMap or something, the compiler will not warn you about your cast. If I’m honest at my company you would not be hired if this was in your assessment and your answer as to why you did this was “it’s shorter” (or the explanation the cast/suppress is needed due to type erasure, but let’s not get into that 😁). But that’s the bias me and my company have, I’m sure there are others that prefer your solution and might find mine not terse enough or too contrived. It’s your code so you should go with what you are comfortable with. Likewise in my opinion asynchronous completion of multiple events or a different type is a flow, and it’s covered by coroutines currently. But maybe someone at Jetbrains will push or allow for destructureable
awaitAll
(or one day static analysis will be good enough to allow the current construct without force casting)
u

uli

06/26/2022, 3:24 PM
@Fleshgrinder i`d say the awaitAll code you posted is complex for optimization that you probably don't need. You might want to put a
coroutineScope {}
around the code I posted, to get common, local error handling and be done with it. Unless your see any issue with it. Just because other code with other requirements is complex does not mean the simple code is wrong.
f

Fleshgrinder

06/26/2022, 5:12 PM
@Tijl that means we have to fire all Arrow employees, and pretty much anyone who does safe casts. I get your point regarding type safety and that's why I would love to see a type safe function; as explained. But, creating it would involve copying a lot of code and escaping runtime optimization (JVM) or bloating the binary and thrashing registers if combos are used often. Probably not an issue, still. @uli it's a suspendable function, some scope is around it anyways. I don't know why
awaitAll
is as complex inside as it is. If all of the ceremony is for nothing than the two awaits are clearly the nicest solution.
u

uli

06/26/2022, 6:43 PM
I think the scope might be needed to catch potential exceptions locally
f

Fleshgrinder

06/26/2022, 6:56 PM
It also handles lazy asyncs, counts, ... none of this is really necessary. Local exception handling is possible with @mkrussel's solution, it's just that the inner call first needs to complete before we get the exception of the outer await. With
awaitAll
we get an exception immediately, because it tracks them all, and aborts all upon the first exception. This would mean in my case that the network call should be inner and the filesystem call should be awaited. Simply because the network has more unhappy paths. It's the only gotcha I see and for me this is still clearly superior over everything else, since it ticks all boxes: • Short • Fast • Lightweight • Typesafe • No need for additional objects that aren't strictly necessary • No need for additional types just to capture types • No need for destructuring • ...
t

Tijl

06/26/2022, 10:30 PM
Arrow’s code is actually typesafe for refactoring due to it being generic, so there’s no comparing there. But you did mentioned putting it in a similar function, indeed nothing wrong with that approach. And agree, must be faster than anything Channel based if you care about that.
👍 1
f

Fleshgrinder

06/27/2022, 6:14 AM
That's the reason for the question. I wanted to avoid writing library code (separate function) and was convinced that there must be a simpler solution. We have one, and now there's no reason for a function. 😎
38 Views