https://kotlinlang.org logo
Title
m

Matej Drobnič

08/27/2019, 9:49 AM
Is there any elegant way to bind multiple channels to the single scope? I'm trying to make an actor that receives and processes data from multiple channels:
launch {
    val channelA = openChannelA()
    val channelB = openChannelB()
    val channelC = openChannelC()

    while (isActive) {
        select {
            channelA.onReceive {
                ...
            }
            channelB.onReceive {
                ...
            }
            channelC.onReceive {
                ...
            }
        }
    }
}
However, I also want all of those channels closed if the actor is terminated for any reason
From what I can see,
consume
does that, but it is very ugly:
launch {
    val channelA = openChannelA()
    val channelB = openChannelB()
    val channelC = openChannelC()

    channelA.consume {
        channelB.consume {
            channelC.consume {
                while (isActive) {
                    select {
                        channelA.onReceive {
                            ...
                        }
                        channelB.onReceive {
                            ...
                        }
                        channelC.onReceive {
                            ...
                        }
                    }
                }
            }
        }
    }
}
g

gildor

08/27/2019, 10:18 AM
Implementation with consume is incorrent, it will essentially consume them one by one
select is right way to do that
you also can just set select using forEach on list of channels to avoid duplication
also you can use whileSelect
m

Matej Drobnič

08/27/2019, 10:56 AM
right but if I just use select, channels will not close when coroutine cancels
m

Marko Mitic

08/27/2019, 11:29 AM
Channels don't have scopes but coroutines sending the data using channel do,
So, if
openChannelA/B/C()
launches a coroutine in same scope as actor, all those will be canceled with the actor and your channels will be garbage collected eventually
g

gildor

08/27/2019, 11:32 AM
just close channels in context[Job]!!.invokeOnCompletion inside your launch
but even better open channel using produce on
launch
scope as Marko suggesting
m

Matej Drobnič

08/27/2019, 11:37 AM
openChannelA()
do not perform any launchers
for example
Flowable.openSubscription()
from rx integration is not scope-aware
m

Marko Mitic

08/27/2019, 11:55 AM
try/finally should work, not sure how elegant it is
g

gildor

08/27/2019, 1:22 PM
for example
Flowable.openSubscription()
from rx integration is not scope-aware
Do not use it, instead use Flowable.asFlow, that it will be perfectly lifecycle-safe and scope-aware Channel is actually bad wrapper for Flowable
m

Matej Drobnič

08/27/2019, 1:26 PM
does flow support select though?
g

gildor

08/27/2019, 1:30 PM
There is no select for Flow, there are much better operators, like merge/zip/concat depending on your use case
m

Matej Drobnič

08/27/2019, 1:31 PM
not sure any of those fits my use case of creating an actor
g

gildor

08/27/2019, 1:33 PM
if you have N Flowable, it’s very easy to create N Flow from them and listen all events and receive them when first of them available (like with single listener for multiple selects). Again, you do not show example how you use it in actor, if you just do the same kind operatio is just
flattenMerge()
operator
m

Matej Drobnič

08/27/2019, 1:48 PM
all flows are different
every one of them has different type and different logic
so I can't just merge them
g

gildor

08/28/2019, 12:33 AM
Okay, do not merge them, just collect all of them separately, or just move your logic to onEach operator of each flow and than use launchIn(myScope), where myScope is scope of your actor, everything will be correctly connected by structured concurrency
m

Matej Drobnič

08/28/2019, 5:37 AM
But wouldn't that process them in parallel instead of single sequence like the actor example above?
g

gildor

08/28/2019, 5:44 AM
depends on how you use them, it will run in parallel, but as I said, you can merge them (with some wrapper for every type)
👍 1