Trying to write a wrapper around R2DBC using Effec...
# arrow
k
Trying to write a wrapper around R2DBC using Effect and context receivers. select returns an Effect<Err, Flow<T> and execute returns Effect<Err, Long> for rows affected. This is my current API. Would really appreciate it if anyone could take a look and see if it looks sane ๐Ÿ™‚
Copy code
sealed interface R2DBCError {
    object Error: R2DBCError
    object TransactionError : R2DBCError
}

context (ConnectionFactory)
object R2DBC {
    inline fun <T>transaction(crossinline handler: context(Connection) (Handle) -> Effect<R2DBCError, T>): Effect<R2DBCError, T> = effect {
        val connection = create().awaitFirst()
        val handle = Handle
        try {
            val value = handler(connection, handle)
            connection.commitTransaction()
            value.bind()
        } catch (e: Exception) {
            connection.rollbackTransaction()
            if (e is kotlinx.coroutines.CancellationException) {
                throw e
            }
            shift(R2DBCError.TransactionError)
        }
    }
}

object Handle {

    /**
     * [execute] executes an insert/update/delete and returns the amount of affected rows
     */
    context (Connection)
    fun execute(sql: String, vararg params: Any) = effect<R2DBCError, Long> {
        val stmt = createStatement(sql)
        params.forEachIndexed { idx, param ->
            stmt.bind(idx, param)
        }

        stmt.add().execute().awaitFirst()
            .rowsUpdated
            .awaitFirst()
    }

    /**
     * [select] Executes a select statement and maps the result into a Flow<T> with a given mapper
     */
    context (Connection)
    inline fun <reified T: Any>select(sql: String, vararg params: Any, noinline r2dbcMapper: R2DBCMapper<T>) = effect<R2DBCError, Flow<T>> {
        val stmt = createStatement(sql)

        params.forEachIndexed { idx, param ->
            stmt.bind(idx, param)
        }

        stmt.add().execute().awaitFirst().map(r2dbcMapper).asFlow()
    }
}
I think that the Handle object probably is redundant and I could just change it to top level functions and
Copy code
inline fun <T>transaction(crossinline handler: context(Connection) (Handle) -> Effect<R2DBCError, T>)
becomes
Copy code
inline fun <T>transaction(crossinline handler: context(Connection) () -> Effect<R2DBCError, T>)
I think the design is quite poor right now because the caller has to care about the R2DBCError as it is now, while not really being able to get any information from it.
s
I would define the errors as:
Copy code
sealed interface R2DBCError {
    object Error: R2DBCError
    data class TransactionError(val cause: Exception) : R2DBCError
}
You're not using
R2DBCError.Error
anywhere? ๐Ÿค”
Also I think you can design this DSL without context receivers if you want.
Untitled.cpp
k
That seems pretty smart yes ๐Ÿ™‚ I think what I am having trouble with is deciding if transaction errors are something that I want to be typed out to the caller or if I just want it to fail with some exception e.
s
In Scala, and I think also Haskell, they always just use an exception in
IO
. Even ZIO-sql just uses Exception and Throwable in their error channel.. I've been thinking about writing a super small JDBC wrapper (for Postgres) that translates SQLState into an typed error but even then. No1 is probably going to
when
over 100 different errors.
k
If you have compile time checks on your queries then exceptions should be fine right?
If I want to type it out I'm guessing I have to map over every exception that R2DBC can throw and map it into either exceptions in the case that there is something that shouldn't happen like an incorrect SQL query or some sort of typed error if it's because of something we can recover from
I was thinking that something like this might be an okay compromise
Copy code
object R2DBC {
    inline fun <E, T>ConnectionFactory.transaction(
        crossinline handler: ConnectionHandle<E>.() -> Effect<E, T>): Effect<E, T> = effect {
        val connection = create().awaitFirst()
        try {
            val value = handler(ConnectionHandle(connection, this))
            connection.commitTransaction()
            value.fold({
                connection.rollbackTransaction()
                shift(it)
            }, { it })
        } catch (e: Exception) {
            connection.rollbackTransaction()
            throw e
        }
    }
}

class ConnectionHandle<E>(connection: Connection, raise: EffectScope<E>): Connection by connection, EffectScope<E> by raise {
    /**
     * [select] Executes a select statement and maps the result into a Flow<T> with a given mapper
     */
    inline fun <T: Any>select(
        noinline r2dbcMapper: R2DBCMapper<T>,
        sql: String, 
        vararg params: Any,
        crossinline failWith: (Exception) -> E,
    ) = effect<E, Flow<T>> {
        try {
            val stmt = createStatement(sql)
            params.forEachIndexed { idx, param ->
                stmt.bind(idx, param)
            }
            stmt.add().execute().awaitFirst()
                .map(r2dbcMapper)
                .asFlow()
        } catch (e: Exception) {
            e.nonFatalOrThrow()
            shift(failWith(e))
        }
    }

    /**
     * [execute] executes an insert/update/delete and returns the amount of affected rows
     */
    inline fun <E>execute(
        sql: String,
        vararg params: Any,
        crossinline failWith: (Exception) -> E,
    ) = effect<E, Long> {
        try {

            val stmt = createStatement(sql)
            params.forEachIndexed { idx, param ->
                stmt.bind(idx, param)
            }

            stmt.add().execute().awaitFirst()
                .rowsUpdated
                .awaitFirst()
        } catch (e: Exception) {
            e.nonFatalOrThrow()
            shift(failWith(e))
        }
    }
}
Now I can just add some error handling in the failWith lambda in the caller
Thanks for the feedback ๐Ÿ™‚
s
Adding such an error handler is always useful, and you can still provide a translation to
RDBC2Error
if they don't provide the handler. That way you can enforce error handling, and it exposes best of both worlds without code duplication even ๐Ÿ˜‰
k
Smart ๐Ÿ™‚ Thanks ๐Ÿ‘