Aleksander Eskilson
11/15/2022, 3:54 PMkotlin-spark-api
gitter a few days ago, https://gitter.im/JetBrains/kotlin-spark-api?at=636d86ea18f21c023ba7b373
There I'd noted that the default Spark classloader can have conflicts with kotlin-spark-api's replacement spark-sql Catalyst converters. The solution there I'd initially tried was to set the spark.driver.userClassPathFirst
config option. But in practice this causes a lot of other classpath issues with other supplied jars (things like s3 impl jars).
Another approach I'd tried was to make a shaded jar of my project, ensuring I included shaded dependencies for org.jetbrains.kotlinx.spark
and org.jetbrains.kotlin
. This seems to work when I add my project with the spark.jars
and also the spark.driver.extraClassPath
option (which states it'll prepend jars to the Spark classpath, which I'd suppose is the reason the replacement Catalyst converters get properly picked up by spark first), pointing to my project's physical jar.
That's all a little unwieldy though. I'd prefer to use the spark.jars.packages
option for my packaged project, but that doesn't seem to work (the spark-sql overloads seem to not get discovered by the Spark classloader before it's own implementation).
Is anyone else experiencing odd classloader issues between Spark and the kotlin-spark-api replacement of Catalyst converters? Ideas on how to resolve these classpath conflicts?Jolan Rensen [JB]
11/17/2022, 12:10 PMPasha Finkelshteyn
11/17/2022, 12:52 PMPasha Finkelshteyn
11/17/2022, 12:53 PMAleksander Eskilson
11/17/2022, 10:27 PMSparkSession
JavaGateway to make calls against JVM classes and their methods.
The UDF2 emits a Kotlin data class as its return type.
I register the UDF2 using the inlined UDFRegistration.register function. A Python function ultimately calls a method with register as its implementation via the JavaGateway.
In pure Kotlin unit/integration tests, when I register my_udf
with the UDFRegistration.register
function, things are smooth sailing. Catalyst seems able to infer the return type and generate the execution plan to convert the UDFs return data class instances to the appropriate GenericInternalRow
instances.
When I go to a PySpark world and load my project in with the "spark.jars" & "spark.driver.extraClassPath" config options (from the Spark UI, I can see my project is correctly on the classpath, and so is the kotlin-spark-api project), I can get my UDF2 properly registered, but the Catalyst plan seems to break down when an action is invoked over DF operations calling the UDF:
Failed to execute user defined function (functions$$$Lambda$1366/0x0000000800d41840: (struct<foo:string,bar:double>, string) => struct<foo:string,bar:double>)
at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:136)
...
Caused by: java.lang.IllegalArgumentException: The value (MyDataClass(foo="str", bar=1.0)) of the type (com.myproject.MyDataClass) cannot be converted to struct<foo:string,bar:double>
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:274)
...
The failure line seems to indicate that PySpark's JVM classpath points to the original Scala implementation of CatalystTypeConverters
rather than the kotlin-spark-api override implementation (that would support generating a Catalyst plan for Kotlin's data class).
This seems to be a classloader issue. I think the problem here is that PySpark's JVM classpath is seeing the original Spark impl of CatalystTypeConverters
first, and selecting it over the kotlin-spark-api impl.
As I said above, I can get my shaded jar to reliably execute my UDF when I use spark.jars.packages
and also set spark.driver.userClassPathFirst
, but for my production workflows, that won't be workable in practice, and setting spark.jars
and spark.driver.extraClassPath
is pretty unwieldy and seems to not be 100% reliable.Pasha Finkelshteyn
11/18/2022, 10:34 AMPasha Finkelshteyn
11/18/2022, 10:35 AMAleksander Eskilson
11/18/2022, 3:57 PM[...] we're just smartasses and trying to extend Spark in the place where it shouldn't be extended [...]lol, been there. It is frustrating that the Encoders/Converters APIs aren't more easily directly extensible. I admit, those are their most complicated internals; I can understand they want to reduce the number of consumers who'd be irate about any breaking changes there.
[...] probably they will decline to do so (because Kotlin isn't even an officially supported language).Yeah, I agree here, unfortunately 😕 I've seen a ticket open in Spark to integrate the support, but I can see it's stalled some. Nonetheless, I'm happy to make a plug on Spark's Jira board. I think you're right that once we give things up to PySpark, the classloader's behavior is less obvious and more difficult to manipulate. And I think things can get even stranger in the cloud IaaS world (e.g. AWS EMR), where I think the engineers there don't assume any user would have libraries they'd truly want to deliberately bork something as core as spark-sql classes. So one approach that may work on a local cluster may not at all work on a hosted cluster (without perhaps a lot of non-obvious deep configuration). In our case, the API of UDF functions we're writing have some interesting constraints: 1. Some of the most appropriate dependencies are exclusively in the JVM ecosystem, so some JVM implementation is required, and Kotlin is a good development experience 2. The ultimate consumers of the API are data scientists performing exploratory workflows and authoring predictive pipelines, and Python/PySpark is what they speak best So to your question, I'd say we do have a perhaps unusual requirement to expose our Kotlin APIs via Python bindings.