Pere Casafont
08/29/2019, 2:04 PMsayX()
GlobalScore.launch(Dispatchers.Default) { sayY() }
triggers X but not Y... it looks like the whole coroutines engine goes down. Is that possible?Dominaezzz
08/29/2019, 2:22 PMDispatchers.Default
uses a fixed number of threads, so if you call blocking APIs in it, then something like this might happen. Also if you have lots of long running cpu-intensive operations, this can happen too.Pere Casafont
08/29/2019, 2:26 PMDominaezzz
08/29/2019, 2:26 PMPere Casafont
08/29/2019, 2:26 PMDominaezzz
08/29/2019, 2:29 PMPere Casafont
08/29/2019, 2:31 PMDominaezzz
08/29/2019, 2:37 PM<http://Dispatchers.IO|Dispatchers.IO>
and the stalls happen less or don't happen, then there's an implementation bug. If nothing changes, then maybe the broadcast channel isn't being used correctly.Pere Casafont
08/29/2019, 2:49 PMDominaezzz
08/29/2019, 2:53 PMcancel
once you're done with openSubscription()
. (Or just use consume
).Pere Casafont
08/29/2019, 6:39 PMconsume
Dico
08/29/2019, 8:56 PMconsume
will close the channel when the coroutine diesDavid Glasser
08/30/2019, 1:15 AMuli
08/30/2019, 4:06 AMPere Casafont
08/30/2019, 9:13 AMException in thread "nioEventLoopGroup-3-2" io.ktor.util.cio.ChannelWriteException: Cannot write to a channel
at io.ktor.server.netty.cio.NettyResponsePipeline.processCallFailed(NettyResponsePipeline.kt:144)
at io.ktor.server.netty.cio.NettyResponsePipeline.access$processCallFailed(NettyResponsePipeline.kt:26)
at io.ktor.server.netty.cio.NettyResponsePipeline.processJobs(NettyResponsePipeline.kt:447)
at io.ktor.server.netty.cio.NettyResponsePipeline$processJobs$1.invokeSuspend(NettyResponsePipeline.kt)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTask.run(Dispatched.kt:238)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:416)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:515)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.nio.channels.ClosedChannelException
at io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:955)
at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:863)
at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1365)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:716)
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:763)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:789)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:757)
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:766)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:789)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:757)
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:766)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:789)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:757)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:812)
at io.ktor.server.netty.cio.NettyResponsePipeline$processBodyFlusher$2.invokeSuspend(NettyResponsePipeline.kt:312)
... 9 more
Dispatchers.Unconfined
it stopped stalling, but that error keeps happening sometimes (very rarely and only when deployed to acceptance)webSocket("/notifications") {
try {
// Subscribe to the notifications channel
val notificationChannel = notifierService.channel.openSubscription()
for (notification in notificationChannel) {
val text = gson.toJson(notification)
outgoing.send(Frame.Text(text))
}
} catch (e: ClosedSendChannelException) {
<http://logger.info|logger.info> { "Local socket ${call.request.origin.remoteHost} closed: ${e.message}." }
} catch (e: ClosedReceiveChannelException) {
<http://logger.info|logger.info> { "Remote socket ${call.request.origin.remoteHost} closed: ${e.message}" }
} catch (e: Throwable) {
logger.error(e) { "Error on web socket" }
}
}
where notifierService.channel
is a BroadcastChannel which receives events from another thread, but totally unrelated to the ktor websockets/API as what fires these events is the consumption of an external APIDominaezzz
08/30/2019, 9:56 AMnotificationChannel
is never cancelled.Pere Casafont
08/30/2019, 10:26 AM/**
* Returns new iterator to receive elements from this channels using `for` loop.
* Iteration completes normally when the channel is [isClosedForReceive] without cause and
* throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
*/
public operator fun iterator(): ChannelIterator<E>
Dominaezzz
08/30/2019, 10:33 AMPere Casafont
08/30/2019, 3:06 PM