https://kotlinlang.org logo
#coroutines
Title
# coroutines
m

mbonnin

03/18/2024, 12:30 PM
Can I have a
channelFlow{}
that does not dispatch until after its
block
is started? Reason is I would like the caller to be able to control the buffer size while still being able to synchronously monitor collections (code in 🧵 )
Copy code
@Test
  fun myTest() = runBlocking {
    println("1")
    launch(start = CoroutineStart.UNDISPATCHED) {
      val flow = channelFlow {
        println("3")
        withContext(Dispatchers.Default) {
          // Do some work
          println("5")
          send(Unit)
        }
      }

      println("2")
      flow.collect()
    }

    println("4")
  }
I'd like that to print
1 2 3 4 5
. It currently prints
1 2 4 3 5
s

Sam

03/18/2024, 1:35 PM
I would like the caller to be able to control the buffer size while still being able to synchronously monitor collections
Can you give more of a concrete example of what this means? It's not obvious from the code you shared.
m

mbonnin

03/18/2024, 1:36 PM
It's a function that returns a Flow
Copy code
fun retrieveFromNetwork(): Flow<Response> = channelFlow { ...
And I'd like the caller to be able to control the buffering:
Copy code
retrieveFromNetwork().buffer(CONFLATED)
retrieveFromNetwork().buffer(UNLIMITED)
//etc...
Depending the use cases, it might or might not be OK to drop items
The "synchronous" part is to interact with an Android
IdlingResource
. I want the
IdlingResource
to appear busy synchronously. If not, there's always a small race condition
s

Sam

03/18/2024, 1:54 PM
Can't you just use
onStart
?
m

mbonnin

03/18/2024, 1:55 PM
I'm afraid if I use
onStart
, fusing of the
Flows
won't work
Because the return value of
onStart{}
is not a
FusibleFlow
s

Sam

03/18/2024, 1:56 PM
I think the same will be true of any version you could come up with that would solve this problem 😞
m

mbonnin

03/18/2024, 1:57 PM
Yea, this is more or less the conclusion I reached too but I fail to see the fundamental reason why this is not possible
s

Sam

03/18/2024, 2:02 PM
I don't think it's fundamentally impossible, it's just that the implementation of
ChannelFlow
starts with a coroutine dispatch, and all the built-in fusible flows are based on that class. Making your own alternative implementation is theoretically possible but I wouldn't want to try 😄
m

mbonnin

03/18/2024, 2:03 PM
I considered for a brief moment making my own implementation 😅 But it addition to all the potential footguns, I'm almost sure it can't be done without some internal API
Or maybe just
InternalCoroutinesApi
APIs but that does sound scary
s

Sam

03/18/2024, 2:05 PM
Since
FusibleFlow
itself is
@InternalCoroutinesApi
I think you're right 🤦
Apparently we mere mortals are not worthy to implement it
m

mbonnin

03/18/2024, 2:06 PM
Not that I really wanted too 🙈
But maybe a
channelFlow(start = CoroutineStart.UNDISPATCHED) {}
... But that probably have far reached implications as well
s

Sam

03/18/2024, 2:19 PM
I tried it
Copy code
@OptIn(ExperimentalTypeInference::class)
fun <T> customChannelFlow(
  start: CoroutineStart = CoroutineStart.ATOMIC,
  @BuilderInference block: suspend ProducerScope<T>.() -> Unit
): Flow<T> = CustomChannelFlow(
  block,
  start,
  EmptyCoroutineContext,
  Channel.BUFFERED,
  BufferOverflow.SUSPEND
)

@OptIn(InternalCoroutinesApi::class)
class CustomChannelFlow<T>(
  private val block: suspend ProducerScope<T>.() -> Unit,
  private val start: CoroutineStart,
  context: CoroutineContext,
  capacity: Int,
  onBufferOverflow: BufferOverflow
) : FusibleFlow<T>, ChannelFlow<T>(context, capacity, onBufferOverflow) {
  override suspend fun collectTo(scope: ProducerScope<T>) = block(scope)

  override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T> {
    return CustomChannelFlow(block, start, context, capacity, onBufferOverflow)
  }

  override fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> {
    return scope.produce(context, capacity, start, null, block)
  }
}
I hate it
🤣 1
m

mbonnin

03/18/2024, 2:27 PM
Copy code
flowOn(Dispatchers.Unconfined)
seems to work although I'm not 100% sure it really does 😅
s

Sam

03/18/2024, 2:28 PM
Well anything that relies on the dispatch order of coroutines is probably a very bad idea 😄
nod 1
m

mbonnin

03/18/2024, 2:28 PM
Copy code
fun networkThings(): Flow<Unit> = channelFlow {
    println("3")
    withContext(Dispatchers.Default) {
      // Do some work
      Thread.sleep(100)
      println("5")
      send(Unit)
    }
  }.flowOn(Dispatchers.Unconfined)
Well anything that relies on the dispatch order of coroutines is probably a very bad idea
I agree in general but there's also a reason why
UNDISPATCHED
is public API
In my specific case, I think
IdlingResource
is the API that "taints" rest of the code
It's kind of bridging blocking code to suspending code
s

Sam

03/18/2024, 2:43 PM
Is there a way to instrument the thing behind the flow instead of the flow itself? Catch it before it ever becomes a coroutine
I'm assuming this is purely for testing
m

mbonnin

03/18/2024, 2:44 PM
All other solutions I can see would require changing our public API, which I'm not very keen on doing.
It's a lib. If it were an app, I would probably do things differently indeed
s

Sam

03/18/2024, 2:47 PM
Another question: does it absolutely have to be a channel flow?
m

mbonnin

03/18/2024, 2:48 PM
I think it could be any kind of
FLow
s

Sam

03/18/2024, 2:48 PM
If it's just a regular flow the problem goes away, because it doesn't need to be fusible
You can add the
onStart
hook before adding the buffer
💡 1
m

mbonnin

03/18/2024, 2:49 PM
Excellent point 👍, I can't remember why I started looking into
channelFlow{}
It needs to change the dispatcher but that probably can be done without a channel 🤔
s

Sam

03/18/2024, 2:50 PM
So long as you don't emit from inside the
withContext
block you're fine
Copy code
emit(withContext(Dispatchers.IO) { getValue() }) // fine
withContext(Dispatchers.IO) { emit(getValue()) } // not fine
👀 1
👍 1
m

mbonnin

03/18/2024, 3:24 PM
This means you can't really collect a
Flow
without a channel, though, right? The thread hopping has to be done for each individual item, right? i.e. stuff like this doesn't work:
Copy code
return flow {
      withContext(Dispatchers.Default) {
        flowOf(1, 2).collect {
          // breaking the Flow invariant
          emit(it)
        }
      }
    }
In our case, it's probably too late to do that. There's a lot going on for each item: cache reading, parsing, network, cache writing, potentially suspending user code as well. Removing the channel would mean a lot of thread hoping + making sure everyone in the chain observes the "do work a background thread" contract...
s

Sam

03/18/2024, 3:33 PM
Only the coroutine that's collecting the flow is allowed to emit values. A channel flow uses a channel so that other coroutines can send their values to the collector coroutine, where the values are permitted to be emitted. But I'm curious what you were trying to show with your code example… I agree it doesn't work, but I don't think it would be useful even if it did.
m

mbonnin

03/18/2024, 3:38 PM
Yea agreed my example is not the most useful 😄 . The thing is in our internals, everything is a
Flow
. We have this interceptor API for an example. It's. handy cause you can modify the call easily:
Copy code
override fun <D : Operation.Data> intercept(request: ApolloRequest<D>, chain: ApolloInterceptorChain): Flow<ApolloResponse<D>> {
      // Customize request here
      return chain.proceed(request).map { 
        // Customize responses here
      }
    }
If this interceptor runs on
Dispatchers.Main
it means you have to change the dispatcher in each interceptor for each non-trivial work:
Copy code
override fun <D : Operation.Data> intercept(request: ApolloRequest<D>, chain: ApolloInterceptorChain): Flow<ApolloResponse<D>> {
      return chain.proceed(request).onStart {
        withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
          // do some work 
        }
      }
    }
