Hello all, I am trying to use coroutines with kafk...
# announcements
r
Hello all, I am trying to use coroutines with kafka but to no avail. Following an article, I did:
Copy code
suspend inline fun <reified K : Any, reified V : Any> KafkaProducer<K, V>.dispatch(record: ProducerRecord<K, V>) =
    suspendCoroutine<RecordMetadata> { continuation ->
        val callback = Callback { metadata, exception ->
            if (metadata == null) {
                continuation.resumeWithException(exception!!)
            } else {
                continuation.resume(metadata)
            }
        }
        this.send(record, callback)
    }
and then:
Copy code
fun main(args: Array<String>) {
    val bootstrapServer = "localhost:9092"
    val properties: Properties = Properties();
    properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer)
    properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java.name)
    properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java.name)

    val record: ProducerRecord<String, String> = ProducerRecord("kafka-learning", "testing KAFKA COROUTINE")

    val producer = KafkaProducer<String, String>(properties)

    GlobalScope.async {
        producer.dispatch(record)
    }
}
But no message is sent. What am I doing wrong?
t
This is likely because the JVM terminates before the producer could dispatch the message. Coroutines that are run with
GlobalScope
are scheduled to be run on a pool of threads, thus the end of your
main
function is reached before the code in
async
is executed.
r
And what should I do? Could you help?
t
I don't know much about how Kafka works, but if it works like a regular JVM program, then you could try the following:
Copy code
fun main(args: Array<String>) = runBlocking<Unit> {
    [...]
    producer.dispatch(record)
}
The
runBlocking
function has been designed for this specific case, i.e. bridge the main function with the coroutine world.
r
It worked. Thanks.
t
Note that when using
runBlocking
the thread that calls it is blocked, hence the name.
r
Is this the best way to use coroutine? Or is there another way to call asyn?
d
Do you actually want that call to be asynchronous? I thought you'd want it to finish before
main
exits. For that specific snippet changing
GlobalScope.async
to
runBlocking
is the one of best ways (You could also do suspend main).
100 Views