Rodrigo Silva
01/06/2020, 12:21 PMsuspend inline fun <reified K : Any, reified V : Any> KafkaProducer<K, V>.dispatch(record: ProducerRecord<K, V>) =
suspendCoroutine<RecordMetadata> { continuation ->
val callback = Callback { metadata, exception ->
if (metadata == null) {
continuation.resumeWithException(exception!!)
} else {
continuation.resume(metadata)
}
}
this.send(record, callback)
}
and then:
fun main(args: Array<String>) {
val bootstrapServer = "localhost:9092"
val properties: Properties = Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer)
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java.name)
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java.name)
val record: ProducerRecord<String, String> = ProducerRecord("kafka-learning", "testing KAFKA COROUTINE")
val producer = KafkaProducer<String, String>(properties)
GlobalScope.async {
producer.dispatch(record)
}
}
But no message is sent. What am I doing wrong?tseisel
01/06/2020, 12:56 PMGlobalScope
are scheduled to be run on a pool of threads, thus the end of your main
function is reached before the code in async
is executed.Rodrigo Silva
01/06/2020, 12:58 PMtseisel
01/06/2020, 1:03 PMfun main(args: Array<String>) = runBlocking<Unit> {
[...]
producer.dispatch(record)
}
The runBlocking
function has been designed for this specific case, i.e. bridge the main function with the coroutine world.Rodrigo Silva
01/06/2020, 1:30 PMtseisel
01/06/2020, 1:49 PMrunBlocking
the thread that calls it is blocked, hence the name.Rodrigo Silva
01/06/2020, 2:17 PMDominaezzz
01/06/2020, 2:51 PMmain
exits.
For that specific snippet changing GlobalScope.async
to runBlocking
is the one of best ways (You could also do suspend main).