Patrick Doering
10/20/2020, 9:25 AMkotlinx-nodejs
dependency
2. Run task nodeJsGenerateExternals
3. Copy from externals
to sources
4. Add to index.module_kafkajs.kt
on top @file:JsModule("kafkajs")
5. Move all typealiases (non external) to other file (in my case index.module_kafkajs2.kt
In plain JS I have to call it that way:
await Consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
value: message.value.toString(),
})
},
});
In the created index.module_kafkajs.kt file I found that:
external interface Consumer {
fun connect(): Promise<Unit>
fun disconnect(): Promise<Unit>
fun subscribe(topic: ConsumerSubscribeTopic): Promise<Unit>
fun stop(): Promise<Unit>
fun run(config: ConsumerRunConfig = definedExternally): Promise<Unit>
fun commitOffsets(topicPartitions: Array<TopicPartitionOffsetAndMedata>): Promise<Unit>
fun seek(topicPartition: `T$28`)
fun describeGroup(): Promise<GroupDescription>
fun pause(topics: Array<`T$29`>)
fun paused(): Array<TopicPartitions>
fun resume(topics: Array<`T$29`>)
fun on(eventName: ValueOf<ConsumerEvents>, listener: (args: Any) -> Unit): RemoveInstrumentationEventListener<Any>
fun logger(): Logger
var events: ConsumerEvents
}
and that:
external interface ConsumerRunConfig {
var autoCommit: Boolean?
get() = definedExternally
set(value) = definedExternally
var autoCommitInterval: Number?
get() = definedExternally
set(value) = definedExternally
var autoCommitThreshold: Number?
get() = definedExternally
set(value) = definedExternally
var eachBatchAutoResolve: Boolean?
get() = definedExternally
set(value) = definedExternally
var partitionsConsumedConcurrently: Number?
get() = definedExternally
set(value) = definedExternally
var eachBatch: ((payload: EachBatchPayload) -> Promise<Unit>)?
get() = definedExternally
set(value) = definedExternally
var eachMessage: ((payload: EachMessagePayload) -> Promise<Unit>)?
get() = definedExternally
set(value) = definedExternally
}
Actual I don` understand how to use that from Kotlin with eachMessage. I hope somebody can help me.andylamax
10/20/2020, 9:37 AMexternal interface EachMessagePayload
Are you using this with kotlinx-coroutines
?andylamax
10/20/2020, 9:41 AMConsumer.run(jsObject{
eachMessage = { payload ->
val topic = payload.topic
val partition = payload.partion
val message = payload.message
console.log(js{
value = message.value.toString()
})
}
}).then({res->
console.log(res)
})
Patrick Doering
10/20/2020, 10:00 AMexternal interface EachMessagePayload {
var topic: String
var partition: Number
var message: KafkaMessage
}
Patrick Doering
10/20/2020, 10:00 AMPatrick Doering
10/20/2020, 10:51 AMandylamax
10/20/2020, 10:55 AMcoroutineScope {
val res = Consumer.run(jsObject{
eachMessage = { payload ->
val topic = payload.topic
val partition = payload.partion
val message = payload.message
console.log(js{
value = message.value.toString()
})
}
}).await() // suspends here
}
Patrick Doering
10/20/2020, 10:59 AMval config: ConsumerRunConfig = js("({})")
config.autoCommit = true
config.autoCommitInterval = null
config.autoCommitThreshold = null
config.eachBatchAutoResolve = true
config.partitionsConsumedConcurrently = 1
but then I cant call consumer.run(config{andylamax
10/20/2020, 11:01 AMval config = jsObject<ConsumerRunConfig> {
autoCommit = true
autoCommit = true
autoCommitInterval = null
autoCommitThreshold = null
eachBatchAutoResolve = true
partitionsConsumedConcurrently = 1
}
andylamax
10/20/2020, 11:01 AMConsumer
?Patrick Doering
10/20/2020, 11:24 AMprivate var consumer: dynamic
consumer = Kafka(kafkaOptions).consumer(consumerOptions)
Patrick Doering
10/20/2020, 11:30 AMfun consume() {
val consumerSubscription: ConsumerSubscribeTopic = js("({})")
consumerSubscription.topic = getTopic()
consumerSubscription.fromBeginning = fromBeginning
// default values set
val config: ConsumerRunConfig = js("({})")
config.autoCommit = true
config.autoCommitInterval = null
config.autoCommitThreshold = null
config.eachBatchAutoResolve = true
config.partitionsConsumedConcurrently = 1
consumer.connect()
GlobalScope.launch ( context = Dispatchers.Default ) {
delay(1000)
consumer.subscribe(consumerSubscription)
// Here comes my consumer.run(config)
//Here is the pain:
val res = consumer.run(config{
eachMessage = { payload ->
val topic = payload.topic
val partition = payload.partion
val message = payload.message
console.log(js{
value = message.value.toString()
})
}
}).await() // suspends here
andylamax
10/20/2020, 12:22 PMfun consume() {
val consumerSubscription: ConsumerSubscribeTopic = js("({})")
consumerSubscription.topic = getTopic()
consumerSubscription.fromBeginning = fromBeginning
// default values set
val config: ConsumerRunConfig = js("({})")
config.autoCommit = true
config.autoCommitInterval = null
config.autoCommitThreshold = null
config.eachBatchAutoResolve = true
config.partitionsConsumedConcurrently = 1
consumer.connect()
GlobalScope.launch ( context = Dispatchers.Default ) {
delay(1000)
consumer.subscribe(consumerSubscription)
// Here comes my consumer.run(config)
//Here is the pain:
val res = consumer.run(config.apply {
eachMessage = { payload ->
val topic = payload.topic
val partition = payload.partion
val message = payload.message
console.log(js{
value = message.value.toString()
})
}
}).await() // suspends here
Patrick Doering
10/20/2020, 12:28 PMandylamax
10/20/2020, 12:30 PMPatrick Doering
10/20/2020, 12:30 PMval topic = payload.topic
val partition = payload.partion
val message = payload.message
andylamax
10/20/2020, 12:32 PMval res = consumer.run(config.apply {
eachMessage = { payload ->
val topic = payload.topic
val partition = payload.partion
val message = payload.message
console.log(js{
value = message.value.toString()
})
promise{ Unit }
}
}).await() // suspends here
Patrick Doering
10/20/2020, 12:36 PMPatrick Doering
10/20/2020, 1:16 PMPatrick Doering
10/20/2020, 2:42 PMpromise{ Unit } --> that breaks the consumption. But when I have more messages, it stops after the first message.
And JB mentioned:
Unfortunately, we don’t able to work with js async/await.andylamax
10/20/2020, 3:50 PMPatrick Doering
10/20/2020, 4:15 PMawait consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
value: message.value.toString(),
})
},
});
Well I wanna consume 1 to n messages when new ones are produced.Patrick Doering
10/20/2020, 4:15 PMandylamax
10/20/2020, 4:27 PMpromise { undefined } // in the end
Patrick Doering
10/20/2020, 4:42 PMpromise { undefined } // in the end
The consumer throws a lot of errors.
But when I use:
Promise { resolve, reject ->
}
then I get no errors. But also only the first message.