Tucker Barbour
12/04/2019, 3:44 PMrunBlocking {
val channel = Channel<Int>(bufferSize)
val producers = (0..producerCount).map { launch { produceValues(channel) } }
while (producers.any { it.isActive }) {
// time-of-check time-of-use race condition here. If there were any producers active at the time of check but had finished sending messages, we end up waiting on messages that we'll never receive
val message = channel.receive()
doSomethingWithMessage(message)
}
coroutineContext.cancelChildren()
}
If I were using Java I would probably use a CountdownLatch to coordinate the completion of the Producers with the Consumers. Is there a similar pattern for coroutines?Adam Powell
12/04/2019, 4:38 PMval channel = Channel<Int>(bufferSize)
launch {
coroutineScope {
repeat(producerCount) {
launch { produceValues(channel} }
}
} // this block exits when producers join
channel.close()
}
for (message in channel) {
doSomethingWithMessage(message)
}
or use the produce
builder that basically does the above for you:
val producerChannel = produce<Int>(capacity = bufferSize) {
repeat(producerCount) {
launch { produceValues(channel) }
}
}
for (message in producerChannel) {
doSomethingWithMessage(message)
}
Tucker Barbour
12/04/2019, 5:38 PMTucker Barbour
12/05/2019, 11:33 AMcoroutineScope
inside the first launch
? My vague understanding is that this structures all the producers under a single “supervisor” providing the join semantics on all producers. Is that a correct understanding?Adam Powell
12/05/2019, 1:51 PMcoroutineScope
creates a new scope that the inner launched jobs become children of. At the end of the block, all children must join before moving on. Exiting the block with an exception will cancel all children. Either way, if control flow leaves that block you're guaranteed that the child jobs are no longer running. This is an important part of structured concurrency.Tucker Barbour
12/05/2019, 3:17 PMFlow
API?Adam Powell
12/05/2019, 3:37 PMval producerFlow = channelFlow<Int> {
repeat(producerCount) {
launch { produceValues(channel) }
}
}
producerFlow.collect { message ->
doSomethingWithMessage(message)
}
// Do it again with a fresh set of producers backing it
producerFlow.collect { message ->
doSomethingElseWithMessage(message)
}
Adam Powell
12/05/2019, 3:37 PMcollect
Tucker Barbour
12/05/2019, 4:17 PM