https://kotlinlang.org logo
#ktor
Title
# ktor
j

Júlio Santos

06/30/2021, 12:58 PM
Hey guys! What's up? Does anyone here have an example of using Ktor with Apache Kafka?
h

hfhbd

06/30/2021, 1:00 PM
What is your use-case? Kafka Streams? Kafka Consumer? Kafka Producer? What is the difference to normal Kafka library usage?
j

Júlio Santos

06/30/2021, 1:01 PM
Kafka Consumer and producer
h

hfhbd

06/30/2021, 1:09 PM
Quick and dirty Kafka Consumer example
server.kt
j

Júlio Santos

06/30/2021, 1:11 PM
Thanks a lot, @hfhbd
h

hfhbd

06/30/2021, 1:13 PM
This code won't compile, this code is needed too:
Copy code
val inputTopic = "data-events"
val avgTopic = "avgPower"

fun kafkaProperties(port: Int) = Properties().apply {
    this["bootstrap.servers"] = "localhost:$port"
    this["acks"] = "all"
    this["retries"] = 0
    this["batch.size"] = 16384
    this["group.id"] = "test"
    this["<http://linger.ms|linger.ms>"] = 0
    this["buffer.memory"] = 33554432
    this["key.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"
    this["value.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"
    this["key.deserializer"] = "org.apache.kafka.common.serialization.StringDeserializer"
    this["value.deserializer"] = "org.apache.kafka.common.serialization.StringDeserializer"
}
Copy code
fun useTestContainer(): Pair<String, Int> {
    val kafkaServer = KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.1.1")).apply {
        start()
    }
    return kafkaServer.host to kafkaServer.firstMappedPort
}
j

Júlio Santos

06/30/2021, 1:14 PM
Sensational
h

hfhbd

06/30/2021, 1:15 PM
Topic creation:
Copy code
val (host, port) = useTestContainer() //"localhost" to 9092

        val props = Properties().apply {
            put(StreamsConfig.APPLICATION_ID_CONFIG, "pg")
            put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "$host:$port")
            put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().javaClass)
            put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().javaClass)
        }

        val adminClient = AdminClient.create(props)
        val input = NewTopic(inputTopic, 1, 1)
        val output = NewTopic(outputTopic, 1, 1).apply {
            configs(mapOf("<http://retention.ms|retention.ms>" to "-1", "retention.bytes" to "-1"))
        }
        val avgPower = NewTopic(avgTopic, 1, 1)

        val newTopics = listOf(input, output, avgPower)

        adminClient.createTopics(newTopics)
        adminClient.close()
j

Júlio Santos

06/30/2021, 1:15 PM
I can't thank you enough
Do you have this project on github?
h

hfhbd

06/30/2021, 1:45 PM
Unfortunately, this is an internal project. If you want, I could publish it later this week
j

Júlio Santos

06/30/2021, 1:48 PM
I understand, if you can publish it it will help me a lot
2 Views