is there an idiomatic way to process an infinite s...
# announcements
a
is there an idiomatic way to process an infinite sequence in parallel?
g
Sequences are sequential by definition, so there is no way to make them parallel. For parallel sequence of events you can use Flow
a
No what I am saying is something along the lines of
{numbers from sequence} -> {split into subsequeces} -> {do computation in parallel on each sub sequence}
g
Just split into subsequeces, run computation on own thread
a
well I guess what I am asking is if there is an idiomatic way to split the sequence
would be cool if there were something like a grouping that could be used for an inf sequence
g
You can use
Sequence.chunked
, which emits list of required size
not exactly subsequence. but also sequence is not really needed, because anyway you should buffer some amount of items to process
a
buffering seems like it would complicate the code
I am simply trying to have integers
[2, inf)
and split and see if the number is prime concurrently
g
One call of
chunked(SIZE)
doesn’t look complicated
a
ah ok fair
g
but I still think that it’s pretty limited solution, essentially, there is no way to return result, it’s fire and forget
a
hmmm
g
I believe Flow (or any other Reactive framework) is much better choice
a
Copy code
private fun primes() {
    val threads = 6
    val infSequence = generateSequence(2) { it + 1 }
    runBlocking {
        for (i in 0 until threads) {
            launch {
                infSequence.filter { it % threads == i }
                    .filter { isPrime(it) }
                    .forEach { println(it) }
            }
        }
    }
}
eh
yeah probably
g
isPrime always blocking, cannot be used in parallerl
it will block sequence
a
it is my own implementation
even tho probably less efficient I’m guessing
g
You already have coroutines, why not use flow that gives everything out of the box
and much more idiomatic
a
how would you idiomatically do this with a flow
g
just make
isPrime
suspend function that runs on particular dispatcher with target parallelism
a
I am so confused
why would
isPrime
be suspending… it doesn’t need to suspend?…
g
To run on target dispatcher
because it’s your bottleneck
a
ah I see I think
g
there are a few ways actually, one more is just blocking
isPrime
, but you split the list and use flatMapConcat
Actually, for this particular task with prime number, Java’s Fork/Join framework probably the best solution, just because it’s optimized for CPU bound tasks and have good job stealing algorithm, but to make it performant you have to batch work to threads by splitting sequence by chunks
d
Take the time to carefully read this ... https://kotlinlang.org/docs/reference/coroutines/flow.html Flow is what you need.
g
Honestly not sure that pure Flow will be faster for this use case with prime number ir you launch coroutine per each item, with chunked/windowed it possible to optimise it much more, but those operators are not available in 1.3 yet
m
for deeper support try asking also on #coroutines
👍🏼 1