Has anyone successfully used kotlin corotines with...
# coroutines
d
Has anyone successfully used kotlin corotines with kafka consumers? We currently spin up a thread to consume from a topic, we end up spinning 30ish threads so far, which are not too busy most of the time. I want to convert it to a coroutine usage, but when you consume from a topic you need to commit offset from the same thread and committing should be done in order, I think, which will be a problem if I just send them to be executed in a threadpool. What approach can I take?
🤔 2
c
I could be wrong, but I don’t think you can. Consuming is not super resource intense though. If you ack your messages right away you could just throw everything into a coroutine pool to process after that?
v
I’m afraid coroutines can’t help here a lot. Kafka consumer API is poll-based, so you don’t have a lot of options except to subscribe to literally every topic in a single thread and organize some kind of event loop with suspensions on top of
OffsetCommitCallback
👍 1
d
you mean: read event, commit offset and then do something with that event in a callback after the commit?
v
it depends on how and when you are using commits and whether you are pausing consumer when offsets are not commited. You can have a special kind of event loop (aka dispatcher) which most of the time polls all topics, but also have an actor per topic which commits offsets using suspension over
OffsetCommitCallback
the rest is an implementation detail which depends only on how you want it to behave in different situations
s
You could let other thread process your record, but your consumer thread still need to keep calling poll for heartbeat, and you probably need to use pause/resume API, handle rebalance in rebalance listener, adjust configurations for your usage. So it's really depending on your usage. You should find some discussions in Kafka user email group.
d
@Vsevolod Tolstopyatov [JB] we poll commits -> do some work with all fetched commits -> do a blocking offset commit -> repeat. I am not sure how you propose to use
OffsetCommitCallback
, because that's the commit that is done after commit
s
So you want to let other thread to process polled records, call
commitSync
once all record processed?
d
@stevecstian yeah that's what I was thinking initially, if I understand you correctly
maybe actually the thing that I want to do is simple, and I just confused you all 😛
s
Unless your record process time is very long or hard to determine, otherwise I'll recommend you to just use single thread model.
d
that's fair point, I don't remember why did we go with thread per topic approach, I will ask about it
s
Kafka's consumer is not thread safe and is designed to be used with single thread...