Jakub Zalas
02/13/2025, 9:26 PMAndrew O'Hara
02/13/2025, 11:46 PMJakub Zalas
02/14/2025, 12:07 AMJakub Zalas
02/14/2025, 12:10 AMAndrew O'Hara
02/14/2025, 12:19 AMJakub Zalas
02/14/2025, 12:40 AMMIDI
02/14/2025, 8:43 AMJakub Zalas
02/14/2025, 8:46 AMMIDI
02/14/2025, 8:50 AMAndrew O'Hara
02/14/2025, 3:32 PMJames Yox
02/14/2025, 4:59 PMJames Yox
02/14/2025, 5:00 PMAndrew O'Hara
02/14/2025, 5:01 PMJames Yox
02/14/2025, 5:03 PMJakub Zalas
02/14/2025, 5:09 PMJakub Zalas
02/14/2025, 5:09 PMJakub Zalas
02/14/2025, 5:09 PMJames Yox
02/14/2025, 5:09 PMJakub Zalas
02/14/2025, 5:11 PMJakub Zalas
02/14/2025, 5:17 PMJames Yox
02/14/2025, 5:18 PMval client = Mqtt5Client.builder()
.identifier(clientId)
.serverHost(broker)
.serverPort(port)
.automaticReconnectWithDefaultConfig()
.buildRx()
val publishes = consumer.filterIsInstance<HiveMqttPublish>().map { it.mqtt5Publish }
val subscribes = consumer.filterIsInstance<HiveMqttSubscribe>().map { it.mqtt5Subscribe }
val hiveFlow: HiveMqttFlow = channelFlow<Either<HiveMqttError, HiveMqttEvent>> {
launch {
client.publish(publishes.asFlowable()).asFlow()
.onStart { consumer.unplug() }
.onStart {
Either.catch {
client.connectWith()
.cleanStart(cleanStart)
.noSessionExpiry()
.applyConnect()
.await()
.let { send(HiveMqttConnAck(it).right()) }
}.mapLeft { send(HiveMqttUncaughtThrowable(it).left()) }
}
.collect { send(HiveMqttPublishResult(it).right()) }
}
launch {
subscribes.collect {
val subResult = client.subscribe(it).await()
send(HiveMqttSubAck(it, subResult).right())
}
}
launch {
client.publishes(mqttGlobalPublishFilter)
.asFlow()
.collect { send(HiveMqttPublish(it).right()) }
}
awaitClose { client.disconnect().blockingAwait() }
}.flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
James Yox
02/14/2025, 5:18 PMJames Yox
02/14/2025, 5:19 PMJames Yox
02/14/2025, 5:19 PMMIDI
02/17/2025, 3:56 PMAndrew O'Hara
02/17/2025, 4:02 PMMIDI
02/17/2025, 4:07 PMAndrew O'Hara
02/17/2025, 4:27 PM