Hey. I'm trying to launch kafka streams from Suspe...
# arrow
a
Hey. I'm trying to launch kafka streams from SuspendApp. Most importantly I want to register the shutdown hooks so that the stream threads aren't closed before committing any offsets. I (naively) tried wrapping KafkaStreams as a Resource, but this is closed immediately (my assumption here is that this is because it isn't closeable 🤷). Is there a way to wrap KafkaStreams as a long-running resource and allow it to close on SIGINT in a proper maner? This is my test-code:
Copy code
fun main() = SuspendApp {
    resourceScope {
        streamsSettings().onRight { settings ->
            resource {
                kafkaStreams(settings) {
                    stream<String, String>("foo")
                        .peek { key, value -> println("Received record on 'foo': [key=$key] [value=$value]") }
                        .to("bar")
                }
            }
        }
        awaitCancellation()
    }
}


suspend fun ResourceScope.kafkaStreams(settings: StreamsSettings, builder: StreamsBuilder.() -> Unit): KafkaStreams =
    install({
        val topology: Topology = StreamsBuilder().also { builder(it) }.build()
        KafkaStreams(topology, settings.properties()).also { it.start() }
    }) { k, _ ->
        k.close(Duration.ofMillis(10_000))
    }

private fun streamsSettings(): Either<String, StreamsSettings> = either {
    StreamsSettings(
        applicationId = "kafka-sc",
        boostrapServer = "localhost:9092",
        defaultKeySerde = Serdes.String(),
        defaultValueSerde = Serdes.String(),
    )
}
Any suggestions? Or is this "pointless" since KS is non-reactive? My (potentially wrong) assumption was that using structured concurrency to launch the KS-instance would still provide benefits wrt closing the stream-threads.
s
Hey @Anders Kirkeby, Why are you wrapping in an additional
resource { }
block? 🤔 That turns it into a lazy value, and shouldn't be needed here. I am not entirely sure how
KS
works, does it automatically take care of comitting offsets on
close
? I know with the Java SDK it requires quite some machinery to do this correctly. I've tried this pattern with regular Kafka Java SDK, and written a small wrapper around KotlinX Flow to facilitate this. https://github.com/nomisRev/kotlin-kafka
Also it seems you're not raising any errors in
streamSettings()
unless you changed it to pseudo code here.
a
Ah, good catch. The extra resource wrapper might not be needed. Ye - had a look at the consumer/producer wrappers you wrote, but since my intention was to use kafka-streams I didn't get very far with it. Ye the stream-settings is a WIP-implementation (the intention is to load this from a yaml file so not quite there yet).
s
close
doesn't seem to commit any offsets though 🤔 I am not entirely sure how this is dealt with with
KafkaStreams
, it's been some time since I looked/worked with it. I've been working mostly with other streaming services or Kafka Java SDK.
a
Removing the resource seems to have fixed the issue where the exit callback would be invoked immediately. However closing the application with SIGINT/SIGTERM doesn't run the close method
My understanding is that running the close method sets the stream threads in "pending shutdown" witch will finish processing any in-flight records and commit the relevant offsets 🤔
s
Alright, that is not clear to me from the API docs. Hmm, that seems unlikely that the finalizer is not being called. Can you share your updated snippet? Can you see the stream starting and printing values?
a
Here it is
Copy code
fun main() = SuspendApp {
    resourceScope {
        streamsSettings().onRight { settings ->
                kafkaStreams(settings) {
                    stream<String, String>("foo")
                        .peek { key, value -> println("Received record on 'foo': [key=$key] [value=$value]") }
                        .to("bar")
                }
        }
        awaitCancellation()
    }


}

data class StreamsSettings(
    val applicationId: String,
    val boostrapServer: String,
    val defaultKeySerde: Serde<*> = Serdes.String(),
    val defaultValueSerde: Serde<*> = Serdes.String(),
    private val props: Properties? = null
) {
    companion object {
        fun streamsSettings(): Either<String, StreamsSettings> = either {
            StreamsSettings(
                applicationId = "elhub-kafka-poc-sc",
                boostrapServer = "localhost:9092",
                defaultKeySerde = Serdes.String(),
                defaultValueSerde = Serdes.String(),
            )
        }
    }

    fun properties(): Properties = Properties().apply {
        props?.let { putAll(it) }
        put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId)
        put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, boostrapServer)
        put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, defaultKeySerde::class.java)
        put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, defaultValueSerde::class.java)
    }
}

suspend fun ResourceScope.kafkaStreams(settings: StreamsSettings, builder: StreamsBuilder.() -> Unit): KafkaStreams =
    install({
        val topology: Topology = StreamsBuilder().also { builder(it) }.build()
        KafkaStreams(topology, settings.properties()).also {
            println("starting kafka streams")
            it.start()
        }.also {
            println("stream started")
        }
    }) { k, _ ->
        println("Closing Kafka Streams - will close with force after 10 seconds")
        k.close(Duration.ofMillis(30_000))
        println("Closed Kafka Streams - successfully")
    }
added som printlines as well 🤪
this was the output
s
Can you put a
println
before
awaitCancellation
?
Maybe we need to call
start
on a different thread than
main
Hmm, not that should be necessary. 😕
a
Prints 'hello cancellation' on startup so it seems to be in order
s
If you could push this to a repo, I'd be happy to take a look
a
Gladly - need to run som errands first, but will let you know. Thanks!
Not sure what I did differently tbh, but it seems to work now - might just be that logging was borked with the previous one? 🤔 https://github.com/andeki92/kafka-streams-poc-sc Might refine it a bit more later, but seems to be working for now at least 🧐
With the context-receiver however the "this" scope is rather polluted, so might look at some better compositional APIs later
s
TODO: Figure out a way to inline the resourceScope and loggingContext
Copy code
class MyCtx(
  private val scope: ResoureceScope,
  private val logging: LoggingContext
): ResoureceScope by scope, LoggingContext by logging

suspend fun <A> MyCtx(block: suspend MyCtx.() -> A): A =
  resourceScope {
    block(MyCtx(this, loggingContext))
  }
suspend fun MyCtx.blabla(): A
Happy to help with any other questions you might have ☺️
a
Hha, something like that ye 😅 Thanks for helping out!
s
My pleasure!