Eric Martori
06/07/2019, 3:00 PMinternal class InvocableFlow<T>(private val scope: CoroutineScope) {
val flow = flow {
for (value in channel.openSubscription()) {
emit(value)
}
}
private val channel = BroadcastChannel<T>(BUFFERED)
fun invoke(data: T) {
scope.launch {
channel.send(data)
}
}
}gildor
06/07/2019, 3:19 PMval flow get() = channel.asFlow()Eric Martori
06/07/2019, 3:27 PMZach Klippenstein (he/him) [MOD]
06/07/2019, 4:02 PMlaunch to start the coroutine to call send from could be racy: it could look like data is being sent out-of-order.
If scope has a dispatcher that is backed by a thread pool, and you call invoke multiple times in quick succession from the same thread, the actual order in which the `launch`ed coroutines invoke send could be non-deterministic (because they’re running on different threads). If you use runBlocking instead, the invoke call won’t return until the send is complete. It will also propagate backpressure, but will block the caller thread if the buffer is full (backpressure/blocking concerns depend on your use case).gildor
06/07/2019, 4:11 PMgildor
06/07/2019, 4:13 PMgildor
06/07/2019, 4:14 PMZach Klippenstein (he/him) [MOD]
06/07/2019, 4:23 PMsend are running on a threadpool, which might not be what the caller expects.
Offer isn’t necessarily appropriate, since it may drop items. Whether this behavior, offer, or runBlocking is appropriate depends on the use case.gildor
06/07/2019, 4:37 PMgildor
06/07/2019, 4:37 PMgildor
06/07/2019, 4:38 PMgildor
06/07/2019, 4:39 PMZach Klippenstein (he/him) [MOD]
06/07/2019, 7:13 PMEric Martori
06/08/2019, 1:48 PM