Hey guys! What's up? Does anyone here have an exam...
# ktor
j
Hey guys! What's up? Does anyone here have an example of using Ktor with Apache Kafka?
h
What is your use-case? Kafka Streams? Kafka Consumer? Kafka Producer? What is the difference to normal Kafka library usage?
j
Kafka Consumer and producer
h
Quick and dirty Kafka Consumer example
server.kt
j
Thanks a lot, @hfhbd
h
This code won't compile, this code is needed too:
Copy code
val inputTopic = "data-events"
val avgTopic = "avgPower"

fun kafkaProperties(port: Int) = Properties().apply {
    this["bootstrap.servers"] = "localhost:$port"
    this["acks"] = "all"
    this["retries"] = 0
    this["batch.size"] = 16384
    this["group.id"] = "test"
    this["<http://linger.ms|linger.ms>"] = 0
    this["buffer.memory"] = 33554432
    this["key.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"
    this["value.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"
    this["key.deserializer"] = "org.apache.kafka.common.serialization.StringDeserializer"
    this["value.deserializer"] = "org.apache.kafka.common.serialization.StringDeserializer"
}
Copy code
fun useTestContainer(): Pair<String, Int> {
    val kafkaServer = KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.1.1")).apply {
        start()
    }
    return kafkaServer.host to kafkaServer.firstMappedPort
}
j
Sensational
h
Topic creation:
Copy code
val (host, port) = useTestContainer() //"localhost" to 9092

        val props = Properties().apply {
            put(StreamsConfig.APPLICATION_ID_CONFIG, "pg")
            put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "$host:$port")
            put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().javaClass)
            put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().javaClass)
        }

        val adminClient = AdminClient.create(props)
        val input = NewTopic(inputTopic, 1, 1)
        val output = NewTopic(outputTopic, 1, 1).apply {
            configs(mapOf("<http://retention.ms|retention.ms>" to "-1", "retention.bytes" to "-1"))
        }
        val avgPower = NewTopic(avgTopic, 1, 1)

        val newTopics = listOf(input, output, avgPower)

        adminClient.createTopics(newTopics)
        adminClient.close()
j
I can't thank you enough
Do you have this project on github?
h
Unfortunately, this is an internal project. If you want, I could publish it later this week
j
I understand, if you can publish it it will help me a lot