https://kotlinlang.org logo
#ktor
Title
# ktor
l

Levi

12/04/2023, 3:21 PM
Looks like the following does work, but I've no idea whether this is the "correct" way:
Copy code
fun main(args: Array<String>) = runBlocking {
    val consumer = setupConsumer()
    val topic = "topic"
    consumer.subscribe(listOf(topic))
    CoroutineScope(Dispatchers.Unconfined).launch {
        while (true) {
            val records = consumer.poll(Duration.ofMillis(3000))
            for (record in records) {
                <http://logger.info|logger.info>("Received message: $record at offset ${record.offset()}")
            }
            delay(1000)
        }
    }
    Runtime.getRuntime()
            .addShutdownHook(
                    Thread {
                        consumer.close()
                    }
            )

}
a

AdamW

12/04/2023, 4:01 PM
launch
eagerly returns a
Job
and it won’t wait for completion, that’s probably what you’re seeing with the error in your previous message. I recommend leaving the entrypoint as is and instead starting from your
Application.module
. As for Kafka in the suspending world, it’s worth checking here.
l

Levi

12/04/2023, 4:01 PM
Thanks a lot. Will check it out.
🤝 1