I’m getting a “No transaction in context.” excepti...
# exposed
p
I’m getting a “No transaction in context.” exception with the simplest code ever. What’s strange is that the first
suspendTransaction
works fine
, but the second one fails. Full code snippet below (full code in comments):
Copy code
kotlin

suspendTransaction(db = db) {
    SchemaUtils.create(UsersTable)
}

suspendTransaction(db = db) {
    // ===> Fails here
    UsersTable.insert {
        it[name] = "Alice"
    }
}
Any ideas on what might be going wrong?
Copy code
package net.yaranga.framework

import io.r2dbc.postgresql.PostgresqlConnectionFactoryProvider.OPTIONS
import io.r2dbc.spi.Connection
import io.r2dbc.spi.ConnectionFactories
import io.r2dbc.spi.ConnectionFactoryOptions.*
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.test.StandardTestDispatcher
import kotlinx.coroutines.test.runTest
import kotlinx.coroutines.test.setMain
import org.jetbrains.exposed.v1.core.dao.id.LongIdTable
import org.jetbrains.exposed.v1.r2dbc.R2dbcDatabase
import org.jetbrains.exposed.v1.r2dbc.SchemaUtils
import org.jetbrains.exposed.v1.r2dbc.insert
import org.jetbrains.exposed.v1.r2dbc.transactions.suspendTransaction
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestInfo
import org.testcontainers.containers.PostgreSQLContainer
import org.testcontainers.images.PullPolicy
import org.testcontainers.junit.jupiter.Container
import org.testcontainers.junit.jupiter.Testcontainers
import org.testcontainers.shaded.org.apache.commons.lang3.RandomStringUtils
import reactor.core.publisher.Mono
import java.util.*


@Testcontainers
class Test1() {
    companion object {
        @Container
        val postgresContainer = PostgreSQLContainer("postgres:17-alpine")
            .withImagePullPolicy(PullPolicy.alwaysPull())
            .withReuse(true)
            .withUrlParam("TC_REUSABLE", "true")
    }

    private lateinit var testDBName: String
    lateinit var connection: Mono<Connection>

    @OptIn(ExperimentalCoroutinesApi::class)
    @BeforeEach
    fun setUp(testInfo: TestInfo) {
        Dispatchers.setMain(StandardTestDispatcher())

        testDBName = testInfo.testClass.get().simpleName +
                "-" +
                testInfo.testMethod.get().name +
                "-" +
                RandomStringUtils.randomAlphanumeric(6).lowercase(Locale.ENGLISH)

        val options: MutableMap<String?, String?> = HashMap<String?, String?>()
        options.put("lock_timeout", "10s")


        val connectionFactory = ConnectionFactories.get(
            builder()
                .option(DRIVER, "postgresql")
                .option(HOST, postgresContainer.host)
                .option(PORT, postgresContainer.getMappedPort(PostgreSQLContainer.POSTGRESQL_PORT))
                .option(USER, postgresContainer.username)
                .option(PASSWORD, postgresContainer.password)
                .option(DATABASE, postgresContainer.databaseName) // optional
                .option(OPTIONS, options) // optional
                .build()
        )

        connection = Mono.from(connectionFactory.create())

        connection.flatMapMany { c ->
            c.createStatement("CREATE DATABASE \"${testDBName}\" ENCODING 'utf8'")
                .execute()
        }.blockLast() // Block until the database creation is complete
    }

    @AfterEach
    fun tearDown() {
        connection.flatMapMany { c -> c.close() }.blockLast() // Block until connection is closed
    }

    object UsersTable : LongIdTable("users") {
        val name = varchar("name", 50)
    }

    @Test
    fun testNested() = runTest {
        val db = createDB()

        suspendTransaction(db = db) {
            SchemaUtils.create(UsersTable)
        }

        suspendTransaction(db = db) {
            // ===> HERE
            UsersTable.insert {
                it[name] = "Alice"
            }
        }
    }


    private fun createDB(): R2dbcDatabase {
        val options: MutableMap<String?, String?> = HashMap<String?, String?>()
        options.put("lock_timeout", "10s")

        return R2dbcDatabase.connect {
            useNestedTransactions = true
            defaultMaxAttempts = 1

            connectionFactoryOptions {
                option(DRIVER, "postgresql")
                option(HOST, postgresContainer.host)
                option(PORT, postgresContainer.getMappedPort(PostgreSQLContainer.POSTGRESQL_PORT))
                option(USER, postgresContainer.username)
                option(PASSWORD, postgresContainer.password)
                option(DATABASE, testDBName)
                option(OPTIONS, options)
            }
        }
    }
}
👀 1
Exception:
Copy code
No transaction in context.
java.lang.IllegalStateException: No transaction in context.
	at org.jetbrains.exposed.v1.core.transactions.CoreTransactionManager.currentTransaction(TransactionManagerApi.kt:140)
	at org.jetbrains.exposed.v1.core.vendors.DatabaseDialectKt.getCurrentDialect(DatabaseDialect.kt:162)
	at org.jetbrains.exposed.v1.r2dbc.statements.InsertSuspendExecutable.returnedColumns(InsertSuspendExecutable.kt:266)
	at org.jetbrains.exposed.v1.r2dbc.statements.InsertSuspendExecutable.returnedValues$lambda$14(InsertSuspendExecutable.kt:161)
	at org.jetbrains.exposed.v1.r2dbc.statements.api.R2dbcResult$mapSegments$1.invokeSuspend$lambda$2$lambda$0(R2dbcResult.kt:52)
	at io.r2dbc.postgresql.PostgresqlSegmentResult.lambda$flatMap$8(PostgresqlSegmentResult.java:195)
	at reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.onNext(FluxConcatMapNoPrefetch.java:183)
	at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:129)
	at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854)
	at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:250)
	at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:194)
	at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337)
	at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
	at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854)
	at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854)
	at io.r2dbc.postgresql.util.FluxDiscardOnCancel$FluxDiscardOnCancelSubscriber.onNext(FluxDiscardOnCancel.java:91)
	at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:113)
	at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:129)
	at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:878)
	at reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:803)
	at reactor.core.publisher.FluxCreate$SerializedFluxSink.next(FluxCreate.java:161)
	at io.r2dbc.postgresql.client.ReactorNettyClient$Conversation.emit(ReactorNettyClient.java:696)
	at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.emit(ReactorNettyClient.java:948)
	at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:822)
	at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:728)
	at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:129)
	at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854)
	at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:224)
	at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:224)
	at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:294)
	at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:403)
	at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:425)
	at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:115)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:333)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:455)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1407)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:918)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:994)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:1583)
@e5l Could you please take a look? The exceptions seem to occur after the database has already returned results, which feels unusual. Any insights? Thanks!
e
Sure, @Oleg Babichev could you check?
o
Thank you for submitting the issue, I'll try reproduce it inside the Exposed repository
👍 1