planerist
08/07/2025, 2:19 PMsuspendTransaction
works fine, but the second one fails.
Full code snippet below (full code in comments):
kotlin
suspendTransaction(db = db) {
SchemaUtils.create(UsersTable)
}
suspendTransaction(db = db) {
// ===> Fails here
UsersTable.insert {
it[name] = "Alice"
}
}
Any ideas on what might be going wrong?planerist
08/07/2025, 2:19 PMpackage net.yaranga.framework
import io.r2dbc.postgresql.PostgresqlConnectionFactoryProvider.OPTIONS
import io.r2dbc.spi.Connection
import io.r2dbc.spi.ConnectionFactories
import io.r2dbc.spi.ConnectionFactoryOptions.*
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.test.StandardTestDispatcher
import kotlinx.coroutines.test.runTest
import kotlinx.coroutines.test.setMain
import org.jetbrains.exposed.v1.core.dao.id.LongIdTable
import org.jetbrains.exposed.v1.r2dbc.R2dbcDatabase
import org.jetbrains.exposed.v1.r2dbc.SchemaUtils
import org.jetbrains.exposed.v1.r2dbc.insert
import org.jetbrains.exposed.v1.r2dbc.transactions.suspendTransaction
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestInfo
import org.testcontainers.containers.PostgreSQLContainer
import org.testcontainers.images.PullPolicy
import org.testcontainers.junit.jupiter.Container
import org.testcontainers.junit.jupiter.Testcontainers
import org.testcontainers.shaded.org.apache.commons.lang3.RandomStringUtils
import reactor.core.publisher.Mono
import java.util.*
@Testcontainers
class Test1() {
companion object {
@Container
val postgresContainer = PostgreSQLContainer("postgres:17-alpine")
.withImagePullPolicy(PullPolicy.alwaysPull())
.withReuse(true)
.withUrlParam("TC_REUSABLE", "true")
}
private lateinit var testDBName: String
lateinit var connection: Mono<Connection>
@OptIn(ExperimentalCoroutinesApi::class)
@BeforeEach
fun setUp(testInfo: TestInfo) {
Dispatchers.setMain(StandardTestDispatcher())
testDBName = testInfo.testClass.get().simpleName +
"-" +
testInfo.testMethod.get().name +
"-" +
RandomStringUtils.randomAlphanumeric(6).lowercase(Locale.ENGLISH)
val options: MutableMap<String?, String?> = HashMap<String?, String?>()
options.put("lock_timeout", "10s")
val connectionFactory = ConnectionFactories.get(
builder()
.option(DRIVER, "postgresql")
.option(HOST, postgresContainer.host)
.option(PORT, postgresContainer.getMappedPort(PostgreSQLContainer.POSTGRESQL_PORT))
.option(USER, postgresContainer.username)
.option(PASSWORD, postgresContainer.password)
.option(DATABASE, postgresContainer.databaseName) // optional
.option(OPTIONS, options) // optional
.build()
)
connection = Mono.from(connectionFactory.create())
connection.flatMapMany { c ->
c.createStatement("CREATE DATABASE \"${testDBName}\" ENCODING 'utf8'")
.execute()
}.blockLast() // Block until the database creation is complete
}
@AfterEach
fun tearDown() {
connection.flatMapMany { c -> c.close() }.blockLast() // Block until connection is closed
}
object UsersTable : LongIdTable("users") {
val name = varchar("name", 50)
}
@Test
fun testNested() = runTest {
val db = createDB()
suspendTransaction(db = db) {
SchemaUtils.create(UsersTable)
}
suspendTransaction(db = db) {
// ===> HERE
UsersTable.insert {
it[name] = "Alice"
}
}
}
private fun createDB(): R2dbcDatabase {
val options: MutableMap<String?, String?> = HashMap<String?, String?>()
options.put("lock_timeout", "10s")
return R2dbcDatabase.connect {
useNestedTransactions = true
defaultMaxAttempts = 1
connectionFactoryOptions {
option(DRIVER, "postgresql")
option(HOST, postgresContainer.host)
option(PORT, postgresContainer.getMappedPort(PostgreSQLContainer.POSTGRESQL_PORT))
option(USER, postgresContainer.username)
option(PASSWORD, postgresContainer.password)
option(DATABASE, testDBName)
option(OPTIONS, options)
}
}
}
}
planerist
08/07/2025, 2:20 PMNo transaction in context.
java.lang.IllegalStateException: No transaction in context.
at org.jetbrains.exposed.v1.core.transactions.CoreTransactionManager.currentTransaction(TransactionManagerApi.kt:140)
at org.jetbrains.exposed.v1.core.vendors.DatabaseDialectKt.getCurrentDialect(DatabaseDialect.kt:162)
at org.jetbrains.exposed.v1.r2dbc.statements.InsertSuspendExecutable.returnedColumns(InsertSuspendExecutable.kt:266)
at org.jetbrains.exposed.v1.r2dbc.statements.InsertSuspendExecutable.returnedValues$lambda$14(InsertSuspendExecutable.kt:161)
at org.jetbrains.exposed.v1.r2dbc.statements.api.R2dbcResult$mapSegments$1.invokeSuspend$lambda$2$lambda$0(R2dbcResult.kt:52)
at io.r2dbc.postgresql.PostgresqlSegmentResult.lambda$flatMap$8(PostgresqlSegmentResult.java:195)
at reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.onNext(FluxConcatMapNoPrefetch.java:183)
at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:129)
at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:250)
at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:194)
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854)
at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854)
at io.r2dbc.postgresql.util.FluxDiscardOnCancel$FluxDiscardOnCancelSubscriber.onNext(FluxDiscardOnCancel.java:91)
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:113)
at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:129)
at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:878)
at reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:803)
at reactor.core.publisher.FluxCreate$SerializedFluxSink.next(FluxCreate.java:161)
at io.r2dbc.postgresql.client.ReactorNettyClient$Conversation.emit(ReactorNettyClient.java:696)
at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.emit(ReactorNettyClient.java:948)
at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:822)
at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:728)
at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:129)
at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854)
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:224)
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:224)
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:294)
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:403)
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:425)
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:115)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:333)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:455)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1407)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:918)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:994)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:1583)
planerist
08/08/2025, 8:51 AMe5l
08/08/2025, 8:52 AMOleg Babichev
08/08/2025, 9:57 AM