winteryoung
08/03/2018, 2:09 AMelizarov
08/03/2018, 8:43 AMwinteryoung
08/03/2018, 4:38 PMprivate fun <T> asyncProduceConsume(
consumerConcurrencyLimit: Int,
producer: Producer<T>,
consumer: Consumer<T>
) {
runBlocking {
actor<T> {
arrayListOf<Job>().let { jobs ->
CoroutineSemaphore(consumerConcurrencyLimit).let { semaphore ->
for (product in this) {
launch {
semaphore.lock {
consumer(product)
}
}.let { job ->
jobs.add(job)
}
}
}
for (job in jobs) {
job.join()
}
}
}.let { consumerActor ->
async {
producer(consumerActor)
}.also {
try {
it.await()
} finally {
consumerActor.close()
(consumerActor as Job).join()
}
}
}
}
}
winteryoung
08/03/2018, 4:39 PM