william
10/21/2020, 11:27 PMChannel(Channel.UNLIMITED)
in use to act as a queue between coroutines but am having trouble with my receiving coroutine not seeing any of what the producer is giving. could there be an issue from performing both of this in the scope of <http://Dispatchers.IO|Dispatchers.IO>
(which i believe is multithreaded in java) ? should i be using a single threaded coroutine scope / dispatcher for this to work?Zach Klippenstein (he/him) [MOD]
10/22/2020, 1:09 AMwilliam
10/22/2020, 12:04 PMChannelIterator
interface does say `
Instances of this interface are *not thread-safe* and shall not be used from concurrent coroutines.
Which certainly sounds like what i am doingsendingQueue
is the channel I'm talking about. send
is what adds to it, and when start
is called it starts iterating over the channel
class PersistentWsClient(private val wsClient: WsClient, private val scope: CoroutineScope) {
private var session: WsSession? = null
private val _errors = Channel<Error>()
val errors = _errors.consumeAsFlow()
private val sendingQueue = Channel<Frame>(Channel.UNLIMITED)
private val _incoming = BroadcastChannel<StreamUpdate>(CONFLATED) // TODO is conflated right?
val incoming: Flow<StreamUpdate> = _incoming.asFlow()
fun open(host: String, path: String, port: Int = 8080) {
scope.launch {
when (val open = wsClient.open(host, path, port)) {
is Result.Success -> {
session = open.success
start()
}
is Result.Error -> {
this@PersistentWsClient.w("error opening connection: ${open.error.message}", open.error)
_errors.offer(Error.OpeningConnection(open.error))
// TODO back-off and try again
}
}
}
}
fun send(payload: Frame) {
// ignoring result bc Channel.UNLIMITED always returns true to `offer`
sendingQueue.offer(payload)
this.e("offerring $payload")
}
private fun start() {
_incoming.offer(StreamUpdate.ConnectionChange.Connected)
scope.launch {
session?.let { session ->
this.e("starting persistent client")
launch {
for (frame in sendingQueue) {
this@PersistentWsClient.e("sending queue for $frame")
when (val send = session.send(frame)) {
UnitResult.Success -> {
this@PersistentWsClient.d("send frame")
}
is UnitResult.Error -> {
this@PersistentWsClient.e("error sending message: ${send.error.message}")
_errors.offer(Error.SendingFrame(frame))
// TODO check if disconnected!
}
}
}
}
launch {
session.incoming
.onEach { _incoming.offer(StreamUpdate.FrameWrapper(it)) }
.collect()
}
} ?: e("start called when session id is null!")
}
}
suspend fun close() {
scope.cancel()
session?.close()
session = null
}
sealed class Error {
class OpeningConnection(val e: Exception): Error()
class SendingFrame(val frame: Frame): Error()
}
sealed class StreamUpdate {
class FrameWrapper(frame: Frame): StreamUpdate()
sealed class ConnectionChange: StreamUpdate() {
object ConnectionLost : ConnectionChange()
object Connected : ConnectionChange()
}
}
}
offering ...
starting persistent client
Zach Klippenstein (he/him) [MOD]
10/22/2020, 1:39 PMwilliam
10/22/2020, 9:57 PMlaunch
which printed something out every second and that was going fine