https://kotlinlang.org logo
Title
s

Sergei Grishchenko

06/25/2022, 12:11 PM
Hi, I am experimenting with coroutines and trying to implement some broadcast flow to separate work between few workers, here is what I have now
private fun generateWork() = channelFlow {
    for (i in 1..10) {
        val page = "page$i"
        println("Generator sent $page")
        send(page)
    }
    close()
    println("Generator is closed")
}

private fun CoroutineScope.doWork(id: Int, flow: Flow<String>) = launch {
    flow.collect {
        println("Worker $id processed $it")
    }
    println("Worker $id finished")
}

suspend fun performWork() {
    try {
        coroutineScope {
            val workFlow: Flow<String?> = generateWork()

            val sharedWorkFlow = workFlow
                .onCompletion { cause -> if (cause == null) emit(null) }
                .shareIn(this, WhileSubscribed())
                .takeWhile { it != null }
                .filterNotNull()

            val workersCount = 10

            List(workersCount) { id ->
                val workPartFlow = sharedWorkFlow
                    .withIndex()
                    .filter { (index, _) -> index % workersCount == id }
                    .map { (_, value) -> value }

                doWork(id, workPartFlow)
            }.joinAll()

            cancel()
        }
    } catch (e: CancellationException) {
        println("Work is performed")
    }
}
So my questions are: 1. Is there possibility to implement it simpler? 2. Am I use Shared Flow correctly? 3. Are there ways to unsubscribe from Shared Flow aside from canceling of
coroutineScope
? 4. Is there way to make Shared Flow finite aside from emitting some special value from
onCompletion
(it is null in my case) and use
takeWhile
to track it? Thank you
n

Nick Allen

06/25/2022, 8:01 PM
1. Yes, just use
Channel
(not wrapped in a
Flow
). Then each worker only gets one item, no item is missed, completion is supported. 2. Because you don't actually want to "share" the items, this isn't quite an applicable use case. Also you need to be really careful with shared flows that you have subscribed to the Flow before it spits out the items you want. The
WhileSubscribed
means it won't start until you have one listener ... but nothing in your code actually stops it from emitting everything to that first subscriber (which will throw away all but one item) before the other workers subscribe and then there's nothing left. If you ever do have a bunch a subscribers to "setup" first then you need to explicitly delay emission until they are ready. Could do something like
onStart { ready.await() }
before you
shareIn
. But only worry about that if you want multiple subscribers handling the same emissions. 3. Nope 4. Nope
🚀 1
:thank-you: 1
s

Sergei Grishchenko

06/25/2022, 9:58 PM
Thanks a lot, but I have additional questions on top of your answers: 1. As far I know,
Channel
is hot stream, but I want to keep my
generateWork
cold so I don't want to request any resources until someone subscribes to it. 2. You are definitely write, I don't want to share items. I want to share common resource (flow) between few receivers to separate work, so if
SharedFrow
is not for it what if for? And to be honest, I didn't understand the point about "emitting everything to that first subscriber". As far I know, subscribers can be created only in some terminal operators like
collect
or in special operators that tern cold stream to hot stream like
shareIn
. So you said that "nothing in your code actually stops it from emitting everything to that first subscriber" but if I don't use such terminal or special operator after creation of shared flow, and I don't pass
sharedWorkFlow
anywhere without preparation (filtering by index),
sharedWorkFlow
stays my private variable and nobody can just subscribe to
sharedWorkFlow
and read what it want. May be I missed something...
n

Nick Allen

06/26/2022, 10:17 PM
1. As far I know,
Channel
is hot stream, but I want to keep my
generateWork
cold so I don't want to request any resources until someone subscribes to it.
As soon as you create it, you subscribe to it. So what's the point?
You are definitely write, I don't want to share items. I want to share common resource (flow) between few receivers to separate work
Share a
Channel
instead, as-in pass the single
Channel
to all your works.
so if
SharedFrow
is not for it what if for?
It's for when you want all the listeners to get all the events. Like distributing notifications. Think of any API that has
addListener/removeListener
type of methods. There's not really an "end" to the notifications. Listeners say when they want to start and when they are done. Just like a
SharedFlow
listener says it wants to start by calling
collect
and when it's done, generally by cancelling the coroutine running
collect
.
And to be honest, I didn't understand the point about "emitting everything to that first subscriber"
Consider this ordering of events: 1. doWork is called for id=0. 2.
generateWork
now has a subscriber and sends it's 10 items to it's one listener and doWork processes all the items. 3. doWork is called for id=1 4. onCompletion lambda sends out null 5. doWork for id=0 and id=1 both get the null and exit. Page 2 (index 1) is never processed.
nobody can just subscribe to
sharedWorkFlow
and read what it want.
This doesn't sound like the your sample code. Is your real scenario more complex?
s

Sergei Grishchenko

06/27/2022, 7:35 AM
Thank so much once again, It seems I have understood your point, because
sharedWorkFlow
is hot, it is possible that when some subscriber subscribes to it, it can be done at that moment. reasonably. To be honest, I don't have any real scenario, I tried to explore coroutines API and I paid attention, that ecosystem around Channels is covered by
@Deprecated
and
@Obsolete
annotations, so I thought, If we have cold streams as Flow, and we have hot streams as StateFlow/SharedFlow, may be we don't need Channels anymore, but it seems my assumption was incorrect. The first reactive framework I met was
RxJS
, so there is abstraction
Observable
that represent any stream and usually it is cold (like
Flow
in coroutines) and there are different implementations of
Subject
(
BehaviorSubject
for state,
RepeatSubject
for few last events on subscription) that implements
Observable
and represents hot streams, but there is no analog for Channel, so I try to figure out what the purpose and role of such entity like Channel