https://kotlinlang.org logo
#kotlin-spark
Title
# kotlin-spark
t

Tyler Kinkade

03/05/2024, 3:26 AM
@Jolan Rensen [JB] Hi 🙂 I'm trying to use the Kotlin DataFrame interoperability code from the project wiki to convert a Kotlin dataframe to a Spark dataframe in a Kotlin notebook, but it doesn't compile (error below).
Copy code
%use dataframe
%use spark
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.*
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.*
import org.apache.spark.unsafe.types.CalendarInterval
import org.jetbrains.kotlinx.dataframe.*
import org.jetbrains.kotlinx.dataframe.annotations.DataSchema
import org.jetbrains.kotlinx.dataframe.api.*
import org.jetbrains.kotlinx.dataframe.columns.ColumnKind.*
import org.jetbrains.kotlinx.dataframe.schema.*
import org.jetbrains.kotlinx.spark.api.*
import java.math.*
import java.sql.*
import java.time.*
import kotlin.reflect.*

fun DataFrame<*>.toSpark(spark: SparkSession, sc: JavaSparkContext): Dataset<Row> {
    val rows = sc.toRDD(rows().map(DataRow<*>::toSpark))
    return spark.createDataFrame(rows, schema().toSpark())
}

fun DataRow<*>.toSpark(): Row =
    RowFactory.create(
        *values().map {
            when (it) {
                is DataRow<*> -> it.toSpark()
                else -> it
            }
        }.toTypedArray()
    )

fun DataFrameSchema.toSpark(): StructType =
    DataTypes.createStructType(
        columns.map { (name, schema) ->
            DataTypes.createStructField(name, schema.toSpark(), schema.nullable)
        }
    )

fun ColumnSchema.toSpark(): DataType =
    when (this) {
        is ColumnSchema.Value -> type.toSpark() ?: error("unknown data type: $type")
        is ColumnSchema.Group -> schema.toSpark()
        is ColumnSchema.Frame -> error("nested dataframes are not supported")
        else -> error("unknown column kind: $this")
    }

fun KType.toSpark(): DataType? = when(this) {
    typeOf<Byte>(), typeOf<Byte?>() -> DataTypes.ByteType
    typeOf<Short>(), typeOf<Short?>() -> DataTypes.ShortType
    typeOf<Int>(), typeOf<Int?>() -> DataTypes.IntegerType
    typeOf<Long>(), typeOf<Long?>() -> DataTypes.LongType
    typeOf<Boolean>(), typeOf<Boolean?>() -> DataTypes.BooleanType
    typeOf<Float>(), typeOf<Float?>() -> DataTypes.FloatType
    typeOf<Double>(), typeOf<Double?>() -> DataTypes.DoubleType
    typeOf<String>(), typeOf<String?>() -> DataTypes.StringType
    typeOf<LocalDate>(), typeOf<LocalDate?>() -> DataTypes.DateType
    typeOf<Date>(), typeOf<Date?>() -> DataTypes.DateType
    typeOf<Timestamp>(), typeOf<Timestamp?>() -> DataTypes.TimestampType
    typeOf<Instant>(), typeOf<Instant?>() -> DataTypes.TimestampType
    typeOf<ByteArray>(), typeOf<ByteArray?>() -> DataTypes.BinaryType
    typeOf<Decimal>(), typeOf<Decimal?>() -> DecimalType.SYSTEM_DEFAULT()
    typeOf<BigDecimal>(), typeOf<BigDecimal?>() -> DecimalType.SYSTEM_DEFAULT()
    typeOf<BigInteger>(), typeOf<BigInteger?>() -> DecimalType.SYSTEM_DEFAULT()
    typeOf<CalendarInterval>(), typeOf<CalendarInterval?>() -> DataTypes.CalendarIntervalType
    else -> null
}
Error:
Copy code
Line_31.jupyter.kts (20:48 - 55) 'toSpark' is a member and an extension at the same time. References to such elements are not allowed
How can I fix this?
j

Jolan Rensen [JB]

03/05/2024, 10:54 AM
Ah I see, this is probably because Jupyter notebooks behave a little bit different compared to normal Kotlin. It complains about the line
Copy code
val rows = sc.toRDD(rows().map(DataRow<*>::toSpark))
you can replace that with
Copy code
val rows = sc.toRDD(rows().map { it.toSpark() })
Also, don't forget that in notebooks you don't need
withSpark {}
🙂
I'll edit the line in the wiki so it works for both
also
%use dataframe, spark
should be in a separate cell 🙂
t

Tyler Kinkade

03/05/2024, 8:46 PM
Awesome! Thank you! 🙂 (Yeah, normal Kotlin also complained when I placed the code within a class.)