louiscad
05/09/2019, 3:57 PMkevin.cianfarini
05/09/2019, 4:46 PMrecieve
the element from that channel. Each time you receive, send
to this
channelDominaezzz
05/09/2019, 5:15 PMselect
.Paul Woitaschek
05/09/2019, 6:24 PMbj0
05/09/2019, 8:45 PMselect
in a produce
, but I haven't looked at any of the new Flow
stufflouiscad
05/09/2019, 10:24 PMgaetan
05/09/2019, 10:44 PMbohsen
05/10/2019, 5:53 AMbj0
05/10/2019, 5:57 AMselect
and produce
louiscad
05/10/2019, 7:23 AMConflatedBroadcastChannel
.
I wish I could have a shorter solution that could scale better, that's why I asked.
Also, I have no clue how I can do this with select
and produce
.Dominaezzz
05/10/2019, 8:20 AMlouiscad
05/10/2019, 10:22 AMDominaezzz
05/10/2019, 10:26 AMDataClass(channel1.receive(), channel2.receive(), channel3.receive(), channel4.receive())
?louiscad
05/10/2019, 10:28 AMDominaezzz
05/10/2019, 10:30 AMbj0
05/11/2019, 7:26 AM= produce {
louiscad
05/13/2019, 7:43 AMfun <E1, E2, R> CoroutineScope.combineLatest(
one: ReceiveChannel<E1>,
two: ReceiveChannel<E2>,
transform: (one: E1, two: E2) -> R
): ReceiveChannel<R> {
val output = Channel<R>(capacity = Channel.CONFLATED)
output.invokeOnClose {
one.cancel()
two.cancel()
}
launch {
var latestOne: E1 = one.receive()
var latestTwo: E2 = two.receive()
output.send(transform(latestOne, latestTwo))
while (true) {
select<Unit> {
one.onReceive { latestOne = it }
two.onReceive { latestTwo = it }
}
output.send(transform(latestOne, latestTwo))
}
}.invokeOnCompletion {
one.cancel()
two.cancel()
}
return output
}
(Edited to return ReceiveChannel<T>
.)
Do you think it's correct, especially regarding closing handling?Paulius Ruminas
05/13/2019, 7:51 AMvar latestOne: E1 = one.receive()
var latestTwo: E2 = two.receive()
select<Unit> {
one.onReceive { latestOne = it }
two.onReceive { latestTwo = it }
}
then these statements will throw CancellationException
. Is this the behaviour you want?louiscad
05/13/2019, 8:11 AMlaunch
. I should cancel the two other channels though.
I edited the snippet to return ReceiveChannel<T>
BTW.Paulius Ruminas
05/13/2019, 8:21 AMYes, that shouldn't be a problem since it'll be eaten byBut it will cancel the whole scope if it is notlaunch
Supervised
.
runBlocking {
val a = produce {
while (true) {
delay(1_000)
send(1)
}
}
val b = produce {
while (true) {
delay(2_000)
send(2)
}
}
coroutineScope {
launch {
while (true) {
println("Test")
delay(1_000)
}
}
launch {
combineLatest(a, b) { a, b -> Pair(a, b) }.consumeEach {
println(it)
}
}
delay(10_000)
a.cancel()
}
}
When a.cancel()
is called it will throw an Exception
and cancel the coroutine that prints "Test".louiscad
05/13/2019, 8:25 AMrunBlocking {
launch {}.cancel()
}
doesn't throw because cancellation is not a crash, so it doesn't propagate to parent scope when the coroutine is concurrent (as with launch
or async
).
So I don't see a problem here.Dico
05/13/2019, 3:57 PMonReceive
behaviour.Dominaezzz
05/13/2019, 6:05 PMlouiscad
05/13/2019, 6:18 PM