Looks like the following does work, but I've no idea whether this is the "correct" way:
Copy code
fun main(args: Array<String>) = runBlocking {
val consumer = setupConsumer()
val topic = "topic"
consumer.subscribe(listOf(topic))
CoroutineScope(Dispatchers.Unconfined).launch {
while (true) {
val records = consumer.poll(Duration.ofMillis(3000))
for (record in records) {
<http://logger.info|logger.info>("Received message: $record at offset ${record.offset()}")
}
delay(1000)
}
}
Runtime.getRuntime()
.addShutdownHook(
Thread {
consumer.close()
}
)
}
a
AdamW
12/04/2023, 4:01 PM
launch
eagerly returns a
Job
and it won’t wait for completion, that’s probably what you’re seeing with the error in your previous message. I recommend leaving the entrypoint as is and instead starting from your
Application.module
.
As for Kafka in the suspending world, it’s worth checking here.