Gustavo Cesário
07/23/2025, 9:00 PMclass RedisService(val paymentRouterService: PaymentRouterService) {
private val log = LoggerFactory.getLogger(this::class.java)
val queue = "payment_requests"
private val redisURI = RedisURI.Builder.redis("localhost", 6379)
.withTimeout(Duration.ofMinutes(10))
.build()
private val client = RedisClient.create(redisURI)
private val connection = client.connect()
private val commands: RedisAsyncCommands<String, String> = connection.async()
val json = Json
fun addPaymentRequestToQueue(paymentRequest: PaymentRequest) {
try {
<http://log.info|log.info>("Adding payment request to queue: $paymentRequest")
commands.lpush(queue, json.encodeToString(paymentRequest))
<http://log.info|log.info>("Payment request added to queue successfully")
} catch (e: Exception) {
log.error("Error adding payment request to queue: ${e.message}", e)
throw e
}
}
fun startRequestConsumer() {
CoroutineScope(<http://Dispatchers.IO|Dispatchers.IO>).launch {
while (true) {
try {
val result = commands.brpop(0, queue)
<http://log.info|log.info>("Request received from queue: $result")
if (result != null) {
val request = json.decodeFromString<PaymentRequest>(result.await().value)
paymentRouterService.processPayment(request)
}
} catch (e: Exception) {
log.error("Error processing payment request: ${e.message}", e)
delay(100)
}
}
}
}
fun closeConnection() {
try {
<http://log.info|log.info>("Closing Redis connection")
connection.close()
client.shutdown()
<http://log.info|log.info>("Redis connection closed successfully")
} catch (e: Exception) {
log.error("Error closing Redis connection: ${e.message}", e)
}
}
}
Jeff Hudson
07/23/2025, 9:05 PMbrpop
is a blocking command. Even though you're outsourcing your consumer to a loop on the IO dispatcher, the Redis connection itself is blocking. So when you call brpop
for the first time, it hangs and therefore lpush
can't runJeff Hudson
07/23/2025, 9:06 PMclient.connect()
twice. Once for a consumer connection and another for a producer connection).Jeff Hudson
07/23/2025, 9:08 PMdelay(100)
due to brpop
being blocking 🙂. and make sure you shut down the queue properly! it shouldn't be running on an orphaned coroutine scope in production code
environment.monitor.subscribe(ApplicationStopped) { queue.close() }
Gustavo Cesário
07/23/2025, 9:18 PMJeff Hudson
07/23/2025, 9:19 PM