How do you deal with multiple "chained" asynchrono...
# coroutines
d
How do you deal with multiple "chained" asynchronous operations when using flow? Example:
fun foo(): Flow<ResultA>
and each
ResultA
has a
fun getElements(): Flow<SubResult>
. The
getElements
is tied to the same execution context as the containing flow and has to be cancelled with it.
s
Look for
flatMapXXX
functions on Flow.
d
I'm not asking about how to consume such a flow, my question is how to write the
foo
and
getElements
methods. How can I ensure that someone calls
getElements
only while consuming the outer flow and not later?
s
Not sure what you mean. What does ‘later’ mean? Do you mean you’d like a restriction to be able to call resultA.getElements() only from within a flatMapXXX on a Flow<ResultA> (returned by fool())?
d
Yes, it should only be able to be called while the outer flow is being collected.
Basically I am asking how to create a "one time" flow, I guess.
s
Copy code
class Outer {
  ...
  fun foo(): Flow<ResultA> = ...
  ...
  fun ResultA.getElements(): Flow<SubResult> = ...
}

class ResultA {
  ...
  ...
  fun test(outer: Outer) {
    val flow<SubResult> = with (outer) {
      foo().flatMapConcat { it.getElements() } 
    }
    ...
  }
  ...
}
The above limits the scope (not the number of times) in which you can call them. In the above example, you limit it to the scope of instances of ‘Outer’.
d
Not effectively:
Copy code
class SubResult

class Outer {
    fun foo(): Flow<ResultA> = TODO()
    fun ResultA.getElements(): Flow<SubResult> = TODO()
}
class ResultA {
    suspend fun test(outer: Outer) {
        val results = outer.foo().toList()
        with(outer) {
            results.flatMap {
                /* this call to getElements is invalid, it happens after the outer flow is collected */
                it.getElements().toList() 
            }
        }
    }
}
s
Yup, my example only limits the scope, not the number of times it is called, nor the order. I don’t think there is an easy way of limiting the number of times and order. What is your use case that you want to limit it this way?
d
It's a database batch query. The outer flow gives me one item per batch, the inner flow is the individual rows per batch
s
fun batches() : Flow<Batch>
and
fun Batch.rows(): Flow<Row>
?
d
Yes. But how do I prevent someone from consuming
rows
twice on the same batch? Because thats not valid...
s
Yeah… you can do that at runtime, but I don’t think you can enforce this through the compiler…..
d
I know I can't. I am asking how to do it at runtime 😄
s
The
fun Batch.rows()
would have the returned
Flow<Row>
emit an error instead of a proper flow of rows when it is tried to get collected after it has completed.
Copy code
class Batch {
  ...
  var canBeOpenend: Boolean = true
  ...
  fun rows(): Flow<Row> = flow {
    if (!canBeOpenend) throw SomeException()
    canBeOpened = false
    ... find rows in the batch to emit ...
  }
  ...
}
(may need to make ‘canBeOpened’ an AtomicBoolean….)
d
Thanks, I'll do some testing.
b
You could also use a Mutex and never unlock it once someone has consumed it