mbonnin
03/18/2024, 12:30 PMchannelFlow{}
that does not dispatch until after its block
is started? Reason is I would like the caller to be able to control the buffer size while still being able to synchronously monitor collections (code in ๐งต )mbonnin
03/18/2024, 12:30 PM@Test
fun myTest() = runBlocking {
println("1")
launch(start = CoroutineStart.UNDISPATCHED) {
val flow = channelFlow {
println("3")
withContext(Dispatchers.Default) {
// Do some work
println("5")
send(Unit)
}
}
println("2")
flow.collect()
}
println("4")
}
mbonnin
03/18/2024, 12:31 PM1 2 3 4 5
. It currently prints 1 2 4 3 5
Sam
03/18/2024, 1:35 PMI would like the caller to be able to control the buffer size while still being able to synchronously monitor collectionsCan you give more of a concrete example of what this means? It's not obvious from the code you shared.
mbonnin
03/18/2024, 1:36 PMfun retrieveFromNetwork(): Flow<Response> = channelFlow { ...
mbonnin
03/18/2024, 1:36 PMretrieveFromNetwork().buffer(CONFLATED)
retrieveFromNetwork().buffer(UNLIMITED)
//etc...
mbonnin
03/18/2024, 1:36 PMmbonnin
03/18/2024, 1:37 PMIdlingResource
. I want the IdlingResource
to appear busy synchronously. If not, there's always a small race conditionSam
03/18/2024, 1:54 PMonStart
?mbonnin
03/18/2024, 1:55 PMonStart
, fusing of the Flows
won't workmbonnin
03/18/2024, 1:55 PMonStart{}
is not a FusibleFlow
Sam
03/18/2024, 1:56 PMmbonnin
03/18/2024, 1:57 PMSam
03/18/2024, 2:02 PMChannelFlow
starts with a coroutine dispatch, and all the built-in fusible flows are based on that class. Making your own alternative implementation is theoretically possible but I wouldn't want to try ๐mbonnin
03/18/2024, 2:03 PMmbonnin
03/18/2024, 2:04 PMInternalCoroutinesApi
APIs but that does sound scarySam
03/18/2024, 2:05 PMFusibleFlow
itself is @InternalCoroutinesApi
I think you're right ๐คฆSam
03/18/2024, 2:06 PMmbonnin
03/18/2024, 2:06 PMmbonnin
03/18/2024, 2:07 PMchannelFlow(start = CoroutineStart.UNDISPATCHED) {}
... But that probably have far reached implications as wellSam
03/18/2024, 2:19 PMSam
03/18/2024, 2:19 PM@OptIn(ExperimentalTypeInference::class)
fun <T> customChannelFlow(
start: CoroutineStart = CoroutineStart.ATOMIC,
@BuilderInference block: suspend ProducerScope<T>.() -> Unit
): Flow<T> = CustomChannelFlow(
block,
start,
EmptyCoroutineContext,
Channel.BUFFERED,
BufferOverflow.SUSPEND
)
@OptIn(InternalCoroutinesApi::class)
class CustomChannelFlow<T>(
private val block: suspend ProducerScope<T>.() -> Unit,
private val start: CoroutineStart,
context: CoroutineContext,
capacity: Int,
onBufferOverflow: BufferOverflow
) : FusibleFlow<T>, ChannelFlow<T>(context, capacity, onBufferOverflow) {
override suspend fun collectTo(scope: ProducerScope<T>) = block(scope)
override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T> {
return CustomChannelFlow(block, start, context, capacity, onBufferOverflow)
}
override fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> {
return scope.produce(context, capacity, start, null, block)
}
}
Sam
03/18/2024, 2:19 PMmbonnin
03/18/2024, 2:27 PMflowOn(Dispatchers.Unconfined)
seems to work although I'm not 100% sure it really does ๐
Sam
03/18/2024, 2:28 PMmbonnin
03/18/2024, 2:28 PMfun networkThings(): Flow<Unit> = channelFlow {
println("3")
withContext(Dispatchers.Default) {
// Do some work
Thread.sleep(100)
println("5")
send(Unit)
}
}.flowOn(Dispatchers.Unconfined)
mbonnin
03/18/2024, 2:29 PMWell anything that relies on the dispatch order of coroutines is probably a very bad ideaI agree in general but there's also a reason why
UNDISPATCHED
is public APImbonnin
03/18/2024, 2:30 PMIdlingResource
is the API that "taints" rest of the codembonnin
03/18/2024, 2:31 PMSam
03/18/2024, 2:43 PMSam
03/18/2024, 2:43 PMmbonnin
03/18/2024, 2:44 PMmbonnin
03/18/2024, 2:45 PMSam
03/18/2024, 2:47 PMmbonnin
03/18/2024, 2:48 PMFLow
Sam
03/18/2024, 2:48 PMSam
03/18/2024, 2:48 PMonStart
hook before adding the buffermbonnin
03/18/2024, 2:49 PMchannelFlow{}
mbonnin
03/18/2024, 2:50 PMSam
03/18/2024, 2:50 PMwithContext
block you're fineSam
03/18/2024, 2:51 PMemit(withContext(Dispatchers.IO) { getValue() }) // fine
withContext(Dispatchers.IO) { emit(getValue()) } // not fine
mbonnin
03/18/2024, 3:24 PMFlow
without a channel, though, right? The thread hopping has to be done for each individual item, right?
i.e. stuff like this doesn't work:
return flow {
withContext(Dispatchers.Default) {
flowOf(1, 2).collect {
// breaking the Flow invariant
emit(it)
}
}
}
mbonnin
03/18/2024, 3:26 PMSam
03/18/2024, 3:33 PMmbonnin
03/18/2024, 3:38 PMFlow
. We have this interceptor API for an example. It's. handy cause you can modify the call easily:
override fun <D : Operation.Data> intercept(request: ApolloRequest<D>, chain: ApolloInterceptorChain): Flow<ApolloResponse<D>> {
// Customize request here
return chain.proceed(request).map {
// Customize responses here
}
}
mbonnin
03/18/2024, 3:39 PMDispatchers.Main
it means you have to change the dispatcher in each interceptor for each non-trivial work:
override fun <D : Operation.Data> intercept(request: ApolloRequest<D>, chain: ApolloInterceptorChain): Flow<ApolloResponse<D>> {
return chain.proceed(request).onStart {
withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
// do some work
}
}
}
mbonnin
03/18/2024, 3:42 PMwithContext(<http://Dispatchers.IO|Dispatchers.IO>)
would be breaking the current contract (probably OK if there's a good reason to do so)
โข there are a bunch of interceptors. Right now I can think of at least 5-6. So that means each of them has to dispatch to a background thread and then come back to the main thread to emit before letting follow up interceptors do the same. That sounds quite inefficientmbonnin
03/18/2024, 3:44 PMSam
03/18/2024, 3:45 PMintercept
function itself isn't involved in the flow collection; it can run on whatever thread it likes.Sam
03/18/2024, 3:45 PMflowOn
mbonnin
03/18/2024, 3:46 PMintercept
will be called on main. The map{}
in chain.proceed(request).map{}
depends thoughmbonnin
03/18/2024, 3:46 PMmbonnin
03/18/2024, 3:47 PMmbonnin
03/18/2024, 3:49 PMflowOn()
that changes the dispatcher that executes everythingSam
03/18/2024, 3:50 PMflowOn
affects everything upstream of it, unless of course it bumps into another flowOn
further upstreammbonnin
03/18/2024, 3:52 PMSam
03/18/2024, 3:52 PMmbonnin
03/18/2024, 3:53 PMmbonnin
03/18/2024, 3:55 PMSam
03/18/2024, 3:55 PMchain.proceed().flowOn(...)
, then yes, they could change the context for upstream interceptorsSam
03/18/2024, 3:56 PMmbonnin
03/18/2024, 3:58 PMSam
03/18/2024, 4:06 PMWell, if they calledThinking about it, you'd have exactly the same problem when using the interceptor pattern for a one-shot function. So it doesn't sound too awful., then yes, they could change the context for upstream interceptorschain.proceed().flowOn(...)
mbonnin
03/18/2024, 4:07 PMmbonnin
03/18/2024, 4:08 PMSam
03/18/2024, 4:09 PMDispatchers.Default
, since that allows coroutines to switch to <http://Dispatchers.IO|Dispatchers.IO>
without a thread-switch if they need to.Sam
03/18/2024, 4:09 PMmbonnin
03/18/2024, 4:09 PMDispatchers.Main
and Dispatchers.Default
mbonnin
03/18/2024, 4:10 PMgetThings(): Flow<Thing>
is always called from the main threadmbonnin
03/18/2024, 4:11 PMDispatchers.Default
(or IO
, both could share the threadpool)mbonnin
03/18/2024, 4:11 PMDispatchers.Main
, hence the global flowOn(Dispatchers.Default)
Sam
03/18/2024, 4:12 PMmbonnin
03/18/2024, 4:13 PMSam
03/18/2024, 4:14 PMmbonnin
03/18/2024, 4:15 PMflowOn(Dispatchers.Default)
, it means interceptors are free to do withContext(Dispatchers.Default) { blabla }
but it's mostly a no-opSam
03/18/2024, 4:15 PMmbonnin
03/18/2024, 4:16 PMSam
03/18/2024, 4:21 PM<http://Dispatchers.IO|Dispatchers.IO>
. That's really the only important thing.Sam
03/18/2024, 4:22 PM<http://Dispatchers.IO|Dispatchers.IO>
"just in case", but I don't have a source for that and I'm not sure how I feel about itmbonnin
03/18/2024, 4:23 PMyou don't have much control over which dispatcher anything is called fromThe dispatcher is deterministic by default. The user is free to change it from interceptor but then it becomes their own problem (and I don't think much people will do this)
mbonnin
03/18/2024, 4:24 PMmbonnin
03/18/2024, 4:24 PMlouiscad
03/19/2024, 8:56 AMwhile still being able to synchronously monitor collections
Can't you just use a middleman custom flow operator?
mbonnin
03/19/2024, 8:58 AMlouiscad
03/19/2024, 9:02 AMlouiscad
03/19/2024, 9:05 AMfun <T> Flow<T>.monitor(): Flow<T> = flow {
println("about to collect")
try {
withIndex().collect { (index, element) ->
val number = index + 1
println("Collecting element #$number")
emit(element)
}
} finally {
println("Done collecting")
}
}
louiscad
03/19/2024, 9:09 AMmbonnin
03/19/2024, 9:19 AMFlow
changes threads though?louiscad
03/19/2024, 9:23 AMwithContext
at the collect/terminal-operator site, or use flowOn
downstream (which will apply to all operators and flows applied before, until any other flowOn
up the chain.mbonnin
03/19/2024, 9:24 AMat the collect/terminal-operator siteThe call site is out of my control
mbonnin
03/19/2024, 9:24 AMuseHow would you do that?downstreamflowOn
louiscad
03/19/2024, 9:25 AMmbonnin
03/19/2024, 9:25 AMrunning the generation of values into a withContext, before calling emitThe counter argument to that is additional thread hoping
louiscad
03/19/2024, 9:26 AMmyCoolFlow.flowOn(Dispatchers.Whatever)
.monitor()
louiscad
03/19/2024, 9:26 AMmbonnin
03/19/2024, 9:27 AMDefault
and Main
louiscad
03/19/2024, 9:27 AMlouiscad
03/19/2024, 9:27 AMlouiscad
03/19/2024, 9:27 AMmbonnin
03/19/2024, 9:27 AMThat works if I control the call site, all I have is a function return aCopy codemyCoolFlow.flowOn(Dispatchers.Whatever).monitor()
Flow
fun networkThings(): Flow<Thing>
mbonnin
03/19/2024, 9:28 AMlouiscad
03/19/2024, 9:28 AMmbonnin
03/19/2024, 9:28 AMFlow
louiscad
03/19/2024, 9:28 AMmbonnin
03/19/2024, 9:28 AMlouiscad
03/19/2024, 9:29 AMlouiscad
03/19/2024, 9:29 AMmbonnin
03/19/2024, 9:30 AMmbonnin
03/19/2024, 9:30 AMlouiscad
03/19/2024, 9:30 AMmbonnin
03/19/2024, 9:31 AMlouiscad
03/19/2024, 9:31 AMmbonnin
03/19/2024, 9:31 AMmbonnin
03/19/2024, 9:32 AMmbonnin
03/19/2024, 9:32 AMlouiscad
03/19/2024, 9:32 AMmbonnin
03/19/2024, 9:33 AMlouiscad
03/19/2024, 9:34 AMmbonnin
03/19/2024, 9:34 AMlouiscad
03/19/2024, 9:34 AMmbonnin
03/19/2024, 9:35 AMmbonnin
03/19/2024, 9:35 AMmbonnin
03/19/2024, 9:35 AMlouiscad
03/19/2024, 9:35 AMDispatchers.Main
is among your concerns, it's definitely a clientmbonnin
03/19/2024, 9:35 AMmbonnin
03/19/2024, 9:36 AMmbonnin
03/19/2024, 9:36 AMIO
louiscad
03/19/2024, 9:36 AMDispatchers.Default
, and you won't ever have thread switching if you happen to use <http://Dispatchers.IO|Dispatchers.IO>
, unless too many things run there at once of course, and that those extra I/O threads are actually necessary.mbonnin
03/19/2024, 9:40 AM