Sergei Grishchenko
06/25/2022, 12:11 PMprivate fun generateWork() = channelFlow {
for (i in 1..10) {
val page = "page$i"
println("Generator sent $page")
send(page)
}
close()
println("Generator is closed")
}
private fun CoroutineScope.doWork(id: Int, flow: Flow<String>) = launch {
flow.collect {
println("Worker $id processed $it")
}
println("Worker $id finished")
}
suspend fun performWork() {
try {
coroutineScope {
val workFlow: Flow<String?> = generateWork()
val sharedWorkFlow = workFlow
.onCompletion { cause -> if (cause == null) emit(null) }
.shareIn(this, WhileSubscribed())
.takeWhile { it != null }
.filterNotNull()
val workersCount = 10
List(workersCount) { id ->
val workPartFlow = sharedWorkFlow
.withIndex()
.filter { (index, _) -> index % workersCount == id }
.map { (_, value) -> value }
doWork(id, workPartFlow)
}.joinAll()
cancel()
}
} catch (e: CancellationException) {
println("Work is performed")
}
}
So my questions are:
1. Is there possibility to implement it simpler?
2. Am I use Shared Flow correctly?
3. Are there ways to unsubscribe from Shared Flow aside from canceling of coroutineScope
?
4. Is there way to make Shared Flow finite aside from emitting some special value from onCompletion
(it is null in my case) and use takeWhile
to track it?
Thank youNick Allen
06/25/2022, 8:01 PMChannel
(not wrapped in a Flow
). Then each worker only gets one item, no item is missed, completion is supported.
2. Because you don't actually want to "share" the items, this isn't quite an applicable use case. Also you need to be really careful with shared flows that you have subscribed to the Flow before it spits out the items you want. The WhileSubscribed
means it won't start until you have one listener ... but nothing in your code actually stops it from emitting everything to that first subscriber (which will throw away all but one item) before the other workers subscribe and then there's nothing left. If you ever do have a bunch a subscribers to "setup" first then you need to explicitly delay emission until they are ready. Could do something like onStart { ready.await() }
before you shareIn
. But only worry about that if you want multiple subscribers handling the same emissions.
3. Nope
4. NopeSergei Grishchenko
06/25/2022, 9:58 PMChannel
is hot stream, but I want to keep my generateWork
cold so I don't want to request any resources until someone subscribes to it.
2. You are definitely write, I don't want to share items. I want to share common resource (flow) between few receivers to separate work, so if SharedFrow
is not for it what if for? And to be honest, I didn't understand the point about "emitting everything to that first subscriber". As far I know, subscribers can be created only in some terminal operators like collect
or in special operators that tern cold stream to hot stream like shareIn
. So you said that "nothing in your code actually stops it from emitting everything to that first subscriber" but if I don't use such terminal or special operator after creation of shared flow, and I don't pass sharedWorkFlow
anywhere without preparation (filtering by index), sharedWorkFlow
stays my private variable and nobody can just subscribe to sharedWorkFlow
and read what it want. May be I missed something...Nick Allen
06/26/2022, 10:17 PM1. As far I know,As soon as you create it, you subscribe to it. So what's the point?is hot stream, but I want to keep myChannel
cold so I don't want to request any resources until someone subscribes to it.generateWork
You are definitely write, I don't want to share items. I want to share common resource (flow) between few receivers to separate workShare a
Channel
instead, as-in pass the single Channel
to all your works.
so ifIt's for when you want all the listeners to get all the events. Like distributing notifications. Think of any API that hasis not for it what if for?SharedFrow
addListener/removeListener
type of methods. There's not really an "end" to the notifications. Listeners say when they want to start and when they are done. Just like a SharedFlow
listener says it wants to start by calling collect
and when it's done, generally by cancelling the coroutine running collect
.
And to be honest, I didn't understand the point about "emitting everything to that first subscriber"Consider this ordering of events: 1. doWork is called for id=0. 2.
generateWork
now has a subscriber and sends it's 10 items to it's one listener and doWork processes all the items.
3. doWork is called for id=1
4. onCompletion lambda sends out null
5. doWork for id=0 and id=1 both get the null and exit.
Page 2 (index 1) is never processed.
nobody can just subscribe toThis doesn't sound like the your sample code. Is your real scenario more complex?and read what it want.sharedWorkFlow
Sergei Grishchenko
06/27/2022, 7:35 AMsharedWorkFlow
is hot, it is possible that when some subscriber subscribes to it, it can be done at that moment. reasonably.
To be honest, I don't have any real scenario, I tried to explore coroutines API and I paid attention, that ecosystem around Channels is covered by @Deprecated
and @Obsolete
annotations, so I thought, If we have cold streams as Flow, and we have hot streams as StateFlow/SharedFlow, may be we don't need Channels anymore, but it seems my assumption was incorrect.
The first reactive framework I met was RxJS
, so there is abstraction Observable
that represent any stream and usually it is cold (like Flow
in coroutines) and there are different implementations of Subject
(BehaviorSubject
for state, RepeatSubject
for few last events on subscription) that implements Observable
and represents hot streams, but there is no analog for Channel, so I try to figure out what the purpose and role of such entity like Channel