I have a service that uses ktor + websockets + bro...
# coroutines
p
I have a service that uses ktor + websockets + broadcast channel. After some hours it stops responding to http requests and
Copy code
sayX()
GlobalScore.launch(Dispatchers.Default) { sayY() }
triggers X but not Y... it looks like the whole coroutines engine goes down. Is that possible?
d
Dispatchers.Default
uses a fixed number of threads, so if you call blocking APIs in it, then something like this might happen. Also if you have lots of long running cpu-intensive operations, this can happen too.
But based on the technologies you're using, I doubt either of those are in play here.
p
right, no CPU intensive operations at all here
d
Do you do a lot of `launch`es?
p
broadcast channels are still experimental, maybe I found a bug there, but everything just stalls silently so it's hard to figure out what's happening
once every 10 min
and the application's coroutines stop working after ~10h
the HTTP API sends empty responses, the launches don't trigger their content and the websocket goes down
d
I reckon it's a ktor issue as supposed to broadcast channels.
Have you tried using a different dispatcher to see if the behaviour changes?
p
no, but I don't think the ktor API uses the default dispatcher for its features and that one stalls as well
d
If you use
<http://Dispatchers.IO|Dispatchers.IO>
and the stalls happen less or don't happen, then there's an implementation bug. If nothing changes, then maybe the broadcast channel isn't being used correctly.
Are you able to reproduce this?
p
nope
thank you for all the help, though, I'll pay more attention to the broadcast channel
d
The only thing I can think of is, make sure you call
cancel
once you're done with
openSubscription()
. (Or just use
consume
).
p
I iterate over it, which should be the same as
consume
d
That's not the same.
consume
will close the channel when the coroutine dies
`consume can be the only receiver of a channel
Iterating is just receiving items as they become available, and multiple coroutines can be doing this, until the corresponding SendChannel is closed.
d
I'd suggest getting a thread dump and making sure there isn't anything surprising blocking. With jstack or something like yourkit
u
Could be a dead lock. Do you have any circular event push? https://medium.com/@elizarov/deadlocks-in-non-hierarchical-csp-e5910d137cc
p
Copy code
Exception in thread "nioEventLoopGroup-3-2" io.ktor.util.cio.ChannelWriteException: Cannot write to a channel
        at io.ktor.server.netty.cio.NettyResponsePipeline.processCallFailed(NettyResponsePipeline.kt:144)
        at io.ktor.server.netty.cio.NettyResponsePipeline.access$processCallFailed(NettyResponsePipeline.kt:26)
        at io.ktor.server.netty.cio.NettyResponsePipeline.processJobs(NettyResponsePipeline.kt:447)
        at io.ktor.server.netty.cio.NettyResponsePipeline$processJobs$1.invokeSuspend(NettyResponsePipeline.kt)
        at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
        at kotlinx.coroutines.DispatchedTask.run(Dispatched.kt:238)
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:416)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:515)
        at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.nio.channels.ClosedChannelException
        at io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:955)
        at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:863)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1365)
        at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:716)
        at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:763)
        at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:789)
        at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:757)
        at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:766)
        at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:789)
        at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:757)
        at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:766)
        at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:789)
        at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:757)
        at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:812)
        at io.ktor.server.netty.cio.NettyResponsePipeline$processBodyFlusher$2.invokeSuspend(NettyResponsePipeline.kt:312)
        ... 9 more
after using
Dispatchers.Unconfined
it stopped stalling, but that error keeps happening sometimes (very rarely and only when deployed to acceptance)
the code is
Copy code
webSocket("/notifications") {
            try {
                // Subscribe to the notifications channel
                val notificationChannel = notifierService.channel.openSubscription()
                for (notification in notificationChannel) {
                    val text = gson.toJson(notification)
                    outgoing.send(Frame.Text(text))
                }
            } catch (e: ClosedSendChannelException) {
                <http://logger.info|logger.info> { "Local socket ${call.request.origin.remoteHost} closed: ${e.message}." }
            } catch (e: ClosedReceiveChannelException) {
                <http://logger.info|logger.info> { "Remote socket ${call.request.origin.remoteHost} closed: ${e.message}" }
            } catch (e: Throwable) {
                logger.error(e) { "Error on web socket" }
            }
        }
where
notifierService.channel
is a BroadcastChannel which receives events from another thread, but totally unrelated to the ktor websockets/API as what fires these events is the consumption of an external API
d
Looks like
notificationChannel
is never cancelled.
p
Copy code
/**
     * Returns new iterator to receive elements from this channels using `for` loop.
     * Iteration completes normally when the channel is [isClosedForReceive] without cause and
     * throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
     */
    public operator fun iterator(): ChannelIterator<E>
d
Does the iteration ever complete?
p
this broadcast channel is a single one for the whole app, so it never completes, but when the websocket closes an exception is thrown
oh, thinking about it, in that case what closes is the socket but not the notification channel
thanks