Eric Martori
06/07/2019, 3:00 PMinternal class InvocableFlow<T>(private val scope: CoroutineScope) {
val flow = flow {
for (value in channel.openSubscription()) {
emit(value)
}
}
private val channel = BroadcastChannel<T>(BUFFERED)
fun invoke(data: T) {
scope.launch {
channel.send(data)
}
}
}
gildor
06/07/2019, 3:19 PMval flow get() = channel.asFlow()
Eric Martori
06/07/2019, 3:27 PMZach Klippenstein (he/him) [MOD]
06/07/2019, 4:02 PMlaunch
to start the coroutine to call send
from could be racy: it could look like data is being sent out-of-order.
If scope
has a dispatcher that is backed by a thread pool, and you call invoke
multiple times in quick succession from the same thread, the actual order in which the `launch`ed coroutines invoke send
could be non-deterministic (because they’re running on different threads). If you use runBlocking
instead, the invoke
call won’t return until the send is complete. It will also propagate backpressure, but will block the caller thread if the buffer is full (backpressure/blocking concerns depend on your use case).gildor
06/07/2019, 4:11 PMZach Klippenstein (he/him) [MOD]
06/07/2019, 4:23 PMsend
are running on a threadpool, which might not be what the caller expects.
Offer isn’t necessarily appropriate, since it may drop items. Whether this behavior, offer
, or runBlocking
is appropriate depends on the use case.gildor
06/07/2019, 4:37 PMZach Klippenstein (he/him) [MOD]
06/07/2019, 7:13 PMEric Martori
06/08/2019, 1:48 PM