https://kotlinlang.org logo
#coroutines
Title
# coroutines
p

Pere Casafont

08/29/2019, 2:04 PM
I have a service that uses ktor + websockets + broadcast channel. After some hours it stops responding to http requests and
Copy code
sayX()
GlobalScore.launch(Dispatchers.Default) { sayY() }
triggers X but not Y... it looks like the whole coroutines engine goes down. Is that possible?
d

Dominaezzz

08/29/2019, 2:22 PM
Dispatchers.Default
uses a fixed number of threads, so if you call blocking APIs in it, then something like this might happen. Also if you have lots of long running cpu-intensive operations, this can happen too.
But based on the technologies you're using, I doubt either of those are in play here.
p

Pere Casafont

08/29/2019, 2:26 PM
right, no CPU intensive operations at all here
d

Dominaezzz

08/29/2019, 2:26 PM
Do you do a lot of `launch`es?
p

Pere Casafont

08/29/2019, 2:26 PM
broadcast channels are still experimental, maybe I found a bug there, but everything just stalls silently so it's hard to figure out what's happening
once every 10 min
and the application's coroutines stop working after ~10h
the HTTP API sends empty responses, the launches don't trigger their content and the websocket goes down
d

Dominaezzz

08/29/2019, 2:29 PM
I reckon it's a ktor issue as supposed to broadcast channels.
Have you tried using a different dispatcher to see if the behaviour changes?
p

Pere Casafont

08/29/2019, 2:31 PM
no, but I don't think the ktor API uses the default dispatcher for its features and that one stalls as well
d

Dominaezzz

08/29/2019, 2:37 PM
If you use
<http://Dispatchers.IO|Dispatchers.IO>
and the stalls happen less or don't happen, then there's an implementation bug. If nothing changes, then maybe the broadcast channel isn't being used correctly.
Are you able to reproduce this?
p

Pere Casafont

08/29/2019, 2:49 PM
nope
thank you for all the help, though, I'll pay more attention to the broadcast channel
d

Dominaezzz

08/29/2019, 2:53 PM
The only thing I can think of is, make sure you call
cancel
once you're done with
openSubscription()
. (Or just use
consume
).
p

Pere Casafont

08/29/2019, 6:39 PM
I iterate over it, which should be the same as
consume
d

Dico

08/29/2019, 8:56 PM
That's not the same.
consume
will close the channel when the coroutine dies
`consume can be the only receiver of a channel
Iterating is just receiving items as they become available, and multiple coroutines can be doing this, until the corresponding SendChannel is closed.
d

David Glasser

08/30/2019, 1:15 AM
I'd suggest getting a thread dump and making sure there isn't anything surprising blocking. With jstack or something like yourkit
u

uli

08/30/2019, 4:06 AM
Could be a dead lock. Do you have any circular event push? https://medium.com/@elizarov/deadlocks-in-non-hierarchical-csp-e5910d137cc
p

Pere Casafont

08/30/2019, 9:13 AM
Copy code
Exception in thread "nioEventLoopGroup-3-2" io.ktor.util.cio.ChannelWriteException: Cannot write to a channel
        at io.ktor.server.netty.cio.NettyResponsePipeline.processCallFailed(NettyResponsePipeline.kt:144)
        at io.ktor.server.netty.cio.NettyResponsePipeline.access$processCallFailed(NettyResponsePipeline.kt:26)
        at io.ktor.server.netty.cio.NettyResponsePipeline.processJobs(NettyResponsePipeline.kt:447)
        at io.ktor.server.netty.cio.NettyResponsePipeline$processJobs$1.invokeSuspend(NettyResponsePipeline.kt)
        at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
        at kotlinx.coroutines.DispatchedTask.run(Dispatched.kt:238)
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:416)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:515)
        at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.nio.channels.ClosedChannelException
        at io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:955)
        at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:863)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1365)
        at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:716)
        at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:763)
        at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:789)
        at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:757)
        at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:766)
        at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:789)
        at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:757)
        at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:766)
        at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:789)
        at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:757)
        at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:812)
        at io.ktor.server.netty.cio.NettyResponsePipeline$processBodyFlusher$2.invokeSuspend(NettyResponsePipeline.kt:312)
        ... 9 more
after using
Dispatchers.Unconfined
it stopped stalling, but that error keeps happening sometimes (very rarely and only when deployed to acceptance)
the code is
Copy code
webSocket("/notifications") {
            try {
                // Subscribe to the notifications channel
                val notificationChannel = notifierService.channel.openSubscription()
                for (notification in notificationChannel) {
                    val text = gson.toJson(notification)
                    outgoing.send(Frame.Text(text))
                }
            } catch (e: ClosedSendChannelException) {
                <http://logger.info|logger.info> { "Local socket ${call.request.origin.remoteHost} closed: ${e.message}." }
            } catch (e: ClosedReceiveChannelException) {
                <http://logger.info|logger.info> { "Remote socket ${call.request.origin.remoteHost} closed: ${e.message}" }
            } catch (e: Throwable) {
                logger.error(e) { "Error on web socket" }
            }
        }
where
notifierService.channel
is a BroadcastChannel which receives events from another thread, but totally unrelated to the ktor websockets/API as what fires these events is the consumption of an external API
d

Dominaezzz

08/30/2019, 9:56 AM
Looks like
notificationChannel
is never cancelled.
p

Pere Casafont

08/30/2019, 10:26 AM
Copy code
/**
     * Returns new iterator to receive elements from this channels using `for` loop.
     * Iteration completes normally when the channel is [isClosedForReceive] without cause and
     * throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
     */
    public operator fun iterator(): ChannelIterator<E>
d

Dominaezzz

08/30/2019, 10:33 AM
Does the iteration ever complete?
p

Pere Casafont

08/30/2019, 3:06 PM
this broadcast channel is a single one for the whole app, so it never completes, but when the websocket closes an exception is thrown
oh, thinking about it, in that case what closes is the socket but not the notification channel
thanks
8 Views