So I think I've tied myself into a bit of a knot r...
# coroutines
r
So I think I've tied myself into a bit of a knot retarding scopes and jobs... I've got this code, which is supposed to set up the connection to our rabbitmq broker. However, when the connection is closed, the whole app crashes because of an
IOException
, but I'm not sure where it bubbles out of this code. Shouldn't it be caught by the try/catch on the join?
Copy code
while (true) {
                val setupDone = Job(lifecycleScope.coroutineContext.job)
                val connectionScope = CoroutineScope(lifecycleScope.coroutineContext + SupervisorJob())
                try {
                    isOnline.first() { it }
                    val connection = factory.newConnection()
                    val recoveryListener = object: RecoveryListener {
                        override fun handleRecoveryStarted(recoverable: Recoverable?) {
                            Timber.d { "RabbitMQ recovery started" }
                            lifecycleScope.launch {
                                try {
                                    connectionScope.cancel()
                                    connection.close()
                                } catch (e: AlreadyClosedException) {
                                    // ignore
                                }
                                Timber.i { "Killed RabbitMQ connection, starting a new one..." }
                                setup(<mailto:this@BrokerConnection.info|this@BrokerConnection.info>, true)
                            }
                        }

                        override fun handleRecovery(recoverable: Recoverable?) {
                            Timber.d { "RabbitMQ recovery completed" }
                        }
                    }

                    (connection as Recoverable).addRecoveryListener(recoveryListener)
                    setupNetworkTopic(connection, setupDone, connectionScope)
                    Timber.d { "Setting up broker connection." }
                    setupDone.join()
                    Timber.v { "Done setting up broker connection." }
                    return@launch
                } catch (ex: SocketException) {
                    Timber.i(ex)
                    connectionScope.cancel()
                } catch (ex: Exception) {
                    Timber.e(ex)
                    connectionScope.cancel()
                } finally {
                    setupDone.cancel()
                }
                delay(30.seconds)
            }
Copy code
private fun setupNetworkTopic(connection: Connection, job: Job, connectionScope: CoroutineScope) {
        connectionScope.launch(job + <http://Dispatchers.IO|Dispatchers.IO>) {
            connection.confirmChannel { // IOException gets thrown here. ----------------
                declareExchange(ExchangeSpecification(ExchangeNames.NETWORK, ExchangeType.FANOUT))
                declareQueue(QueueSpecification(QueueNames.NETWORK))
                bindQueue(BindQueueSpecification(QueueNames.NETWORK, ExchangeNames.NETWORK))

                publish {
                    suspend fun send(ns: NetworkStates) {
                        try {
                            val response = publishWithConfirm(
                                OutboundMessage(ExchangeNames.NETWORK, "", AMQP.BasicProperties(), Json.encodeToByteArray(ns))
                            )
                            if (!response) {
                                Timber.e { "Failed to publish network info!" }
                            }
                        } catch (e: IOException) {
                            Timber.w(e) { "Trying again in a few" }
                        } catch (e: ShutdownSignalException) {
                            Timber.e(e) { "Trying again in a few" }
                        } catch (e: AlreadyClosedException) {
                            Timber.i(e) { "Waiting for broker restart" }
                        }
                    }
                    lifecycleScope.launch(<http://Dispatchers.IO|Dispatchers.IO>) {
                        networkState.collect { send(NetworkStates(it)) }
                    }
                }
            }
        }
    }
a
I dont see how you launch anything with connectionScope, though you call cancel in two places for it
CoroutineScope also accepts an error handler which catches anything happening in the coroutine
r
Inside of
setupNetworkTopic
, I'm using it
Thanks for the hint on the coroutine exception handler, lemme try