Gus
07/07/2021, 10:45 AMBroadcastChannel
, but when I upgrade to coroutines 1.4.0 (or 1.5.0), the consumer's lambda function doesn't get called (even though my debugging shows that ackChannel.sendBlocking(...)
is indeed called). Here's how it's consumed:
ackChannel
.asFlow()
.onEach {
<http://logger.info|logger.info>("onEach called with $it")
ackObserver.onNext(it.toCargoDeliveryAck())
}
.onCompletion { ackObserver.onCompleted() }
.collect()
(Full method definition on GitHub)
I've also tried replacing the BroadcastChannel
with MutableSharedFlow<String>(0, extraBufferCapacity = 1)
(and replacing calls to ackChannel.sendBlocking()
with mutableFlow.tryEmit()
), but the exact same issue persists.
Any idea what I'm doing wrong? I created a PR where this issue is easily reproducible because it breaks a unit test.Zach Klippenstein (he/him) [MOD]
07/07/2021, 4:33 PMGus
07/07/2021, 4:34 PMackObserver.onNext(it.toCargoDeliveryAck())
Zach Klippenstein (he/him) [MOD]
07/07/2021, 4:43 PMGus
07/07/2021, 4:54 PMThread.sleep(1_000)
right after client.collectCargo
(line 189 of the test file, before the server sends a message to the client on line 195), and the issue persists.Zach Klippenstein (he/him) [MOD]
07/07/2021, 5:01 PMGus
07/07/2021, 5:06 PMZach Klippenstein (he/him) [MOD]
07/07/2021, 5:14 PMackChannel
ackChannel.collect { ackChannel.send() }
Which is never going to emit.client.collectCargo
on line 142 also causes things to emit? I’m not sure how that client works.collectCargo
to get a flow back (test:189). Nothing actually happens, since that flow isn’t being collected yet.
2. Your test calls onNext
(test:195), which somehow queues up an element to be emitted by your client the next time an observer is registered.
3. Your test collects the first item from the flow (test:198).
a. `collectCargo`’s callbackFlow
lambda runs.
b. collectCargo
registers an observer (client:142).
c. The client emits the queued element, running the onNext
function (client:117), before the internal collectCargo
on line 142 returns.
d. Because your collect hasn’t ran yet, it gets dropped.
e. ackChannel.….collect()
is executed (client:147) which means subsequent client emissions should get forwarded.Gus
07/07/2021, 5:37 PMclient.collectCargo
function shouldn't be emitting anything because it's a regular Java class generated by the Protocol Buffers tool (so it shouldn't interfere with Kotlin or coroutines).
I need to run now but I'll be back in a few hours and I'll try a few things based on the pointers you've given me -- thanks so much, btw!Zach Klippenstein (he/him) [MOD]
07/07/2021, 5:52 PMclient.collectCargo
couldn’t call onNext
because it’s java