Or a more general question on Channels: if I `clos...
# coroutines
r
Or a more general question on Channels: if I
close()
a
Channel
, do existing subscribers which haven't consumed all items in the channel still get all items which have been
send()
to the channel in the past?
πŸ‘Œ 1
Closing a channel after this function has suspended does not cause this suspended send invocation to abort, because closing a channel is conceptually like sending a special "close token" over this channel. All elements sent over the channel are delivered in first-in first-out order. The sent element will be delivered to receivers before the close token.
c
Yes. Per the docs, calling
channel.close()
essentially places an item into the channel marking the end point. From there,
isClosedForSend
returns true, and once all elements have been drained from the channel,
isClosedForReceive
will return true.
r
Trying to debug this clusterfuck:
Copy code
object CardControlUtils {
    suspend fun waitForReconnect(flow: Flow<CardWithCommands?>, timeout: Duration): MutableStateFlow<CardWithCommands?> {
        val outgoing = MutableStateFlow<CardWithCommands?>(null)

        val delayedReaderDisconnected = SupervisorJob()

        CoroutineScope(coroutineContext).launch {
            var oldChannel: Channel<CardCommand>? = null
            var newChannel: Channel<CardCommand>? = null
            fun reset() {
                outgoing.value = null
                oldChannel?.close()
                oldChannel = null
                newChannel?.close()
                newChannel = null
            }
            flow.collect { card ->
                if (card != null) {
                    fun setupChannels() {
                        oldChannel = card.commandChannel
                        newChannel = Channel<CardCommand>(5)
                        outgoing.value = card.copy(
                            commandChannel = newChannel!!
                        )
                    }
                    if (outgoing.value == null) {
                        setupChannels()
                    } else {
                        delayedReaderDisconnected.cancelChildren()
                        if (card.message == outgoing.value?.message) {
                            Timber.d { "Reader here again, all fine." }
                        } else {
                            newChannel?.send(CardCommand.TagLeftField)
                            reset()
                            setupChannels()
                        }
                    }
                    CoroutineScope(coroutineContext).launch {
                        println("Starting to consume card command channel: $oldChannel")
                        if (oldChannel != null) {
                            oldChannel?.consumeAsFlow()?.collect { msg ->
                                println("Message: $msg")
                                when (msg) {
                                    CardCommand.ReaderDisconnected -> {
                                        Timber.d { "Reader disconnected, ignoring for now..." }
                                        launch(delayedReaderDisconnected) {
                                            delay(timeout)
                                            Timber.d { "Sending ReaderDisconnected" }
                                            newChannel?.send(msg)
                                            reset()
                                        }
                                    }

                                    CardCommand.RotatedCard -> newChannel?.send(msg)
                                    CardCommand.SkippedCard -> newChannel?.send(msg)
                                    CardCommand.TagLeftField -> {
                                        newChannel?.send(msg)
                                        reset()
                                    }
                                }
                            }
                        }
                        println("Done consuming.")
                    }
                } else {
                    reset()
                }
            }
        }

        return outgoing
    }
}
Test:
Copy code
class CardControlUtilsTest {
    @Test
    fun testStandardForward() {
        val scope = CoroutineScope(Dispatchers.Default)
        val flow = Channel<CardWithCommands?>(5)
        scope.launch {
            flow.send(null)
            val channel = Channel<CardCommand>(5)
            flow.send(
                CardWithCommands(
                    <http://CardMessage.AI|CardMessage.AI>,
                    channel,
                    MutableStateFlow("uid"),
                    scope,
                )
            )
            println("launched channel $channel")
            delay(500.milliseconds.toJavaDuration())
            channel.send(CardCommand.TagLeftField)
            println("sent")
            channel.close()
            flow.send(null)
        }
        runBlocking {
            val gotFlow = waitForReconnect(flow.consumeAsFlow(), 1.seconds)
            gotFlow.test {
                assertEquals(null, awaitItem())
                val card = awaitItem()
                println("got item: $card")
                assertEquals(<http://CardMessage.AI|CardMessage.AI>, card?.message)
                val commandFlow = card?.commandChannel?.receiveAsFlow()
                scope.launch {
                    commandFlow?.collect {
                        println(it.toString())
                    }
                }
                assertNotNull(commandFlow)
                commandFlow!!.test {
                    assertEquals(CardCommand.TagLeftField, awaitItem())
                    println("got left field")
                    awaitComplete()
                }
                println("got commandflow completed")
                assertEquals(null, awaitItem())
            }
        }
    }
}
Result:
Copy code
launched channel capacity=5,data=[kotlinx.coroutines.channels.BufferedChannel$BufferedChannelIterator@3997e019]
got item: CardWithCommands(message=AI(),cardJob=SupervisorJobImpl{Active},uid=uid)
Starting to consume card command channel: capacity=5,data=[kotlinx.coroutines.channels.BufferedChannel$BufferedChannelIterator@3997e019]
sent
Done consuming.

Expected item but found Complete
I'd expect a
Copy code
Message: ...
somewhere, but that item doesn't arrive
Tested some more, the first message doesn't arrive, but if I send a second one, it goes through... WTF?
c
The example is pretty long and convoluted, it’s hard to know what exactly you’re trying to do. What’s the actual use-case here?
But one thing I notice: you’re launching a job within
flow.collect { card ->
, each of which is running in parallel to each other and to the main flow. However, there’s no synchronization around the channels, so I’d guess you’re running into race conditions, as this is not following the conventions of Flow processing
r
I'm not really sure how to do nested flows exactly, the current setup is
Copy code
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                                                                         β”‚
β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚ β”‚Card                            β”‚  β”‚Card                           β”‚   β”‚
β”‚ β”‚                                β”‚  β”‚                               β”‚   β”‚
β”‚ β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”‚  β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚   β”‚
β”‚ β”‚ β”‚Command    β”‚ β”‚Command    β”‚    β”‚  β”‚ β”‚Command    β”‚ β”‚Command      β”‚ β”‚   β”‚
β”‚ β”‚ β”‚           β”‚ β”‚           β”‚    β”‚  β”‚ β”‚           β”‚ β”‚             β”‚ β”‚   β”‚
β”‚ β”‚ β”‚           β”‚ β”‚           β”‚    β”‚  β”‚ β”‚           β”‚ β”‚             β”‚ β”‚   β”‚
β”‚ β”‚ β”‚           β”‚β–Ίβ”‚           β”‚    β”‚  β”‚ β”‚           β”‚β–Ίβ”‚             β”‚ β”‚   β”‚
β”‚ β”‚ β”‚           β”‚ β”‚           β”‚    β”‚  β”‚ β”‚           β”‚ β”‚             β”‚ β”‚   β”‚
β”‚ β”‚ β”‚           β”‚ β”‚           β”‚    β”œβ”€β–Ίβ”‚ β”‚           β”‚ β”‚             β”‚ β”‚   β”‚
β”‚ β”‚ β”‚           β”‚ β”‚           β”‚    β”‚  β”‚ β”‚           β”‚ β”‚             β”‚ β”‚   β”‚
β”‚ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚  β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚   β”‚
β”‚ β”‚                                β”‚  β”‚                               β”‚   β”‚
β”‚ β”‚                                β”‚  β”‚                               β”‚   β”‚
β”‚ β”‚                                β”‚  β”‚                               β”‚   β”‚
β”‚ β”‚                                β”‚  β”‚                               β”‚   β”‚
β”‚ β”‚                                β”‚  β”‚                               β”‚   β”‚
β”‚ β”‚                                β”‚  β”‚                               β”‚   β”‚
β”‚ β”‚                                β”‚  β”‚                               β”‚   β”‚
β”‚ β”‚                                β”‚  β”‚                               β”‚   β”‚
β”‚ β”‚                                β”‚  β”‚                               β”‚   β”‚
β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                         β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
So I have a nested channel structure, where the
Card
object has some info. I'm currently representing that with nested flows/channels where I close the inner channel when the outer completes.
... now I'm writing a function which merges some of the
Card
timelines into single ones.
c
So you have a
Card
, and you want to load the content for each Card’s
commands
in parallel? And do you want each Card to also be running in parallel?
r
No, I want to merge two cards, like so:
Copy code
β”‚                                     
                                     β”‚                                     
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                                    β–Ό                                    β”‚
β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚ β”‚Card                            β”‚  β”‚Card                           β”‚   β”‚
β”‚ β”‚                                β”‚  β”‚                               β”‚   β”‚
β”‚ β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”‚  β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚   β”‚
β”‚ β”‚ β”‚Command    β”‚ β”‚Command    β”‚    β”‚  β”‚ β”‚Command    β”‚ β”‚Command      β”‚ β”‚   β”‚
β”‚ β”‚ β”‚           β”‚ β”‚           β”‚    β”‚  β”‚ β”‚           β”‚ β”‚             β”‚ β”‚   β”‚
β”‚ β”‚ β”‚           β”‚ β”‚           β”‚    β”‚  β”‚ β”‚           β”‚ β”‚             β”‚ β”‚   β”‚
β”‚ β”‚ β”‚           β”‚β–Ίβ”‚           β”‚    β”‚  β”‚ β”‚           β”‚β–Ίβ”‚             β”‚ β”‚   β”‚
β”‚ β”‚ β”‚           β”‚ β”‚           β”‚    β”‚  β”‚ β”‚           β”‚ β”‚             β”‚ β”‚   β”‚
β”‚ β”‚ β”‚           β”‚ β”‚           β”‚    β”œβ”€β–Ίβ”‚ β”‚           β”‚ β”‚             β”‚ β”‚   β”‚
β”‚ β”‚ β”‚           β”‚ β”‚           β”‚    β”‚  β”‚ β”‚           β”‚ β”‚             β”‚ β”‚   β”‚
β”‚ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚  β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚   β”‚
β”‚ β”‚                                β”‚  β”‚                               β”‚   β”‚
β”‚ β”‚                                β”‚  β”‚                               β”‚   β”‚
β”‚ β”‚                                β”‚  β”‚                               β”‚   β”‚
β”‚ β”‚                                β”‚  β”‚                               β”‚   β”‚
β”‚ β”‚                                β”‚  β”‚                               β”‚   β”‚
β”‚ β”‚                                β”‚  β”‚                               β”‚   β”‚
β”‚ β”‚                                β”‚  β”‚                               β”‚   β”‚
β”‚ β”‚                                β”‚  β”‚                               β”‚   β”‚
β”‚ β”‚                                β”‚  β”‚                               β”‚   β”‚
β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                         β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
So I'm sticking together the two command channels one after another
Nope, removed the fork/launch, same result still, first message is getting swallowed πŸ€”
... conceptually, the forking doesn't make sense either, the channel of the first card will be closed before the second card arrives
c
Ignoring the channels for a moment, because there might be standard Flow operators that do what you want. And I also don’t really understand the diagrams, so can you please explain the use-case just in terms of high-level data flow? Is it that you have a card reader which is issuing commands from multiple cards in parallel, but you want to process all commands from one card before starting to process the next?
In general, Flows process data sequentially, so doing all the work of launching other coroutines inside
flow.collect { }
is basically ignoring everything about the flow. Likewise, channels are used to share data between two different coroutines, so if you’re creating and closing them rapidly like this, the Channel instance probably isn’t shared between the launched coroutines like you expect.
r
The card reader is issuing commands in sequence, like card on field. Then it sends a "Card" message along. Each card also contains a channel for commands, e.g. "tag left field", aka the card left the card reader, or "reader disconnected". Now I wanna add a delay on the "reader disconnected" logic, so it doesn't get send forward immediately, instead it should wait a few seconds to forward it, and if the reader reappears, act as if nothing happened.
The channels are the same, at least according to the object ids
c
I think you need to change the logic so it’s managing the cancellation of Jobs, not Channels. Perhaps something like this will better handle the kind of logic you need:
Copy code
var cardDisconnectedJob: Job? = null
card
    .commandChannel
    .filterNotNull()
    .onEach { msg: CardCommand ->
        // cancel the job that's waiting for the timeout, if it exists
        cardDisconnectedJob?.cancel()
        when (msg) {
            CardCommand.ReaderDisconnected -> {
                cardDisconnectedJob = launch(delayedReaderDisconnected) {
                    delay(timeout)
                    // we exceeded the timeout, continue cancelling the whole card processing
                }
            }

            else -> { 
                outgoing.send(msg)
            }
        }

    }
    .launchIn(scope)
r
Split it up a bit, now it should be easier to understand:
Copy code
suspend fun waitForReconnect(flow: Flow<CardWithCommands?>, delay: Duration): MutableStateFlow<CardWithCommands?> {
        val outgoing = MutableStateFlow<CardWithCommands?>(null)

        val delayedReaderDisconnected = SupervisorJob()
        val topScope = CoroutineScope(coroutineContext)

        topScope.launch {
            var newChannel: Channel<CardCommand>? = null
            fun reset() {
                outgoing.value = null
                newChannel?.close()
                newChannel = null
            }
            suspend fun setupConsume(channel: Channel<CardCommand>, to: Channel<CardCommand>) {
                channel.consumeEach { msg ->
                    println("Message: $msg")
                    when (msg) {
                        CardCommand.ReaderDisconnected -> {
                            Timber.d { "Reader disconnected, ignoring for now..." }
                            launch(delayedReaderDisconnected) {
                                delay(delay)
                                Timber.d { "Sending ReaderDisconnected" }
                                to.send(msg)
                                reset()
                            }
                        }

                        CardCommand.RotatedCard -> to.send(msg)
                        CardCommand.SkippedCard -> to.send(msg)
                        CardCommand.TagLeftField -> {
                            to.send(msg)
                            reset()
                        }
                    }
                }
            }
            flow.collect { card ->
                delayedReaderDisconnected.cancelChildren()
                if (card != null) {
                    suspend fun setupChannels() {
                        newChannel = Channel<CardCommand>(5)
                        outgoing.value = card.copy(
                            commandChannel = newChannel!!
                        )
                        setupConsume(card.commandChannel, newChannel!!)
                    }
                    if (outgoing.value == null) {
                        setupChannels()
                    } else {
                        if (card.message == outgoing.value?.message) {
                            Timber.d { "Reader here again, all fine." }
                            setupConsume(card.commandChannel, newChannel!!)
                        } else {
                            newChannel?.send(CardCommand.TagLeftField)
                            reset()
                            setupChannels()
                        }
                    }
                } else {
                    reset()
                }
            }
        }

        return outgoing
    }
My main problem is still lack of the first message I send into the channel πŸ€”