I'm using `GlobalScope.launch(<http://Dispatchers....
# ktor
a
I'm using
GlobalScope.launch(<http://Dispatchers.IO|Dispatchers.IO>)
in my server and starting a Kafka message consumer that polls/processes in the job. I'd like to subscribe multiple consumers (different groups) and I'm wondering if it's save to execute multiple blocks of
GlobalScope.launch(<http://Dispatchers.IO|Dispatchers.IO>)
(one for each consumer). Are there limits/concerns on that?
h
What do you mean with safe? It will work, but using GlobalScope is an anti-pattern, and you should use it rarely.
a
That's why I was wondering if it's safe b/c I see threads/documentation to be very careful with it which I'm trying to appreciate 🙂
I feel as though my use case is appropriate for it. My server's purpose is to consume & process Kafka messages from the moment it starts to the time that it shuts down.
but I'm totally open to alternative approaches
s
Are you using Ktor? Why not use the
CoroutineScope
of the
Application
? Otherwise doesn't Spring offer a
CoroutineScope
that has the same lifespan as the server?
h
My current approach is creating a dedicated, single thread context for the kafka consumer (as it is not thread safe):
val kafkaConsumerContext = newSingleThreadContext("KafkaConsumer")
Then, similar to what Simon suggests, using the
CoroutineScope
of the Ktor
Application
I launch a job with the thread contrext:
launch(kafkaConsumerContext){ //consume kafka topic }
. Finally, I close the thread context during the
ApplicationStopPreparing
event from Ktor:
Copy code
environment.monitor.subscribe(ApplicationStopPreparing) {
        kafkaConsumerContext.close()
    }
a
thank you both I am using Ktor. A couple of questions about the CoroutineScope of the Application 1. From the documentation it seems that this scope shares the same lifecycle of Application (it won't shut down the coroutine unless the application also stops) but just wanted to confirm? In other words if the execution inside of
launch { }
does not return it will run as long as the application is running. 2. What are the specific benefits of using the
Application
CoroutineScope
vs.
GlobalScope
. I'm guessing they are both "global" but there is better control/predictability over the
Application
one since it's bound to application itself? 3. Does it use
<http://Dispatchers.IO|Dispatchers.IO>
by default, is
launch { }
equivalent to
launch(<http://Dispatchers.IO|Dispatchers.IO>) {}
?
@Hakon Grotte thanks for pointing out the thread-safety issue of the Kafka consumer. I was reading through the documentation and I guess using
newSingleThreadContext
accomplishes the "One Consumer Per Thread" approach recommended there. I just had some follow up questions about it. 1. Just to be clear, I don't want consume messages from the same topic & group at the same time. In other words I don't want to launch
KafkaConsumer(topic: MyTopic , group: GroupA)
more than once but rather something more like coroutine 1:
KafkaConsumer(topic: MyTopic , group: GroupA)
, coroutine 2:
KafkaConsumer(topic: MyTopic , group: GroupB)
(two different groups) . From the documentation it seems like the thread safety involves spawning multiple consumers that are identical on the same thread (hence
ConcurrentModificationException
from the docs). In my case does that mean I don't need to worry about the thread-safety issue since each consumer won't step on the other ones? 2. Depending on the answer to the above question, if it's still advisable to use
newSingleThreadContext
is that similar to creating a
Thread
in Java land? In other words am I losing the benefits of basic benefits of coroutines vs. threads in terms of resource consumption? This seems to be the case according to the functions documentation . I know there might not be a clean answer for this but do you have any general rule of thumb on how many of these you seem to be able to launch on a per/CPU amount?
s
1. From the documentation it seems that this scope shares the same lifecycle of Application (it won't shut down the coroutine unless the application also stops) but just wanted to confirm? In other words if the execution inside of
launch { }
does not return it will run as long as the application is running.
That is correct, the
launch
will run for as long as the server will run. The
CoroutineScope
gets cancelled when you call
stop
on the server.
1. What are the specific benefits of using the
Application
CoroutineScope
vs.
GlobalScope
. I'm guessing they are both "global" but there is better control/predictability over the
Application
one since it's bound to application itself?
Here is an article by Roman Elizarov why you should not use
GlobalScope
, https://elizarov.medium.com/the-reason-to-avoid-globalscope-835337445abc. It's actually "upgraded" to
DelicateCoroutinesApi
, Ktor uses itself inside the codebase. Yet, It's still say avoid it and use
Application
instead. I personally rely on
CoroutineScope
and structured concurrency for graceful shutdown, https://github.com/arrow-kt/suspendapp#suspendapp-with-kafka.
GlobalScope
doesn't respect structured concurrency, so it's effectively the same as launch and forget.
1. Does it use
<http://Dispatchers.IO|Dispatchers.IO>
by default, is
launch { }
equivalent to
launch(<http://Dispatchers.IO|Dispatchers.IO>) {}
?
No, afaik it'll result in
Dispatchers.Default
. To chime in on the other questions:
1. Just to be clear, I don't want consume messages from the same topic & group at the same time. In other words I don't want to launch
KafkaConsumer(topic: MyTopic , group: GroupA)
more than once but rather something more like coroutine 1:
KafkaConsumer(topic: MyTopic , group: GroupA)
, coroutine 2:
KafkaConsumer(topic: MyTopic , group: GroupB)
(two different groups) . From the documentation it seems like the thread safety involves spawning multiple consumers that are identical on the same thread (hence
ConcurrentModificationException
from the docs). In my case does that mean I don't need to worry about the thread-safety issue since each consumer won't step on the other ones?
I think @Hakon Grotte meant that a created consumer's methods need to be called from the same thread that initialised the consumer. So in your case you'd need two separate consumers, with two separate
newSingleThreadContext
. Yes, that's correct. The
👍 1
1. Depending on the answer to the above question, if it's still advisable to use
newSingleThreadContext
is that similar to creating a
Thread
in Java land? In other words am I losing the benefits of basic benefits of coroutines vs. threads in terms of resource consumption? This seems to be the case according to the functions documentation . I know there might not be a clean answer for this but do you have any general rule of thumb on how many of these you seem to be able to launch on a per/CPU amount?
No, it's not the equivalent of
Thread
. It's the equivalent of
Executors.newSingleThreadExecutor
, which means it's a
ExeuctorService
backed by a single thread. You don't lose the benefits from coroutines. What you might want to do is process the records on a different pool in parallel. I personally like using libraries like reactor-kafka, or my kotlin-kafka port (which will reach 1.0 by end of this year, but it production ready) because it abstracts away a lot of these concerns and allows you to more easily process records in parallel by leveraging
Flow
and removing the low-level details of the single threaded consumers.
a
Thank you Simon, greatly appreciate your patience and time to answering my questions. I'll dig into the resources you shared to get a better understanding of your answers however from first glance your kotlin-kafka flow library seems interesting if it helps me with the concerns I've raised. I was looking at the example in the README (given the high level goals that I've shared: triggering multiple Kafka consumers at the same time in a Ktor server) would I simply be able to do
launch { KafkaReceiver(settings)... }
in the
Application
scope multiple times without having to concern myself with thread-safety, etc?
s
You're very welcome! Happy to help
Triggering multiple Kafka consumers at the same time in a Ktor server) would I simply be able to do
launch { KafkaReceiver(settings)... }
in the
Application
scope multiple times without having to concern myself with thread-safety, etc?
Yes, that's exactly what I do.
h
Great answers @simon.vergauwen 🙌 Does that mean that KafkaReceiver from reactor-kafka does not share the same "One Consumer Per Thread" requirement as the "blocking" KafkaConsumer?
s
@Hakon Grotte, it does. Under the hood
KafkaReceiver
uses
KafkaConsumer
but all of that complexity is hidden away
👌 1
a
awesome, I'm going to start testing it out
Hi Simon, I've been playing around with your library today and I think I've got my POC mostly working. I had some fun configuring my deserializer and making it work with the type-safe settings object since I'm doing some off-beat stuff: AWS MSK, IAM authentication, Avro Registry using AWS Glue 🤪 I see the messages being consumed if I drop one into my subscribed topic but for some reason I'm not sure if it's committing the messages because when I run
./bin/kafka-consumer-groups.sh
to inspect the group it shows that the Current Offset and Lag are both empty/null. Sharing relevant code snippets below:
test-kotlin-kafka.kt
./bin/kafka-consumer-groups.sh --bootstrap-server $bootStrapServer --describe --group asaf-testing
outputs ->
Copy code
GROUP                TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG 
asaf-testing         myTopic         0          -               25              -
Okay this is my mistake coming from using the low level KafkaConsumer class which I guess was implicitly auto-acknowledging messages on my behalf. when I altered my code to this...
Copy code
KafkaReceiver(settings).receive(myTopic).map { record ->
      record.offset.acknowledge()
      MyDataClass(record.value())
    }.collect(::println)
I observed that the CURRENT-OFFSET and LAG got set appropriately.
This code example above was just a simple test but I'd also like to process the
MyDataClass
object through another function and if it fails skip calling ACK. I adjust the code a bit more to test that and rearranged it to work like this
Copy code
KafkaReceiver(settings).receive(myTopic).map { record ->
      val tripRecord = MyDataClass(record.value())
      foo(tripRecord) // process stuff
      record
    }.collect { it.offset.acknowledge() }
I'm new to flows and I'm wondering is there a better/more efficient way to ACK the Record if the processing is done properly? I suppose maybe my question boils down to if I'm potentially abusing the flow by calling IO heavy work inside of the map function?
One last thing that you might be aware of, the type alias for the AutoOffsetReset still causes deprecation warnings even though I'm importing
io.github.nomisRev.kafka.receiver.AutoOffsetReset
in my code
Warning:(6, 42) 'typealias AutoOffsetReset = io.github.nomisRev.kafka.AutoOffsetReset' uses 'AutoOffsetReset', which is deprecated. Use io.github.nomisRev.kafka.receiver.AutoOffsetReset instead
s
Thanks @Asaf Peleg for the reminder. That was indeed incorrect, it's not actually deprecated, it will just be changing packages.
To answer your other question: If you're doing a lot of
IO
work your best bet is probably to use
flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
and stream on the
IO
dispatcher entirely.
Copy code
KafkaReceiver(settings).receive(myTopic).map { record ->
      val tripRecord = MyDataClass(record.value())
      foo(tripRecord) // process stuff
      record
    }.flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
     .collect { it.offset.acknowledge() }
The
acknowledge
is done by the internals of
kotlin-kafka
and happens on the kafka specific threading anyway. You could keep the code as is, or do:
Copy code
KafkaReceiver(settings).receive(myTopic).map { record ->
      val tripRecord = MyDataClass(record.value())
      foo(tripRecord) // process stuff
        .also { record.offset.acknowledge() }
    }.flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
     .collect { /* result of foo */ }
This way if
foo
returns any data you want to further transform, or do something with you compose further
Flow
operators below.
a
ah good advice, what's a little bit unintuitive from that example (especially from that first one so correct me if I'm wrong) is that you would think that the
flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
applies to the code in the collect but it's actually making the code running in map more efficient. Just to make sure I'm on the same page I was implying that
foo
is what will be doing IO bound stuff such as making API requests or mutating storage.
s
Yes, it might be a bit unintuitive at first but it works upstream so in the upward direction of the operation. Where
collect
is called in the "current" context.
Copy code
withContext(Dispatchers.Main) {
    val singleValue = intFlow // will be executed on IO if context wasn't specified before
        .map { ... } // Will be executed in IO
        .flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
        .filter { ... } // Will be executed in Default
        .flowOn(Dispatchers.Default)
        .single() // Will be executed in the Main
}
a
gotcha, thanks!
this has all been super helpful, thank you again so much for all the help and the Kafka library.
🙌 1
just implemented it and I just wanted to clarify your examples above using the Ktor default app context
Copy code
launch(<http://Dispatchers.IO|Dispatchers.IO>) {
    val settings = receiverSettings() // A. Uses IO because in context of launch
    KafkaReceiver(settings)
        .receive(myTopic).map { } // B. Uses IO b/c of flowOn right below
        .flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
        .collect { }  // C. Uses IO because in context of launch
  }
I labeled the executing code sections above w/
A
B
and
C
to see if I got the correct dispatcher correctly. If that is correct what does it mean for for a flow to use an additional coroutine/dispatcher inside of an existing coroutine. Is it as simple as creating a child coroutine inside of a parent coroutine?
s
That is correct, although here everything would run on
IO
. So basically
collect
creates a new coroutine inheriting the
coroutineContext
from where it's being called, and it runs the
Flow
on said newly created coroutine.
flowOn
is used to switch any upstream to a different
context
.
If that is correct what does it mean for for a flow to use an additional coroutine/dispatcher inside of an existing coroutine. Is it as simple as creating a child coroutine inside of a parent coroutine?
I'm not entirely sure I understand your question 🤔 If you mean how does
flowOn
work, the answer is it depends based on which optimisation it can apply. If the existing
Flow
can get away without creating a child coroutine it will, otherwise it will indeed create a child coroutine of the coroutine that was already created for
Flow#collect
.
this has all been super helpful, thank you again so much for all the help and the Kafka library.
My pleasure ☺️
a
yeah I'm probably overthinking that question, I think it makes sense in the same way that code running in coroutines can trigger other asynchronous work.
In my example is there any benefit to using
flowOn
since it's parent dispatcher is
IO
or does it still confer some improvement?
s
No, in this case it doesn't benefit from adding another
IO
since the code is already running on
IO
.
So each record is already being processed on an
IO
thread. So
KafkaReceiver
polls it on a kafka-thread, and then immediately hands it over to the
<http://Dispatchers.IO|Dispatchers.IO>
for your processing.
a
I assume that
IO
is an appropriate dispatcher to wrap the
KafkaReceiver
but I guess what you're saying is that doing it's own threading stuff under the hood then.
s
Yes, it has to since Java's
KafkaConsumer
needs to work on a single
Thread
which is managed internally by
KafkaReceiver
. TL;DR
KafkaReceiver
creates a
newSingleThreadContext
and creates
KafkaConsumer
on it. It calls all
KafkaConsumer
methods on it, and sends them to the
Flow
through a
ChannelFlow
.
a
understood
Hi again 🙂 I was experimenting with fake messages and purposely skipping calling
ack
on them to test retry logic. I observed that with the sample code we have been using it will not commit the offset. From here there were two scenarios where I could use some advice: 1. If this is the latest message on the topic the receiver does not retry that message. I'm only able to process the message again if I restart the server. 2. If another message is added to the topic the receiver will poll it and if I ACK that message it will push the offset past the one I did not ACK. What's the correct approach, should I explicitly cancel the flow and restart it in a tight while loop or is there something I'm missing?
just to clarify this potential approach...
Copy code
launch(<http://Dispatchers.IO|Dispatchers.IO>) {
    val settings = receiverSettings()
    while (true) { // If flow exits, start again
      KafkaReceiver(settings).receive(myTopic).map { record ->
        val myDataRecord = MyDataClass(record.value())
        <http://logger.info|logger.info> { myDataRecord }
        val result = foo(myDataRecord) // example processor returns <Boolean?>
        result?.let {
          record.offset.acknowledge() // ACK message if processor returns non-null
        }
        result
      }.takeWhile { // Cancel flow if processor returns null
        it != null
      }.collect { <http://logger.info|logger.info> { "Processed: $it" } }
      <http://logger.info|logger.info> { "Will Retry Now" }
    }
  }
}