Júlio Santos
06/30/2021, 12:58 PMhfhbd
06/30/2021, 1:00 PMJúlio Santos
06/30/2021, 1:01 PMhfhbd
06/30/2021, 1:09 PMJúlio Santos
06/30/2021, 1:11 PMhfhbd
06/30/2021, 1:13 PMval inputTopic = "data-events"
val avgTopic = "avgPower"
fun kafkaProperties(port: Int) = Properties().apply {
this["bootstrap.servers"] = "localhost:$port"
this["acks"] = "all"
this["retries"] = 0
this["batch.size"] = 16384
this["group.id"] = "test"
this["<http://linger.ms|linger.ms>"] = 0
this["buffer.memory"] = 33554432
this["key.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"
this["value.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"
this["key.deserializer"] = "org.apache.kafka.common.serialization.StringDeserializer"
this["value.deserializer"] = "org.apache.kafka.common.serialization.StringDeserializer"
}
fun useTestContainer(): Pair<String, Int> {
val kafkaServer = KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.1.1")).apply {
start()
}
return kafkaServer.host to kafkaServer.firstMappedPort
}
Júlio Santos
06/30/2021, 1:14 PMhfhbd
06/30/2021, 1:15 PMval (host, port) = useTestContainer() //"localhost" to 9092
val props = Properties().apply {
put(StreamsConfig.APPLICATION_ID_CONFIG, "pg")
put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "$host:$port")
put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().javaClass)
put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().javaClass)
}
val adminClient = AdminClient.create(props)
val input = NewTopic(inputTopic, 1, 1)
val output = NewTopic(outputTopic, 1, 1).apply {
configs(mapOf("<http://retention.ms|retention.ms>" to "-1", "retention.bytes" to "-1"))
}
val avgPower = NewTopic(avgTopic, 1, 1)
val newTopics = listOf(input, output, avgPower)
adminClient.createTopics(newTopics)
adminClient.close()
Júlio Santos
06/30/2021, 1:15 PMhfhbd
06/30/2021, 1:45 PMJúlio Santos
06/30/2021, 1:48 PM