When using a `callbackFlow` is it necessary to cal...
# coroutines
t
When using a
callbackFlow
is it necessary to call
close(exception)
if an operation inside it throws? i.e.
Copy code
callbackFlow {
   try {
      val result = somethingThatThrows() // Try-catch or just let it be?
      offer(result)
   } catch (exception) {
      close(exception)
   }
   
   awaitClose { ... } 
}
d
It.... depends. Your example doesn't look like it requires
callbackFlow
. If the exception means that
offer
/`send` will stop being called, then yes, you must call
close
.
t
Gotcha. That was a pseudo-example. The real use case would look something like:
Copy code
fun processedEvents(coordinator: EventCoordinator): Flow<Event> {
    return callbackFlow {
        val eventListener = object: EventListener {
            override fun onEvent(event: Event) {
                // Can throw
                val processedEvent = coordinator.processEvent(event)
                
                offer(processedEvent)
            }
        }

        coordinator.addEventListener(eventListener)

        awaitClose { coordinator.removeEventListener(eventListener) }
    }
}
If
processEvent(...)
throws in that example, the listener should be removed, i.e.
awaitClose
block must run So, sounds like needs to be surrounded with
try/catch
+
close
🤔
d
Ah, in that case you're right it needs to be surrounded. You might want to do this to make sure you're not catching exception thrown from
offer
.
Copy code
val r = runCatching { coordinator.processEvent(event) }
r.fold({ offer(it) }, { close(it) })
Or even better, I think you should do this.
Copy code
fun processedEvents(coordinator: EventCoordinator): Flow<Event> {
    return callbackFlow {
        val eventListener = object: EventListener {
            override fun onEvent(event: Event) {
                offer(event)
            }
        }
        coordinator.addEventListener(eventListener)
        awaitClose { coordinator.removeEventListener(eventListener) }
    }.map { coordinator.processEvent(it) }
}
🙏🏼 1
"better" is debatable, depending on how pure
coordinator.processEvent
is.
t
woah, using
map
is kinda nice. yeah,
coordinator.processEvent
internally calls a getter and maps the incoming argument to some new value. it does not inflict any side-effects.
so it sounds like
awaitClose
is triggered only when `close`/`cancel` have been explicitly called. i.e., letting the
processEvent
exception propagate would not have caused
awaitClose
to be called
d
It would, because the channel is closed when the downstream collector (i,e
map
) cancels/finishes collection (i,e throws exception)
t
oh yes, gotcha. i meant, without either the
map
or
try/catch
,
awaitClose
would not get called if
processEvent
threw
this is great, thanks so much
👍🏼 1