Can I have a `channelFlow{}` that does not dispatc...
# coroutines
m
Can I have a
channelFlow{}
that does not dispatch until after its
block
is started? Reason is I would like the caller to be able to control the buffer size while still being able to synchronously monitor collections (code in ๐Ÿงต )
Copy code
@Test
  fun myTest() = runBlocking {
    println("1")
    launch(start = CoroutineStart.UNDISPATCHED) {
      val flow = channelFlow {
        println("3")
        withContext(Dispatchers.Default) {
          // Do some work
          println("5")
          send(Unit)
        }
      }

      println("2")
      flow.collect()
    }

    println("4")
  }
I'd like that to print
1 2 3 4 5
. It currently prints
1 2 4 3 5
s
I would like the caller to be able to control the buffer size while still being able to synchronously monitor collections
Can you give more of a concrete example of what this means? It's not obvious from the code you shared.
m
It's a function that returns a Flow
Copy code
fun retrieveFromNetwork(): Flow<Response> = channelFlow { ...
And I'd like the caller to be able to control the buffering:
Copy code
retrieveFromNetwork().buffer(CONFLATED)
retrieveFromNetwork().buffer(UNLIMITED)
//etc...
Depending the use cases, it might or might not be OK to drop items
The "synchronous" part is to interact with an Android
IdlingResource
. I want the
IdlingResource
to appear busy synchronously. If not, there's always a small race condition
s
Can't you just use
onStart
?
m
I'm afraid if I use
onStart
, fusing of the
Flows
won't work
Because the return value of
onStart{}
is not a
FusibleFlow
s
I think the same will be true of any version you could come up with that would solve this problem ๐Ÿ˜ž
m
Yea, this is more or less the conclusion I reached too but I fail to see the fundamental reason why this is not possible
s
I don't think it's fundamentally impossible, it's just that the implementation of
ChannelFlow
starts with a coroutine dispatch, and all the built-in fusible flows are based on that class. Making your own alternative implementation is theoretically possible but I wouldn't want to try ๐Ÿ˜„
m
I considered for a brief moment making my own implementation ๐Ÿ˜… But it addition to all the potential footguns, I'm almost sure it can't be done without some internal API
Or maybe just
InternalCoroutinesApi
APIs but that does sound scary
s
Since
FusibleFlow
itself is
@InternalCoroutinesApi
I think you're right ๐Ÿคฆ
Apparently we mere mortals are not worthy to implement it
m
Not that I really wanted too ๐Ÿ™ˆ
But maybe a
channelFlow(start = CoroutineStart.UNDISPATCHED) {}
... But that probably have far reached implications as well
s
I tried it
Copy code
@OptIn(ExperimentalTypeInference::class)
fun <T> customChannelFlow(
  start: CoroutineStart = CoroutineStart.ATOMIC,
  @BuilderInference block: suspend ProducerScope<T>.() -> Unit
): Flow<T> = CustomChannelFlow(
  block,
  start,
  EmptyCoroutineContext,
  Channel.BUFFERED,
  BufferOverflow.SUSPEND
)

@OptIn(InternalCoroutinesApi::class)
class CustomChannelFlow<T>(
  private val block: suspend ProducerScope<T>.() -> Unit,
  private val start: CoroutineStart,
  context: CoroutineContext,
  capacity: Int,
  onBufferOverflow: BufferOverflow
) : FusibleFlow<T>, ChannelFlow<T>(context, capacity, onBufferOverflow) {
  override suspend fun collectTo(scope: ProducerScope<T>) = block(scope)

  override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T> {
    return CustomChannelFlow(block, start, context, capacity, onBufferOverflow)
  }

  override fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> {
    return scope.produce(context, capacity, start, null, block)
  }
}
I hate it
๐Ÿคฃ 1
m
Copy code
flowOn(Dispatchers.Unconfined)
seems to work although I'm not 100% sure it really does ๐Ÿ˜…
s
Well anything that relies on the dispatch order of coroutines is probably a very bad idea ๐Ÿ˜„
nod 1
m
Copy code
fun networkThings(): Flow<Unit> = channelFlow {
    println("3")
    withContext(Dispatchers.Default) {
      // Do some work
      Thread.sleep(100)
      println("5")
      send(Unit)
    }
  }.flowOn(Dispatchers.Unconfined)
Well anything that relies on the dispatch order of coroutines is probably a very bad idea
I agree in general but there's also a reason why
UNDISPATCHED
is public API
In my specific case, I think
IdlingResource
is the API that "taints" rest of the code
It's kind of bridging blocking code to suspending code
s
Is there a way to instrument the thing behind the flow instead of the flow itself? Catch it before it ever becomes a coroutine
I'm assuming this is purely for testing
m
All other solutions I can see would require changing our public API, which I'm not very keen on doing.
It's a lib. If it were an app, I would probably do things differently indeed
s
Another question: does it absolutely have to be a channel flow?
m
I think it could be any kind of
FLow
s
If it's just a regular flow the problem goes away, because it doesn't need to be fusible
You can add the
onStart
hook before adding the buffer
๐Ÿ’ก 1
m
Excellent point ๐Ÿ‘, I can't remember why I started looking into
channelFlow{}
It needs to change the dispatcher but that probably can be done without a channel ๐Ÿค”
s
So long as you don't emit from inside the
withContext
block you're fine
Copy code
emit(withContext(Dispatchers.IO) { getValue() }) // fine
withContext(Dispatchers.IO) { emit(getValue()) } // not fine
๐Ÿ‘€ 1
๐Ÿ‘ 1
m
This means you can't really collect a
Flow
without a channel, though, right? The thread hopping has to be done for each individual item, right? i.e. stuff like this doesn't work:
Copy code
return flow {
      withContext(Dispatchers.Default) {
        flowOf(1, 2).collect {
          // breaking the Flow invariant
          emit(it)
        }
      }
    }
In our case, it's probably too late to do that. There's a lot going on for each item: cache reading, parsing, network, cache writing, potentially suspending user code as well. Removing the channel would mean a lot of thread hoping + making sure everyone in the chain observes the "do work a background thread" contract...
s
Only the coroutine that's collecting the flow is allowed to emit values. A channel flow uses a channel so that other coroutines can send their values to the collector coroutine, where the values are permitted to be emitted. But I'm curious what you were trying to show with your code exampleโ€ฆ I agree it doesn't work, but I don't think it would be useful even if it did.
m
Yea agreed my example is not the most useful ๐Ÿ˜„ . The thing is in our internals, everything is a
Flow
. We have this interceptor API for an example. It's. handy cause you can modify the call easily:
Copy code
override fun <D : Operation.Data> intercept(request: ApolloRequest<D>, chain: ApolloInterceptorChain): Flow<ApolloResponse<D>> {
      // Customize request here
      return chain.proceed(request).map { 
        // Customize responses here
      }
    }
If this interceptor runs on
Dispatchers.Main
it means you have to change the dispatcher in each interceptor for each non-trivial work:
Copy code
override fun <D : Operation.Data> intercept(request: ApolloRequest<D>, chain: ApolloInterceptorChain): Flow<ApolloResponse<D>> {
      return chain.proceed(request).onStart {
        withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
          // do some work 
        }
      }
    }
