Hi guys! Huge work on publishing R2DBC support! I ...
# exposed
g
Hi guys! Huge work on publishing R2DBC support! I have a problem: my table contains
kotlinx.datetime.LocalDateTime
column, and I'm trying to make queries to it. As long as the final
ResultRow
doesn't contain this column, everything works fine, but if Exposed needs to deserialize it, an exception occurs. Is it possible that there is some issue with
TransactionCoroutineElement
being detached when it shouldn't, or something like that? Stacktrace and some code snippets in thread.
Stacktrace:
Copy code
No transaction in context.
java.lang.IllegalStateException: No transaction in context.
	at _COROUTINE._BOUNDARY._(CoroutineDebugging.kt:42)
	at org.jetbrains.exposed.v1.r2dbc.statements.api.R2dbcResult$mapRows$1.invokeSuspend(R2dbcResult.kt:147)
	at kotlinx.coroutines.flow.AbstractFlow.collect(Flow.kt:226)
	at org.jetbrains.exposed.v1.r2dbc.Query.collect$suspendImpl(Query.kt:323)
	at kotlinx.coroutines.flow.FlowKt__ReduceKt.firstOrNull(Reduce.kt:179)
	at ru.meproject.launcher.backend.app.ApplicationTest$testRoot$1$count$1.invokeSuspend(ApplicationTest.kt:43)
	at org.jetbrains.exposed.v1.r2dbc.transactions.TransactionManagerKt$suspendedTransactionAsyncInternal$1.invokeSuspend(TransactionManager.kt:426)
	at io.ktor.server.testing.TestApplicationKt.runTestApplication(TestApplication.kt:464)
	at io.ktor.server.testing.TestApplicationKt$testApplication$1.invokeSuspend(TestApplication.kt:447)
	at io.ktor.test.dispatcher.TestCommonKt$runTestWithRealTime$1.invokeSuspend(TestCommon.kt:40)
	at kotlinx.coroutines.test.TestBuildersKt__TestBuildersKt$runTest$2$1$1.invokeSuspend(TestBuilders.kt:317)
Caused by: java.lang.IllegalStateException: No transaction in context.
	at org.jetbrains.exposed.v1.core.transactions.CoreTransactionManager.currentTransaction(TransactionManagerApi.kt:110)
	at org.jetbrains.exposed.v1.core.vendors.DatabaseDialectKt.getCurrentDialect(DatabaseDialect.kt:160)
	at org.jetbrains.exposed.v1.datetime.KotlinLocalDateTimeColumnType.readObject(KotlinDateColumnType.kt:295)
	at org.jetbrains.exposed.v1.core.ResultRow$Companion.create(ResultRow.kt:146)
	at org.jetbrains.exposed.v1.r2dbc.Query.collect$lambda$20(Query.kt:320)
	at org.jetbrains.exposed.v1.r2dbc.statements.api.R2dbcResult$mapRows$1.invokeSuspend$lambda$2$lambda$0(R2dbcResult.kt:46)
	at org.mariadb.r2dbc.client.MariadbResult.lambda$map$2(MariadbResult.java:211)
	at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:179)
	at reactor.core.publisher.FluxWindowPredicate$WindowFlux.drainRegular(FluxWindowPredicate.java:670)
	at reactor.core.publisher.FluxWindowPredicate$WindowFlux.drain(FluxWindowPredicate.java:748)
	at reactor.core.publisher.FluxWindowPredicate$WindowFlux.onNext(FluxWindowPredicate.java:790)
	at reactor.core.publisher.FluxWindowPredicate$WindowPredicateMain.onNext(FluxWindowPredicate.java:268)
	at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
	at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:880)
	at reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:805)
	at reactor.core.publisher.FluxCreate$SerializedFluxSink.next(FluxCreate.java:163)
	at org.mariadb.r2dbc.client.Exchange.emit(Exchange.java:66)
	at org.mariadb.r2dbc.client.SimpleClient$ServerMessageSubscriber.onNext(SimpleClient.java:770)
	at org.mariadb.r2dbc.client.SimpleClient$ServerMessageSubscriber.onNext(SimpleClient.java:712)
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122)
	at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:294)
	at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:403)
	at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:425)
	at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:115)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1407)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:918)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:994)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:833)
My table (simplified):
Copy code
object AuthTable : Table(name = "auth") {
    val login = varchar("login", 16)
    val lastLogin = datetime("last_login").defaultExpression(CurrentDateTime)

    override val primaryKey = PrimaryKey(login)
}
This query fails:
Copy code
val anyRow = db.transaction { AuthTable.selectAll().limit(1).firstOrNull() }
While these 2 don't:
Copy code
val count = db.transaction { AuthTable.selectAll().count() }
val anyLogin = db.transaction { AuthTable.select(AuthTable.login).limit(1).firstOrNull() }
(and this is my R2dbcDatabase::transaction extension which simply wraps
suspendTransaction
):
Copy code
suspend fun <T> R2dbcDatabase.transaction(statement: suspend R2dbcTransaction.() -> T): T {
    return suspendTransaction(db = this, statement = statement)
}
Some more digging:
R2dbcResult::mapRows
maps rows in Reactor's own thread, which probably doesn't receive any
CoroutineContext.Element
(because it doesn't know anything about coroutines)
e
@Oleg Babichev
d
Hi guys!. When I use r2dbc suspendTransaction with
<http://Dispatchers.IO|Dispatchers.IO>
, or
Dispatchers.Default
, raise exception
No transaction in context.
SamplesSQL.kt
Copy code
@Suppress("LongMethod")
fun main() {
    Assume.assumeTrue(TestDB.H2_V2 in TestDB.enabledDialects())
    R2dbcDatabase.connect("r2dbc:h2:mem:///test;USER=root;")

    runBlocking(<http://Dispatchers.IO|Dispatchers.IO>) { // Specified context.
        org.jetbrains.exposed.v1.r2dbc.transactions.suspendTransaction {
            addLogger(StdOutSqlLogger)

            SchemaUtils.create(Cities, Users)

            val saintPetersburgId = Cities.insert {
                it[name] = "St. Petersburg"
            } get Cities.id
When use Dispatchers.IO in
runBlocking
, raise exception
No transaction in context
. Can't use specified dispatcher ?
s
A new version
beta-2
was published this morning with a fix for this issue. Could you try, and let us know if it the problem is also solved for you? It's been solved for my use-cases on
beta-2
.
👍 1
👍🏾 1
d
@simon.vergauwen Thanks,
beta-2
is working ^^
kodee loving 1