```import io.ktor.util.DispatcherWithShutdown impo...
# coroutines
m
Copy code
import io.ktor.util.DispatcherWithShutdown
import io.ktor.util.InternalAPI
import kotlin.coroutines.CoroutineContext
import kotlinx.coroutines.CompletableJob
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.launch


class SQSPoller(private val numWorkers: Int = 10
) : CoroutineScope {

    private val supervisorJob = SupervisorJob()
    override val coroutineContext: CoroutineContext
        get() = <http://Dispatchers.IO|Dispatchers.IO> + supervisorJob

    private val stopRequest: CompletableJob = Job()

    // this is limited to a pool size of 64 threads. or the number of cores(which ever is higher)
    @OptIn(InternalAPI::class)
    private val tasksDispatcher = DispatcherWithShutdown(<http://Dispatchers.IO|Dispatchers.IO>)

    private lateinit var rootJob: Job

    fun start() = launch(tasksDispatcher) {
        println("Starting SQS Queue Poller")
        println("starting")
        val sqsMessageProducer = launchSQSPoller()
        val workers = List(numWorkers) {
            worker(it, sqsMessageProducer)
        }
        // distribute work among numWorkers
        for (worker in workers) {
            worker.join()
        }
        stopRequest.join()
    }.apply {
        rootJob = this
    }

    /** [fetchMessages] long polls the SQS queue for new messages*/
    fun fetchMessages(): List<Int> {

        return arrayListOf(1,3)
    }

    /** [launchSQSPoller] fetch sqs messages` .*/
    private fun CoroutineScope.launchSQSPoller(): ReceiveChannel<Int> = produce {
        loopForever {
                val messages = fetchMessages()
                messages.forEach {
                    send(it)
                }
        }
    }

    private fun processMessage(message: Int) {
        println("sqs: worker ${Thread.currentThread()}  processing messages")
        println(message)
        Thread.sleep(400)
    }

    /** [loopForever] triggers a function as long as the coroutine scope is active and the unleash flag is set */
    private suspend fun loopForever(block: suspend () -> Unit) {
        while (true) {

                try {
                    block()
                    Thread.yield()
                } catch (ex: Exception) {
                    println("coroutine on ${Thread.currentThread().name} cancelled")
                } catch (ex: Exception) {
                    println("${Thread.currentThread().name} failed with {$ex}. Retrying...")
                    ex.printStackTrace()
                }
            }

        println("coroutine on ${Thread.currentThread().name} exiting")
    }

    /** [worker] consumes the SQS messages */
    private fun CoroutineScope.worker(id: Int, channel: ReceiveChannel<Int>) = launch(tasksDispatcher) {
        println("sqs: worker $id processing messages")
        // fanout messages
        for (message in channel) {
            launch { processMessage(message) }
        }
    }
}
I am implementing a SQS polling job, that polls the queue and does fan out to process the messages, but I keep running into performance issues. What I am I doing wrong?
j
@Moses Mugisha If you don’t want to just drop coroutines, there are a few improvements you can make: • Don’t use
Thread.sleep
with coroutines because coroutines share the thread(s) provided by the dispatcher (under the hood) and are much more efficient than threads, but if you sleep the thread in one coroutine, it will force any other coroutines on the same thread to also sleep. This is likely where you’re seeing a lot of your performance issues. Use
delay()
or just loop for x millis if you want to simulate work without actually suspending • Use suspending functions, not extension functions of
CoroutineScope
• Avoid creating classes that extend
CoroutineScope
as it is not recommended (see docs for explanation and alternative) • Be careful with
while(true)
inside a coroutine because you’ll want to make sure you stop iteration if the coroutine is cancelled. It’s safer to use
while(isActive)
or
while(true)
with
ensureActive()
or
yield()
if you need to control exactly when you check that the coroutine has been cancelled. In your code you’d probably be okay because you’re calling a
block : suspend () -> Unit
on each iteration so that suspending block will ensure the coroutine is active. Basically, if all your logic on each iteration is non-suspending and long-running, it would likely loop indefinitely, even when you’ve called
stop()
unless the thread its running on is stopped