Moses Mugisha
07/23/2021, 7:13 PMimport io.ktor.util.DispatcherWithShutdown
import io.ktor.util.InternalAPI
import kotlin.coroutines.CoroutineContext
import kotlinx.coroutines.CompletableJob
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.launch
class SQSPoller(private val numWorkers: Int = 10
) : CoroutineScope {
private val supervisorJob = SupervisorJob()
override val coroutineContext: CoroutineContext
get() = <http://Dispatchers.IO|Dispatchers.IO> + supervisorJob
private val stopRequest: CompletableJob = Job()
// this is limited to a pool size of 64 threads. or the number of cores(which ever is higher)
@OptIn(InternalAPI::class)
private val tasksDispatcher = DispatcherWithShutdown(<http://Dispatchers.IO|Dispatchers.IO>)
private lateinit var rootJob: Job
fun start() = launch(tasksDispatcher) {
println("Starting SQS Queue Poller")
println("starting")
val sqsMessageProducer = launchSQSPoller()
val workers = List(numWorkers) {
worker(it, sqsMessageProducer)
}
// distribute work among numWorkers
for (worker in workers) {
worker.join()
}
stopRequest.join()
}.apply {
rootJob = this
}
/** [fetchMessages] long polls the SQS queue for new messages*/
fun fetchMessages(): List<Int> {
return arrayListOf(1,3)
}
/** [launchSQSPoller] fetch sqs messages` .*/
private fun CoroutineScope.launchSQSPoller(): ReceiveChannel<Int> = produce {
loopForever {
val messages = fetchMessages()
messages.forEach {
send(it)
}
}
}
private fun processMessage(message: Int) {
println("sqs: worker ${Thread.currentThread()} processing messages")
println(message)
Thread.sleep(400)
}
/** [loopForever] triggers a function as long as the coroutine scope is active and the unleash flag is set */
private suspend fun loopForever(block: suspend () -> Unit) {
while (true) {
try {
block()
Thread.yield()
} catch (ex: Exception) {
println("coroutine on ${Thread.currentThread().name} cancelled")
} catch (ex: Exception) {
println("${Thread.currentThread().name} failed with {$ex}. Retrying...")
ex.printStackTrace()
}
}
println("coroutine on ${Thread.currentThread().name} exiting")
}
/** [worker] consumes the SQS messages */
private fun CoroutineScope.worker(id: Int, channel: ReceiveChannel<Int>) = launch(tasksDispatcher) {
println("sqs: worker $id processing messages")
// fanout messages
for (message in channel) {
launch { processMessage(message) }
}
}
}
I am implementing a SQS polling job, that polls the queue and does fan out to process the messages, but I keep running into performance issues. What I am I doing wrong?Justin
07/24/2021, 7:22 AMThread.sleep
with coroutines because coroutines share the thread(s) provided by the dispatcher (under the hood) and are much more efficient than threads, but if you sleep the thread in one coroutine, it will force any other coroutines on the same thread to also sleep. This is likely where you’re seeing a lot of your performance issues. Use delay()
or just loop for x millis if you want to simulate work without actually suspending
• Use suspending functions, not extension functions of CoroutineScope
• Avoid creating classes that extend CoroutineScope
as it is not recommended (see docs for explanation and alternative)
• Be careful with while(true)
inside a coroutine because you’ll want to make sure you stop iteration if the coroutine is cancelled. It’s safer to use while(isActive)
or while(true)
with ensureActive()
or yield()
if you need to control exactly when you check that the coroutine has been cancelled. In your code you’d probably be okay because you’re calling a block : suspend () -> Unit
on each iteration so that suspending block will ensure the coroutine is active. Basically, if all your logic on each iteration is non-suspending and long-running, it would likely loop indefinitely, even when you’ve called stop()
unless the thread its running on is stopped