Icyrockton
05/03/2022, 2:10 PMmapWithState
. but get some Functional interface not serializable exception
java.io.NotSerializableException: streaming.VisitStatisticsKt$$Lambda$1181/1794893473
Icyrockton
05/03/2022, 2:12 PMfun main() {
withSpark {
val resetTime = {
val now = Date()
val date = Date(now.year, now.month, now.date + 1)
date.time - now.time
}
val javaSparkContext = JavaSparkContext(spark.sparkContext())
val jssc = JavaStreamingContext(javaSparkContext, Durations.seconds(10))
jssc.checkpoint("./checkPoint")
val stream = KafkaUtils.createDirectStream(
jssc, LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe<String, String>(
listOf("test"), kafkaConfig()
)
)
stream.map { Json.decodeFromString<UserLog>(it.value()) }.filter {
it.action == "visit_station"
}.mapToPair { Tuple2(it.data["gasStationId"] as Int , 1) }
.mapWithState(StateSpec.function(Function3{ /* <----- not serializable interface */
stationId : Int, v2 :Option<Int> , v3 : State<Int> ->
val accSum = v2.getOrElse { 0 } + v3.option.getOrElse { 0 }
v3.update(accSum)
c(stationId , accSum)
})).print()
jssc.start()
jssc.awaitTerminationOrTimeout(resetTime())
}
}
altavir
05/03/2022, 2:17 PMPasha Finkelshteyn
05/03/2022, 2:18 PMJolan Rensen [JetBrains]
05/05/2022, 10:38 AMJolan Rensen [JetBrains]
05/05/2022, 12:03 PMException in thread "main" java.io.NotSerializableException: DStream checkpointing has been enabled but the DStreams with their functions are not serializable
org.jetbrains.kotlinx.spark.examples.TestKt$main$1$$Lambda$876/0x0000000800753440
Jolan Rensen [JetBrains]
05/05/2022, 12:03 PMJolan Rensen [JetBrains]
05/05/2022, 12:03 PMJolan Rensen [JetBrains]
05/05/2022, 12:39 PMJolan Rensen [JetBrains]
05/05/2022, 12:39 PMPasha Finkelshteyn
05/05/2022, 12:41 PMJolan Rensen [JetBrains]
05/05/2022, 12:55 PM.mapWithState(
StateSpec.function(
object : Function3<Int, Option<Int>, State<Int>, Tuple2<Int, Int>>, java.io.Serializable {
override fun invoke(stationId: Int, v2: Option<Int>, v3: State<Int>): Tuple2<Int, Int> {
val accSum = v2.getOrElse(0) + v3.getOrElse(0)
v3.update(accSum)
return t(stationId, accSum)
}
}
)
)
Jolan Rensen [JetBrains]
05/05/2022, 12:55 PMPasha Finkelshteyn
05/05/2022, 12:56 PMPasha Finkelshteyn
05/05/2022, 12:56 PMserializableFunction { /* code of invoke here */ }
Jolan Rensen [JetBrains]
05/05/2022, 12:57 PMJolan Rensen [JetBrains]
05/05/2022, 12:57 PMPasha Finkelshteyn
05/05/2022, 12:57 PMJolan Rensen [JetBrains]
05/05/2022, 12:57 PMJolan Rensen [JetBrains]
05/05/2022, 1:44 PMOptional
(the Java variant) instead of Option
(the Scala variant). That might actually do the trick as wellJolan Rensen [JetBrains]
05/05/2022, 2:05 PM.mapWithState<Int, Int, Int, Tuple2<Int, Int>>(
StateSpec.function { stationId: Int, v2: Optional<Int>, v3: State<Int> ->
val accSum = v2.getOrElse(0) + v3.getOrElse(0)
v3.update(accSum)
t(stationId, accSum)
}
)
(In theory the first explicit types are not needed, but the scala integration of the kotlin compiler is a bit iffy)Jolan Rensen [JetBrains]
05/05/2022, 2:09 PMfun main() {
val resetTime = {
val now = Date()
val date = Date(now.year, now.month, now.date + 1)
date.time - now.time
}
withSparkStreaming(
batchDuration = Durations.seconds(10),
timeout = resetTime(),
checkpointPath = "./checkPoint",
) {
val stream = KafkaUtils.createDirectStream(
ssc, LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe<String, String>(
listOf("test"), kafkaConfig()
)
)
stream
.map { Json.decodeFromString<UserLog>(it.value()) }
.filter { it.action == "visit_station" }
.map { t(it.data["gasStationId"] as Int, 1) }
.mapWithState(
StateSpec.function { stationId: Int, v2: Optional<Int>, v3: State<Int> ->
val accSum = v2.getOrElse(0) + v3.getOrElse(0)
v3.update(accSum)
t(stationId, accSum)
}
)
.print()
}
}
Icyrockton
05/05/2022, 3:14 PM