https://kotlinlang.org logo
#apollo-kotlin
Title
# apollo-kotlin
p

Parijat Shah

02/24/2023, 8:32 PM
I start a subscription as seen in the code snippet. My understanding is that it will catch any exceptions and restart the flow but in reality , my app is crashing with the exception below.
Copy code
subscription(someSubscription).toFlow().retryWhen { t, attempt ->
    delay(attempt * attempt * 10000)
    true
}
Copy code
java.io.EOFException: null
    at okio.RealBufferedSource.require(RealBufferedSource.kt:199)
    at okio.RealBufferedSource.readByte(RealBufferedSource.kt:209)
    at okhttp3.internal.ws.WebSocketReader.readHeader(WebSocketReader.kt:119)
    at okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.kt:102)
    at okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.kt:293)
    at okhttp3.internal.ws.RealWebSocket$connect$1.onResponse(RealWebSocket.kt:195)
    at okhttp3.internal.connection.RealCall$AsyncCall.run(RealCall.kt:519)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1167)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
    at java.lang.Thread.run(Thread.java:764)
com.apollographql.apollo3.exception.ApolloNetworkException: Network error while executing onDisplayTimeUpdated
    at com.apollographql.apollo3.network.ws.WebSocketNetworkTransport$execute$$inlined$map$1$2.emit(Emitters.kt:246)
    at kotlinx.coroutines.flow.internal.SafeCollectorKt$emitFun$1.invoke(SafeCollector.kt:15)
    at kotlinx.coroutines.flow.internal.SafeCollectorKt$emitFun$1.invoke(SafeCollector.kt:15)
    at kotlinx.coroutines.flow.internal.SafeCollector.emit(SafeCollector.kt:87)
    at kotlinx.coroutines.flow.internal.SafeCollector.emit(SafeCollector.kt:66)
    at com.apollographql.apollo3.network.ws.WebSocketNetworkTransport$execute$3.invokeSuspend(WebSocketNetworkTransport.kt:271)
    at com.apollographql.apollo3.network.ws.WebSocketNetworkTransport$execute$3.invoke
    at com.apollographql.apollo3.network.ws.WebSocketNetworkTransport$execute$3.invoke
    at com.apollographql.apollo3.internal.FlowsKt$transformWhile$1$invokeSuspend$$inlined$collectWhile$1.emit(flows.kt:53)
    at com.apollographql.apollo3.internal.FlowsKt$transformWhile$1$invokeSuspend$$inlined$collectWhile$2.emit(flows.kt:38)
    at com.apollographql.apollo3.network.ws.WebSocketNetworkTransport$execute$$inlined$filter$1$2.emit(Emitters.kt:223)
    at kotlinx.coroutines.flow.SubscribedFlowCollector.emit
    at kotlinx.coroutines.flow.SharedFlowImpl.collect$suspendImpl(SharedFlow.kt:383)
    at kotlinx.coroutines.flow.SharedFlowImpl$collect$1.invokeSuspend
    at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
    at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
    at kotlinx.coroutines.internal.LimitedDispatcher.run(LimitedDispatcher.kt:42)
    at kotlinx.coroutines.scheduling.TaskImpl.run(Tasks.kt:95)
    at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:570)
    at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:750)
    at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:677)
    at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:664)
m

mbonnin

02/24/2023, 8:43 PM
Mmm that's unexpected, let me check if I can reproduce. What version are you on?
p

Parijat Shah

02/24/2023, 8:55 PM
3.7.4
m

mbonnin

02/24/2023, 9:48 PM
Can't reproduce using the integration tests 😞
I'm doing this:
Copy code
fun simple() {
    val apolloClient = ApolloClient.Builder()
        .serverUrl("<http://localhost:61504/subscriptions>")
        .build()

    runBlocking {
      val list = apolloClient.subscription(CountSubscription(5, 10000))
          .toFlow()
          .retryWhen { cause, attempt ->
            println("retrying because")
            cause.printStackTrace()
            delay(attempt * attempt * 10000)
            true
          }
          .map { it.data!!.count }
          .toList()
      assertEquals(0.until(5).toList(), list)
    }
  }
And killing the server to simulate a network error
What scope/context are you running your Flow in? I know error handling in Flows can be quite complicated maybe it's related to that?
p

Parijat Shah

02/24/2023, 11:03 PM
Sorry about the late response was in a meeting. I am using custom scope with Main Dispatcher.
Copy code
coroutineScope = CoroutineScope(Dispatchers.Main + SupervisorJob())
My unit test works as well.
m

mbonnin

02/24/2023, 11:29 PM
by "works", you mean it doesn't throw, right?
Does it say anything else in the stack trace? It's weird that there are 2 stacktraces but no "caused by"
Have to call it a day for tonight but if you can setup a reproducer, I'll look into it on Monday.
p

Parijat Shah

02/24/2023, 11:57 PM
by "works", you mean it doesn't throw, right?
In my unit test, I am throwing exception and the behavior is as expected. retry block starts the flow again.
Copy code
retryWhen { t, attempt ->
    delay(attempt * attempt * 10000)
    true
}
Just the update. I released a patch with enabling option
webSocketReopenWhen
and don't see the exception anymore.
Copy code
ApolloClient.Builder()
    .webSocketReopenWhen { _, attempt ->
       // some delay
        true
    }
m

mbonnin

03/03/2023, 5:19 PM
Nice, thanks for the update!