https://kotlinlang.org logo
Title
a

Anders Kirkeby

02/28/2023, 9:51 AM
Hey. I'm trying to launch kafka streams from SuspendApp. Most importantly I want to register the shutdown hooks so that the stream threads aren't closed before committing any offsets. I (naively) tried wrapping KafkaStreams as a Resource, but this is closed immediately (my assumption here is that this is because it isn't closeable 🤷). Is there a way to wrap KafkaStreams as a long-running resource and allow it to close on SIGINT in a proper maner? This is my test-code:
fun main() = SuspendApp {
    resourceScope {
        streamsSettings().onRight { settings ->
            resource {
                kafkaStreams(settings) {
                    stream<String, String>("foo")
                        .peek { key, value -> println("Received record on 'foo': [key=$key] [value=$value]") }
                        .to("bar")
                }
            }
        }
        awaitCancellation()
    }
}


suspend fun ResourceScope.kafkaStreams(settings: StreamsSettings, builder: StreamsBuilder.() -> Unit): KafkaStreams =
    install({
        val topology: Topology = StreamsBuilder().also { builder(it) }.build()
        KafkaStreams(topology, settings.properties()).also { it.start() }
    }) { k, _ ->
        k.close(Duration.ofMillis(10_000))
    }

private fun streamsSettings(): Either<String, StreamsSettings> = either {
    StreamsSettings(
        applicationId = "kafka-sc",
        boostrapServer = "localhost:9092",
        defaultKeySerde = Serdes.String(),
        defaultValueSerde = Serdes.String(),
    )
}
Any suggestions? Or is this "pointless" since KS is non-reactive? My (potentially wrong) assumption was that using structured concurrency to launch the KS-instance would still provide benefits wrt closing the stream-threads.
s

simon.vergauwen

02/28/2023, 9:57 AM
Hey @Anders Kirkeby, Why are you wrapping in an additional
resource { }
block? 🤔 That turns it into a lazy value, and shouldn't be needed here. I am not entirely sure how
KS
works, does it automatically take care of comitting offsets on
close
? I know with the Java SDK it requires quite some machinery to do this correctly. I've tried this pattern with regular Kafka Java SDK, and written a small wrapper around KotlinX Flow to facilitate this. https://github.com/nomisRev/kotlin-kafka
Also it seems you're not raising any errors in
streamSettings()
unless you changed it to pseudo code here.
a

Anders Kirkeby

02/28/2023, 10:02 AM
Ah, good catch. The extra resource wrapper might not be needed. Ye - had a look at the consumer/producer wrappers you wrote, but since my intention was to use kafka-streams I didn't get very far with it. Ye the stream-settings is a WIP-implementation (the intention is to load this from a yaml file so not quite there yet).
s

simon.vergauwen

02/28/2023, 10:10 AM
close
doesn't seem to commit any offsets though 🤔 I am not entirely sure how this is dealt with with
KafkaStreams
, it's been some time since I looked/worked with it. I've been working mostly with other streaming services or Kafka Java SDK.
a

Anders Kirkeby

02/28/2023, 10:10 AM
Removing the resource seems to have fixed the issue where the exit callback would be invoked immediately. However closing the application with SIGINT/SIGTERM doesn't run the close method
My understanding is that running the close method sets the stream threads in "pending shutdown" witch will finish processing any in-flight records and commit the relevant offsets 🤔
s

simon.vergauwen

02/28/2023, 10:14 AM
Alright, that is not clear to me from the API docs. Hmm, that seems unlikely that the finalizer is not being called. Can you share your updated snippet? Can you see the stream starting and printing values?
a

Anders Kirkeby

02/28/2023, 10:15 AM
Here it is
fun main() = SuspendApp {
    resourceScope {
        streamsSettings().onRight { settings ->
                kafkaStreams(settings) {
                    stream<String, String>("foo")
                        .peek { key, value -> println("Received record on 'foo': [key=$key] [value=$value]") }
                        .to("bar")
                }
        }
        awaitCancellation()
    }


}

data class StreamsSettings(
    val applicationId: String,
    val boostrapServer: String,
    val defaultKeySerde: Serde<*> = Serdes.String(),
    val defaultValueSerde: Serde<*> = Serdes.String(),
    private val props: Properties? = null
) {
    companion object {
        fun streamsSettings(): Either<String, StreamsSettings> = either {
            StreamsSettings(
                applicationId = "elhub-kafka-poc-sc",
                boostrapServer = "localhost:9092",
                defaultKeySerde = Serdes.String(),
                defaultValueSerde = Serdes.String(),
            )
        }
    }

    fun properties(): Properties = Properties().apply {
        props?.let { putAll(it) }
        put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId)
        put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, boostrapServer)
        put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, defaultKeySerde::class.java)
        put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, defaultValueSerde::class.java)
    }
}

suspend fun ResourceScope.kafkaStreams(settings: StreamsSettings, builder: StreamsBuilder.() -> Unit): KafkaStreams =
    install({
        val topology: Topology = StreamsBuilder().also { builder(it) }.build()
        KafkaStreams(topology, settings.properties()).also {
            println("starting kafka streams")
            it.start()
        }.also {
            println("stream started")
        }
    }) { k, _ ->
        println("Closing Kafka Streams - will close with force after 10 seconds")
        k.close(Duration.ofMillis(30_000))
        println("Closed Kafka Streams - successfully")
    }
added som printlines as well 🤪
this was the output
s

simon.vergauwen

02/28/2023, 10:17 AM
Can you put a
println
before
awaitCancellation
?
Maybe we need to call
start
on a different thread than
main
Hmm, not that should be necessary. 😕
a

Anders Kirkeby

02/28/2023, 10:18 AM
Prints 'hello cancellation' on startup so it seems to be in order
s

simon.vergauwen

02/28/2023, 10:18 AM
If you could push this to a repo, I'd be happy to take a look
a

Anders Kirkeby

02/28/2023, 10:19 AM
Gladly - need to run som errands first, but will let you know. Thanks!
Not sure what I did differently tbh, but it seems to work now - might just be that logging was borked with the previous one? 🤔 https://github.com/andeki92/kafka-streams-poc-sc Might refine it a bit more later, but seems to be working for now at least 🧐
With the context-receiver however the "this" scope is rather polluted, so might look at some better compositional APIs later
s

simon.vergauwen

02/28/2023, 2:01 PM
TODO: Figure out a way to inline the resourceScope and loggingContext
class MyCtx(
  private val scope: ResoureceScope,
  private val logging: LoggingContext
): ResoureceScope by scope, LoggingContext by logging

suspend fun <A> MyCtx(block: suspend MyCtx.() -> A): A =
  resourceScope {
    block(MyCtx(this, loggingContext))
  }
suspend fun MyCtx.blabla(): A
Happy to help with any other questions you might have ☺️
a

Anders Kirkeby

02/28/2023, 2:03 PM
Hha, something like that ye 😅 Thanks for helping out!
s

simon.vergauwen

02/28/2023, 2:03 PM
My pleasure!