Is there a functioning Kafka lib that works well w...
# coroutines
j
Is there a functioning Kafka lib that works well with coroutines? Nothing fancy, just need to send some messages to my notifications service
s
Hey @Jo茫o Gabriel Z贸, I've been experimenting with building a Kafka library, and recently I wrote a Alpakka inspired eventloop to stream messages from Kafka using KotlinX Coroutines Flow. The producer code is not 100% finished, but hope to put some more love into this project soon-ish. You can pull in the library and use it. I've used it in some smaller projects, or you can check the internals to see how I used the Kafka SDK 馃槈 https://github.com/nomisRev/kotlin-kafka
j
Oh I have seen your lib, I just wasn鈥檛 sure if it was production ready. The flow aspect of it will come in handy. I鈥檒l spend a little more time with the code and present it to my team. Thanks!
s
Some of the APIs might change a little bit. I would say that everything under the
io.github.nomisrev.kafka.receiver
package is stable, but the API of the
publisher
might still slightly change. I am planning to also organise it similarly to
receiver
package with interfaces, and more a bit more structure.
Besides that it is production ready to use. If you have any feedback or remarks please share on a Github ticket 馃檹
h
I created a similar wrapper for internal use. Biggest problem was not the wrapper itself, but as soon as using Flows, you are doing the operations in memory, eg windowing. Kafka uses a persistent database instead. Also, I used Sharedflows to make it clear, that it never returns normally, but each
map
operation returns a normal flow, so you lose this information. But this is a normal coroutine issue.
s
Yes, streaming Kafka is more complex than it originally looks like. You need to take into account backpressure, and pause partitions or Kafka might consider your consumer dead, etc. That's why Reactor build dedicated support, and same for Akka. Or FS2 in Scala. This project aims to do the same for KotlinX Coroutines Flow so you can stream Kafka message with a lot of guarantees out-of-the-box.
j
Simon, do you think there is a way to use a Avro/GenericRecord serializer with the producer for your lib?
s
Yes, I am actually doing that. I'm using Avro4k. Disclaimer: ran into some issues when a different service was using KafkaStreams, and Confluent deserializers and it tripped over the magic bytes 馃槄 It can be fixed by writing a small intermediate deserializer, but I'd need to look it up.
If you're not using KafkaStreams, or Confluent serializer (which breaks the Avro spec with the magic bytes), you can just use this. https://github.com/avro-kotlin/avro4k With this:
Or well, you wouldn't even need this. You would just need to use
Avro.toGenericRecord
, or
toRecord
(not 100% sure on the syntax on top of my head).
j
I'm trying to use it with the Confluent serializer because for some reason the people at my company built everything around those magic bytes and it breaks if we don't send them 馃檭 I'm running into some troubles while trying to pass the
KafkaAvroSerializer
to the ProducerSettings tho
s
I don't have the solution on hand, but I can share it on Monday when I get back to work.
Hoping to contribute it to Avro4k soon
Hey @Jo茫o Gabriel Z贸, This is the solution we are using for this. Hope that helps.
986 Views