Tyler Kinkade
04/09/2024, 11:10 AMwithSpark
is invoked dynamically in a Dockerized Ktor web app, an UnsupportedFileSystemException
is thrown.
GitHub issue is here
GitHub repo is here.
Broadcast.kt (from kotlin-spark-api example)
import org.jetbrains.kotlinx.spark.api.map
import org.jetbrains.kotlinx.spark.api.withSpark
import java.io.Serializable
object Broadcast {
data class SomeClass(val a: IntArray, val b: Int) : Serializable
fun broadcast(): MutableList<Int> {
lateinit var result: MutableList<Int>
withSpark(master = "local") {
val broadcastVariable = spark.broadcast(SomeClass(a = intArrayOf(5, 6), b = 3))
result = listOf(1, 2, 3, 4, 5)
.toDS()
.map {
val receivedBroadcast = broadcastVariable.value
it + receivedBroadcast.a.first()
}
.collectAsList()
println(result)
}
return result
}
}
Routing.kt
fun Application.configureRouting() {
routing {
get("/") {
val list = Broadcast.broadcast()
call.respondText(list.toString())
}
}
}
Dockerfile
# syntax=docker/dockerfile:1
FROM eclipse-temurin:11-jre-jammy AS jre-jammy-spark
RUN curl <https://archive.apache.org/dist/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3-scala2.13.tgz> -o spark.tgz && \
tar -xf spark.tgz && \
mv spark-3.3.2-bin-hadoop3-scala2.13 /opt/spark && \
rm spark.tgz
ENV SPARK_HOME="/opt/spark"
ENV PATH="${PATH}:/opt/spark/bin:/opt/spark/sbin"
FROM gradle:8.4-jdk11 AS gradle-build
COPY --chown=gradle:gradle . /home/gradle/src
WORKDIR /home/gradle/src
RUN gradle buildFatJar --no-daemon
FROM jre-jammy-spark AS app
RUN mkdir -p /app
COPY --from=gradle-build /home/gradle/src/build/libs/*-all.jar /app/app.jar
ENTRYPOINT ["java","-jar","/app/app.jar"]
compose.yaml
services:
app:
build: .
ports:
- 8888:8888
In a shell, run:
$ docker compose up
Then, open http://localhost:8888 in a browser.
An org.apache.hadoop.fs.UnsupportedFileSystemException
will be thrown:
app-1 | 2024-04-09 10:26:26.484 [main] INFO ktor.application - Autoreload is disabled because the development mode is off.
app-1 | 2024-04-09 10:26:26.720 [main] INFO ktor.application - Application started in 0.261 seconds.
app-1 | 2024-04-09 10:26:26.816 [DefaultDispatcher-worker-1] INFO ktor.application - Responding at <http://0.0.0.0:8888>
app-1 | WARNING: sun.reflect.Reflection.getCallerClass is not supported. This will impact performance.
app-1 | Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
app-1 | 2024-04-09 10:27:07.885 [eventLoopGroupProxy-4-1] WARN o.a.hadoop.util.NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
app-1 | WARNING: An illegal reflective access operation has occurred
app-1 | WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/app/app.jar) to constructor java.nio.DirectByteBuffer(long,int)
app-1 | WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
app-1 | WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
app-1 | WARNING: All illegal access operations will be denied in a future release
app-1 | 2024-04-09 10:27:10.066 [eventLoopGroupProxy-4-1] WARN o.a.spark.sql.internal.SharedState - URL.setURLStreamHandlerFactory failed to set FsUrlStreamHandlerFactory
app-1 | 2024-04-09 10:27:10.071 [eventLoopGroupProxy-4-1] WARN o.a.spark.sql.internal.SharedState - Cannot qualify the warehouse path, leaving it unqualified.
app-1 | org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "file"
Eric Rodriguez
04/09/2024, 11:35 AMorg.apache.spark.unsafe.Platform (file:/app/app.jar)
have u tried with file://
or just /app/app.jar
, doesn't seem to be a kotlin-spark-api
issue to meEric Rodriguez
04/09/2024, 11:40 AMENTRYPOINT ["java","-jar","/app/app.jar"]
Jolan Rensen [JB]
04/09/2024, 2:16 PMJolan Rensen [JB]
04/09/2024, 2:33 PMJolan Rensen [JB]
04/09/2024, 2:51 PMEric Rodriguez
04/09/2024, 2:53 PMTyler Kinkade
04/09/2024, 10:39 PMEric Rodriguez
04/10/2024, 4:35 AMhadoop-client
dependency on gradle and testTyler Kinkade
04/10/2024, 5:26 AMimplementation("org.apache.hadoop:hadoop-client:3.3.2")
and with implementation("org.apache.hadoop:hadoop-client:3.3.6")
, but the same exception is thrown either way. 🤔Eric Rodriguez
04/10/2024, 6:10 AMmaster
be local[*]
or similar?Tyler Kinkade
04/10/2024, 6:23 AMlocal
means one thread. local[*]
means all available cores. What's your thinking on why that would lead to an UnsupportedFileSystemException
though?Eric Rodriguez
04/10/2024, 6:41 AMfs.impl
Eric Rodriguez
04/10/2024, 6:41 AMTyler Kinkade
04/10/2024, 7:03 AMUnable to load native-hadoop library for your platform... using builtin-java classes where applicable
Just in case though, I tried running with master=local[*]
, but the same exception is thrown.Eric Rodriguez
04/10/2024, 7:47 AMlocal
thing, the fs.impl
. Why would you get a No FileSystem for scheme "file"
on your broadcast
function if you are not trying to explicitly read or write anything? Maybe spark.broadcast
tries to write to a shared location where all workers can read data from... would be useful to have a log
before and after withSpark
to make sure whther is the session initialization that throws the exception or spark.broadcast
Tyler Kinkade
04/10/2024, 9:18 AMkotlin-spark
.Jolan Rensen [JB]
04/10/2024, 10:19 AMorg.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "file"
2024-04-09T14:50:30.038904724Z at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3585)
2024-04-09T14:50:30.038910866Z at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3608)
2024-04-09T14:50:30.038916313Z at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
2024-04-09T14:50:30.038921677Z at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3712)
2024-04-09T14:50:30.038927222Z at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3663)
2024-04-09T14:50:30.038932786Z at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:557)
2024-04-09T14:50:30.038938719Z at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
2024-04-09T14:50:30.038944366Z at org.apache.spark.sql.internal.SharedState$.qualifyWarehousePath(SharedState.scala:282)
2024-04-09T14:50:30.038950650Z at org.apache.spark.sql.internal.SharedState.liftedTree1$1(SharedState.scala:80)
2024-04-09T14:50:30.038956369Z at org.apache.spark.sql.internal.SharedState.<init>(SharedState.scala:79)
2024-04-09T14:50:30.038962214Z at org.apache.spark.sql.SparkSession.$anonfun$sharedState$1(SparkSession.scala:143)
2024-04-09T14:50:30.038998400Z at scala.Option.getOrElse(Option.scala:201)
2024-04-09T14:50:30.039003449Z at org.apache.spark.sql.SparkSession.sharedState$lzycompute(SparkSession.scala:143)
2024-04-09T14:50:30.039007516Z at org.apache.spark.sql.SparkSession.sharedState(SparkSession.scala:142)
2024-04-09T14:50:30.039011752Z at org.apache.spark.sql.SparkSession.$anonfun$sessionState$2(SparkSession.scala:162)
2024-04-09T14:50:30.039016094Z at scala.Option.getOrElse(Option.scala:201)
2024-04-09T14:50:30.039020178Z at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:160)
2024-04-09T14:50:30.039025828Z at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:157)
2024-04-09T14:50:30.039030360Z at org.apache.spark.sql.SparkSession.$anonfun$new$3(SparkSession.scala:117)
2024-04-09T14:50:30.039035095Z at scala.Option.map(Option.scala:242)
2024-04-09T14:50:30.039038922Z at org.apache.spark.sql.SparkSession.$anonfun$new$1(SparkSession.scala:117)
2024-04-09T14:50:30.039042933Z at org.apache.spark.sql.internal.SQLConf$.get(SQLConf.scala:230)
2024-04-09T14:50:30.039047411Z at org.apache.spark.sql.catalyst.SerializerBuildHelper$.nullOnOverflow(SerializerBuildHelper.scala:29)
2024-04-09T14:50:30.039052031Z at org.apache.spark.sql.catalyst.SerializerBuildHelper$.createSerializerForJavaBigDecimal(SerializerBuildHelper.scala:158)
2024-04-09T14:50:30.039058725Z at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$serializerFor$1(ScalaReflection.scala:549)
2024-04-09T14:50:30.039063060Z at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:73)
2024-04-09T14:50:30.039067192Z at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects(ScalaReflection.scala:948)
2024-04-09T14:50:30.039071292Z at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects$(ScalaReflection.scala:947)
2024-04-09T14:50:30.039075940Z at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:51)
2024-04-09T14:50:30.039080073Z at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:448)
2024-04-09T14:50:30.039084822Z at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$serializerForType$1(ScalaReflection.scala:437)
2024-04-09T14:50:30.039089091Z at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:73)
2024-04-09T14:50:30.039093262Z at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects(ScalaReflection.scala:948)
2024-04-09T14:50:30.039097522Z at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects$(ScalaReflection.scala:947)
2024-04-09T14:50:30.039101625Z at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:51)
2024-04-09T14:50:30.039105797Z at org.apache.spark.sql.catalyst.ScalaReflection$.serializerForType(ScalaReflection.scala:429)
2024-04-09T14:50:30.039116620Z at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:55)
2024-04-09T14:50:30.039121198Z at org.apache.spark.sql.Encoders$.DECIMAL(Encoders.scala:100)
2024-04-09T14:50:30.039125443Z at org.apache.spark.sql.Encoders.DECIMAL(Encoders.scala)
2024-04-09T14:50:30.039129714Z at org.jetbrains.kotlinx.spark.api.EncodingKt.<clinit>(Encoding.kt:87)
2024-04-09T14:50:30.039134156Z at com.example.plugins.Broadcast.broadcast(Broadcast.kt:62)
2024-04-09T14:50:30.039138308Z at com.example.plugins.RoutingKt$configureRouting$1$1.invokeSuspend(Routing.kt:10)
2024-04-09T14:50:30.039142305Z at com.example.plugins.RoutingKt$configureRouting$1$1.invoke(Routing.kt)
2024-04-09T14:50:30.039146282Z at com.example.plugins.RoutingKt$configureRouting$1$1.invoke(Routing.kt)
the file reading exception print happens when the DECIMAL encoder is pre-loaded by the Kotlin Spark API Encoders file. Can you try to instantiate the same encoder in the non-kotlin spark project to see what happens?
In the spark 3.4+ branch of the project the encoding part is completely overhauled, so this issue won't be there anymore. But it's still a WIP.
I once again would like add that your program still executes fine. It's a caught exception that's just logged to the output.Tyler Kinkade
04/10/2024, 10:26 AMapp-1 | 2024-04-10 09:53:18.482 [eventLoopGroupProxy-4-1] WARN o.a.spark.sql.internal.SharedState - Cannot qualify the warehouse path, leaving it unqualified.
app-1 | org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "file"
. . .
app-1 | 2024-04-10 09:53:19.148 [eventLoopGroupProxy-4-1] INFO o.a.spark.storage.memory.MemoryStore - Block broadcast_0 stored as values in memory (estimated size 72.0 B, free 2.1 GiB)
app-1 | 2024-04-10 09:53:19.180 [eventLoopGroupProxy-4-1] INFO o.a.spark.storage.memory.MemoryStore - Block broadcast_0_piece0 stored as bytes in memory (estimated size 150.0 B, free 2.1 GiB)
app-1 | 2024-04-10 09:53:19.182 [dispatcher-BlockManagerMaster] INFO o.a.spark.storage.BlockManagerInfo - Added broadcast_0_piece0 in memory on dcb91ee36ad3:36665 (size: 150.0 B, free: 2.1 GiB)
app-1 | 2024-04-10 09:53:19.186 [eventLoopGroupProxy-4-1] INFO org.apache.spark.SparkContext - Created broadcast 0 from broadcast at Broadcast.kt:61
The Spring Boot logs are easier to understand. It seems that Spark creates a local temporary directory as part of the preparation of the Spark session long before it invokes the broadcast function:
app-1 | 2024-04-10 09:00:43.393 INFO 1 --- [nio-8888-exec-1] o.a.s.SparkEnv : Registering BlockManagerMasterHeartbeat
app-1 | 2024-04-10 09:00:43.410 INFO 1 --- [nio-8888-exec-1] o.a.s.s.DiskBlockManager : Created local directory at /tmp/blockmgr-c9cef486-62f2-431a-8408-1e48b933da34
app-1 | 2024-04-10 09:00:43.436 INFO 1 --- [nio-8888-exec-1] o.a.s.s.m.MemoryStore : MemoryStore started with capacity 2.1 GiB
. . .
app-1 | 2024-04-10 09:00:43.829 INFO 1 --- [nio-8888-exec-1] o.a.s.s.m.MemoryStore : Block broadcast_0 stored as values in memory (estimated size 72.0 B, free 2.1 GiB)
app-1 | 2024-04-10 09:00:43.856 INFO 1 --- [nio-8888-exec-1] o.a.s.s.m.MemoryStore : Block broadcast_0_piece0 stored as bytes in memory (estimated size 146.0 B, free 2.1 GiB)
app-1 | 2024-04-10 09:00:43.858 INFO 1 --- [ckManagerMaster] o.a.s.s.BlockManagerInfo : Added broadcast_0_piece0 in memory on 1d1b66d9e151:43605 (size: 146.0 B, free: 2.1 GiB)
app-1 | 2024-04-10 09:00:43.862 INFO 1 --- [nio-8888-exec-1] o.a.s.SparkContext : Created broadcast 0 from broadcast at SparkBroadcast.java:30
So, the interesting question is why is Spark 3.3.2 able to handle this without throwing an exception when the call is coming from Java, but not when called from Kotlin?Eric Rodriguez
04/10/2024, 10:31 AMkotlin-spark-api
using haddop3.3.6 when my cluster had v.3.3.2, so I'd wager is something around thatEric Rodriguez
04/10/2024, 10:33 AMTyler Kinkade
04/10/2024, 11:19 AM