Which is fine if you have a couple of places but: โ€ข requiring the
withContext(<http://Dispatchers.IO|Dispatchers.IO>)
would be breaking the current contract (probably OK if there's a good reason to do so) โ€ข there are a bunch of interceptors. Right now I can think of at least 5-6. So that means each of them has to dispatch to a background thread and then come back to the main thread to emit before letting follow up interceptors do the same. That sounds quite inefficient
But this kind of things is hard so maybe it's not that inefficient, I don't know blob shrug . It sounds risky though because it could be easy to run things on the main thread if not careful
s
What do you mean by "If this interceptor runs on `Dispatchers.Main`"? The
intercept
function itself isn't involved in the flow collection; it can run on whatever thread it likes.
The flow will run on whatever thread you collect it on, unless it has some upstream call to
flowOn
m
Right
intercept
will be called on main. The
map{}
in
chain.proceed(request).map{}
depends though
Same for any operator you would add there
If the flow is collected from the main thread (which is the default), every operator is called from the main thread
Unless there's a global
flowOn()
that changes the dispatcher that executes everything
s
flowOn
affects everything upstream of it, unless of course it bumps into another
flowOn
further upstream
๐Ÿ‘ 1
m
Right ๐Ÿ‘
s
If you're dealing with the main thread I don't think you can escape all this completely. Either you have to collect the flow on the main thread and require all the interceptors to be main-safe, or you have to collect the flow on some other thread and require the interceptors to specify explicitly if they need to run on the main thread.
m
I think none of the interceptors should really run on the main thread
So by changing the dispatcher more "globally" (unless someone else changes it again) the contract is "you're free to call blocking code in interceptors". But if someone changes the dispatchers in an interceptor then it has side effects for some of the other ones...
s
Well, if they called
chain.proceed().flowOn(...)
, then yes, they could change the context for upstream interceptors
I'm not a big fan of "you're free to call blocking code in interceptors", but that's just personal opinion. Rather, suspending functions should always aim to be caller agnostic, and not rely on being called from any specific dispatcher.
m
yea that's a good point.
s
Well, if they called
chain.proceed().flowOn(...)
, then yes, they could change the context for upstream interceptors
Thinking about it, you'd have exactly the same problem when using the interceptor pattern for a one-shot function. So it doesn't sound too awful.
๐Ÿ‘ 1
m
I still think there's some value in avoiding extra thread hops? Or am I overthinking this?
Some interceptors will have to offload work to other threads. If they're called from a dispatcher that can handle work, my mental model is that it would avoid a context switch
s
The implementation will mostly take care of avoiding unnecessary thread switches. Maybe a good approach would be to start from
Dispatchers.Default
, since that allows coroutines to switch to
<http://Dispatchers.IO|Dispatchers.IO>
without a thread-switch if they need to.
Have you got an example of a thread-switch scenario that you're worried about?
m
It's mostly
Dispatchers.Main
and
Dispatchers.Default
I thinks it's fair to assume
getThings(): Flow<Thing>
is always called from the main thread
building the thing requires several interceptors that dispatch on
Dispatchers.Default
(or
IO
, both could share the threadpool)
But in between interceptors, I'd like to avoid going back to
Dispatchers.Main
, hence the global
flowOn(Dispatchers.Default)
s
๐Ÿ‘ that makes sense
m
Concurrency is hard ๐Ÿ˜…
s
That's why they pay us lots of money for it
๐Ÿ˜„ 1
m
So now, if there is a global
flowOn(Dispatchers.Default)
, it means interceptors are free to do
withContext(Dispatchers.Default) { blabla }
but it's mostly a no-op
s
Yes, exactly.
m
So it'd feel weird to put in the KDoc to be prepared to be called from any dispatcher while it's in fact not the case
s
Well, if each interceptor can modify the upstream flow then you don't have much control over which dispatcher anything is called from ๐Ÿ˜„. But I think if people are using a coroutine-based library, it's fair to expect them to wrap blocking functions with
<http://Dispatchers.IO|Dispatchers.IO>
. That's really the only important thing.
๐Ÿ‘ 1
I think Ktor runs everything on
<http://Dispatchers.IO|Dispatchers.IO>
"just in case", but I don't have a source for that and I'm not sure how I feel about it
m
you don't have much control over which dispatcher anything is called from
The dispatcher is deterministic by default. The user is free to change it from interceptor but then it becomes their own problem (and I don't think much people will do this)
๐Ÿ‘ 1
I'll look at what Ktor is doing, that's a good example.
Thanks for the nice discussion ๐Ÿ’™ gratitude thank you !
๐Ÿป 1
l
while still being able to synchronously monitor collections
Can't you just use a middleman custom flow operator?
m
Don't think so because any operator would either run in a separate thread or transform the Flow to a non-fusible one
l
The fusible parts fuse as long as adjacent operators are fusible. It's a best-effort thing. The operator runs on the dispatcher of the downstream flowOn dispatcher, or of the collector, it doesn't perform any thread switching itself, and I don't see why you would care about that.
Copy code
fun <T> Flow<T>.monitor(): Flow<T> = flow {
    println("about to collect")
    try {
        withIndex().collect { (index, element) ->
            val number = index + 1
            println("Collecting element #$number")
            emit(element)
        }
    } finally {
        println("Done collecting")
    }
}
The goal of FusibleFlow is to avoid having unnecessary Channels, because (especially before the Channels rewrite I think), those were slightly costly objects. It's to prevent them from adding-up when you chain many operators that require an underlying Channel. Here, we have no Channel, so the worst it can do is split your Channel operators chain in two, which probably won't ever be noticeable to the end user, or to the resources usage and responsiveness of some server running this mostly suspending code.
m
How do you make it so that the
Flow
changes threads though?
l
Either have
withContext
at the collect/terminal-operator site, or use
flowOn
downstream (which will apply to all operators and flows applied before, until any other
flowOn
up the chain.
m
at the collect/terminal-operator site
The call site is out of my control
use
flowOn
downstream
How would you do that?
l
Another possibility, already mentioned in this thread, is running the generation of values into a withContext, before calling emit. The only requirement is that emit is not called inside that withContext block.
m
running the generation of values into a withContext, before calling emit
The counter argument to that is additional thread hoping
l
Copy code
myCoolFlow.flowOn(Dispatchers.Whatever)
    .monitor()
If you switch between Dispatchers.IO and Dispatchers.Default, there's no thread hopping
m
It's between
Default
and
Main
l
You can't have free lunch
๐Ÿ˜ญ 1
But is that cost prohibitive?
I really doubt so
m
Copy code
myCoolFlow.flowOn(Dispatchers.Whatever).monitor()
That works if I control the call site, all I have is a function return a
Flow
Copy code
fun networkThings(): Flow<Thing>
> But is that cost prohibitive? Would probably have to be measured
l
That code I showed doesn't need to control the collect site
m
But it returns a non-fusible
Flow
l
That's fine
m
So 2 channels?
l
Possibly, if further operators need a channel, yes
That's okay. You won't overload the runtime because you had 2 channel objects
m
I agree in general but also this is how I end up with Slack creating OOMs blob upside down
Am I prematurely optimizing things? Maybe?
l
I don't believe this would ever cause OOMs
m
Nah the point is that every abstraction has a hidden cost
l
Unless you buffer things with unlimited capacity, and never collect fast enough
m
Sure I can abstract my way out of everything (which I'm already doing by using Kotlin and GC instead of something like C)
The question is where to stop abstracting everything for convenience
Case in point, that issue about okio vs netty performance: https://github.com/grpc/grpc-java/issues/6696
l
The raison d'รชtre for FusibleFlow is when you have like 3 or 5 or more channel using operators, and that would be multiplied by the amount of concurrent collections happening in parallel. Most likely a server problem that client/app problem, when put in perspective
nod 1
m
Doing everything async is nice and all but at the end of the day, if you want raw performance, the low level details matters
l
Are you writing code for servers, or clients?
m
maybe both?
l
"maybe" ๐Ÿคฃ๐Ÿ˜…
m
๐Ÿ˜‚
I'd like people to be able to use it on both server & clients
Android, iOS, web, server, everywhere Kotlin runs
l
If
Dispatchers.Main
is among your concerns, it's definitely a client
m
True
The dispatcher would be something else on a server
Not sure what BTW, probably
IO
l
For servers, they will be using
Dispatchers.Default
, and you won't ever have thread switching if you happen to use
<http://Dispatchers.IO|Dispatchers.IO>
, unless too many things run there at once of course, and that those extra I/O threads are actually necessary.
๐Ÿ‘ 1
m
Good point ๐Ÿ‘ On the server I want something with virtual threads I guess... But that's a question for another day ๐Ÿ˜