Hi i'm using spark streaming in kotlin , such as ...
# datascience
i
Hi i'm using spark streaming in kotlin , such as
mapWithState
. but get some Functional interface not serializable exception
Copy code
java.io.NotSerializableException: streaming.VisitStatisticsKt$$Lambda$1181/1794893473
Copy code
fun main() {

    withSpark {

        val resetTime = {
            val now = Date()
            val date = Date(now.year, now.month, now.date + 1)
            date.time - now.time
        }
        val javaSparkContext = JavaSparkContext(spark.sparkContext())
        val jssc = JavaStreamingContext(javaSparkContext, Durations.seconds(10))
        jssc.checkpoint("./checkPoint")
        val stream = KafkaUtils.createDirectStream(
            jssc, LocationStrategies.PreferConsistent(),
            ConsumerStrategies.Subscribe<String, String>(
                listOf("test"), kafkaConfig()
            )
        )


        stream.map { Json.decodeFromString<UserLog>(it.value()) }.filter {
            it.action == "visit_station"
        }.mapToPair { Tuple2(it.data["gasStationId"] as Int , 1)  }
            .mapWithState(StateSpec.function(Function3{   /* <----- not serializable interface  */
                    stationId : Int, v2 :Option<Int> , v3 : State<Int> ->
                val accSum = v2.getOrElse { 0 } + v3.option.getOrElse { 0 }
                v3.update(accSum)
                c(stationId , accSum)
            })).print()

        jssc.start()
        jssc.awaitTerminationOrTimeout(resetTime())
    }
}
a
сс @Pasha Finkelshteyn
p
@Jolan Rensen [JB] works on it right now!
j
Yup! I'll test whether it works in the latest version 🙂
More information is in the stacktrace:
Copy code
Exception in thread "main" java.io.NotSerializableException: DStream checkpointing has been enabled but the DStreams with their functions are not serializable
org.jetbrains.kotlinx.spark.examples.TestKt$main$1$$Lambda$876/0x0000000800753440
I think this is a Spark limitation, not a limitation of the Kotlin Spark Library. I'll see if this works (or not) in Java
or Scala
Same error happens in Java, it's indeed a Spark limitation
should work without checkpointing enabled
p
Yeah, things should be serializable 🙂 @Icyrockton couls you provide us with working Java/Scala example please?
j
Okay, it does seem to work if we make an explicitly Serializable lambda, like:
Copy code
.mapWithState(
    StateSpec.function(
        object : Function3<Int, Option<Int>, State<Int>, Tuple2<Int, Int>>, java.io.Serializable {
            override fun invoke(stationId: Int, v2: Option<Int>, v3: State<Int>): Tuple2<Int, Int> {
                val accSum = v2.getOrElse(0) + v3.getOrElse(0)
                v3.update(accSum)

                return t(stationId, accSum)
            }
        }
    )
)
It's not pretty though, I'll see what I can do to make this better 🙂
p
It looks like we need Serializable function derivatives…
serializableFunction { /* code of invoke here */ }
j
yup... Something like that should do the trick
But it's not required everywhere
p
@Icyrockton looks like you can use workaround described by @Jolan Rensen [JB]
j
so maybe an alternative StateSpec class could also be an option
@Icyrockton Oh actually, could you try the variant of StateSpec.function with
Optional
(the Java variant) instead of
Option
(the Scala variant). That might actually do the trick as well
for me, it compiles like:
Copy code
.mapWithState<Int, Int, Int, Tuple2<Int, Int>>(
    StateSpec.function { stationId: Int, v2: Optional<Int>, v3: State<Int> ->
        val accSum = v2.getOrElse(0) + v3.getOrElse(0)
        v3.update(accSum)

        t(stationId, accSum)
    }
)
(In theory the first explicit types are not needed, but the scala integration of the kotlin compiler is a bit iffy)
This turns the whole example (for when the next release hits) into:
Copy code
fun main() {

    val resetTime = {
        val now = Date()
        val date = Date(now.year, now.month, now.date + 1)
        date.time - now.time
    }
    withSparkStreaming(
        batchDuration = Durations.seconds(10),
        timeout = resetTime(),
        checkpointPath = "./checkPoint",
    ) {
        val stream = KafkaUtils.createDirectStream(
            ssc, LocationStrategies.PreferConsistent(),
            ConsumerStrategies.Subscribe<String, String>(
                listOf("test"), kafkaConfig()
            )
        )

        stream
            .map { Json.decodeFromString<UserLog>(it.value()) }
            .filter { it.action == "visit_station" }
            .map { t(it.data["gasStationId"] as Int, 1) }
            .mapWithState(
                StateSpec.function { stationId: Int, v2: Optional<Int>, v3: State<Int> ->
                    val accSum = v2.getOrElse(0) + v3.getOrElse(0)
                    v3.update(accSum)

                    t(stationId, accSum)
                }
            )
            .print()
    }
}
i
thanks I will try , it's very helpful to me 😄
👍 1