debop
04/26/2025, 11:06 PMQuery.fetchBatchResults
for Coroutines. returns Flow<List<ResultRow>>
It support global unique id (eg. snowflakeId) which is not auto inc
package io.bluetape4k.exposed.sql
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.channelFlow
import org.jetbrains.exposed.dao.id.EntityID
import org.jetbrains.exposed.sql.Column
import org.jetbrains.exposed.sql.EntityIDColumnType
import org.jetbrains.exposed.sql.FieldSet
import org.jetbrains.exposed.sql.Op
import org.jetbrains.exposed.sql.Query
import org.jetbrains.exposed.sql.ResultRow
import org.jetbrains.exposed.sql.SortOrder
import org.jetbrains.exposed.sql.SqlExpressionBuilder.greater
import org.jetbrains.exposed.sql.SqlExpressionBuilder.less
import org.jetbrains.exposed.sql.and
/**
* [FieldSet] 에서 [SuspendedQuery.fetchBatchResultFlow] 메소드를 코루틴 환경에서 사용할 수 있도록 확장한 함수입니다.
*
* 이 함수를 사용하려면, 조회하는 첫번째 컬럼이 Int, Long 수형이어야 합니다.
*
*
* // 10개씩 배치로 읽어온다
* val batchedIds: List<List<Int>> = ProductTable
* .fetchBatchResultFlow(10)
* .buffer(capacity = 2)
* .map { rows -> rows.map { it[ProductTable.id].value } }
* .toList()
* */
fun FieldSet.fetchBatchResultFlow(batch: Int = 1000, sortOrder: SortOrder = SortOrder.ASC): Flow<List<ResultRow>> =
Query(this.source, null).fetchBatchResultFlow(batch, sortOrder)
/**
* [SuspendedQuery.fetchBatchResultFlow] 메소드를 코루틴 환경에서 사용할 수 있도록 확장한 함수입니다.
*
* 이 함수를 사용하려면, 조회하는 첫번째 컬럼이 Int, Long 수형이어야 합니다.
*
*
* // 10개씩 배치로 읽어온다
* val batchedIds: List<List<Int>> = ProductTable
* .select(ProductTable.id)
* .fetchBatchResultFlow(10)
* .buffer(capacity = 2)
* .map { rows -> rows.map { it[ProductTable.id].value } }
* .toList()
* */
fun Query.fetchBatchResultFlow(batch: Int = 1000, sortOrder: SortOrder = SortOrder.ASC): Flow<List<ResultRow>> =
SuspendedQuery(this@fetchBatchResultFlow.set, null).fetchBatchResultFlow(batch, sortOrder)
/**
* [Query.fetchBatchedResults] 메소드를 코루틴 환경에서 사용할 수 있도록 확장한 함수를 제공하는 클래스입니다.
*/
open class SuspendedQuery(set: FieldSet, where: Op<Boolean>? = null): Query(set, where) {
/**
* [Query.fetchBatchedResults] 메소드를 코루틴 환경에서 사용할 수 있도록 확장한 메소드입니다.
*
* 이 함수를 사용하려면, 조회하는 첫번째 컬럼이 Int, Long 수형이어야 합니다.
*
*
* // 10개씩 배치로 읽어온다
* val batchedIds: List<List<Int>> = ProductTable
* .select(ProductTable.id)
* .fetchBatchResultFlow(10)
* .buffer(capacity = 2)
* .map { rows -> rows.map { it[ProductTable.id].value } }
* .toList()
* */
fun fetchBatchResultFlow(batchSize: Int = 1000, sortOrder: SortOrder = SortOrder.ASC): Flow<List<ResultRow>> {
require(batchSize > 0) { "Batch size should be greater than 0." }
require(limit == null) { "A manual `LIMIT` clause should not be set. By default, `batchSize` will be used." }
require(orderByExpressions.isEmpty()) {
"A manual `ORDER BY` clause should not be set. By default, the auto-incrementing column will be used."
}
val comparatedColumn = try {
set.source.columns.first() // { it.columnType.isAutoInc } // snowflakeId 같은 Global Unique ID 도 지원하기 위해
} catch (_: NoSuchElementException) {
throw UnsupportedOperationException("Batched select only works on tables with an auto-incrementing column")
}
limit = batchSize
(orderByExpressions as MutableList).add(comparatedColumn to sortOrder)
val whereOp = where ?: Op.TRUE
val fetchInAscendingOrder =
sortOrder in listOf(SortOrder.ASC, SortOrder.ASC_NULLS_FIRST, SortOrder.ASC_NULLS_LAST)
fun toLong(autoIncVal: Any): Long = when (autoIncVal) {
is EntityID<*> -> toLong(autoIncVal.value)
is Int -> autoIncVal.toLong()
else -> autoIncVal as Long
}
return channelFlow {
var lastOffset = if (fetchInAscendingOrder) 0L else null
while (true) {
val query = this@SuspendedQuery.copy().adjustWhere {
lastOffset?.let { lastOffset ->
whereOp and if (fetchInAscendingOrder) {
when (comparatedColumn.columnType) {
is EntityIDColumnType<*> -> {
(comparatedColumn as? Column<EntityID<Long>>)?.let {
(it greater lastOffset)
} ?: (comparatedColumn as? Column<EntityID<Int>>)?.let {
(it greater lastOffset.toInt())
} ?: (comparatedColumn greater lastOffset)
}
else -> (comparatedColumn greater lastOffset)
}
} else {
when (comparatedColumn.columnType) {
is EntityIDColumnType<*> -> {
(comparatedColumn as? Column<EntityID<Long>>)?.let {
(it less lastOffset)
} ?: (comparatedColumn as? Column<EntityID<Int>>)?.let {
(it less lastOffset.toInt())
} ?: (comparatedColumn less lastOffset)
}
else -> (comparatedColumn less lastOffset)
}
}
} ?: whereOp
}
val results = query.iterator().asSequence().toList()
if (results.isNotEmpty()) {
send(results)
}
if (results.size < batchSize) break
lastOffset = toLong(results.last()[comparatedColumn]!!)
}
}
}
}