Hi all! <Pipeline channels> is a commun pattern fo...
# coroutines
t
Hi all! Pipeline channels is a commun pattern for concurrency... I'm trying to move some code from Channels to Flows... but not sure what's the best way to pipeline Flows without causing callback-hell issue as below:
Copy code
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow

fun main() = runBlocking {
    produceFlowNumbers()
        .collect { // First collect
            squareFlow(it)
                .collect { // Second collect inside First collect
                    println(it)
                }
        }
    println("Done!") // we are done
}

fun produceFlowNumbers() : Flow<Int> = flow {
    var x = 1
    while (x < 5) emit(x++) // infinite stream of integers starting from 1
}

fun squareFlow(number: Int): Flow<Int> = flow {
    emit(number*number)
}
I have the feeling that I'm missing something here. Any suggestion experts?
e
Copy code
produceFlowNumbers()
    .flatMapConcat { squareFlow(it) }
    .collect { println(it) }
j
That works, but it's a bit strange to create many flows of one element using
squareFlow
just to flatten them right away. Of you want to transform elements of a flow, you could simply use `.map`:
Copy code
produceFlowNumbers().map { it * it }.collect { ... }
If you want to extract this transformation to a function, you can either have a function that transforms a single element, or a function that also encapsulates the
map
call.
e
and if you do want to emit multiple elements,
transform
may be more natural than
flatMap
(depends on the usage)
Copy code
produceFlowNumbers()
    .transform {
        emit(it * it)
        emit(it * it * it)
    }