mbonnin
03/18/2024, 12:30 PMchannelFlow{} that does not dispatch until after its block is started? Reason is I would like the caller to be able to control the buffer size while still being able to synchronously monitor collections (code in ๐งต )mbonnin
03/18/2024, 12:30 PM@Test
fun myTest() = runBlocking {
println("1")
launch(start = CoroutineStart.UNDISPATCHED) {
val flow = channelFlow {
println("3")
withContext(Dispatchers.Default) {
// Do some work
println("5")
send(Unit)
}
}
println("2")
flow.collect()
}
println("4")
}mbonnin
03/18/2024, 12:31 PM1 2 3 4 5 . It currently prints 1 2 4 3 5Sam
03/18/2024, 1:35 PMI would like the caller to be able to control the buffer size while still being able to synchronously monitor collectionsCan you give more of a concrete example of what this means? It's not obvious from the code you shared.
mbonnin
03/18/2024, 1:36 PMfun retrieveFromNetwork(): Flow<Response> = channelFlow { ...mbonnin
03/18/2024, 1:36 PMretrieveFromNetwork().buffer(CONFLATED)
retrieveFromNetwork().buffer(UNLIMITED)
//etc...mbonnin
03/18/2024, 1:36 PMmbonnin
03/18/2024, 1:37 PMIdlingResource . I want the IdlingResource to appear busy synchronously. If not, there's always a small race conditionSam
03/18/2024, 1:54 PMonStart?mbonnin
03/18/2024, 1:55 PMonStart , fusing of the Flows won't workmbonnin
03/18/2024, 1:55 PMonStart{} is not a FusibleFlowSam
03/18/2024, 1:56 PMmbonnin
03/18/2024, 1:57 PMSam
03/18/2024, 2:02 PMChannelFlow starts with a coroutine dispatch, and all the built-in fusible flows are based on that class. Making your own alternative implementation is theoretically possible but I wouldn't want to try ๐mbonnin
03/18/2024, 2:03 PMmbonnin
03/18/2024, 2:04 PMInternalCoroutinesApi APIs but that does sound scarySam
03/18/2024, 2:05 PMFusibleFlow itself is @InternalCoroutinesApi I think you're right ๐คฆSam
03/18/2024, 2:06 PMmbonnin
03/18/2024, 2:06 PMmbonnin
03/18/2024, 2:07 PMchannelFlow(start = CoroutineStart.UNDISPATCHED) {} ... But that probably have far reached implications as wellSam
03/18/2024, 2:19 PMSam
03/18/2024, 2:19 PM@OptIn(ExperimentalTypeInference::class)
fun <T> customChannelFlow(
start: CoroutineStart = CoroutineStart.ATOMIC,
@BuilderInference block: suspend ProducerScope<T>.() -> Unit
): Flow<T> = CustomChannelFlow(
block,
start,
EmptyCoroutineContext,
Channel.BUFFERED,
BufferOverflow.SUSPEND
)
@OptIn(InternalCoroutinesApi::class)
class CustomChannelFlow<T>(
private val block: suspend ProducerScope<T>.() -> Unit,
private val start: CoroutineStart,
context: CoroutineContext,
capacity: Int,
onBufferOverflow: BufferOverflow
) : FusibleFlow<T>, ChannelFlow<T>(context, capacity, onBufferOverflow) {
override suspend fun collectTo(scope: ProducerScope<T>) = block(scope)
override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T> {
return CustomChannelFlow(block, start, context, capacity, onBufferOverflow)
}
override fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> {
return scope.produce(context, capacity, start, null, block)
}
}Sam
03/18/2024, 2:19 PMmbonnin
03/18/2024, 2:27 PMflowOn(Dispatchers.Unconfined)
seems to work although I'm not 100% sure it really does ๐
Sam
03/18/2024, 2:28 PMmbonnin
03/18/2024, 2:28 PMfun networkThings(): Flow<Unit> = channelFlow {
println("3")
withContext(Dispatchers.Default) {
// Do some work
Thread.sleep(100)
println("5")
send(Unit)
}
}.flowOn(Dispatchers.Unconfined)mbonnin
03/18/2024, 2:29 PMWell anything that relies on the dispatch order of coroutines is probably a very bad ideaI agree in general but there's also a reason why
UNDISPATCHED is public APImbonnin
03/18/2024, 2:30 PMIdlingResource is the API that "taints" rest of the codembonnin
03/18/2024, 2:31 PMSam
03/18/2024, 2:43 PMSam
03/18/2024, 2:43 PMmbonnin
03/18/2024, 2:44 PMmbonnin
03/18/2024, 2:45 PMSam
03/18/2024, 2:47 PMmbonnin
03/18/2024, 2:48 PMFLowSam
03/18/2024, 2:48 PMSam
03/18/2024, 2:48 PMonStart hook before adding the buffermbonnin
03/18/2024, 2:49 PMchannelFlow{}mbonnin
03/18/2024, 2:50 PMSam
03/18/2024, 2:50 PMwithContext block you're fineSam
03/18/2024, 2:51 PMemit(withContext(Dispatchers.IO) { getValue() }) // fine
withContext(Dispatchers.IO) { emit(getValue()) } // not finembonnin
03/18/2024, 3:24 PMFlow without a channel, though, right? The thread hopping has to be done for each individual item, right?
i.e. stuff like this doesn't work:
return flow {
withContext(Dispatchers.Default) {
flowOf(1, 2).collect {
// breaking the Flow invariant
emit(it)
}
}
}mbonnin
03/18/2024, 3:26 PMSam
03/18/2024, 3:33 PMmbonnin
03/18/2024, 3:38 PMFlow . We have this interceptor API for an example. It's. handy cause you can modify the call easily:
override fun <D : Operation.Data> intercept(request: ApolloRequest<D>, chain: ApolloInterceptorChain): Flow<ApolloResponse<D>> {
// Customize request here
return chain.proceed(request).map {
// Customize responses here
}
}mbonnin
03/18/2024, 3:39 PMDispatchers.Main it means you have to change the dispatcher in each interceptor for each non-trivial work:
override fun <D : Operation.Data> intercept(request: ApolloRequest<D>, chain: ApolloInterceptorChain): Flow<ApolloResponse<D>> {
return chain.proceed(request).onStart {
withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
// do some work
}
}
}mbonnin
03/18/2024, 3:42 PMwithContext(<http://Dispatchers.IO|Dispatchers.IO>) would be breaking the current contract (probably OK if there's a good reason to do so)
โข there are a bunch of interceptors. Right now I can think of at least 5-6. So that means each of them has to dispatch to a background thread and then come back to the main thread to emit before letting follow up interceptors do the same. That sounds quite inefficientmbonnin
03/18/2024, 3:44 PMSam
03/18/2024, 3:45 PMintercept function itself isn't involved in the flow collection; it can run on whatever thread it likes.Sam
03/18/2024, 3:45 PMflowOnmbonnin
03/18/2024, 3:46 PMintercept will be called on main. The map{} in chain.proceed(request).map{} depends thoughmbonnin
03/18/2024, 3:46 PMmbonnin
03/18/2024, 3:47 PMmbonnin
03/18/2024, 3:49 PMflowOn() that changes the dispatcher that executes everythingSam
03/18/2024, 3:50 PMflowOn affects everything upstream of it, unless of course it bumps into another flowOn further upstreammbonnin
03/18/2024, 3:52 PMSam
03/18/2024, 3:52 PMmbonnin
03/18/2024, 3:53 PMmbonnin
03/18/2024, 3:55 PMSam
03/18/2024, 3:55 PMchain.proceed().flowOn(...), then yes, they could change the context for upstream interceptorsSam
03/18/2024, 3:56 PMmbonnin
03/18/2024, 3:58 PMSam
03/18/2024, 4:06 PMWell, if they calledThinking about it, you'd have exactly the same problem when using the interceptor pattern for a one-shot function. So it doesn't sound too awful., then yes, they could change the context for upstream interceptorschain.proceed().flowOn(...)
mbonnin
03/18/2024, 4:07 PMmbonnin
03/18/2024, 4:08 PMSam
03/18/2024, 4:09 PMDispatchers.Default, since that allows coroutines to switch to <http://Dispatchers.IO|Dispatchers.IO> without a thread-switch if they need to.Sam
03/18/2024, 4:09 PMmbonnin
03/18/2024, 4:09 PMDispatchers.Main and Dispatchers.Defaultmbonnin
03/18/2024, 4:10 PMgetThings(): Flow<Thing> is always called from the main threadmbonnin
03/18/2024, 4:11 PMDispatchers.Default (or IO , both could share the threadpool)mbonnin
03/18/2024, 4:11 PMDispatchers.Main , hence the global flowOn(Dispatchers.Default)Sam
03/18/2024, 4:12 PMmbonnin
03/18/2024, 4:13 PMSam
03/18/2024, 4:14 PMmbonnin
03/18/2024, 4:15 PMflowOn(Dispatchers.Default) , it means interceptors are free to do withContext(Dispatchers.Default) { blabla } but it's mostly a no-opSam
03/18/2024, 4:15 PMmbonnin
03/18/2024, 4:16 PMSam
03/18/2024, 4:21 PM<http://Dispatchers.IO|Dispatchers.IO>. That's really the only important thing.Sam
03/18/2024, 4:22 PM<http://Dispatchers.IO|Dispatchers.IO> "just in case", but I don't have a source for that and I'm not sure how I feel about itmbonnin
03/18/2024, 4:23 PMyou don't have much control over which dispatcher anything is called fromThe dispatcher is deterministic by default. The user is free to change it from interceptor but then it becomes their own problem (and I don't think much people will do this)
mbonnin
03/18/2024, 4:24 PMmbonnin
03/18/2024, 4:24 PMlouiscad
03/19/2024, 8:56 AMwhile still being able to synchronously monitor collections
Can't you just use a middleman custom flow operator?
mbonnin
03/19/2024, 8:58 AMlouiscad
03/19/2024, 9:02 AMlouiscad
03/19/2024, 9:05 AMfun <T> Flow<T>.monitor(): Flow<T> = flow {
println("about to collect")
try {
withIndex().collect { (index, element) ->
val number = index + 1
println("Collecting element #$number")
emit(element)
}
} finally {
println("Done collecting")
}
}louiscad
03/19/2024, 9:09 AMmbonnin
03/19/2024, 9:19 AMFlow changes threads though?louiscad
03/19/2024, 9:23 AMwithContext at the collect/terminal-operator site, or use flowOn downstream (which will apply to all operators and flows applied before, until any other flowOn up the chain.mbonnin
03/19/2024, 9:24 AMat the collect/terminal-operator siteThe call site is out of my control
mbonnin
03/19/2024, 9:24 AMuseHow would you do that?downstreamflowOn
louiscad
03/19/2024, 9:25 AMmbonnin
03/19/2024, 9:25 AMrunning the generation of values into a withContext, before calling emitThe counter argument to that is additional thread hoping
louiscad
03/19/2024, 9:26 AMmyCoolFlow.flowOn(Dispatchers.Whatever)
.monitor()louiscad
03/19/2024, 9:26 AMmbonnin
03/19/2024, 9:27 AMDefault and Mainlouiscad
03/19/2024, 9:27 AMlouiscad
03/19/2024, 9:27 AMlouiscad
03/19/2024, 9:27 AMmbonnin
03/19/2024, 9:27 AMThat works if I control the call site, all I have is a function return aCopy codemyCoolFlow.flowOn(Dispatchers.Whatever).monitor()
Flow
fun networkThings(): Flow<Thing>mbonnin
03/19/2024, 9:28 AMlouiscad
03/19/2024, 9:28 AMmbonnin
03/19/2024, 9:28 AMFlowlouiscad
03/19/2024, 9:28 AMmbonnin
03/19/2024, 9:28 AMlouiscad
03/19/2024, 9:29 AMlouiscad
03/19/2024, 9:29 AMmbonnin
03/19/2024, 9:30 AMmbonnin
03/19/2024, 9:30 AMlouiscad
03/19/2024, 9:30 AMmbonnin
03/19/2024, 9:31 AMlouiscad
03/19/2024, 9:31 AMmbonnin
03/19/2024, 9:31 AMmbonnin
03/19/2024, 9:32 AMmbonnin
03/19/2024, 9:32 AMlouiscad
03/19/2024, 9:32 AMmbonnin
03/19/2024, 9:33 AMlouiscad
03/19/2024, 9:34 AMmbonnin
03/19/2024, 9:34 AMlouiscad
03/19/2024, 9:34 AMmbonnin
03/19/2024, 9:35 AMmbonnin
03/19/2024, 9:35 AMmbonnin
03/19/2024, 9:35 AMlouiscad
03/19/2024, 9:35 AMDispatchers.Main is among your concerns, it's definitely a clientmbonnin
03/19/2024, 9:35 AMmbonnin
03/19/2024, 9:36 AMmbonnin
03/19/2024, 9:36 AMIOlouiscad
03/19/2024, 9:36 AMDispatchers.Default, and you won't ever have thread switching if you happen to use <http://Dispatchers.IO|Dispatchers.IO>, unless too many things run there at once of course, and that those extra I/O threads are actually necessary.mbonnin
03/19/2024, 9:40 AM