I can't figure out how to solve this exception: W...
# kotlin-spark
t
I can't figure out how to solve this exception: When
withSpark
is invoked dynamically in a Dockerized Ktor web app, an
UnsupportedFileSystemException
is thrown. GitHub issue is here GitHub repo is here. Broadcast.kt (from kotlin-spark-api example)
Copy code
import org.jetbrains.kotlinx.spark.api.map
import org.jetbrains.kotlinx.spark.api.withSpark
import java.io.Serializable

object Broadcast {
    data class SomeClass(val a: IntArray, val b: Int) : Serializable

    fun broadcast(): MutableList<Int> {
        lateinit var result: MutableList<Int>
        withSpark(master = "local") {
            val broadcastVariable = spark.broadcast(SomeClass(a = intArrayOf(5, 6), b = 3))
            result = listOf(1, 2, 3, 4, 5)
                .toDS()
                .map {
                    val receivedBroadcast = broadcastVariable.value
                    it + receivedBroadcast.a.first()
                }
                .collectAsList()
            println(result)
        }
        return result
    }
}
Routing.kt
Copy code
fun Application.configureRouting() {
    routing {
        get("/") {
            val list = Broadcast.broadcast()
            call.respondText(list.toString())
        }
    }
}
Dockerfile
Copy code
# syntax=docker/dockerfile:1
FROM eclipse-temurin:11-jre-jammy AS jre-jammy-spark
RUN curl <https://archive.apache.org/dist/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3-scala2.13.tgz> -o spark.tgz && \
    tar -xf spark.tgz && \
    mv spark-3.3.2-bin-hadoop3-scala2.13 /opt/spark && \
    rm spark.tgz
ENV SPARK_HOME="/opt/spark"
ENV PATH="${PATH}:/opt/spark/bin:/opt/spark/sbin"

FROM gradle:8.4-jdk11 AS gradle-build
COPY --chown=gradle:gradle . /home/gradle/src
WORKDIR /home/gradle/src
RUN gradle buildFatJar --no-daemon

