https://kotlinlang.org logo
#coroutines
Title
# coroutines
w

william

10/21/2020, 11:27 PM
I have a
Channel(Channel.UNLIMITED)
in use to act as a queue between coroutines but am having trouble with my receiving coroutine not seeing any of what the producer is giving. could there be an issue from performing both of this in the scope of
<http://Dispatchers.IO|Dispatchers.IO>
(which i believe is multithreaded in java) ? should i be using a single threaded coroutine scope / dispatcher for this to work?
z

Zach Klippenstein (he/him) [MOD]

10/22/2020, 1:09 AM
That's what channels were designed for, definitely don't need a special dispatcher for it. Can you put some code?
w

william

10/22/2020, 12:04 PM
I can post some code, but the
ChannelIterator
interface does say `
Copy code
Instances of this interface are *not thread-safe* and shall not be used from concurrent coroutines.
Which certainly sounds like what i am doing
@Zach Klippenstein (he/him) [MOD] here is the relevant class.
sendingQueue
is the channel I'm talking about.
send
is what adds to it, and when
start
is called it starts iterating over the channel
Copy code
class PersistentWsClient(private val wsClient: WsClient, private val scope: CoroutineScope) {
    private var session: WsSession? = null
    private val _errors = Channel<Error>()
    val errors = _errors.consumeAsFlow()

    private val sendingQueue = Channel<Frame>(Channel.UNLIMITED)
    private val _incoming = BroadcastChannel<StreamUpdate>(CONFLATED) // TODO is conflated right?
    val incoming: Flow<StreamUpdate> = _incoming.asFlow()

    fun open(host: String, path: String, port: Int = 8080) {
        scope.launch {
            when (val open = wsClient.open(host, path, port)) {
                is Result.Success -> {
                    session = open.success
                    start()
                }
                is Result.Error -> {
                    this@PersistentWsClient.w("error opening connection: ${open.error.message}", open.error)
                    _errors.offer(Error.OpeningConnection(open.error))

                    // TODO back-off and try again
                }
            }
        }
    }

    fun send(payload: Frame) {
        // ignoring result bc Channel.UNLIMITED always returns true to `offer`
        sendingQueue.offer(payload)
        this.e("offerring $payload")
    }

    private fun start() {
        _incoming.offer(StreamUpdate.ConnectionChange.Connected)
        scope.launch {
            session?.let { session ->
                this.e("starting persistent client")
                launch {
                    for (frame in sendingQueue) {
                        this@PersistentWsClient.e("sending queue for $frame")
                        when (val send = session.send(frame)) {
                            UnitResult.Success -> {
                                this@PersistentWsClient.d("send frame")
                            }
                            is UnitResult.Error -> {
                                this@PersistentWsClient.e("error sending message: ${send.error.message}")
                                _errors.offer(Error.SendingFrame(frame))

                                // TODO check if disconnected!
                            }
                        }
                    }
                }

                launch {
                    session.incoming
                        .onEach { _incoming.offer(StreamUpdate.FrameWrapper(it)) }
                        .collect()
                }
            } ?: e("start called when session id is null!")
        }
    }

    suspend fun close() {
        scope.cancel()
        session?.close()
        session = null
    }

    sealed class Error {
        class OpeningConnection(val e: Exception): Error()
        class SendingFrame(val frame: Frame): Error()
    }

    sealed class StreamUpdate {
        class FrameWrapper(frame: Frame): StreamUpdate()

        sealed class ConnectionChange: StreamUpdate() {
            object ConnectionLost : ConnectionChange()
            object Connected : ConnectionChange()
        }
    }
}
the only log messages that i see are
Copy code
offering ...
starting persistent client
z

Zach Klippenstein (he/him) [MOD]

10/22/2020, 1:39 PM
I haven't had my morning coffee yet, but nothing in this code jumps out to me as a reason this wouldn't work. That restriction on ChannelIterator applies to the iterator, not the channel itself. Don't call channel.iterator() (which is what Kotlin's for loop does under the hood) and use that value across threads.
Have you attached a debugger and stepped through this at all? One thing to look out for are what is the state of your jobs at each point - is something cancelling them unexpectedly?
w

william

10/22/2020, 9:57 PM
i can look more into that, one thing i did check was that the coroutine with the for loop wasn't getting cancelled - i had another
launch
which printed something out every second and that was going fine
hmm
4 Views