Luis Munoz
10/19/2020, 8:01 PMscope.launch {
while (isActive) {
launch {
val record = aKafkaConsumer.poll(Duration.ofMillis(200))
log.debug().log("[received records] - count={}", record.count())
record.forEach {
callback(KafkaMessage(it.key(), it.value()))
}
}
}
}
scope.launch {
while (isActive) {
val record = aKafkaConsumer.poll(Duration.ofMillis(200))
log.debug().log("[received records] - count={}", record.count())
record.map {
launch { callback(KafkaMessage(it.key(), it.value())) }
}.joinAll()
}
}
Zach Klippenstein (he/him) [MOD]
10/19/2020, 8:07 PMaKafkaConsumer.poll(Duration.ofMillis(200))
does, but if it blocks, you’re going to be blocking a lot of threads in a hurry.Zach Klippenstein (he/him) [MOD]
10/19/2020, 8:10 PMscope.launch {
while (isActive) {
val record = aKafkaConsumer.poll(Duration.ofMillis(200))
log.debug().log("[received records] - count={}", record.count())
// Create a sub-scope to wait until all callbacks are processed.
coroutineScope {
record.forEach {
launch { callback(KafkaMessage(it.key(), it.value())) }
}
}
}
}
Luis Munoz
10/19/2020, 8:15 PMLuis Munoz
10/19/2020, 8:17 PMZach Klippenstein (he/him) [MOD]
10/19/2020, 8:19 PMcoroutineScope{}
will effectively do that