Hello everyone! I'm not sure if this is the right ...
# ktor
g
Hello everyone! I'm not sure if this is the right place to ask for help, so apologies in advance. I'm new to ktor and I'm trying to build an API that integrates with redis. Basically I want redis to act like a message broker, allowing me to push messages to it and instantly consume them from the same API. I'm using lettuce and I have an endpoint that pushes messages to the queue and a service that setups a listener to consume them. The problem is that I'm not sure why the message is never received when I push it from my endpoint. However, if I push it via redis-cli it is received properly. I suspect that is has something to do with how netty works, but I don't know how to proceed as this is my first time working with it. Does anyone have a clue?
Copy code
class RedisService(val paymentRouterService: PaymentRouterService) {
    private val log = LoggerFactory.getLogger(this::class.java)

    val queue = "payment_requests"

    private val redisURI = RedisURI.Builder.redis("localhost", 6379)
        .withTimeout(Duration.ofMinutes(10))
        .build()
    private val client = RedisClient.create(redisURI)
    private val connection = client.connect()
    private val commands: RedisAsyncCommands<String, String> = connection.async()

    val json = Json

    fun addPaymentRequestToQueue(paymentRequest: PaymentRequest) {
        try {
            <http://log.info|log.info>("Adding payment request to queue: $paymentRequest")
            commands.lpush(queue, json.encodeToString(paymentRequest))
            <http://log.info|log.info>("Payment request added to queue successfully")
        } catch (e: Exception) {
            log.error("Error adding payment request to queue: ${e.message}", e)
            throw e
        }
    }

    fun startRequestConsumer() {
        CoroutineScope(<http://Dispatchers.IO|Dispatchers.IO>).launch {
            while (true) {
                try {
                    val result = commands.brpop(0, queue)
                    <http://log.info|log.info>("Request received from queue: $result")
                    if (result != null) {
                        val request = json.decodeFromString<PaymentRequest>(result.await().value)
                        paymentRouterService.processPayment(request)
                    }
                } catch (e: Exception) {
                    log.error("Error processing payment request: ${e.message}", e)
                    delay(100)
                }
            }
        }
    }

    fun closeConnection() {
        try {
            <http://log.info|log.info>("Closing Redis connection")
            connection.close()
            client.shutdown()
            <http://log.info|log.info>("Redis connection closed successfully")
        } catch (e: Exception) {
            log.error("Error closing Redis connection: ${e.message}", e)
        }
    }

}
j
I think your issue might be that
brpop
is a blocking command. Even though you're outsourcing your consumer to a loop on the IO dispatcher, the Redis connection itself is blocking. So when you call
brpop
for the first time, it hangs and therefore
lpush
can't run
🙌 1
You'll need at least 2 connections (call
client.connect()
twice. Once for a consumer connection and another for a producer connection).
you also don't need the
delay(100)
due to
brpop
being blocking 🙂. and make sure you shut down the queue properly! it shouldn't be running on an orphaned coroutine scope in production code
Copy code
environment.monitor.subscribe(ApplicationStopped) { queue.close() }
g
@Jeff Hudson That worked perfectly! I thought that only the coroutine could get blocked in this case, not the connection itself. Thank you very much!
j
happy to help! good luck with the project 🙂
🙏 1