Levi
12/04/2023, 3:21 PMfun main(args: Array<String>) = runBlocking {
val consumer = setupConsumer()
val topic = "topic"
consumer.subscribe(listOf(topic))
CoroutineScope(Dispatchers.Unconfined).launch {
while (true) {
val records = consumer.poll(Duration.ofMillis(3000))
for (record in records) {
<http://logger.info|logger.info>("Received message: $record at offset ${record.offset()}")
}
delay(1000)
}
}
Runtime.getRuntime()
.addShutdownHook(
Thread {
consumer.close()
}
)
}
AdamW
12/04/2023, 4:01 PMlaunch
eagerly returns a Job
and it won’t wait for completion, that’s probably what you’re seeing with the error in your previous message. I recommend leaving the entrypoint as is and instead starting from your Application.module
.
As for Kafka in the suspending world, it’s worth checking here.Levi
12/04/2023, 4:01 PM