Hi, I have the following suspending function: ``` ...
# coroutines
l
Hi, I have the following suspending function:
Copy code
override suspend fun connect(
        scope: CoroutineScope,
        device: BluetoothDevice,
        uuid: UUID,
        secure: Boolean
    ) {
        dataSource.connectToSocket(device, uuid, secure) { socket ->
            scope.launch(<http://Dispatchers.IO|Dispatchers.IO>) {
                try {
                    dataSource.listenToSocket(socket)
                } catch (e: Throwable) {
                    Timber.tag(TAG).e("Error: ${e.message}.")
                    throw e
                }

            }
            startCommunication()
        }
    }
How can I pass the exception to the caller of connect so that the caller can catch this exception? My approach with a nested coroutine might be wrong: I need another coroutine here because
dataSource.listenToSocket(socket)
is a suspending function with a
while (true) { ... }
statement and therefore won't never return, but I have to call
startCommunication()
directly after calling
dataSource.listenToSocket(socket)
.
z
With this code, the caller can already handle the exception. The caller decides what scope to pass in, so it can set up error handling for that scope. There are a number of ways to do this: eg SupervisorJob with a custom exception handler, or try/finally around a coroutineScope block.
l
@Zach Klippenstein (he/him) [MOD] The coroutineScope block will suspend the caller forever but I will need the return because I handle it as success: VM
Copy code
suspend fun connect(composableScope: CoroutineScope, device: BluetoothDevice) {
        when (val result = connectBluetoothDevicesUseCase.connect(composableScope, device)) {
            is Result.Success -> ...
            is Result.Failure -> ...
        }
    }
connect
is called from composable (
LaunchedEffect
) Use Case
Copy code
suspend fun connect(scope: CoroutineScope, device: BluetoothDevice): Result<Unit> {
        return try {
            manager.connect(scope, device, uuid, false)
            Result.Success(Unit)
        } catch (e: Throwable) {
            Result.Failure(e)
        }
    }
Manager
Copy code
override suspend fun connect(
        scope: CoroutineScope,
        device: BluetoothDevice,
        uuid: UUID,
        secure: Boolean
    ) {
        dataSource.connectToSocket(device, uuid, secure) { socket ->
            scope.launch(<http://Dispatchers.IO|Dispatchers.IO>) {
                dataSource.listenToSocket(socket)
            }
            startCommunication()
        }
    }
u
Convert `connectToSocket`to a suspend function with suspendCancellableCoroutine. Then use the suspend function and everything will work out straight forward.
l
@uli I don't understand how this would help 😕
Copy code
suspend fun connectToSocket(
        device: BluetoothDevice,
        uuid: UUID,
        secure: Boolean,
        onConnected: suspend (BluetoothSocket) -> Unit
    ) {
        onConnected(blueFlow.connectAsClientAsync(device, uuid, secure).await()) // 3rd party lib
    }
u
Sorry, I thought
connectToSocket
would throw, but you are talking about this one, right?
Copy code
Timber.tag(TAG).e("Error: ${e.message}.")
throw e
l
yes
u
Now I get it. Your dilema: You have two states: 1) Immediately: Successfully connected, 2) Then later, after a potentially unlimited time: Exception during listening. What exactlay do you want the caller to do if after a random amount of time the listener throws? It can not revoke the previously reported success.
Would a flow with an enum/sealed class help here? Emit
connected
, then later emit
failure(e)
if an exception
e
is caught?
l
Now I get it. Your dilema: You have two states: 1) Immediately: Successfully connected, 2) Then later, after a potentially unlimited time: Exception during listening.
Exactly!
What exactlay do you want the caller to do if after a random amount of time the listener throws? It can not revoke the previously reported success.
Damn you figured out another issue --> flows would solve this problem
But there is still the problem that
dataSource.listenToSocket(socket)
won't be catchable on caller side/ in use case
u
Right, because it happens after connect returns and you do not want to wait until it fails
If you wait for the exception you will wait for ever in case of success. If your don't wait you lose it
Do your only chance is to stay synchronous all the way. Do the question is still. What is the expected behavior if connectToSocket succeeds and listen later fails?
l
Give me a moment. I have to do a refactor despite of this issue
Until now I hid an important detail: When connected another use case should be called which will return a map or fail.
Do your only chance is to stay synchronous all the way.
Unfortunately yes. The bluetooth device I'm connecting with works seuquentially. The data flow is: client -> device: send request to communicate client <- device: send Ack client -> device: send request to return a data packet client <- device: send response packet client -> device: send Ack So the first thing I'm doing is connecting with the device. It's bluetooth classic so the result is a
BluetoothSocket
(see
connectToSocket
function). At the same time I listen to the socket but in another coroutine (see
connect
function in Manager). I need this socket to send requests from across all my use cases so this coroutine have to live until the device is disconnected.
What is the expected behavior if connectToSocket succeeds and listen later fails?
When
connectToSocket
succeeds we can expect that the device is connected so send a request to communicate implicitly (
startCommunication
in
connect
function) and call the use case to request a data packet explicitly (see VM). When the
listenToSocket
function throws an error, the expected behavior would be to disconnect and display error as
Toast
. When
connectToSocket
function throws an error, just display the error as
Toast
. To be honest...the only reason I can think about now why
listenToSocket
might fail is, when I disconnect the device manually and in turn close the socket.
My tentative implementation to solve issue "because it happens after connect returns and you do not want to wait until it fails": VM
Copy code
suspend fun connect(composableScope: CoroutineScope, device: BluetoothDevice) {
        connectBluetoothDevicesUseCase.connect(composableScope, device).collect {
            when (it) {
                is ConnectBluetoothDeviceUseCase.BluetoothConnectState.Success -> {
                    when (val result = getStatusParameterUseCase.getStatusParameter()) {
                        is Result.Success -> {
                            Timber.w("Data packet collected.")
                        }
                        is Result.Failure -> {
                            Timber.w("Error while collecting data packet: ${result.cause.message}.")
                        }
                    }
                }
                is ConnectBluetoothDeviceUseCase.BluetoothConnectState.Failure -> {
                    Timber.w("Error while connecting: ${it.cause.message}.")
                }
            }
        }
    }
ConnectUseCase
Copy code
suspend fun connect(
        scope: CoroutineScope,
        device: BluetoothDevice
    ): Flow<BluetoothConnectState> = flow {
        try {
            manager.connect(scope, device, uuid, false)
            emit(BluetoothConnectState.Success)
        } catch (e: Throwable) {
            emit(BluetoothConnectState.Failure(e))
        }
    }
GetDataPacketUseCase
Copy code
suspend fun getStatusParameter(): Result<Map<WebKeys, ByteArray>> =
        withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
            try {
                Result.Success(manager.getStatusParameter())
            } catch (e: Throwable) {
                Result.Failure(e)
            }
        }
The VM looks a bit odd?! (correct me if I'm wrong). I could move the error handling from both use cases to the VM but I'm not sure if this is a good idea having Single-Responsibility-Principle in mind. Another approach would be to consume
getStatusParameterUseCase
in
connectBluetoothDevicesUseCase
but then latter one is coupled to first one....
u
How about going all the way with a flow: Manager
Copy code
override suspend fun connect(
        scope: CoroutineScope,
        device: BluetoothDevice,
        uuid: UUID,
        secure: Boolean
    ) : Flow<BluetoothConnectState> = flow {
        dataSource.connectToSocket(device, uuid, secure) { socket ->
            scope.launch(<http://Dispatchers.IO|Dispatchers.IO>) {
                try {
                    dataSource.listenToSocket(socket)
                } catch (e: Exception) {
                    emit BluetoothConnectState.Failed(e)
            }
            startCommunication()
        }
        emit BluetoothConnectState.Connected
    }
Regarding your VM, it looks fine to me. For readability, you could factor this code into its own fun:
Copy code
when (val result = getStatusParameterUseCase.getStatusParameter()) {
                        is Result.Success -> {
                            Timber.w("Data packet collected.")
                        }
                        is Result.Failure -> {
                            Timber.w("Error while collecting data packet: ${result.cause.message}.")
                        }
                    }
l
And use case might look like:
Copy code
suspend fun connect(
        scope: CoroutineScope,
        device: BluetoothDevice
    ): Flow<BluetoothConnectState> = flow {
        try {
            manager.connect(scope, device, uuid, false).collect { connectState ->
                when (connectState) {
                    is BluetoothConnectState.Success -> emit(BluetoothConnectState.Success)
                    is BluetoothConnectState.Failure -> emit(BluetoothConnectState.Failure(
                        connectState.cause)
                    )
                }
            }
        } catch (e: Throwable) {
            emit(BluetoothConnectState.Failure(e))
        }
    }
right?
The only problem I have: In manager when emit(BluetoothConnectState.Failure(e)) is called. I get:
java.lang.IllegalStateException: Flow invariant is violated:
Emission from another coroutine is detected.
Child of StandaloneCoroutine{Active}@14cdbe3, expected child of ProducerCoroutine{Completed}@d4af9e0.
FlowCollector is not thread-safe and concurrent emissions are prohibited.
To mitigate this restriction please use 'channelFlow' builder instead of 'flow'
It's late here so I will investigate about it later. Do I have to use
channelFlow
or can this be done with
withContext
?
u
I guess channel flow. It's about coroutines, not about dispatchers
l
@uli Another problem occured. It seems that after emission of
Success
the flow is closed successfully (
onCompletion
is triggered with null). This results in a crash
Channel is closed
when
IllegalStateException
is caught:
Copy code
override suspend fun connect(
        scope: CoroutineScope,
        device: BluetoothDevice,
        uuid: UUID,
        secure: Boolean
    ): Flow<ConnectBluetoothDeviceUseCase.BluetoothConnectState> = channelFlow {
        dataSource.connectToSocket(device, uuid, secure) { socket ->
            scope.launch(<http://Dispatchers.IO|Dispatchers.IO>) {
                try {
                    dataSource.listenToSocket(socket)
                } catch (e: IllegalStateException) {
                    send(ConnectBluetoothDeviceUseCase.BluetoothConnectState.Failure(e))
                }
            }
            startCommunication()
        }
        send(ConnectBluetoothDeviceUseCase.BluetoothConnectState.Success)
    }.flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
Edit:
awaitClose()
below
Success
did the trick :)
👍 1