Hi folks! I'd posted in the `kotlin-spark-api` git...
# kotlin-spark
a
Hi folks! I'd posted in the
kotlin-spark-api
gitter a few days ago, https://gitter.im/JetBrains/kotlin-spark-api?at=636d86ea18f21c023ba7b373 There I'd noted that the default Spark classloader can have conflicts with kotlin-spark-api's replacement spark-sql Catalyst converters. The solution there I'd initially tried was to set the
spark.driver.userClassPathFirst
config option. But in practice this causes a lot of other classpath issues with other supplied jars (things like s3 impl jars). Another approach I'd tried was to make a shaded jar of my project, ensuring I included shaded dependencies for
org.jetbrains.kotlinx.spark
and
org.jetbrains.kotlin
. This seems to work when I add my project with the
spark.jars
and also the
spark.driver.extraClassPath
option (which states it'll prepend jars to the Spark classpath, which I'd suppose is the reason the replacement Catalyst converters get properly picked up by spark first), pointing to my project's physical jar. That's all a little unwieldy though. I'd prefer to use the
spark.jars.packages
option for my packaged project, but that doesn't seem to work (the spark-sql overloads seem to not get discovered by the Spark classloader before it's own implementation). Is anyone else experiencing odd classloader issues between Spark and the kotlin-spark-api replacement of Catalyst converters? Ideas on how to resolve these classpath conflicts?
j
Unfortunately I don't know enough about class loaders in Spark to be of help. Maybe @Pasha Finkelshteyn has an idea?
p
Hi @Aleksander Eskilson! Thank you for your report! May I ask you please what you're trying to achieve? I think we didn't do any classloader magic inside the project, but maybe I'm wrong?
As far as I can see we don't do any work with classloaders in the code mentioned above
a
@Pasha Finkelshteyn, sure, pulling the use case context from the earlier gitter conversation: I've written some code that wraps a kotlin-spark-api UDF2 in Python bindings. A Python function delegates to the PySpark
SparkSession
JavaGateway to make calls against JVM classes and their methods. The UDF2 emits a Kotlin data class as its return type. I register the UDF2 using the inlined UDFRegistration.register function. A Python function ultimately calls a method with register as its implementation via the JavaGateway. In pure Kotlin unit/integration tests, when I register
my_udf
with the
UDFRegistration.register
function, things are smooth sailing. Catalyst seems able to infer the return type and generate the execution plan to convert the UDFs return data class instances to the appropriate
GenericInternalRow
instances. When I go to a PySpark world and load my project in with the "spark.jars" & "spark.driver.extraClassPath" config options (from the Spark UI, I can see my project is correctly on the classpath, and so is the kotlin-spark-api project), I can get my UDF2 properly registered, but the Catalyst plan seems to break down when an action is invoked over DF operations calling the UDF:
Copy code
Failed to execute user defined function (functions$$$Lambda$1366/0x0000000800d41840: (struct<foo:string,bar:double>, string) => struct<foo:string,bar:double>)
    at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:136)
...
Caused by: java.lang.IllegalArgumentException: The value (MyDataClass(foo="str", bar=1.0)) of the type (com.myproject.MyDataClass) cannot be converted to struct<foo:string,bar:double>
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:274)
...
The failure line seems to indicate that PySpark's JVM classpath points to the original Scala implementation of
CatalystTypeConverters
rather than the kotlin-spark-api override implementation (that would support generating a Catalyst plan for Kotlin's data class). This seems to be a classloader issue. I think the problem here is that PySpark's JVM classpath is seeing the original Spark impl of
CatalystTypeConverters
first, and selecting it over the kotlin-spark-api impl. As I said above, I can get my shaded jar to reliably execute my UDF when I use
spark.jars.packages
and also set
spark.driver.userClassPathFirst
, but for my production workflows, that won't be workable in practice, and setting
spark.jars
and
spark.driver.extraClassPath
is pretty unwieldy and seems to not be 100% reliable.
p
@Aleksander Eskilson I think you're right, we're just smartasses and trying to extend Spark in the place where it shouldn't be extended: we add the class with the same FQDN as one inside Spark itself. And it's actually the only way to force Spark to work with this class. Application will always choose this class, but when we're talking about integration with Python things become very compliocated. I'm not sure at all that we can lure PySpark into belief that it should use our reimplementation. At this point I have to ask: are you absolutely sure that you wanna write your code in Python rather in Spark? Because if the answer is "yes" we should probably create a feature request in Spark for them to allow us to redefine Converters. And most probably they will decline to do so (because Kotlin isn't even an officially supported language), and even if they will agree it will take months to implement and release it
I, however, strongly believe that it's easier to write Spark code in Kotlin than in Python — that's why I created this API for myself 🙂
a
[...] we're just smartasses and trying to extend Spark in the place where it shouldn't be extended [...]
lol, been there. It is frustrating that the Encoders/Converters APIs aren't more easily directly extensible. I admit, those are their most complicated internals; I can understand they want to reduce the number of consumers who'd be irate about any breaking changes there.
[...] probably they will decline to do so (because Kotlin isn't even an officially supported language).
Yeah, I agree here, unfortunately 😕 I've seen a ticket open in Spark to integrate the support, but I can see it's stalled some. Nonetheless, I'm happy to make a plug on Spark's Jira board. I think you're right that once we give things up to PySpark, the classloader's behavior is less obvious and more difficult to manipulate. And I think things can get even stranger in the cloud IaaS world (e.g. AWS EMR), where I think the engineers there don't assume any user would have libraries they'd truly want to deliberately bork something as core as spark-sql classes. So one approach that may work on a local cluster may not at all work on a hosted cluster (without perhaps a lot of non-obvious deep configuration). In our case, the API of UDF functions we're writing have some interesting constraints: 1. Some of the most appropriate dependencies are exclusively in the JVM ecosystem, so some JVM implementation is required, and Kotlin is a good development experience 2. The ultimate consumers of the API are data scientists performing exploratory workflows and authoring predictive pipelines, and Python/PySpark is what they speak best So to your question, I'd say we do have a perhaps unusual requirement to expose our Kotlin APIs via Python bindings.