Hugo Costa
03/05/2025, 10:28 AMprivate val pollingScope = CoroutineScope(parentScope.coroutineContext + SupervisorJob())
fun startPolling() = pollingScope.launch {
while (isActive) {
// To allow us to cancel this
yield()
pollAndProcessMessages()
}
}
but in testing, it goes through the contents of pollAndProcessMessages once and then it just stays there forever, not allowing me to cancel it afterwards with the custom close() function I created
suspend fun close() {
log.info("Initiating graceful shutdown of SQS poller for queue $queueUrl")
pollingScope.coroutineContext.job.cancelAndJoin()
}
My ideal usage would be (in a Netty server)
// During server init
poller.start()
...
Runtime.getRuntime().addShutdownHook(
Thread {
close("SQS Poller") { poller.close() }
},
)
Youssef Shoaib [MOD]
03/05/2025, 10:38 AMCancellationException
Hugo Costa
03/05/2025, 10:47 AMZach Klippenstein (he/him) [MOD]
03/05/2025, 5:51 PMpollAndProcessMessages()
do? Does it make a blocking call?Zach Klippenstein (he/him) [MOD]
03/05/2025, 5:51 PMensureActive()
instead of yield()
if you just want to handle cancellation.Hugo Costa
03/05/2025, 5:52 PMHugo Costa
03/05/2025, 5:56 PMZach Klippenstein (he/him) [MOD]
03/05/2025, 5:57 PMrunBlocking
in your code anywhere? That can cause unexpected deadlocks in weird placesHugo Costa
03/05/2025, 5:58 PM@Test
fun `successfully processes and deletes messages`() = runTest {
val messageListener: SqsMessageListener<TestMessage> = mockk()
val message = Message {
messageId = "msg1"
body = """{"content":"test"}"""
receiptHandle = "receiptHandle1"
}
coEvery { sqsClient.receiveMessage(any()) } returnsMany listOf(
ReceiveMessageResponse { messages = listOf(message) },
ReceiveMessageResponse { messages = emptyList() }
)
coEvery { messageListener.onMessage(any()) } returns Unit
coEvery { sqsClient.deleteMessage(any()) } returns DeleteMessageResponse { }
val poller = SqsPoller(
queueUrl = "test-queue",
messageListener = messageListener,
messageClass = TestMessage::class.java,
sqsClient = sqsClient,
parentScope = this,
)
launch { poller.startPolling() }
advanceUntilIdle()
coVerify(exactly = 1) { messageListener.onMessage(any()) }
coVerify(exactly = 1) { sqsClient.deleteMessage(any()) }
poller.close()
advanceUntilIdle()
}
Youssef Shoaib [MOD]
03/05/2025, 5:59 PMpollAndProcessMessages
block forever? That's the only thing I can think of her lolZach Klippenstein (he/him) [MOD]
03/05/2025, 5:59 PMZach Klippenstein (he/him) [MOD]
03/05/2025, 6:00 PMsuspendCoroutine
instead of suspendCancellableCoroutine
tooYoussef Shoaib [MOD]
03/05/2025, 6:02 PMHugo Costa
03/05/2025, 6:04 PMprivate suspend fun pollAndProcessMessages() {
val response = sqsClient.receiveMessage {
queueUrl = this@SqsPoller.queueUrl
maxNumberOfMessages = MAX_MESSAGES_PER_POLL
waitTimeSeconds = WAIT_TIME_BETWEEN_POLLS.inWholeSeconds.toInt()
}
....
}
Zach Klippenstein (he/him) [MOD]
03/05/2025, 6:06 PMreceiveMessage
call? Might be something with that mock? I am not too familiar with mocking suspend callsHugo Costa
03/05/2025, 6:08 PMcoEvery { sqsClient.receiveMessage(any()) } returns ReceiveMessageResponse { messages = listOf(message) }
and now I at least manage to get myself inside the method 🙂Hugo Costa
03/05/2025, 6:09 PMZach Klippenstein (he/him) [MOD]
03/05/2025, 6:11 PMYoussef Shoaib [MOD]
03/05/2025, 6:12 PMHugo Costa
03/05/2025, 6:12 PMHugo Costa
03/05/2025, 6:16 PMHugo Costa
03/05/2025, 6:18 PMHugo Costa
03/05/2025, 6:18 PMHugo Costa
03/05/2025, 6:23 PMyield()
seems to work-ish. I'll probably just opt for that and issue a ticket on Github to try and see if the Coroutines team can find anything elseHugo Costa
03/05/2025, 6:23 PMYoussef Shoaib [MOD]
03/05/2025, 6:24 PMrunTest
, I'd love to see that!Hugo Costa
03/05/2025, 6:24 PMHugo Costa
03/05/2025, 6:24 PMHugo Costa
03/05/2025, 6:25 PMsuspend fun main() = coroutineScope {
to use suspend fun main() = runTest {
Hugo Costa
03/06/2025, 8:16 AMvar pollCount = 0
coEvery { sqsClient.receiveMessage(any()) } coAnswers {
pollCount++
when (pollCount) {
1 -> ReceiveMessageResponse { messages = listOf(message) }
else -> {
delay(5.seconds) // Simulate the actual polling delay
ReceiveMessageResponse { messages = emptyList() }
}
}
}
Hugo Costa
03/06/2025, 11:09 AM