Is there any elegant way to bind multiple channels...
# coroutines
m
Is there any elegant way to bind multiple channels to the single scope? I'm trying to make an actor that receives and processes data from multiple channels:
Copy code
launch {
    val channelA = openChannelA()
    val channelB = openChannelB()
    val channelC = openChannelC()

    while (isActive) {
        select {
            channelA.onReceive {
                ...
            }
            channelB.onReceive {
                ...
            }
            channelC.onReceive {
                ...
            }
        }
    }
}
However, I also want all of those channels closed if the actor is terminated for any reason
From what I can see,
consume
does that, but it is very ugly:
Copy code
launch {
    val channelA = openChannelA()
    val channelB = openChannelB()
    val channelC = openChannelC()

    channelA.consume {
        channelB.consume {
            channelC.consume {
                while (isActive) {
                    select {
                        channelA.onReceive {
                            ...
                        }
                        channelB.onReceive {
                            ...
                        }
                        channelC.onReceive {
                            ...
                        }
                    }
                }
            }
        }
    }
}
g
Implementation with consume is incorrent, it will essentially consume them one by one
select is right way to do that
you also can just set select using forEach on list of channels to avoid duplication
also you can use whileSelect
m
right but if I just use select, channels will not close when coroutine cancels
m
Channels don't have scopes but coroutines sending the data using channel do,
So, if
openChannelA/B/C()
launches a coroutine in same scope as actor, all those will be canceled with the actor and your channels will be garbage collected eventually
g
just close channels in context[Job]!!.invokeOnCompletion inside your launch
but even better open channel using produce on
launch
scope as Marko suggesting
m
openChannelA()
do not perform any launchers
for example
Flowable.openSubscription()
from rx integration is not scope-aware
m
try/finally should work, not sure how elegant it is
g
for example
Flowable.openSubscription()
from rx integration is not scope-aware
Do not use it, instead use Flowable.asFlow, that it will be perfectly lifecycle-safe and scope-aware Channel is actually bad wrapper for Flowable
m
does flow support select though?
g
There is no select for Flow, there are much better operators, like merge/zip/concat depending on your use case
m
not sure any of those fits my use case of creating an actor
g
if you have N Flowable, it’s very easy to create N Flow from them and listen all events and receive them when first of them available (like with single listener for multiple selects). Again, you do not show example how you use it in actor, if you just do the same kind operatio is just
flattenMerge()
operator
m
all flows are different
every one of them has different type and different logic
so I can't just merge them
g
Okay, do not merge them, just collect all of them separately, or just move your logic to onEach operator of each flow and than use launchIn(myScope), where myScope is scope of your actor, everything will be correctly connected by structured concurrency
m
But wouldn't that process them in parallel instead of single sequence like the actor example above?
g
depends on how you use them, it will run in parallel, but as I said, you can merge them (with some wrapper for every type)
👍 1