Parijat Shah
02/24/2023, 8:32 PMsubscription(someSubscription).toFlow().retryWhen { t, attempt ->
delay(attempt * attempt * 10000)
true
}
java.io.EOFException: null
at okio.RealBufferedSource.require(RealBufferedSource.kt:199)
at okio.RealBufferedSource.readByte(RealBufferedSource.kt:209)
at okhttp3.internal.ws.WebSocketReader.readHeader(WebSocketReader.kt:119)
at okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.kt:102)
at okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.kt:293)
at okhttp3.internal.ws.RealWebSocket$connect$1.onResponse(RealWebSocket.kt:195)
at okhttp3.internal.connection.RealCall$AsyncCall.run(RealCall.kt:519)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1167)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
at java.lang.Thread.run(Thread.java:764)
com.apollographql.apollo3.exception.ApolloNetworkException: Network error while executing onDisplayTimeUpdated
at com.apollographql.apollo3.network.ws.WebSocketNetworkTransport$execute$$inlined$map$1$2.emit(Emitters.kt:246)
at kotlinx.coroutines.flow.internal.SafeCollectorKt$emitFun$1.invoke(SafeCollector.kt:15)
at kotlinx.coroutines.flow.internal.SafeCollectorKt$emitFun$1.invoke(SafeCollector.kt:15)
at kotlinx.coroutines.flow.internal.SafeCollector.emit(SafeCollector.kt:87)
at kotlinx.coroutines.flow.internal.SafeCollector.emit(SafeCollector.kt:66)
at com.apollographql.apollo3.network.ws.WebSocketNetworkTransport$execute$3.invokeSuspend(WebSocketNetworkTransport.kt:271)
at com.apollographql.apollo3.network.ws.WebSocketNetworkTransport$execute$3.invoke
at com.apollographql.apollo3.network.ws.WebSocketNetworkTransport$execute$3.invoke
at com.apollographql.apollo3.internal.FlowsKt$transformWhile$1$invokeSuspend$$inlined$collectWhile$1.emit(flows.kt:53)
at com.apollographql.apollo3.internal.FlowsKt$transformWhile$1$invokeSuspend$$inlined$collectWhile$2.emit(flows.kt:38)
at com.apollographql.apollo3.network.ws.WebSocketNetworkTransport$execute$$inlined$filter$1$2.emit(Emitters.kt:223)
at kotlinx.coroutines.flow.SubscribedFlowCollector.emit
at kotlinx.coroutines.flow.SharedFlowImpl.collect$suspendImpl(SharedFlow.kt:383)
at kotlinx.coroutines.flow.SharedFlowImpl$collect$1.invokeSuspend
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
at kotlinx.coroutines.internal.LimitedDispatcher.run(LimitedDispatcher.kt:42)
at kotlinx.coroutines.scheduling.TaskImpl.run(Tasks.kt:95)
at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:570)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:750)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:677)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:664)
mbonnin
02/24/2023, 8:43 PMParijat Shah
02/24/2023, 8:55 PMmbonnin
02/24/2023, 9:48 PMfun simple() {
val apolloClient = ApolloClient.Builder()
.serverUrl("<http://localhost:61504/subscriptions>")
.build()
runBlocking {
val list = apolloClient.subscription(CountSubscription(5, 10000))
.toFlow()
.retryWhen { cause, attempt ->
println("retrying because")
cause.printStackTrace()
delay(attempt * attempt * 10000)
true
}
.map { it.data!!.count }
.toList()
assertEquals(0.until(5).toList(), list)
}
}
Parijat Shah
02/24/2023, 11:03 PMcoroutineScope = CoroutineScope(Dispatchers.Main + SupervisorJob())
mbonnin
02/24/2023, 11:29 PMParijat Shah
02/24/2023, 11:57 PMby "works", you mean it doesn't throw, right?In my unit test, I am throwing exception and the behavior is as expected. retry block starts the flow again.
retryWhen { t, attempt ->
delay(attempt * attempt * 10000)
true
}
webSocketReopenWhen
and don't see the exception anymore.ApolloClient.Builder()
.webSocketReopenWhen { _, attempt ->
// some delay
true
}
mbonnin
03/03/2023, 5:19 PM