Hello Coroutine enthusiasts, I'm having quite a bi...
# coroutines
h
Hello Coroutine enthusiasts, I'm having quite a bit of trouble with polling. I'm working on an AWS SQS Poller that should be constantly pinging AWS to ask for new tasks. I created a class with a start function like
Copy code
private val pollingScope = CoroutineScope(parentScope.coroutineContext + SupervisorJob())

fun startPolling() = pollingScope.launch {
        while (isActive) {
            // To allow us to cancel this
            yield()
            pollAndProcessMessages()
        }
    }
but in testing, it goes through the contents of pollAndProcessMessages once and then it just stays there forever, not allowing me to cancel it afterwards with the custom close() function I created
Copy code
suspend fun close() {
        log.info("Initiating graceful shutdown of SQS poller for queue $queueUrl")

        pollingScope.coroutineContext.job.cancelAndJoin()
    }
My ideal usage would be (in a Netty server)
Copy code
// During server init
poller.start()

...

Runtime.getRuntime().addShutdownHook(
        Thread {
            close("SQS Poller") { poller.close() }
        },
    )
y
You shouldn't catch all exceptions. At the very least, rethrow if it is a
CancellationException
plus1 1
h
Good point, I actually prefer if we catch them within the actual function, modified the code above - the issue is still happening
z
what does
pollAndProcessMessages()
do? Does it make a blocking call?
Also you can call
ensureActive()
instead of
yield()
if you just want to handle cancellation.
👍 1
h
It doesn't - just calls to the AWS Kotlin SDK (which are suspended)
Even the debugger just effectively gets stuck here - sometimes I find a pattern where it goes inside, sometimes it just gets stuck in this one only
z
Do you have any
runBlocking
in your code anywhere? That can cause unexpected deadlocks in weird places
h
None, new code base, and the failure is happening within the test
Copy code
@Test
    fun `successfully processes and deletes messages`() = runTest {
        val messageListener: SqsMessageListener<TestMessage> = mockk()
        val message = Message {
            messageId = "msg1"
            body = """{"content":"test"}"""
            receiptHandle = "receiptHandle1"
        }

        coEvery { sqsClient.receiveMessage(any()) } returnsMany listOf(
            ReceiveMessageResponse { messages = listOf(message) },
            ReceiveMessageResponse { messages = emptyList() }
        )

        coEvery { messageListener.onMessage(any()) } returns Unit
        coEvery { sqsClient.deleteMessage(any()) } returns DeleteMessageResponse { }

        val poller = SqsPoller(
            queueUrl = "test-queue",
            messageListener = messageListener,
            messageClass = TestMessage::class.java,
            sqsClient = sqsClient,
            parentScope = this,
        )

        launch { poller.startPolling() }
        advanceUntilIdle()

        coVerify(exactly = 1) { messageListener.onMessage(any()) }
        coVerify(exactly = 1) { sqsClient.deleteMessage(any()) }

        poller.close()
        advanceUntilIdle()
    }
y
Does
pollAndProcessMessages
block forever? That's the only thing I can think of her lol
z
Already asked, it sounds like if it does it's a bug in the AWS SDK
it might be using
suspendCoroutine
instead of
suspendCancellableCoroutine
too
y
Oh yeah, that's a big gotcha. If it has it's own termination/cancellation inside, then you need to register with that and cancel any continuation you're capturing
h
Well actually, now that you mention it, in some of my tries, I saw that the debugger wouldn't go beyond the call to the SDK
Copy code
private suspend fun pollAndProcessMessages() {
        val response = sqsClient.receiveMessage {
            queueUrl = this@SqsPoller.queueUrl
            maxNumberOfMessages = MAX_MESSAGES_PER_POLL
            waitTimeSeconds = WAIT_TIME_BETWEEN_POLLS.inWholeSeconds.toInt()
        }

        ....
    }
z
wait this is only in a test, and you're mocking that
receiveMessage
call? Might be something with that mock? I am not too familiar with mocking suspend calls
h
ok, the mock was actually probably not helping - simplified it to
Copy code
coEvery { sqsClient.receiveMessage(any()) } returns ReceiveMessageResponse { messages = listOf(message) }
and now I at least manage to get myself inside the method 🙂
🎉 1
Still get stuck running the whole thing (within the while true statement), it's like this is single threaded and doesn't even think to consider to go somewhere else
z
Sounds like there's a deadlock happening somewhere in your code or test code, nothing obviously wrong is jumping out at me
y
A reproducer could be useful here, especially since you're mocking the server so credentials are unnecessary
h
Yeah this is pure code, let me make this into a Kotlin Playground example
Of course if I ask an LLM to simplify it enough to put it into a Playground, it just works™ - https://pl.kotl.in/O_754lAsH 💀
😄 1
Ok, I managed to break it - if you change it from "coroutineScope" on the main function to "runTest", it does reproduce my issue
There's something going on tied to how the TestDispatcher works it seems
👀 1
running the test with runBlocking and with
yield()
seems to work-ish. I'll probably just opt for that and issue a ticket on Github to try and see if the Coroutines team can find anything else
This should do for now, thank you very much for the support!
y
If you have the reproducer for the hanging in Kotlin Playground with
runTest
, I'd love to see that!
h
Yeah the one above breaks, sec
literally just modified the original code from
suspend fun main() = coroutineScope {
to use
suspend fun main() = runTest {
❤️ 1
I managed to make it run for both by making sure that the sqs call had a delay in the mocked response. It still doesn't answer why the original code (in the Kotlin playground) works with runBlocking and not in runTest
Copy code
var pollCount = 0
        coEvery { sqsClient.receiveMessage(any()) } coAnswers {
            pollCount++
            when (pollCount) {
                1 -> ReceiveMessageResponse { messages = listOf(message) }
                else -> {
                    delay(5.seconds) // Simulate the actual polling delay
                    ReceiveMessageResponse { messages = emptyList() }
                }
            }
        }
fyi, raised this in the Coroutines github and Dima answered that this is expected, with a few suggestions on how to better do this pattern - https://github.com/Kotlin/kotlinx.coroutines/issues/4373#issuecomment-2703528726 🙂
👋 1
❤️ 1