https://kotlinlang.org logo
Title
g

groostav

07/03/2019, 6:46 PM
is there a set of extension functions for
Sequence
that work for
suspend
contexts? IE i have a sequence and I want to
map
on it in a
suspend
caller
b

bloder

07/03/2019, 7:08 PM
I'm not sure but actually
Flow
comes to supply this need, no?
g

groostav

07/03/2019, 7:57 PM
Yeah I also thought of this, but im not in a version of coroutines that has Flow, also its overbuilt for my needs. I was just looking for a cheesy way to get a sequence of
map
calls to happen lazily instead of eagerly
a little bit of reformatting and an old fashioned for-each loop will do the trick
b

bdawg.io

07/03/2019, 8:28 PM
map
operations on a Sequence will still happen lazily. They just will be blocking on each element that's transformed.
b

bloder

07/03/2019, 9:37 PM
Well, to let it concurrently without using Flow I think you can create some simple extension to Sequence solution, if you just want to make map concurrently maybe this can help:
class ConcurrentSequence<T>(val sequence: Sequence<T>) : CoroutineScope by CoroutineScope(Dispatchers.Unconfined)

suspend fun <T, R> ConcurrentSequence<T>.map(transform: (T) -> R): ConcurrentSequence<R> = withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
    ConcurrentSequence(sequence.map(transform))
}

suspend fun <T> ConcurrentSequence<T>.first(): T = sequence.first()

fun <T> concurrentSequenceOf(vararg elements: T): ConcurrentSequence<T> = ConcurrentSequence(elements.asSequence())
just a coroutine scope composing a sequence and applying some operators above it concurrently from external jobs, not a so extensible solution as only extension funcitons but I think it works for your case.
b

bdawg.io

07/03/2019, 10:21 PM
your
withContext(<http://Dispatchers.IO|Dispatchers.IO>)
block returns immediately though. There's no reason to have the
map
function itself suspend. It's an intermediate and a stateless function that doesn't cause the sequence to be evaluated
Also, a sequence only evaluates a single element at a time. So there's still nothing concurrent going on
b

bloder

07/03/2019, 10:28 PM
Oh I understood wrongly the case here, to solve that probably a solution with composing sequences concurrently evaluated would be better
b

bdawg.io

07/03/2019, 10:30 PM
Your terminal operator (ie,
toList
or iterating over the sequence) needs to be suspending. Sequence piggy-backs off of the JVM Iterator which doesn't support
suspend
, making it blocking to iterate over the sequence using the iterator. Since a sequence operator directly uses the Iterator provided by the previous Sequence in the chain, you'll only have blocking evaluation of sequences. For example,
sequence.filter { ... }.map { ... }
, the
map
operator uses the Iterator of elements provided by the
filter
operator. As such, even writing a different
map
extension function that accepts a suspending
transform
method will still be blocking because of the iterator from
filter
1
That's why
Flow
is a completely separate API from
Sequence
, because to have concurrent evaluation, you have to no longer depend on the core mechanism that sequences were built on (JVM iterators that are only blocking)
b

bloder

07/03/2019, 10:52 PM
Yeah but multiple sequences that each one is mapping some value and blocking, being composed concurrently would work in this case, no? Of course it's not a workaround that I would like to implement in my code, I'd prefer to use Flow, but I think it works (as a workaround).
b

bdawg.io

07/04/2019, 1:11 PM
You could also map the sequence into a
Sequence<Deferred<T>>
and then do a terminal operator that fires of all of the coroutines and then perform an await.
coroutineScope {
    sequence.map { value ->
        async { doSomethingSuspend(value) }
    }.toList().awaitAll()
}
It wouldn't fit the "potentially infinite" aspect of sequences.