:rocket: :coroutine: I need to fetch & process...
# coroutines
m
🚀 coroutine I need to fetch & process rows from a DB (Postgres via JDBC, tens/hundreds of mils of rows) as fast as possible. It's both IO & CPU intensive so I need things to run in parallel - e.g. fetching from DB needs to run in different thread than parsing of rows data (JSON & stuff) and that needs to run in parallel with final processing. My question is whether I should use coroutines (well I already did and it's faaast & I like how code looks - but I'm coroutines n00b, so I not sure about downsides/pitfalls). Sample code is in comments...
Copy code
val dbDispatcher = Dispatchers.IO.limitedParallelism(10)
        val parsingDispatcher = Dispatchers.Default.limitedParallelism(4)

        val limit = 1_000_000
        val chunkSize = 5_000

        runBlocking {
            // Fetching from DB - Mostly IO, but also some CPU
            // Let's stream DB cursor into a channel so that we can process it in next steps
            val rowsChunks = produce(dbDispatcher, capacity = 10) {
                jdbcTemplate.queryForStream(
                    """
                        SELECT event_type, coalesce(event_data, event_data_extended), timestamp, seq, version
                        FROM events
                        ORDER BY seq LIMIT ?
                    """.trimIndent(),
                    { rs, _ ->
                        StoredEvent(
                            eventType = rs.getString(1),
                            eventData = rs.getString(2),
                            timestamp = rs.getTimestamp(3).toInstant(),
                            seq = rs.getLong(4),
                            version = rs.getLong(5),
                        )
                    },
                    limit
                ).use { stream ->
                    stream.asSequence().chunked(chunkSize).forEach { chunk ->
                        send(chunk)
                    }
                }
            }

            // Parsing - CPU heavy - parse multiple chunks in parallel if possible
            val parsedChunks = produce(parsingDispatcher, capacity = 3) {
                rowsChunks.consumeEach { chunk ->
                    send(async {
                        chunk.map { parseEvent(it) }
                    })
                }
            }

            // Process fetched & parsed events in current thread
            val crc = CRC32()
            parsedChunks.consumeEach { chunk ->
                chunk.await().forEach { ev ->
                    crc.update(ev.seq.toInt())
                }
            }
            println("CRC32: ${crc.value}")
        }
BTW I'm pretty surprised that first part (rowsChunks) actually works as it's using Java Stream (which wraps Postgres cursor) from multiple threads. 🤷‍♂️
j
why not use r2dbc for unblock reading
💯 1
d
Theres also #CCGG2R64Q and vert.x that are non blocking and might be more performant?