What's the best (most stable, feature complete) mq...
# server
j
What's the best (most stable, feature complete) mqtt client library to use in Kotlin?
a
j
Should I be worried docs point to a 404 page? πŸ™‚
I discarded paho for this reason earlier. I tried a few other libs and there's always something off (examples from DitchOoM/mqtt won't build, while davidepianca98/KMQTT doesn't support reconnections).
a
It's pretty much abandoned, but it's also pretty much your only option. I use it all the time
j
Thanks. I just tried it and it seems to be pretty stable. Not really Kotlinish, but stable πŸ™‚
m
When we chose MQTT client, it didn't seem like the Paho client supported subscribing dynamically to topics at runtime, which was a requirement for us. We went with the MQTT client that is part of AWS' IoT SDK. It has some good features, but we also ran into a memory leak when we tried closing connections a lot. Keeping them open and only subscribing/resubscribing solved the problem for us.
j
Paho has subscribe/unsubscribe methods now I think.
m
Oh it looks like it does! πŸ‘ Would have looked into using it if it wasn't for our system in question is scheduled for replacement during the year πŸ˜‰
πŸ‘ 1
a
Yes, we use paho mqtt for AWS IOT and make frequent use of dynamic subscriptions. What do you use @MIDI?
j
I've used hive mqtt in some personal projects https://github.com/hivemq/hivemq-mqtt-client
πŸ‘€ 1
πŸ‘ 1
Jvm only, obviously.
a
I wish I had found hive before. πŸ€”
j
Shameless plug for this ktor feature https://youtrack.jetbrains.com/issue/KTOR-3613
j
Thanks @James Yox! The hive client looks the most modern and feature rich. Nice documentation too.
I think we have a winner :)
πŸŽ‰ 1
Have you tried using it with coroutines?
j
Yeah it's pretty nice. Can't say I've had any issues with it. The rxJava support was easy to convert to a flow. Happy to share code if you need.
j
Thanks. If that's something you can share I'll be happy to take a look.
It looks like it supports three types of client: blocking, async and reactive. Async is just CompletableFuture that can be used with coroutines. If reactive can be converted to flow that might work for me too.
j
Yeah I used the RxJava support. This is toy project code btw. Nothing production ready haha just some hacking I did for fun.
Copy code
val client = Mqtt5Client.builder()
            .identifier(clientId)
            .serverHost(broker)
            .serverPort(port)
            .automaticReconnectWithDefaultConfig()
            .buildRx()

        val publishes = consumer.filterIsInstance<HiveMqttPublish>().map { it.mqtt5Publish }
        val subscribes = consumer.filterIsInstance<HiveMqttSubscribe>().map { it.mqtt5Subscribe }

        val hiveFlow: HiveMqttFlow = channelFlow<Either<HiveMqttError, HiveMqttEvent>> {
            launch {
                client.publish(publishes.asFlowable()).asFlow()
                    .onStart { consumer.unplug() }
                    .onStart {
                        Either.catch {
                            client.connectWith()
                                .cleanStart(cleanStart)
                                .noSessionExpiry()
                                .applyConnect()
                                .await()
                                .let { send(HiveMqttConnAck(it).right()) }
                        }.mapLeft { send(HiveMqttUncaughtThrowable(it).left()) }
                    }
                    .collect { send(HiveMqttPublishResult(it).right()) }
            }

            launch {
                subscribes.collect {
                    val subResult = client.subscribe(it).await()
                    send(HiveMqttSubAck(it, subResult).right())
                }
            }

            launch {
                client.publishes(mqttGlobalPublishFilter)
                    .asFlow()
                    .collect { send(HiveMqttPublish(it).right()) }
            }

            awaitClose { client.disconnect().blockingAwait() }
        }.flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
πŸ‘ 1
I am using arrow and stuff in there too
The idea was to create a cold flow that when you start collecting goes ahead and connects to the broker and starts sending off events
πŸ™Œ 1
It sends a lot more info too. You could definitely get away with something simpler I am sure
m
@Andrew O'Hara, we are using the MQTT client that is part of the IoT SDK for Java from AWS. I realize I probably wasn't very clear with "subscribing dynamically to topics at runtime": What made me choose the AWS client that I didn't find in the alternatives at the time was constructing topic addresses at runtime and subscribing/unsubscribing to them dynamically, responding to incoming requests. But looking at the Paho client I can see that it's possible right in the "Getting started" example, so I'm starting to suspect I just missed it when I did my initial round of research... πŸ˜„
πŸ˜… 1
a
I didn't actually know that the AWS IoT SDK came with an mqtt client. Good to know about the memory leak.
m
No it isn't that heavy marketed, I get the feeling it's not the most prioritized of AWS features πŸ˜‰ Btw, thanks for your talk at KotlinConf last year, very interesting and thoroughly researched! πŸ™‚ πŸ‘
a
Glad you liked it! Sadly, I wasn't selected to present this year.
134 Views