Hi folks. I'm trying to upgrade from coroutines v1...
# coroutines
g
Hi folks. I'm trying to upgrade from coroutines v1.3.9 to v1.5 in a Kotlin-based JVM library, but I'm struggling because of a change introduced in coroutines v1.4.0 (which doesn't seem mentioned on the release notes). I have a method where I'm using
BroadcastChannel
, but when I upgrade to coroutines 1.4.0 (or 1.5.0), the consumer's lambda function doesn't get called (even though my debugging shows that
ackChannel.sendBlocking(...)
is indeed called). Here's how it's consumed:
Copy code
ackChannel
                .asFlow()
                .onEach {
                    <http://logger.info|logger.info>("onEach called with $it")
                    ackObserver.onNext(it.toCargoDeliveryAck())
                }
                .onCompletion { ackObserver.onCompleted() }
                .collect()
(Full method definition on GitHub) I've also tried replacing the
BroadcastChannel
with
MutableSharedFlow<String>(0, extraBufferCapacity = 1)
(and replacing calls to
ackChannel.sendBlocking()
with
mutableFlow.tryEmit()
), but the exact same issue persists. Any idea what I'm doing wrong? I created a PR where this issue is easily reproducible because it breaks a unit test.
z
Which lambda isn’t getting called (line number?)
g
145, based on my logging statement
ackObserver.onNext(it.toCargoDeliveryAck())
z
Does your client emit immediately (on line 142)? Any emissions that happen before the channel’s flow is collected will be dropped.
g
Good point. Yeah, I think that could certainly happen in the unit test (https://github.com/relaycorp/awala-cogrpc-jvm/blob/94b6205ae95e8db8e57a92b0c8da19ac5662a5e7/src/test/kotlin/tech/relaycorp/relaynet/cogrpc/client/CogRPCClientTest.kt#L180-L212) and my own manual testing. However, I just tested adding a
Thread.sleep(1_000)
right after
client.collectCargo
(line 189 of the test file, before the server sends a message to the client on line 195), and the issue persists.
z
yea i wouldn’t expect putting a sleep there to have any effect
g
I thought that'd ensure that the flow be collected before the server sends messages to the client, but then again I'm new to Kotlin/coroutines so I don't doubt I'm wrong 🙂 Any suggestions to do that?
z
Wait, nothing’s sending into
ackChannel
You effectively have
Copy code
ackChannel.collect { ackChannel.send() }
Which is never going to emit.
Unless the internal
client.collectCargo
on line 142 also causes things to emit? I’m not sure how that client works.
If it does, then what I suspect is happening is: 1. Your test calls
collectCargo
to get a flow back (test:189). Nothing actually happens, since that flow isn’t being collected yet. 2. Your test calls
onNext
(test:195), which somehow queues up an element to be emitted by your client the next time an observer is registered. 3. Your test collects the first item from the flow (test:198). a. `collectCargo`’s
callbackFlow
lambda runs. b.
collectCargo
registers an observer (client:142). c. The client emits the queued element, running the
onNext
function (client:117), before the internal
collectCargo
on line 142 returns. d. Because your collect hasn’t ran yet, it gets dropped. e.
ackChannel.….collect()
is executed (client:147) which means subsequent client emissions should get forwarded.
g
Hmm, I think I get what you're saying, but I find it weird that this works reliably with v1.3.9 of the coroutines library. Also, the internal
client.collectCargo
function shouldn't be emitting anything because it's a regular Java class generated by the Protocol Buffers tool (so it shouldn't interfere with Kotlin or coroutines). I need to run now but I'll be back in a few hours and I'll try a few things based on the pointers you've given me -- thanks so much, btw!
z
i don’t see why
client.collectCargo
couldn’t call
onNext
because it’s java
👍 1
(maybe my wording was confusing - i said “emit” to mean “call `onNext`”)
👍 1