Tucker Barbour
12/04/2019, 3:44 PMrunBlocking {
val channel = Channel<Int>(bufferSize)
val producers = (0..producerCount).map { launch { produceValues(channel) } }
while (producers.any { it.isActive }) {
// time-of-check time-of-use race condition here. If there were any producers active at the time of check but had finished sending messages, we end up waiting on messages that we'll never receive
val message = channel.receive()
doSomethingWithMessage(message)
}
coroutineContext.cancelChildren()
}
If I were using Java I would probably use a CountdownLatch to coordinate the completion of the Producers with the Consumers. Is there a similar pattern for coroutines?Adam Powell
12/04/2019, 4:38 PMval channel = Channel<Int>(bufferSize)
launch {
coroutineScope {
repeat(producerCount) {
launch { produceValues(channel} }
}
} // this block exits when producers join
channel.close()
}
for (message in channel) {
doSomethingWithMessage(message)
}
or use the produce builder that basically does the above for you:
val producerChannel = produce<Int>(capacity = bufferSize) {
repeat(producerCount) {
launch { produceValues(channel) }
}
}
for (message in producerChannel) {
doSomethingWithMessage(message)
}Tucker Barbour
12/04/2019, 5:38 PMTucker Barbour
12/05/2019, 11:33 AMcoroutineScope inside the first launch? My vague understanding is that this structures all the producers under a single “supervisor” providing the join semantics on all producers. Is that a correct understanding?Adam Powell
12/05/2019, 1:51 PMcoroutineScope creates a new scope that the inner launched jobs become children of. At the end of the block, all children must join before moving on. Exiting the block with an exception will cancel all children. Either way, if control flow leaves that block you're guaranteed that the child jobs are no longer running. This is an important part of structured concurrency.Tucker Barbour
12/05/2019, 3:17 PMFlow API?Adam Powell
12/05/2019, 3:37 PMval producerFlow = channelFlow<Int> {
repeat(producerCount) {
launch { produceValues(channel) }
}
}
producerFlow.collect { message ->
doSomethingWithMessage(message)
}
// Do it again with a fresh set of producers backing it
producerFlow.collect { message ->
doSomethingElseWithMessage(message)
}Adam Powell
12/05/2019, 3:37 PMcollectTucker Barbour
12/05/2019, 4:17 PM