Marian Schubert
09/12/2024, 4:53 PMMarian Schubert
09/12/2024, 4:53 PMval dbDispatcher = Dispatchers.IO.limitedParallelism(10)
val parsingDispatcher = Dispatchers.Default.limitedParallelism(4)
val limit = 1_000_000
val chunkSize = 5_000
runBlocking {
// Fetching from DB - Mostly IO, but also some CPU
// Let's stream DB cursor into a channel so that we can process it in next steps
val rowsChunks = produce(dbDispatcher, capacity = 10) {
jdbcTemplate.queryForStream(
"""
SELECT event_type, coalesce(event_data, event_data_extended), timestamp, seq, version
FROM events
ORDER BY seq LIMIT ?
""".trimIndent(),
{ rs, _ ->
StoredEvent(
eventType = rs.getString(1),
eventData = rs.getString(2),
timestamp = rs.getTimestamp(3).toInstant(),
seq = rs.getLong(4),
version = rs.getLong(5),
)
},
limit
).use { stream ->
stream.asSequence().chunked(chunkSize).forEach { chunk ->
send(chunk)
}
}
}
// Parsing - CPU heavy - parse multiple chunks in parallel if possible
val parsedChunks = produce(parsingDispatcher, capacity = 3) {
rowsChunks.consumeEach { chunk ->
send(async {
chunk.map { parseEvent(it) }
})
}
}
// Process fetched & parsed events in current thread
val crc = CRC32()
parsedChunks.consumeEach { chunk ->
chunk.await().forEach { ev ->
crc.update(ev.seq.toInt())
}
}
println("CRC32: ${crc.value}")
}
Marian Schubert
09/12/2024, 4:57 PMJimmy Zhou
09/14/2024, 1:06 AMdave08
09/15/2024, 2:44 PM