Which is fine if you have a couple of places but: • requiring the
withContext(<http://Dispatchers.IO|Dispatchers.IO>)
would be breaking the current contract (probably OK if there's a good reason to do so) • there are a bunch of interceptors. Right now I can think of at least 5-6. So that means each of them has to dispatch to a background thread and then come back to the main thread to emit before letting follow up interceptors do the same. That sounds quite inefficient
But this kind of things is hard so maybe it's not that inefficient, I don't know blob shrug . It sounds risky though because it could be easy to run things on the main thread if not careful
s

Sam

03/18/2024, 3:45 PM
What do you mean by "If this interceptor runs on `Dispatchers.Main`"? The
intercept
function itself isn't involved in the flow collection; it can run on whatever thread it likes.
The flow will run on whatever thread you collect it on, unless it has some upstream call to
flowOn
m

mbonnin

03/18/2024, 3:46 PM
Right
intercept
will be called on main. The
map{}
in
chain.proceed(request).map{}
depends though
Same for any operator you would add there
If the flow is collected from the main thread (which is the default), every operator is called from the main thread
Unless there's a global
flowOn()
that changes the dispatcher that executes everything
s

Sam

03/18/2024, 3:50 PM
flowOn
affects everything upstream of it, unless of course it bumps into another
flowOn
further upstream
👍 1
m

mbonnin

03/18/2024, 3:52 PM
Right 👍
s

Sam

03/18/2024, 3:52 PM
If you're dealing with the main thread I don't think you can escape all this completely. Either you have to collect the flow on the main thread and require all the interceptors to be main-safe, or you have to collect the flow on some other thread and require the interceptors to specify explicitly if they need to run on the main thread.
m

mbonnin

03/18/2024, 3:53 PM
I think none of the interceptors should really run on the main thread
So by changing the dispatcher more "globally" (unless someone else changes it again) the contract is "you're free to call blocking code in interceptors". But if someone changes the dispatchers in an interceptor then it has side effects for some of the other ones...
s

Sam

03/18/2024, 3:55 PM
Well, if they called
chain.proceed().flowOn(...)
, then yes, they could change the context for upstream interceptors
I'm not a big fan of "you're free to call blocking code in interceptors", but that's just personal opinion. Rather, suspending functions should always aim to be caller agnostic, and not rely on being called from any specific dispatcher.
m

mbonnin

03/18/2024, 3:58 PM
yea that's a good point.
s

Sam

03/18/2024, 4:06 PM
Well, if they called
chain.proceed().flowOn(...)
, then yes, they could change the context for upstream interceptors
Thinking about it, you'd have exactly the same problem when using the interceptor pattern for a one-shot function. So it doesn't sound too awful.
👍 1
m

mbonnin

03/18/2024, 4:07 PM
I still think there's some value in avoiding extra thread hops? Or am I overthinking this?
Some interceptors will have to offload work to other threads. If they're called from a dispatcher that can handle work, my mental model is that it would avoid a context switch
s

Sam

03/18/2024, 4:09 PM
The implementation will mostly take care of avoiding unnecessary thread switches. Maybe a good approach would be to start from
Dispatchers.Default
, since that allows coroutines to switch to
<http://Dispatchers.IO|Dispatchers.IO>
without a thread-switch if they need to.
Have you got an example of a thread-switch scenario that you're worried about?
m

mbonnin

03/18/2024, 4:09 PM
It's mostly
Dispatchers.Main
and
Dispatchers.Default
I thinks it's fair to assume
getThings(): Flow<Thing>
is always called from the main thread
building the thing requires several interceptors that dispatch on
Dispatchers.Default
(or
IO
, both could share the threadpool)
But in between interceptors, I'd like to avoid going back to
Dispatchers.Main
, hence the global
flowOn(Dispatchers.Default)
s

Sam

03/18/2024, 4:12 PM
👍 that makes sense
m

mbonnin

03/18/2024, 4:13 PM
Concurrency is hard 😅
s

Sam

03/18/2024, 4:14 PM
That's why they pay us lots of money for it
😄 1
m

mbonnin

03/18/2024, 4:15 PM
So now, if there is a global
flowOn(Dispatchers.Default)
, it means interceptors are free to do
withContext(Dispatchers.Default) { blabla }
but it's mostly a no-op
s

Sam

03/18/2024, 4:15 PM
Yes, exactly.
m

mbonnin

03/18/2024, 4:16 PM
So it'd feel weird to put in the KDoc to be prepared to be called from any dispatcher while it's in fact not the case
s

Sam

03/18/2024, 4:21 PM
Well, if each interceptor can modify the upstream flow then you don't have much control over which dispatcher anything is called from 😄. But I think if people are using a coroutine-based library, it's fair to expect them to wrap blocking functions with
<http://Dispatchers.IO|Dispatchers.IO>
. That's really the only important thing.
👍 1
I think Ktor runs everything on
<http://Dispatchers.IO|Dispatchers.IO>
"just in case", but I don't have a source for that and I'm not sure how I feel about it
m

mbonnin

03/18/2024, 4:23 PM
you don't have much control over which dispatcher anything is called from
The dispatcher is deterministic by default. The user is free to change it from interceptor but then it becomes their own problem (and I don't think much people will do this)
👍 1
I'll look at what Ktor is doing, that's a good example.
Thanks for the nice discussion 💙 gratitude thank you !
🍻 1
l

louiscad

03/19/2024, 8:56 AM
while still being able to synchronously monitor collections
Can't you just use a middleman custom flow operator?
m

mbonnin

03/19/2024, 8:58 AM
Don't think so because any operator would either run in a separate thread or transform the Flow to a non-fusible one
l

louiscad

03/19/2024, 9:02 AM
The fusible parts fuse as long as adjacent operators are fusible. It's a best-effort thing. The operator runs on the dispatcher of the downstream flowOn dispatcher, or of the collector, it doesn't perform any thread switching itself, and I don't see why you would care about that.
Copy code
fun <T> Flow<T>.monitor(): Flow<T> = flow {
    println("about to collect")
    try {
        withIndex().collect { (index, element) ->
            val number = index + 1
            println("Collecting element #$number")
            emit(element)
        }
    } finally {
        println("Done collecting")
    }
}
The goal of FusibleFlow is to avoid having unnecessary Channels, because (especially before the Channels rewrite I think), those were slightly costly objects. It's to prevent them from adding-up when you chain many operators that require an underlying Channel. Here, we have no Channel, so the worst it can do is split your Channel operators chain in two, which probably won't ever be noticeable to the end user, or to the resources usage and responsiveness of some server running this mostly suspending code.
m

mbonnin

03/19/2024, 9:19 AM
How do you make it so that the
Flow
changes threads though?
l

louiscad

03/19/2024, 9:23 AM
Either have
withContext
at the collect/terminal-operator site, or use
flowOn
downstream (which will apply to all operators and flows applied before, until any other
flowOn
up the chain.
m

mbonnin

03/19/2024, 9:24 AM
at the collect/terminal-operator site
The call site is out of my control
use
flowOn
downstream
How would you do that?
l

louiscad

03/19/2024, 9:25 AM
Another possibility, already mentioned in this thread, is running the generation of values into a withContext, before calling emit. The only requirement is that emit is not called inside that withContext block.
m

mbonnin

03/19/2024, 9:25 AM
running the generation of values into a withContext, before calling emit
The counter argument to that is additional thread hoping
l

louiscad

03/19/2024, 9:26 AM
Copy code
myCoolFlow.flowOn(Dispatchers.Whatever)
    .monitor()
If you switch between Dispatchers.IO and Dispatchers.Default, there's no thread hopping
m

mbonnin

03/19/2024, 9:27 AM
It's between
Default
and
Main
l

louiscad

03/19/2024, 9:27 AM
You can't have free lunch
😭 1
But is that cost prohibitive?
I really doubt so
m

mbonnin

03/19/2024, 9:27 AM
Copy code
myCoolFlow.flowOn(Dispatchers.Whatever).monitor()
That works if I control the call site, all I have is a function return a
Flow
Copy code
fun networkThings(): Flow<Thing>
> But is that cost prohibitive? Would probably have to be measured
l

louiscad

03/19/2024, 9:28 AM
That code I showed doesn't need to control the collect site
m

mbonnin

03/19/2024, 9:28 AM
But it returns a non-fusible
Flow
l

louiscad

03/19/2024, 9:28 AM
That's fine
m

mbonnin

03/19/2024, 9:28 AM
So 2 channels?
l

louiscad

03/19/2024, 9:29 AM
Possibly, if further operators need a channel, yes
That's okay. You won't overload the runtime because you had 2 channel objects
m

mbonnin

03/19/2024, 9:30 AM
I agree in general but also this is how I end up with Slack creating OOMs blob upside down
Am I prematurely optimizing things? Maybe?
l

louiscad

03/19/2024, 9:30 AM
I don't believe this would ever cause OOMs
m

mbonnin

03/19/2024, 9:31 AM
Nah the point is that every abstraction has a hidden cost
l

louiscad

03/19/2024, 9:31 AM
Unless you buffer things with unlimited capacity, and never collect fast enough
m

mbonnin

03/19/2024, 9:31 AM
Sure I can abstract my way out of everything (which I'm already doing by using Kotlin and GC instead of something like C)
The question is where to stop abstracting everything for convenience
Case in point, that issue about okio vs netty performance: https://github.com/grpc/grpc-java/issues/6696
l

louiscad

03/19/2024, 9:32 AM
The raison d'être for FusibleFlow is when you have like 3 or 5 or more channel using operators, and that would be multiplied by the amount of concurrent collections happening in parallel. Most likely a server problem that client/app problem, when put in perspective
nod 1
m

mbonnin

03/19/2024, 9:33 AM
Doing everything async is nice and all but at the end of the day, if you want raw performance, the low level details matters
l

louiscad

03/19/2024, 9:34 AM
Are you writing code for servers, or clients?
m

mbonnin

03/19/2024, 9:34 AM
maybe both?
l

louiscad

03/19/2024, 9:34 AM
"maybe" 🤣😅
m

mbonnin

03/19/2024, 9:35 AM
😂
I'd like people to be able to use it on both server & clients
Android, iOS, web, server, everywhere Kotlin runs
l

louiscad

03/19/2024, 9:35 AM
If
Dispatchers.Main
is among your concerns, it's definitely a client
m

mbonnin

03/19/2024, 9:35 AM
True
The dispatcher would be something else on a server
Not sure what BTW, probably
IO
l

louiscad

03/19/2024, 9:36 AM
For servers, they will be using
Dispatchers.Default
, and you won't ever have thread switching if you happen to use
<http://Dispatchers.IO|Dispatchers.IO>
, unless too many things run there at once of course, and that those extra I/O threads are actually necessary.
👍 1
m

mbonnin

03/19/2024, 9:40 AM
Good point 👍 On the server I want something with virtual threads I guess... But that's a question for another day 😅
3 Views