Hello, everyone. Quick help needed here. Recently...
# datascience
i
Hello, everyone. Quick help needed here. Recently I upgraded my AWS EMR instance to emr-6.6.0 and my jobs started failing. It is most likely some Kotlin Spark API incompatibility with Spark 3.2.0. More in the thread.
EMR 6.6.0 uses Spark 3.2.0 so I started getting:
Copy code
User class threw exception: java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.expressions.objects.Invoke$.apply$default$5()Z
at org.apache.spark.sql.KotlinReflection$.$anonfun$serializerFor$16(KotlinReflection.scala:754)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
at org.apache.spark.sql.KotlinReflection$.$anonfun$serializerFor$1(KotlinReflection.scala:748)
at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:73)
at org.apache.spark.sql.KotlinReflection.cleanUpReflectionObjects(KotlinReflection.scala:1012)
at org.apache.spark.sql.KotlinReflection.cleanUpReflectionObjects$(KotlinReflection.scala:1011)
at org.apache.spark.sql.KotlinReflection$.cleanUpReflectionObjects(KotlinReflection.scala:47)
at org.apache.spark.sql.KotlinReflection$.serializerFor(KotlinReflection.scala:591)
at org.apache.spark.sql.KotlinReflection$.serializerFor(KotlinReflection.scala:578)
at org.apache.spark.sql.KotlinReflection.serializerFor(KotlinReflection.scala)
at org.jetbrains.kotlinx.spark.api.ApiV1Kt.kotlinClassEncoder(ApiV1.kt:180)
at org.jetbrains.kotlinx.spark.api.ApiV1Kt.generateEncoder(ApiV1.kt:167)
at movoto.leadsync.jobs.ExtractJob.<init>(ExtractJob.kt:218)
at movoto.leadsync.ExtractCommand.run(SparkJobMain.kt:104)
at com.github.ajalt.clikt.parsers.Parser.parse(Parser.kt:198)
at com.github.ajalt.clikt.parsers.Parser.parse(Parser.kt:211)
at com.github.ajalt.clikt.parsers.Parser.parse(Parser.kt:18)
at com.github.ajalt.clikt.core.CliktCommand.parse(CliktCommand.kt:395)
at com.github.ajalt.clikt.core.CliktCommand.parse$default(CliktCommand.kt:392)
at com.github.ajalt.clikt.core.CliktCommand.main(CliktCommand.kt:410)
at com.github.ajalt.clikt.core.CliktCommand.main(CliktCommand.kt:435)
at movoto.leadsync.SparkJobMainKt.main(SparkJobMain.kt:189)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:740)
Unmanaged Application:
Here it says there is no support for Spark 3.2.0 https://github.com/Kotlin/kotlin-spark-api#supported-versions-of-apache-spark Is there any tweak I can do in order to make it work, rather in my Kotlin/Spark incantations or adding to my cluster? By now we rolled back our EMR instance to 6.5.0.
i
@Jolan Rensen [JetBrains]
j
We support the latest supported releases from Apache: https://spark.apache.org/downloads.html, I assumed that 3.2.0 would be compatible with 3.2.1 at least...
you are running Scala 2.12?
👀 1
i
specifically signature of a spark method we are having issues with
Yes Scala 2.12
2.12.10
p
And which version of Kotlin API for Apache Spark do you use?
If you have reproducible example — we would test it. There can be several resons for incompatibility starting from internal incompatibility of Spark itself to inconsistency between compile and provided dependencies in your build and in EMR
👀 1
i
that was
kotlin-spark-api-3.0:1.0.2
but local testing produced the same issue w/
kotlin-spark-api-3.2:1.1.0
on Spark 3.2.0 but not Spark 3.2.1
p
So it works with 3.2.1, but doen't work with 3.2.0?
i
Correct
p
Looks like they have "fixed" something 🙂 But we still would prefer to have a reproducer. If you can't share it publicly — feel free to contact me or @Jolan Rensen [JetBrains] in DM.
j
yes please 🙂
debugging that part of the code is hard, so having something we can test would be awesome
i
I am working on this. I’ll make as simple as possible… Gimme a moment, please
p
Thank you very much!
j
no haste 🙂 I don't think I have time before the weekend anyways, but I'll definitely check it out on Monday
i
That’s good to hear actually. We reverted the change to emr-6.5.0 and things are back to business. I also am in a hurry today, so push it forward a day or two.
Actually here is a simple reproducible piece of code:
Copy code
import io.kotest.core.spec.style.FunSpec
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.SparkSession
import org.jetbrains.kotlinx.spark.api.dsOf
import <http://org.jetbrains.kotlinx.spark.api.to|org.jetbrains.kotlinx.spark.api.to>

data class DowncastType(
    val intType: Int? = null,
)

class SparkDowncastTest : FunSpec({
    val spark = SparkSession.builder().master("local[*]").orCreate

    afterSpec {
        spark.stop()
    }

    test("test spark downcasting") {
        val dataset = spark.dsOf("intType" to 1)

        val casted: Dataset<DowncastType> = <http://dataset.to|dataset.to>()
    }
})
j
What are you trying to achieve with this example? Running it on 3.2.1 also gives:
Copy code
Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'intType' given input columns: [first, second]
Which makes sense, since you are trying to implicitly convert a
Dataset<Pair<String, Int>>
to
Dataset<DowncastType>
which is a completely different class
Or is it just to reproduce the error, because on 3.2.0 I can indeed reproduce it with this example 🙂
I'll use
Copy code
data class Type(val intType: Int)
...
dsOf(Type(intType = 1))
...
instead 🙂
Okay, I've narrowed down the bug to the exact place where it's called in the code. We use an adapted version of a Spark file (KotlinReflection.scala instead of ScalaReflection.scala) to enable support for Kotlin data classes and whatnot. However, since this is essentially a replacement for the actual file, the Spark version must match exactly as well... We're still debating how to handle this in the future. I'll try to keep you up to date 🙂 For now, however, you can either wait until EMR updates to 3.2.1, or you can build the spark-3.2 branch adapted to 3.2.0 from source.
i
Thank you so much @Jolan Rensen [JetBrains] and @Pasha Finkelshteyn. You are incredible. I’ll take a look on how to build that branch.