https://kotlinlang.org logo
#coroutines
Title
# coroutines
d

Dias

11/06/2018, 4:26 PM
Has anyone successfully used kotlin corotines with kafka consumers? We currently spin up a thread to consume from a topic, we end up spinning 30ish threads so far, which are not too busy most of the time. I want to convert it to a coroutine usage, but when you consume from a topic you need to commit offset from the same thread and committing should be done in order, I think, which will be a problem if I just send them to be executed in a threadpool. What approach can I take?
🤔 2
c

codyoss

11/06/2018, 4:32 PM
I could be wrong, but I don’t think you can. Consuming is not super resource intense though. If you ack your messages right away you could just throw everything into a coroutine pool to process after that?
v

Vsevolod Tolstopyatov [JB]

11/06/2018, 4:40 PM
I’m afraid coroutines can’t help here a lot. Kafka consumer API is poll-based, so you don’t have a lot of options except to subscribe to literally every topic in a single thread and organize some kind of event loop with suspensions on top of
OffsetCommitCallback
👍 1
d

Dias

11/06/2018, 4:43 PM
you mean: read event, commit offset and then do something with that event in a callback after the commit?
v

Vsevolod Tolstopyatov [JB]

11/06/2018, 4:53 PM
it depends on how and when you are using commits and whether you are pausing consumer when offsets are not commited. You can have a special kind of event loop (aka dispatcher) which most of the time polls all topics, but also have an actor per topic which commits offsets using suspension over
OffsetCommitCallback
the rest is an implementation detail which depends only on how you want it to behave in different situations
s

stevecstian

11/06/2018, 4:59 PM
You could let other thread process your record, but your consumer thread still need to keep calling poll for heartbeat, and you probably need to use pause/resume API, handle rebalance in rebalance listener, adjust configurations for your usage. So it's really depending on your usage. You should find some discussions in Kafka user email group.
d

Dias

11/06/2018, 5:01 PM
@Vsevolod Tolstopyatov [JB] we poll commits -> do some work with all fetched commits -> do a blocking offset commit -> repeat. I am not sure how you propose to use
OffsetCommitCallback
, because that's the commit that is done after commit
s

stevecstian

11/06/2018, 5:07 PM
So you want to let other thread to process polled records, call
commitSync
once all record processed?
d

Dias

11/06/2018, 5:15 PM
@stevecstian yeah that's what I was thinking initially, if I understand you correctly
maybe actually the thing that I want to do is simple, and I just confused you all 😛
s

stevecstian

11/06/2018, 5:24 PM
Unless your record process time is very long or hard to determine, otherwise I'll recommend you to just use single thread model.
d

Dias

11/06/2018, 5:31 PM
that's fair point, I don't remember why did we go with thread per topic approach, I will ask about it
s

stevecstian

11/06/2018, 5:36 PM
Kafka's consumer is not thread safe and is designed to be used with single thread...
9 Views