https://kotlinlang.org logo
#coroutines
Title
# coroutines
t

Tucker Barbour

12/04/2019, 3:44 PM
Is there a recommended pattern for multiple producers and single consume? For example:
Copy code
runBlocking {
  val channel = Channel<Int>(bufferSize)
  val producers = (0..producerCount).map { launch { produceValues(channel) } }

  while (producers.any { it.isActive }) {
    // time-of-check time-of-use race condition here. If there were any producers active at the time of check but had finished sending messages, we end up waiting on messages that we'll never receive
    val message = channel.receive()
    doSomethingWithMessage(message)
  }

  coroutineContext.cancelChildren()
}
If I were using Java I would probably use a CountdownLatch to coordinate the completion of the Producers with the Consumers. Is there a similar pattern for coroutines?
a

Adam Powell

12/04/2019, 4:38 PM
Close the channel when all the producers are done.
Copy code
val channel = Channel<Int>(bufferSize)
launch {
  coroutineScope {
    repeat(producerCount) {
      launch { produceValues(channel} }
    }
  } // this block exits when producers join
  channel.close()
}

for (message in channel) {
  doSomethingWithMessage(message)
}
or use the
produce
builder that basically does the above for you:
Copy code
val producerChannel = produce<Int>(capacity = bufferSize) {
  repeat(producerCount) {
    launch { produceValues(channel) }
  }
}

for (message in producerChannel) {
  doSomethingWithMessage(message)
}
t

Tucker Barbour

12/04/2019, 5:38 PM
Interesting. I'll try that out. Thank you for the response!
👍 1
This works as expected, thank you for the help. Out of curiosity, what’s the purpose/advantage of the
coroutineScope
inside the first
launch
? My vague understanding is that this structures all the producers under a single “supervisor” providing the join semantics on all producers. Is that a correct understanding?
a

Adam Powell

12/05/2019, 1:51 PM
"supervisor" means something a bit different related to error propagation from child coroutines so I wouldn't describe it as that.
coroutineScope
creates a new scope that the inner launched jobs become children of. At the end of the block, all children must join before moving on. Exiting the block with an exception will cancel all children. Either way, if control flow leaves that block you're guaranteed that the child jobs are no longer running. This is an important part of structured concurrency.
t

Tucker Barbour

12/05/2019, 3:17 PM
That makes sense. Thank you for the clarification. Does this sort of implementation change with the
Flow
API?
a

Adam Powell

12/05/2019, 3:37 PM
it can, yes. You can make this a cold, repeatable operation by doing it like this:
Copy code
val producerFlow = channelFlow<Int> {
  repeat(producerCount) {
    launch { produceValues(channel) }
  }
}

producerFlow.collect { message ->
  doSomethingWithMessage(message)
}

// Do it again with a fresh set of producers backing it
producerFlow.collect { message ->
  doSomethingElseWithMessage(message)
}
in this example the producers don't start until someone `collect`s the flow, and will start a new set for each
collect
t

Tucker Barbour

12/05/2019, 4:17 PM
Thank you! This was all very helpful.
3 Views