FROM jre-jammy-spark AS app
RUN mkdir -p /app
COPY --from=gradle-build /home/gradle/src/build/libs/*-all.jar /app/app.jar
ENTRYPOINT ["java","-jar","/app/app.jar"]
compose.yaml
Copy code
services:
  app:
    build: .
    ports:
      - 8888:8888
In a shell, run:
Copy code
$ docker compose up
Then, open http://localhost:8888 in a browser. An
org.apache.hadoop.fs.UnsupportedFileSystemException
will be thrown:
Copy code
app-1  | 2024-04-09 10:26:26.484 [main] INFO  ktor.application - Autoreload is disabled because the development mode is off.
app-1  | 2024-04-09 10:26:26.720 [main] INFO  ktor.application - Application started in 0.261 seconds.
app-1  | 2024-04-09 10:26:26.816 [DefaultDispatcher-worker-1] INFO  ktor.application - Responding at <http://0.0.0.0:8888>
app-1  | WARNING: sun.reflect.Reflection.getCallerClass is not supported. This will impact performance.
app-1  | Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
app-1  | 2024-04-09 10:27:07.885 [eventLoopGroupProxy-4-1] WARN  o.a.hadoop.util.NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
app-1  | WARNING: An illegal reflective access operation has occurred
app-1  | WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/app/app.jar) to constructor java.nio.DirectByteBuffer(long,int)
app-1  | WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
app-1  | WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
app-1  | WARNING: All illegal access operations will be denied in a future release
app-1  | 2024-04-09 10:27:10.066 [eventLoopGroupProxy-4-1] WARN  o.a.spark.sql.internal.SharedState - URL.setURLStreamHandlerFactory failed to set FsUrlStreamHandlerFactory
app-1  | 2024-04-09 10:27:10.071 [eventLoopGroupProxy-4-1] WARN  o.a.spark.sql.internal.SharedState - Cannot qualify the warehouse path, leaving it unqualified.
app-1  | org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "file"
e
org.apache.spark.unsafe.Platform (file:/app/app.jar)
have u tried with
file://
or just
/app/app.jar
, doesn't seem to be a
kotlin-spark-api
issue to me
oh, its the entrypoin in docker
Copy code
ENTRYPOINT ["java","-jar","/app/app.jar"]
j
I'll have a look 🙂
thank you color 1
Actually, after running it with that fix I still get the org.apache.hadoop.fs.UnsupportedFileSystemException, so I'll investigate where that comes from. It runs correctly though
thank you color 1
1
Hmm, It might be because of a mismatch in hadoop version (3.3.6 vs 3.3.2) but I'm not sure. On the other hand, it's just a warning, not a breaking exception. The output shows up correctly
1
e
better to declare the correct hadoop version ime
t
Thanks so much for taking a look! 🙂 @Jolan Rensen [JB] What fix are you referring to? Where is Hadoop version specified? There's no Hadoop installation in the container. (From what I have read, there doesn't have to be.) @Eric Rodriguez Where would you declare the hadoop version?
e
sometimes hadoop gets a bit crazy when it finds different versions of its jars, that might be the problem. You can try declaring the correct
hadoop-client
dependency on gradle and test
thank you color 1
t
@Eric Rodriguez Thanks for the suggestion! 🙂 I just tried building with
implementation("org.apache.hadoop:hadoop-client:3.3.2")
and with
implementation("org.apache.hadoop:hadoop-client:3.3.6")
, but the same exception is thrown either way. 🤔
e
shouldn't
master
be
local[*]
or similar?
t
Either is acceptable:
local
means one thread.
local[*]
means all available cores. What's your thinking on why that would lead to an
UnsupportedFileSystemException
though?
e
Broadcast wants to write to hdfs by default, but u're on a standard file fs. you may need to change the default fs, , some property similar to
fs.impl
thar's my theory
t
Doesn't running locally imply that no Hadoop cluster is configured? Either way, when Spark discovers that there's no Hadoop cluster, it says:
Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Just in case though, I tried running with
master=local[*]
, but the same exception is thrown.
e
no the
local
thing, the
fs.impl
. Why would you get a
No FileSystem for scheme "file"
on your
broadcast
function if you are not trying to explicitly read or write anything? Maybe
spark.broadcast
tries to write to a shared location where all workers can read data from... would be useful to have a
log
before and after
withSpark
to make sure whther is the session initialization that throws the exception or
spark.broadcast
t
@Jolan Rensen [JB] For comparison, I created an equivalent dockerized Spring Boot app here. Notably, no exception is thrown. This seems to suggest that the issue lies within
kotlin-spark
.
j
Looking through the stacktrace:
Copy code
org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "file"
2024-04-09T14:50:30.038904724Z 	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3585)
2024-04-09T14:50:30.038910866Z 	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3608)
2024-04-09T14:50:30.038916313Z 	at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
2024-04-09T14:50:30.038921677Z 	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3712)
2024-04-09T14:50:30.038927222Z 	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3663)
2024-04-09T14:50:30.038932786Z 	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:557)
2024-04-09T14:50:30.038938719Z 	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
2024-04-09T14:50:30.038944366Z 	at org.apache.spark.sql.internal.SharedState$.qualifyWarehousePath(SharedState.scala:282)
2024-04-09T14:50:30.038950650Z 	at org.apache.spark.sql.internal.SharedState.liftedTree1$1(SharedState.scala:80)
2024-04-09T14:50:30.038956369Z 	at org.apache.spark.sql.internal.SharedState.<init>(SharedState.scala:79)
2024-04-09T14:50:30.038962214Z 	at org.apache.spark.sql.SparkSession.$anonfun$sharedState$1(SparkSession.scala:143)
2024-04-09T14:50:30.038998400Z 	at scala.Option.getOrElse(Option.scala:201)
2024-04-09T14:50:30.039003449Z 	at org.apache.spark.sql.SparkSession.sharedState$lzycompute(SparkSession.scala:143)
2024-04-09T14:50:30.039007516Z 	at org.apache.spark.sql.SparkSession.sharedState(SparkSession.scala:142)
2024-04-09T14:50:30.039011752Z 	at org.apache.spark.sql.SparkSession.$anonfun$sessionState$2(SparkSession.scala:162)
2024-04-09T14:50:30.039016094Z 	at scala.Option.getOrElse(Option.scala:201)
2024-04-09T14:50:30.039020178Z 	at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:160)
2024-04-09T14:50:30.039025828Z 	at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:157)
2024-04-09T14:50:30.039030360Z 	at org.apache.spark.sql.SparkSession.$anonfun$new$3(SparkSession.scala:117)
2024-04-09T14:50:30.039035095Z 	at scala.Option.map(Option.scala:242)
2024-04-09T14:50:30.039038922Z 	at org.apache.spark.sql.SparkSession.$anonfun$new$1(SparkSession.scala:117)
2024-04-09T14:50:30.039042933Z 	at org.apache.spark.sql.internal.SQLConf$.get(SQLConf.scala:230)
2024-04-09T14:50:30.039047411Z 	at org.apache.spark.sql.catalyst.SerializerBuildHelper$.nullOnOverflow(SerializerBuildHelper.scala:29)
2024-04-09T14:50:30.039052031Z 	at org.apache.spark.sql.catalyst.SerializerBuildHelper$.createSerializerForJavaBigDecimal(SerializerBuildHelper.scala:158)
2024-04-09T14:50:30.039058725Z 	at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$serializerFor$1(ScalaReflection.scala:549)
2024-04-09T14:50:30.039063060Z 	at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:73)
2024-04-09T14:50:30.039067192Z 	at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects(ScalaReflection.scala:948)
2024-04-09T14:50:30.039071292Z 	at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects$(ScalaReflection.scala:947)
2024-04-09T14:50:30.039075940Z 	at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:51)
2024-04-09T14:50:30.039080073Z 	at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:448)
2024-04-09T14:50:30.039084822Z 	at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$serializerForType$1(ScalaReflection.scala:437)
2024-04-09T14:50:30.039089091Z 	at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:73)
2024-04-09T14:50:30.039093262Z 	at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects(ScalaReflection.scala:948)
2024-04-09T14:50:30.039097522Z 	at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects$(ScalaReflection.scala:947)
2024-04-09T14:50:30.039101625Z 	at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:51)
2024-04-09T14:50:30.039105797Z 	at org.apache.spark.sql.catalyst.ScalaReflection$.serializerForType(ScalaReflection.scala:429)
2024-04-09T14:50:30.039116620Z 	at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:55)
2024-04-09T14:50:30.039121198Z 	at org.apache.spark.sql.Encoders$.DECIMAL(Encoders.scala:100)
2024-04-09T14:50:30.039125443Z 	at org.apache.spark.sql.Encoders.DECIMAL(Encoders.scala)
2024-04-09T14:50:30.039129714Z 	at org.jetbrains.kotlinx.spark.api.EncodingKt.<clinit>(Encoding.kt:87)
2024-04-09T14:50:30.039134156Z 	at com.example.plugins.Broadcast.broadcast(Broadcast.kt:62)
2024-04-09T14:50:30.039138308Z 	at com.example.plugins.RoutingKt$configureRouting$1$1.invokeSuspend(Routing.kt:10)
2024-04-09T14:50:30.039142305Z 	at com.example.plugins.RoutingKt$configureRouting$1$1.invoke(Routing.kt)
2024-04-09T14:50:30.039146282Z 	at com.example.plugins.RoutingKt$configureRouting$1$1.invoke(Routing.kt)
the file reading exception print happens when the DECIMAL encoder is pre-loaded by the Kotlin Spark API Encoders file. Can you try to instantiate the same encoder in the non-kotlin spark project to see what happens? In the spark 3.4+ branch of the project the encoding part is completely overhauled, so this issue won't be there anymore. But it's still a WIP. I once again would like add that your program still executes fine. It's a caught exception that's just logged to the output.
t
@Eric Rodriguez I put a more detailed log for the Ktor-Spark app on the repo. If I'm reading it correctly, it seems that the exception is thrown before the broadcast is performed.
Copy code
app-1  | 2024-04-10 09:53:18.482 [eventLoopGroupProxy-4-1] WARN  o.a.spark.sql.internal.SharedState - Cannot qualify the warehouse path, leaving it unqualified.
app-1  | org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "file"
. . .
app-1  | 2024-04-10 09:53:19.148 [eventLoopGroupProxy-4-1] INFO  o.a.spark.storage.memory.MemoryStore - Block broadcast_0 stored as values in memory (estimated size 72.0 B, free 2.1 GiB)
app-1  | 2024-04-10 09:53:19.180 [eventLoopGroupProxy-4-1] INFO  o.a.spark.storage.memory.MemoryStore - Block broadcast_0_piece0 stored as bytes in memory (estimated size 150.0 B, free 2.1 GiB)
app-1  | 2024-04-10 09:53:19.182 [dispatcher-BlockManagerMaster] INFO  o.a.spark.storage.BlockManagerInfo - Added broadcast_0_piece0 in memory on dcb91ee36ad3:36665 (size: 150.0 B, free: 2.1 GiB)
app-1  | 2024-04-10 09:53:19.186 [eventLoopGroupProxy-4-1] INFO  org.apache.spark.SparkContext - Created broadcast 0 from broadcast at Broadcast.kt:61
The Spring Boot logs are easier to understand. It seems that Spark creates a local temporary directory as part of the preparation of the Spark session long before it invokes the broadcast function:
Copy code
app-1  | 2024-04-10 09:00:43.393  INFO 1 --- [nio-8888-exec-1] o.a.s.SparkEnv                           : Registering BlockManagerMasterHeartbeat
app-1  | 2024-04-10 09:00:43.410  INFO 1 --- [nio-8888-exec-1] o.a.s.s.DiskBlockManager                 : Created local directory at /tmp/blockmgr-c9cef486-62f2-431a-8408-1e48b933da34
app-1  | 2024-04-10 09:00:43.436  INFO 1 --- [nio-8888-exec-1] o.a.s.s.m.MemoryStore                    : MemoryStore started with capacity 2.1 GiB
. . .
app-1  | 2024-04-10 09:00:43.829  INFO 1 --- [nio-8888-exec-1] o.a.s.s.m.MemoryStore                    : Block broadcast_0 stored as values in memory (estimated size 72.0 B, free 2.1 GiB)
app-1  | 2024-04-10 09:00:43.856  INFO 1 --- [nio-8888-exec-1] o.a.s.s.m.MemoryStore                    : Block broadcast_0_piece0 stored as bytes in memory (estimated size 146.0 B, free 2.1 GiB)
app-1  | 2024-04-10 09:00:43.858  INFO 1 --- [ckManagerMaster] o.a.s.s.BlockManagerInfo                 : Added broadcast_0_piece0 in memory on 1d1b66d9e151:43605 (size: 146.0 B, free: 2.1 GiB)
app-1  | 2024-04-10 09:00:43.862  INFO 1 --- [nio-8888-exec-1] o.a.s.SparkContext                       : Created broadcast 0 from broadcast at SparkBroadcast.java:30
So, the interesting question is why is Spark 3.3.2 able to handle this without throwing an exception when the call is coming from Java, but not when called from Kotlin?
e
I had some issues (you can see them in this channel) and they were mainly due to
kotlin-spark-api
using haddop3.3.6 when my cluster had v.3.3.2, so I'd wager is something around that
like @Jolan Rensen [JB] says, its just a log and the app works fine, but if u really want it gone then my bet is u'd need to add soem hadoop-* dependencies specifying your version (3.3.2 iiuc), unfortunately I'm not sure which jar in particular is responsible, maybe hadoop-client + hadoop-common
t
Thank you both for taking the time to help me understand what's happening here. It's a relief to know that the exception isn't happening because I'm doing something wrong on my end. Even though this is a caught exception, when you're trying to learn how to use Spark and Kotlin together, it can be difficult to know which exceptions are "expected" and which are user errors that need to be corrected. Have a great weekend!