reactormonk
01/07/2025, 8:25 PMIOException
, but I'm not sure where it bubbles out of this code. Shouldn't it be caught by the try/catch on the join?reactormonk
01/07/2025, 9:32 PMwhile (true) {
val setupDone = Job(lifecycleScope.coroutineContext.job)
val connectionScope = CoroutineScope(lifecycleScope.coroutineContext + SupervisorJob())
try {
isOnline.first() { it }
val connection = factory.newConnection()
val recoveryListener = object: RecoveryListener {
override fun handleRecoveryStarted(recoverable: Recoverable?) {
Timber.d { "RabbitMQ recovery started" }
lifecycleScope.launch {
try {
connectionScope.cancel()
connection.close()
} catch (e: AlreadyClosedException) {
// ignore
}
Timber.i { "Killed RabbitMQ connection, starting a new one..." }
setup(<mailto:this@BrokerConnection.info|this@BrokerConnection.info>, true)
}
}
override fun handleRecovery(recoverable: Recoverable?) {
Timber.d { "RabbitMQ recovery completed" }
}
}
(connection as Recoverable).addRecoveryListener(recoveryListener)
setupNetworkTopic(connection, setupDone, connectionScope)
Timber.d { "Setting up broker connection." }
setupDone.join()
Timber.v { "Done setting up broker connection." }
return@launch
} catch (ex: SocketException) {
Timber.i(ex)
connectionScope.cancel()
} catch (ex: Exception) {
Timber.e(ex)
connectionScope.cancel()
} finally {
setupDone.cancel()
}
delay(30.seconds)
}
private fun setupNetworkTopic(connection: Connection, job: Job, connectionScope: CoroutineScope) {
connectionScope.launch(job + <http://Dispatchers.IO|Dispatchers.IO>) {
connection.confirmChannel { // IOException gets thrown here. ----------------
declareExchange(ExchangeSpecification(ExchangeNames.NETWORK, ExchangeType.FANOUT))
declareQueue(QueueSpecification(QueueNames.NETWORK))
bindQueue(BindQueueSpecification(QueueNames.NETWORK, ExchangeNames.NETWORK))
publish {
suspend fun send(ns: NetworkStates) {
try {
val response = publishWithConfirm(
OutboundMessage(ExchangeNames.NETWORK, "", AMQP.BasicProperties(), Json.encodeToByteArray(ns))
)
if (!response) {
Timber.e { "Failed to publish network info!" }
}
} catch (e: IOException) {
Timber.w(e) { "Trying again in a few" }
} catch (e: ShutdownSignalException) {
Timber.e(e) { "Trying again in a few" }
} catch (e: AlreadyClosedException) {
Timber.i(e) { "Waiting for broker restart" }
}
}
lifecycleScope.launch(<http://Dispatchers.IO|Dispatchers.IO>) {
networkState.collect { send(NetworkStates(it)) }
}
}
}
}
}
Alexandru Caraus
01/07/2025, 10:36 PMAlexandru Caraus
01/07/2025, 10:38 PMreactormonk
01/07/2025, 10:45 PMsetupNetworkTopic
, I'm using itreactormonk
01/07/2025, 10:50 PM