I want to create a Flow that is backed by somethin...
# coroutines
s
I want to create a Flow that is backed by something like a channel so I can emit values to it from the outside, but I don't figure out what's right here. Maybe "channelFlow()" is the correct thing, but I don't find a tutorial for that. 😕 I want to send URLs to a flow while a user scrolls through a photo gallery to process thumbnails. I don't want to span multiple coroutines that read from disk in parallel as this is a performance problem. Reading from disk should be in sequence, but after that creating thumbnails in memory can be done in parallel. What's a good way to achieve that?
✅ 1
z
That’s what
MutableSharedFlow
is for.
s
"all collectors get all emitted values" is not what I want 😕
z
Ah, missed that. In that case,
ReceiveChannel.collectAsFlow
or
.receiveAsFlow
.
s
But for that I would need to create an Channel and keep it open. I was thinking that Channels are something bad.
I solved my sequentiell reading wish by using a SingleThreadCoroutineScope
z
Well, channels are “bad” for use cases where they’re not necessary 🙂
SingleThreadCoroutineScope
is probably “bad” here because it’s defeating the whole point of coroutines (concurrency without threads).
s
Yes, but everything before and after the file IO part still runs in parallel. I'm not sure if having a lot of file handles open at the same time is a good thing.
z
You can get serial execution using coroutine-native tools, like
Mutex
and
Channel
, which is a perfectly valid use case for those things.
Seems like what you were trying to do initially is fine: a channel here to act as a work queue, a coroutine to read from it and read from disk serially, and then maybe fanning out to something to create thumbnails (CPU-bound work)
s
Yes, but instead of a Channel I try to do the same with a Flow, because it's cold and Channels are "bad" because they are like an open inputstream according to the KotlinConf19 talk.
z
So something like
Copy code
val workQueue: Channel<URL> = Channel()
val imageLoader = launch(<http://Dispatchers.IO|Dispatchers.IO>) {
  for (url in workQueue) {
    // Load images serially.
    val bitmap = loadBitmap(url) // assuming this is blocking?

    // Thumbnail creation is CPU-bound, so fan out.
    launch(Dispatchers.Default) {
      createThumbnail(bitmap)
    }
  }
}
🙏 1
You’re correct in so much as that you’d need to close the channel at some point. But how you do that kind of depends on the rest of your code.
E.g. this could live in a repository, and then when your repository is shutdown you could close the channel.
s
So my misunderstanding is that Channels are still to be used and cannot be replaced with a Flow in those cases?
z
(You’d also want to cancel the scope that you’re using to launch all these coroutines at the same time)
This seems like a use case for channels yes – a private implementation detail used for communicating between “push” code into a privately-owned coroutine.
Channels are effectively blocking queues – if you need a blocking queue, then use a blocking queue.
s
So there is no Flow-alternative for a Channel
z
That’s like asking if there’s a car-alternative for a bicycle. They are designed for different use cases.
Many flow operators use channels internally
s
Ok, I see. In the talk about Flow I got the feeling that Channels are an old thing and Flow the new way.
z
Flow’s are not blocking queues, they’re reactive streams.
What you saw might be referring to the fact that, before Flow existed, it was common to try to use channels as reactive streams, which they’re not, and which caused lots of problems because of that.
s
Oh, ok. But why there are SharedFlows that allow to emit values like Channels? That confuses me.
z
They’re useful for emitting streams of events to an arbitrary number of subscribers.
I am pretty sure you could use
MutableSharedFlow
to implement this as well, but because you only ever have a single collector, which you also control, it’s not much different from using a channel, and the channel is probably simpler.
And even then you’d still need to do cleanup to at least cancel your coroutine scope.
s
Ok, thank you a lot helping me to understand that. 🙂