Hi everyone, I try to call the run method of Kafka...
# javascript
p
Hi everyone, I try to call the run method of KafkaJS from Kotlin. For that I did the following steps: 1. Add 
kotlinx-nodejs
 dependency 2. Run task 
nodeJsGenerateExternals
3. Copy from 
externals
 to sources 4. Add to 
index.module_kafkajs.kt
 on top 
@file:JsModule("kafkajs")
5. Move all typealiases (non external) to other file (in my case 
index.module_kafkajs2.kt
In plain JS I have to call it that way:
Copy code
await Consumer.run({
   eachMessage: async ({ topic, partition, message }) => {
     console.log({
       value: message.value.toString(),
     })
   },
 });
In the created index.module_kafkajs.kt file I found that:
Copy code
external interface Consumer {
    fun connect(): Promise<Unit>
    fun disconnect(): Promise<Unit>
    fun subscribe(topic: ConsumerSubscribeTopic): Promise<Unit>
    fun stop(): Promise<Unit>
    fun run(config: ConsumerRunConfig = definedExternally): Promise<Unit>
    fun commitOffsets(topicPartitions: Array<TopicPartitionOffsetAndMedata>): Promise<Unit>
    fun seek(topicPartition: `T$28`)
    fun describeGroup(): Promise<GroupDescription>
    fun pause(topics: Array<`T$29`>)
    fun paused(): Array<TopicPartitions>
    fun resume(topics: Array<`T$29`>)
    fun on(eventName: ValueOf<ConsumerEvents>, listener: (args: Any) -> Unit): RemoveInstrumentationEventListener<Any>
    fun logger(): Logger
    var events: ConsumerEvents
}

and that: 

external interface ConsumerRunConfig {
    var autoCommit: Boolean?
        get() = definedExternally
        set(value) = definedExternally
    var autoCommitInterval: Number?
        get() = definedExternally
        set(value) = definedExternally
    var autoCommitThreshold: Number?
        get() = definedExternally
        set(value) = definedExternally
    var eachBatchAutoResolve: Boolean?
        get() = definedExternally
        set(value) = definedExternally
    var partitionsConsumedConcurrently: Number?
        get() = definedExternally
        set(value) = definedExternally
    var eachBatch: ((payload: EachBatchPayload) -> Promise<Unit>)?
        get() = definedExternally
        set(value) = definedExternally
    var eachMessage: ((payload: EachMessagePayload) -> Promise<Unit>)?
        get() = definedExternally
        set(value) = definedExternally
}
Actual I don` understand how to use that from Kotlin with eachMessage. I hope somebody can help me.
a
Can you post the
external interface EachMessagePayload
Are you using this with
kotlinx-coroutines
?
Copy code
Consumer.run(jsObject{
  eachMessage = { payload ->
    val topic = payload.topic
    val partition = payload.partion
    val message = payload.message
    console.log(js{
      value = message.value.toString()
    })
  }
}).then({res->
   console.log(res)
})
p
Here is the EachMessagePayload interface:
Copy code
external interface EachMessagePayload {
    var topic: String
    var partition: Number
    var message: KafkaMessage
}
Ok I will try it.
Yeah I am using it with kotlinx-coroutines
a
Then you can do it this way
Copy code
coroutineScope {
  val res = Consumer.run(jsObject{
    eachMessage = { payload ->
      val topic = payload.topic
      val partition = payload.partion
      val message = payload.message
      console.log(js{
        value = message.value.toString()
      })
    }
  }).await() // suspends here
}
p
That is not working. As I can see I have to call the run with a config object. Like that: consumer.run(config) For that I have to define:
Copy code
val config: ConsumerRunConfig = js("({})")
config.autoCommit = true
config.autoCommitInterval = null
config.autoCommitThreshold = null
config.eachBatchAutoResolve = true
config.partitionsConsumedConcurrently = 1
but then I cant call consumer.run(config{
a
for the config object, you should be able to do
Copy code
val config = jsObject<ConsumerRunConfig> {
  autoCommit = true
  autoCommit = true
  autoCommitInterval = null
  autoCommitThreshold = null
  eachBatchAutoResolve = true
  partitionsConsumedConcurrently = 1
}
How do you get/create and instance of a the
Consumer
?
p
I create my consumer in this way.
Copy code
private var consumer: dynamic
consumer = Kafka(kafkaOptions).consumer(consumerOptions)
Then I do that:
Copy code
fun consume() {
    val consumerSubscription: ConsumerSubscribeTopic = js("({})")
    consumerSubscription.topic = getTopic()
    consumerSubscription.fromBeginning = fromBeginning

    // default values set
    val config: ConsumerRunConfig = js("({})")
    config.autoCommit = true
    config.autoCommitInterval = null
    config.autoCommitThreshold = null
    config.eachBatchAutoResolve = true
    config.partitionsConsumedConcurrently = 1

    consumer.connect()
    GlobalScope.launch ( context = Dispatchers.Default ) {
        delay(1000)
        consumer.subscribe(consumerSubscription)
        
        // Here comes my consumer.run(config) 
        
        //Here is the pain: 
         val res = consumer.run(config{
                eachMessage = { payload ->
                    val topic = payload.topic
                    val partition = payload.partion
                    val message = payload.message
                    console.log(js{
                        value = message.value.toString()
                    })
                }
            }).await() // suspends here
a
Try this
Copy code
fun consume() {
    val consumerSubscription: ConsumerSubscribeTopic = js("({})")
    consumerSubscription.topic = getTopic()
    consumerSubscription.fromBeginning = fromBeginning
    // default values set
    val config: ConsumerRunConfig = js("({})")
    config.autoCommit = true
    config.autoCommitInterval = null
    config.autoCommitThreshold = null
    config.eachBatchAutoResolve = true
    config.partitionsConsumedConcurrently = 1
    consumer.connect()
    GlobalScope.launch ( context = Dispatchers.Default ) {
        delay(1000)
        consumer.subscribe(consumerSubscription)
        // Here comes my consumer.run(config) 
        //Here is the pain: 
         val res = consumer.run(config.apply {
                eachMessage = { payload ->
                    val topic = payload.topic
                    val partition = payload.partion
                    val message = payload.message
                    console.log(js{
                        value = message.value.toString()
                    })
                }
            }).await() // suspends here
p
Cool. But now for this three lines I become a: Expected a value of type Promise<Unit> error. Any idea how to solve that?
a
which three lines? Sorry?
p
Oh sorry.
Copy code
val topic = payload.topic
                    val partition = payload.partion
                    val message = payload.message
a
try
Copy code
val res = consumer.run(config.apply {
            eachMessage = { payload ->
               val topic = payload.topic
               val partition = payload.partion
               val message = payload.message
               console.log(js{
                  value = message.value.toString()
               })
               promise{ Unit }
            }
         }).await() // suspends here
p
Ok, it worked for the first message. Great cool. But I get some errors. await is not a function.
Do you know why it is only working with the first message but not with the other 40 messages?
So the problem could be with this line:
Copy code
promise{ Unit } --> that breaks the consumption. But when I have more messages, it stops after the first message.
And JB mentioned: Unfortunately, we don’t able to work with js async/await.
a
I don't think if that's a kotlin/js issue. What is Kafka's requirement for you to get multiple messages? Can you post the js equivalent?
p
Here is the equivalent:
Copy code
await consumer.run({
Copy code
eachMessage: async ({ topic, partition, message }) => {
Copy code
console.log({
Copy code
value: message.value.toString(),
Copy code
})
Copy code
},
Copy code
});
Well I wanna consume 1 to n messages when new ones are produced.
The js is running. I get every message of the topic
a
What is Kafka's requirement for you to get multiple messages? I have a feeling there is something needed to return but I am not sure which one. try
Copy code
promise { undefined } // in the end
p
That is interesting. When I use:
Copy code
promise { undefined } // in the end
The consumer throws a lot of errors. But when I use:
Copy code
Promise { resolve, reject ->
}
then I get no errors. But also only the first message.