Hello all, I am trying to use coroutines with kafk...
# announcements
r
Hello all, I am trying to use coroutines with kafka but to no avail. Following an article, I did:
Copy code
suspend inline fun <reified K : Any, reified V : Any> KafkaProducer<K, V>.dispatch(record: ProducerRecord<K, V>) =
    suspendCoroutine<RecordMetadata> { continuation ->
        val callback = Callback { metadata, exception ->
            if (metadata == null) {
                continuation.resumeWithException(exception!!)
            } else {
                continuation.resume(metadata)
            }
        }
        this.send(record, callback)
    }
and then:
Copy code
fun main(args: Array<String>) {
    val bootstrapServer = "localhost:9092"
    val properties: Properties = Properties();
    properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer)
    properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java.name)
    properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java.name)

    val record: ProducerRecord<String, String> = ProducerRecord("kafka-learning", "testing KAFKA COROUTINE")

    val producer = KafkaProducer<String, String>(properties)

    GlobalScope.async {
        producer.dispatch(record)
    }
}
But no message is sent. What am I doing wrong?
t
This is likely because the JVM terminates before the producer could dispatch the message. Coroutines that are run with
GlobalScope
are scheduled to be run on a pool of threads, thus the end of your
main
function is reached before the code in
async
is executed.
r
And what should I do? Could you help?
t
I don't know much about how Kafka works, but if it works like a regular JVM program, then you could try the following:
Copy code
fun main(args: Array<String>) = runBlocking<Unit> {
    [...]
    producer.dispatch(record)
}
The
runBlocking
function has been designed for this specific case, i.e. bridge the main function with the coroutine world.
r
It worked. Thanks.
t
Note that when using
runBlocking
the thread that calls it is blocked, hence the name.
r
Is this the best way to use coroutine? Or is there another way to call asyn?
d
Do you actually want that call to be asynchronous? I thought you'd want it to finish before
main
exits. For that specific snippet changing
GlobalScope.async
to
runBlocking
is the one of best ways (You could also do suspend main).