Anders Kirkeby
02/28/2023, 9:51 AMfun main() = SuspendApp {
resourceScope {
streamsSettings().onRight { settings ->
resource {
kafkaStreams(settings) {
stream<String, String>("foo")
.peek { key, value -> println("Received record on 'foo': [key=$key] [value=$value]") }
.to("bar")
}
}
}
awaitCancellation()
}
}
suspend fun ResourceScope.kafkaStreams(settings: StreamsSettings, builder: StreamsBuilder.() -> Unit): KafkaStreams =
install({
val topology: Topology = StreamsBuilder().also { builder(it) }.build()
KafkaStreams(topology, settings.properties()).also { it.start() }
}) { k, _ ->
k.close(Duration.ofMillis(10_000))
}
private fun streamsSettings(): Either<String, StreamsSettings> = either {
StreamsSettings(
applicationId = "kafka-sc",
boostrapServer = "localhost:9092",
defaultKeySerde = Serdes.String(),
defaultValueSerde = Serdes.String(),
)
}
Any suggestions? Or is this "pointless" since KS is non-reactive? My (potentially wrong) assumption was that using structured concurrency to launch the KS-instance would still provide benefits wrt closing the stream-threads.simon.vergauwen
02/28/2023, 9:57 AMresource { }
block? 🤔 That turns it into a lazy value, and shouldn't be needed here.
I am not entirely sure how KS
works, does it automatically take care of comitting offsets on close
? I know with the Java SDK it requires quite some machinery to do this correctly.
I've tried this pattern with regular Kafka Java SDK, and written a small wrapper around KotlinX Flow to facilitate this. https://github.com/nomisRev/kotlin-kafkastreamSettings()
unless you changed it to pseudo code here.Anders Kirkeby
02/28/2023, 10:02 AMsimon.vergauwen
02/28/2023, 10:10 AMclose
doesn't seem to commit any offsets though 🤔 I am not entirely sure how this is dealt with with KafkaStreams
, it's been some time since I looked/worked with it. I've been working mostly with other streaming services or Kafka Java SDK.Anders Kirkeby
02/28/2023, 10:10 AMsimon.vergauwen
02/28/2023, 10:14 AMAnders Kirkeby
02/28/2023, 10:15 AMfun main() = SuspendApp {
resourceScope {
streamsSettings().onRight { settings ->
kafkaStreams(settings) {
stream<String, String>("foo")
.peek { key, value -> println("Received record on 'foo': [key=$key] [value=$value]") }
.to("bar")
}
}
awaitCancellation()
}
}
data class StreamsSettings(
val applicationId: String,
val boostrapServer: String,
val defaultKeySerde: Serde<*> = Serdes.String(),
val defaultValueSerde: Serde<*> = Serdes.String(),
private val props: Properties? = null
) {
companion object {
fun streamsSettings(): Either<String, StreamsSettings> = either {
StreamsSettings(
applicationId = "elhub-kafka-poc-sc",
boostrapServer = "localhost:9092",
defaultKeySerde = Serdes.String(),
defaultValueSerde = Serdes.String(),
)
}
}
fun properties(): Properties = Properties().apply {
props?.let { putAll(it) }
put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId)
put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, boostrapServer)
put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, defaultKeySerde::class.java)
put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, defaultValueSerde::class.java)
}
}
suspend fun ResourceScope.kafkaStreams(settings: StreamsSettings, builder: StreamsBuilder.() -> Unit): KafkaStreams =
install({
val topology: Topology = StreamsBuilder().also { builder(it) }.build()
KafkaStreams(topology, settings.properties()).also {
println("starting kafka streams")
it.start()
}.also {
println("stream started")
}
}) { k, _ ->
println("Closing Kafka Streams - will close with force after 10 seconds")
k.close(Duration.ofMillis(30_000))
println("Closed Kafka Streams - successfully")
}
added som printlines as well 🤪simon.vergauwen
02/28/2023, 10:17 AMprintln
before awaitCancellation
?start
on a different thread than main
Anders Kirkeby
02/28/2023, 10:18 AMsimon.vergauwen
02/28/2023, 10:18 AMAnders Kirkeby
02/28/2023, 10:19 AMsimon.vergauwen
02/28/2023, 2:01 PMTODO: Figure out a way to inline the resourceScope and loggingContext
class MyCtx(
private val scope: ResoureceScope,
private val logging: LoggingContext
): ResoureceScope by scope, LoggingContext by logging
suspend fun <A> MyCtx(block: suspend MyCtx.() -> A): A =
resourceScope {
block(MyCtx(this, loggingContext))
}
suspend fun MyCtx.blabla(): A
Anders Kirkeby
02/28/2023, 2:03 PMsimon.vergauwen
02/28/2023, 2:03 PM