Asaf Peleg
09/18/2023, 7:50 PMGlobalScope.launch(<http://Dispatchers.IO|Dispatchers.IO>)
in my server and starting a Kafka message consumer that polls/processes in the job.
I'd like to subscribe multiple consumers (different groups) and I'm wondering if it's save to execute multiple blocks of GlobalScope.launch(<http://Dispatchers.IO|Dispatchers.IO>)
(one for each consumer).
Are there limits/concerns on that?hfhbd
09/18/2023, 7:55 PMAsaf Peleg
09/18/2023, 7:56 PMAsaf Peleg
09/18/2023, 7:57 PMAsaf Peleg
09/18/2023, 7:57 PMsimon.vergauwen
09/19/2023, 8:48 AMCoroutineScope
of the Application
?
Otherwise doesn't Spring offer a CoroutineScope
that has the same lifespan as the server?Hakon Grotte
09/19/2023, 9:26 AMval kafkaConsumerContext = newSingleThreadContext("KafkaConsumer")
Then, similar to what Simon suggests, using the CoroutineScope
of the Ktor Application
I launch a job with the thread contrext:
launch(kafkaConsumerContext){ //consume kafka topic }
.
Finally, I close the thread context during the ApplicationStopPreparing
event from Ktor:
environment.monitor.subscribe(ApplicationStopPreparing) {
kafkaConsumerContext.close()
}
Asaf Peleg
09/19/2023, 4:17 PMlaunch { }
does not return it will run as long as the application is running.
2. What are the specific benefits of using the Application
CoroutineScope
vs. GlobalScope
. I'm guessing they are both "global" but there is better control/predictability over the Application
one since it's bound to application itself?
3. Does it use <http://Dispatchers.IO|Dispatchers.IO>
by default, is launch { }
equivalent to launch(<http://Dispatchers.IO|Dispatchers.IO>) {}
?Asaf Peleg
09/19/2023, 4:31 PMnewSingleThreadContext
accomplishes the "One Consumer Per Thread" approach recommended there. I just had some follow up questions about it.
1. Just to be clear, I don't want consume messages from the same topic & group at the same time. In other words I don't want to launch KafkaConsumer(topic: MyTopic , group: GroupA)
more than once but rather something more like coroutine 1: KafkaConsumer(topic: MyTopic , group: GroupA)
, coroutine 2: KafkaConsumer(topic: MyTopic , group: GroupB)
(two different groups) . From the documentation it seems like the thread safety involves spawning multiple consumers that are identical on the same thread (hence ConcurrentModificationException
from the docs). In my case does that mean I don't need to worry about the thread-safety issue since each consumer won't step on the other ones?
2. Depending on the answer to the above question, if it's still advisable to use newSingleThreadContext
is that similar to creating a Thread
in Java land? In other words am I losing the benefits of basic benefits of coroutines vs. threads in terms of resource consumption? This seems to be the case according to the functions documentation . I know there might not be a clean answer for this but do you have any general rule of thumb on how many of these you seem to be able to launch on a per/CPU amount?simon.vergauwen
09/19/2023, 7:36 PM1. From the documentation it seems that this scope shares the same lifecycle of Application (it won't shut down the coroutine unless the application also stops) but just wanted to confirm? In other words if the execution inside ofThat is correct, thedoes not return it will run as long as the application is running.launch { }
launch
will run for as long as the server will run. The CoroutineScope
gets cancelled when you call stop
on the server.
1. What are the specific benefits of using theHere is an article by Roman Elizarov why you should not useApplication
vs.CoroutineScope
. I'm guessing they are both "global" but there is better control/predictability over theGlobalScope
one since it's bound to application itself?Application
GlobalScope
, https://elizarov.medium.com/the-reason-to-avoid-globalscope-835337445abc. It's actually "upgraded" to DelicateCoroutinesApi
, Ktor uses itself inside the codebase. Yet, It's still say avoid it and use Application
instead.
I personally rely on CoroutineScope
and structured concurrency for graceful shutdown, https://github.com/arrow-kt/suspendapp#suspendapp-with-kafka. GlobalScope
doesn't respect structured concurrency, so it's effectively the same as launch and forget.
1. Does it useNo, afaik it'll result inby default, is<http://Dispatchers.IO|Dispatchers.IO>
equivalent tolaunch { }
?launch(<http://Dispatchers.IO|Dispatchers.IO>) {}
Dispatchers.Default
.
To chime in on the other questions:
1. Just to be clear, I don't want consume messages from the same topic & group at the same time. In other words I don't want to launchI think @Hakon Grotte meant that a created consumer's methods need to be called from the same thread that initialised the consumer. So in your case you'd need two separate consumers, with two separatemore than once but rather something more like coroutine 1:KafkaConsumer(topic: MyTopic , group: GroupA)
, coroutine 2:KafkaConsumer(topic: MyTopic , group: GroupA)
(two different groups) . From the documentation it seems like the thread safety involves spawning multiple consumers that are identical on the same thread (henceKafkaConsumer(topic: MyTopic , group: GroupB)
from the docs). In my case does that mean I don't need to worry about the thread-safety issue since each consumer won't step on the other ones?ConcurrentModificationException
newSingleThreadContext
.
Yes, that's correct. Thesimon.vergauwen
09/19/2023, 7:39 PM1. Depending on the answer to the above question, if it's still advisable to useNo, it's not the equivalent ofis that similar to creating anewSingleThreadContext
in Java land? In other words am I losing the benefits of basic benefits of coroutines vs. threads in terms of resource consumption? This seems to be the case according to the functions documentation . I know there might not be a clean answer for this but do you have any general rule of thumb on how many of these you seem to be able to launch on a per/CPU amount?Thread
Thread
. It's the equivalent of Executors.newSingleThreadExecutor
, which means it's a ExeuctorService
backed by a single thread. You don't lose the benefits from coroutines. What you might want to do is process the records on a different pool in parallel.
I personally like using libraries like reactor-kafka, or my kotlin-kafka port (which will reach 1.0 by end of this year, but it production ready) because it abstracts away a lot of these concerns and allows you to more easily process records in parallel by leveraging Flow
and removing the low-level details of the single threaded consumers.Asaf Peleg
09/19/2023, 10:28 PMlaunch { KafkaReceiver(settings)... }
in the Application
scope multiple times without having to concern myself with thread-safety, etc?simon.vergauwen
09/20/2023, 5:53 AMTriggering multiple Kafka consumers at the same time in a Ktor server) would I simply be able to doYes, that's exactly what I do.in thelaunch { KafkaReceiver(settings)... }
scope multiple times without having to concern myself with thread-safety, etc?Application
Hakon Grotte
09/20/2023, 12:21 PMsimon.vergauwen
09/20/2023, 12:46 PMKafkaReceiver
uses KafkaConsumer
but all of that complexity is hidden awayAsaf Peleg
09/20/2023, 2:37 PMAsaf Peleg
09/22/2023, 4:11 AM./bin/kafka-consumer-groups.sh
to inspect the group it shows that the Current Offset and Lag are both empty/null.
Sharing relevant code snippets below:Asaf Peleg
09/22/2023, 4:15 AMAsaf Peleg
09/22/2023, 4:20 AM./bin/kafka-consumer-groups.sh --bootstrap-server $bootStrapServer --describe --group asaf-testing
outputs ->
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
asaf-testing myTopic 0 - 25 -
Asaf Peleg
09/22/2023, 7:34 PMKafkaReceiver(settings).receive(myTopic).map { record ->
record.offset.acknowledge()
MyDataClass(record.value())
}.collect(::println)
I observed that the CURRENT-OFFSET and LAG got set appropriately.Asaf Peleg
09/22/2023, 8:02 PMMyDataClass
object through another function and if it fails skip calling ACK.
I adjust the code a bit more to test that and rearranged it to work like this
KafkaReceiver(settings).receive(myTopic).map { record ->
val tripRecord = MyDataClass(record.value())
foo(tripRecord) // process stuff
record
}.collect { it.offset.acknowledge() }
I'm new to flows and I'm wondering is there a better/more efficient way to ACK the Record if the processing is done properly?
I suppose maybe my question boils down to if I'm potentially abusing the flow by calling IO heavy work inside of the map function?Asaf Peleg
09/22/2023, 9:10 PMio.github.nomisRev.kafka.receiver.AutoOffsetReset
in my code
Warning:(6, 42) 'typealias AutoOffsetReset = io.github.nomisRev.kafka.AutoOffsetReset' uses 'AutoOffsetReset', which is deprecated. Use io.github.nomisRev.kafka.receiver.AutoOffsetReset instead
simon.vergauwen
09/25/2023, 7:02 AMsimon.vergauwen
09/25/2023, 7:12 AMIO
work your best bet is probably to use flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
and stream on the IO
dispatcher entirely.
KafkaReceiver(settings).receive(myTopic).map { record ->
val tripRecord = MyDataClass(record.value())
foo(tripRecord) // process stuff
record
}.flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
.collect { it.offset.acknowledge() }
The acknowledge
is done by the internals of kotlin-kafka
and happens on the kafka specific threading anyway.
You could keep the code as is, or do:
KafkaReceiver(settings).receive(myTopic).map { record ->
val tripRecord = MyDataClass(record.value())
foo(tripRecord) // process stuff
.also { record.offset.acknowledge() }
}.flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
.collect { /* result of foo */ }
This way if foo
returns any data you want to further transform, or do something with you compose further Flow
operators below.Asaf Peleg
09/25/2023, 2:36 PMflowOn(<http://Dispatchers.IO|Dispatchers.IO>)
applies to the code in the collect but it's actually making the code running in map more efficient.
Just to make sure I'm on the same page I was implying that foo
is what will be doing IO bound stuff such as making API requests or mutating storage.simon.vergauwen
09/25/2023, 3:48 PMcollect
is called in the "current" context.
withContext(Dispatchers.Main) {
val singleValue = intFlow // will be executed on IO if context wasn't specified before
.map { ... } // Will be executed in IO
.flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
.filter { ... } // Will be executed in Default
.flowOn(Dispatchers.Default)
.single() // Will be executed in the Main
}
Asaf Peleg
09/25/2023, 4:42 PMAsaf Peleg
09/25/2023, 4:43 PMAsaf Peleg
09/25/2023, 4:52 PMlaunch(<http://Dispatchers.IO|Dispatchers.IO>) {
val settings = receiverSettings() // A. Uses IO because in context of launch
KafkaReceiver(settings)
.receive(myTopic).map { } // B. Uses IO b/c of flowOn right below
.flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
.collect { } // C. Uses IO because in context of launch
}
I labeled the executing code sections above w/ A
B
and C
to see if I got the correct dispatcher correctly.
If that is correct what does it mean for for a flow to use an additional coroutine/dispatcher inside of an existing coroutine. Is it as simple as creating a child coroutine inside of a parent coroutine?simon.vergauwen
09/25/2023, 5:36 PMIO
. So basically collect
creates a new coroutine inheriting the coroutineContext
from where it's being called, and it runs the Flow
on said newly created coroutine. flowOn
is used to switch any upstream to a different context
.
If that is correct what does it mean for for a flow to use an additional coroutine/dispatcher inside of an existing coroutine. Is it as simple as creating a child coroutine inside of a parent coroutine?I'm not entirely sure I understand your question 🤔 If you mean how does
flowOn
work, the answer is it depends based on which optimisation it can apply. If the existing Flow
can get away without creating a child coroutine it will, otherwise it will indeed create a child coroutine of the coroutine that was already created for Flow#collect
.
this has all been super helpful, thank you again so much for all the help and the Kafka library.My pleasure ☺️
Asaf Peleg
09/25/2023, 5:38 PMAsaf Peleg
09/25/2023, 5:39 PMflowOn
since it's parent dispatcher is IO
or does it still confer some improvement?simon.vergauwen
09/25/2023, 5:40 PMIO
since the code is already running on IO
.simon.vergauwen
09/25/2023, 5:41 PMIO
thread. So KafkaReceiver
polls it on a kafka-thread, and then immediately hands it over to the <http://Dispatchers.IO|Dispatchers.IO>
for your processing.Asaf Peleg
09/25/2023, 5:43 PMIO
is an appropriate dispatcher to wrap the KafkaReceiver
but I guess what you're saying is that doing it's own threading stuff under the hood then.simon.vergauwen
09/25/2023, 5:47 PMKafkaConsumer
needs to work on a single Thread
which is managed internally by KafkaReceiver
.
TL;DR KafkaReceiver
creates a newSingleThreadContext
and creates KafkaConsumer
on it. It calls all KafkaConsumer
methods on it, and sends them to the Flow
through a ChannelFlow
.Asaf Peleg
09/25/2023, 5:48 PMAsaf Peleg
09/27/2023, 2:16 AMack
on them to test retry logic.
I observed that with the sample code we have been using it will not commit the offset.
From here there were two scenarios where I could use some advice:
1. If this is the latest message on the topic the receiver does not retry that message. I'm only able to process the message again if I restart the server.
2. If another message is added to the topic the receiver will poll it and if I ACK that message it will push the offset past the one I did not ACK.
What's the correct approach, should I explicitly cancel the flow and restart it in a tight while loop or is there something I'm missing?Asaf Peleg
09/27/2023, 3:40 AMlaunch(<http://Dispatchers.IO|Dispatchers.IO>) {
val settings = receiverSettings()
while (true) { // If flow exits, start again
KafkaReceiver(settings).receive(myTopic).map { record ->
val myDataRecord = MyDataClass(record.value())
<http://logger.info|logger.info> { myDataRecord }
val result = foo(myDataRecord) // example processor returns <Boolean?>
result?.let {
record.offset.acknowledge() // ACK message if processor returns non-null
}
result
}.takeWhile { // Cancel flow if processor returns null
it != null
}.collect { <http://logger.info|logger.info> { "Processed: $it" } }
<http://logger.info|logger.info> { "Will Retry Now" }
}
}
}