Is anyone using Kotlin with Spark? We're starting ...
# datascience
k
Is anyone using Kotlin with Spark? We're starting to and I ran into an issue with some helper functions not returning
Serializable
implementations. Thread starts here: https://kotlinlang.slack.com/archives/C1H43FDRB/p1521752154000105
t
Have you looked at this yet? https://github.com/khud/sparklin
c
I've tried using Kotlin with Spark about a year ago and the speed of autocompletion was terrible compared to Java. It seems that Kotlin and Scala just didn't work very well together. Has it been fixed since then?
@kz
t
You guys are using the Java API right?
k
That's right. Java API. IJ support seems just as good as ever.
I haven't checked out sparklin yet
Ah, it's a REPL?
t
@kz yes although I was hoping it would have a few extension functions. Have you looked at using Kryo for serialization? I've had success with that as demonstrated here http://tomstechnicalblog.blogspot.com/2016/11/using-kotlin-language-with-spark.html
Kryo is a much safer and more robust serialization solution anyway, and requires no Serializable interfaces. I'll play with your example later if there is indeed something weird going on with the lambda itself, and not so much the objects it's manipulating.
1
k
I will check out your blog but I'm not sure that's the case. I know Kryo can be configured so that your data doesn't need to be Serializable. But based on my code-dive into SparkEnv and ClosureCleaner, your closure does have to be
Serializable
.
Even when Kryo is set as the serializer.
I've convinced myself that this makes sense since spark needs to be able to serialize your Java/Scala/Kotlin code to send it to the executors so you should probably rely on the language's mechanism for that rather then let the user override it a la Kryo.
I would be interested in knowing what you think though.
t
@kz eww... why is Kotlin not mapping it to whatever higher order type it needs then? Weird. Ill take a look when I'm at a computer.
k
The situation where I ran into this was something along the lines of
rdd.takeOrdered(10, comparingBy {it.first})
Let me dig up a reproducible example. The "solution" for us was to make the returned
Comparator
Serializable
which meant (at least until we find some better fix) that we had to stop using those utility builders.
t
My guess is comparingBy is returning an object not serializable. Maybe you can reimplement it yourself so it is.
k
That's basically what we're doing now. It's a shame though because the standard library already provides all these nice utilities
The factory methods on java.util.Comparator do this more or less correctly:
Copy code
public static <T, U extends Comparable<? super U>> Comparator<T> comparing(
            Function<? super T, ? extends U> keyExtractor)
    {
        Objects.requireNonNull(keyExtractor);
        return (Comparator<T> & Serializable)
            (c1, c2) -> keyExtractor.apply(c1).compareTo(keyExtractor.apply(c2));
    }
If the keyExtractor is Serializable the comparator will be too. Although you have to just through some hoops to make that lambda serializable (at least in my attempts):
Copy code
Comparator.comparing((Serializable & Function<Pair<String, Integer>, Integer>) Pair::getSecond)
t
I'm convinced you can do something much cleaner and easier to use. You should be able to write it in Kotlin too. Ill take a look later rather than reviewing on my phone.
And you can package the fixed operator as an extension function to the RDD hopefully. Might be a good contribution to Sparklin.
k
Thanks. Here is a small usecase that's failing:
Copy code
val conf = SparkConf().setAppName("Example")
        conf.setMaster("local[1]")
        conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        val sc = JavaSparkContext(conf)

        val rdd = sc.emptyRDD<Pair<String, Int>>()
        rdd.takeOrdered(10, compareByDescending { it.second })
The last line makes the task not serializable.
Here is a super hacky workaround. It relies on the comparator having an accessible default constructor. We only close over the non-serializable comparator's class, then lazily instantiate it inside our serializable comparator object expression and delegate to it
Copy code
fun <T> JavaRDD<T>.takeOrderedKz(num: Int, delegate: Comparator<T>): MutableList<T> {
    val comparatorClass = delegate.javaClass

    return takeOrdered(num, object : Comparator<T>, Serializable {

        private var impl: Comparator<T>? = null
        private val comparator: Comparator<T>
            get() {
                if (impl == null) {
                    impl = comparatorClass.newInstance()
                }
                return impl!!
            }

        override fun compare(o1: T, o2: T): Int = comparator.compare(o1, o2)
    })
}
e
You are welcome to submit an issue to http://kotl.in/issue about serializability of utility comparators. IMHO, you have quite a compelling use-case
👍 1
t
@kz ☝️ share if you do file an issue. We will upvote
k
Thanks guys. I'll do it within a few hours.