reactormonk
04/23/2024, 5:25 PMclose()
a Channel
, do existing subscribers which haven't consumed all items in the channel still get all items which have been send()
to the channel in the past?reactormonk
04/23/2024, 5:44 PMClosing a channel after this function has suspended does not cause this suspended send invocation to abort, because closing a channel is conceptually like sending a special "close token" over this channel. All elements sent over the channel are delivered in first-in first-out order. The sent element will be delivered to receivers before the close token.
Casey Brooks
04/23/2024, 5:44 PMchannel.close()
essentially places an item into the channel marking the end point. From there, isClosedForSend
returns true, and once all elements have been drained from the channel, isClosedForReceive
will return true.reactormonk
04/23/2024, 5:46 PMobject CardControlUtils {
suspend fun waitForReconnect(flow: Flow<CardWithCommands?>, timeout: Duration): MutableStateFlow<CardWithCommands?> {
val outgoing = MutableStateFlow<CardWithCommands?>(null)
val delayedReaderDisconnected = SupervisorJob()
CoroutineScope(coroutineContext).launch {
var oldChannel: Channel<CardCommand>? = null
var newChannel: Channel<CardCommand>? = null
fun reset() {
outgoing.value = null
oldChannel?.close()
oldChannel = null
newChannel?.close()
newChannel = null
}
flow.collect { card ->
if (card != null) {
fun setupChannels() {
oldChannel = card.commandChannel
newChannel = Channel<CardCommand>(5)
outgoing.value = card.copy(
commandChannel = newChannel!!
)
}
if (outgoing.value == null) {
setupChannels()
} else {
delayedReaderDisconnected.cancelChildren()
if (card.message == outgoing.value?.message) {
Timber.d { "Reader here again, all fine." }
} else {
newChannel?.send(CardCommand.TagLeftField)
reset()
setupChannels()
}
}
CoroutineScope(coroutineContext).launch {
println("Starting to consume card command channel: $oldChannel")
if (oldChannel != null) {
oldChannel?.consumeAsFlow()?.collect { msg ->
println("Message: $msg")
when (msg) {
CardCommand.ReaderDisconnected -> {
Timber.d { "Reader disconnected, ignoring for now..." }
launch(delayedReaderDisconnected) {
delay(timeout)
Timber.d { "Sending ReaderDisconnected" }
newChannel?.send(msg)
reset()
}
}
CardCommand.RotatedCard -> newChannel?.send(msg)
CardCommand.SkippedCard -> newChannel?.send(msg)
CardCommand.TagLeftField -> {
newChannel?.send(msg)
reset()
}
}
}
}
println("Done consuming.")
}
} else {
reset()
}
}
}
return outgoing
}
}
Test:
class CardControlUtilsTest {
@Test
fun testStandardForward() {
val scope = CoroutineScope(Dispatchers.Default)
val flow = Channel<CardWithCommands?>(5)
scope.launch {
flow.send(null)
val channel = Channel<CardCommand>(5)
flow.send(
CardWithCommands(
<http://CardMessage.AI|CardMessage.AI>,
channel,
MutableStateFlow("uid"),
scope,
)
)
println("launched channel $channel")
delay(500.milliseconds.toJavaDuration())
channel.send(CardCommand.TagLeftField)
println("sent")
channel.close()
flow.send(null)
}
runBlocking {
val gotFlow = waitForReconnect(flow.consumeAsFlow(), 1.seconds)
gotFlow.test {
assertEquals(null, awaitItem())
val card = awaitItem()
println("got item: $card")
assertEquals(<http://CardMessage.AI|CardMessage.AI>, card?.message)
val commandFlow = card?.commandChannel?.receiveAsFlow()
scope.launch {
commandFlow?.collect {
println(it.toString())
}
}
assertNotNull(commandFlow)
commandFlow!!.test {
assertEquals(CardCommand.TagLeftField, awaitItem())
println("got left field")
awaitComplete()
}
println("got commandflow completed")
assertEquals(null, awaitItem())
}
}
}
}
Result:
launched channel capacity=5,data=[kotlinx.coroutines.channels.BufferedChannel$BufferedChannelIterator@3997e019]
got item: CardWithCommands(message=AI(),cardJob=SupervisorJobImpl{Active},uid=uid)
Starting to consume card command channel: capacity=5,data=[kotlinx.coroutines.channels.BufferedChannel$BufferedChannelIterator@3997e019]
sent
Done consuming.
Expected item but found Complete
I'd expect a
Message: ...
somewhere, but that item doesn't arrivereactormonk
04/23/2024, 7:54 PMCasey Brooks
04/23/2024, 8:04 PMCasey Brooks
04/23/2024, 8:07 PMflow.collect { card ->
, each of which is running in parallel to each other and to the main flow. However, thereβs no synchronization around the channels, so Iβd guess youβre running into race conditions, as this is not following the conventions of Flow processingreactormonk
04/23/2024, 8:17 PMβββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β
β ββββββββββββββββββββββββββββββββββ βββββββββββββββββββββββββββββββββ β
β βCard β βCard β β
β β β β β β
β β βββββββββββββ βββββββββββββ β β βββββββββββββ βββββββββββββββ β β
β β βCommand β βCommand β β β βCommand β βCommand β β β
β β β β β β β β β β β β β β
β β β β β β β β β β β β β β
β β β ββΊβ β β β β ββΊβ β β β
β β β β β β β β β β β β β β
β β β β β β βββΊβ β β β β β β
β β β β β β β β β β β β β β
β β βββββββββββββ βββββββββββββ β β βββββββββββββ βββββββββββββββ β β
β β β β β β
β β β β β β
β β β β β β
β β β β β β
β β β β β β
β β β β β β
β β β β β β
β β β β β β
β β β β β β
β ββββββββββββββββββββββββββββββββββ βββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
reactormonk
04/23/2024, 8:18 PMCard
object has some info. I'm currently representing that with nested flows/channels where I close the inner channel when the outer completes.reactormonk
04/23/2024, 8:18 PMCard
timelines into single ones.Casey Brooks
04/23/2024, 8:22 PMCard
, and you want to load the content for each Cardβs commands
in parallel? And do you want each Card to also be running in parallel?reactormonk
04/23/2024, 8:24 PMβ
β
ββββββββββββββββββββββββββββββββββββββ΄βββββββββββββββββββββββββββββββββββββ
β βΌ β
β ββββββββββββββββββββββββββββββββββ¬βββ¬ββββββββββββββββββββββββββββββββ β
β βCard β βCard β β
β β β β β β
β β βββββββββββββ βββββββββββββ β β βββββββββββββ βββββββββββββββ β β
β β βCommand β βCommand β β β βCommand β βCommand β β β
β β β β β β β β β β β β β β
β β β β β β β β β β β β β β
β β β ββΊβ β β β β ββΊβ β β β
β β β β β β β β β β β β β β
β β β β β β βββΊβ β β β β β β
β β β β β β β β β β β β β β
β β βββββββββββββ βββββββββββββ β β βββββββββββββ βββββββββββββββ β β
β β β β β β
β β β β β β
β β β β β β
β β β β β β
β β β β β β
β β β β β β
β β β β β β
β β β β β β
β β β β β β
β ββββββββββββββββββββββββββββββββββ΄βββ΄ββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
reactormonk
04/23/2024, 8:25 PMreactormonk
04/23/2024, 8:27 PMreactormonk
04/23/2024, 8:27 PMCasey Brooks
04/23/2024, 8:30 PMCasey Brooks
04/23/2024, 8:32 PMflow.collect { }
is basically ignoring everything about the flow. Likewise, channels are used to share data between two different coroutines, so if youβre creating and closing them rapidly like this, the Channel instance probably isnβt shared between the launched coroutines like you expect.reactormonk
04/23/2024, 8:34 PMreactormonk
04/23/2024, 8:35 PMCasey Brooks
04/23/2024, 9:02 PMvar cardDisconnectedJob: Job? = null
card
.commandChannel
.filterNotNull()
.onEach { msg: CardCommand ->
// cancel the job that's waiting for the timeout, if it exists
cardDisconnectedJob?.cancel()
when (msg) {
CardCommand.ReaderDisconnected -> {
cardDisconnectedJob = launch(delayedReaderDisconnected) {
delay(timeout)
// we exceeded the timeout, continue cancelling the whole card processing
}
}
else -> {
outgoing.send(msg)
}
}
}
.launchIn(scope)
reactormonk
04/23/2024, 9:05 PMsuspend fun waitForReconnect(flow: Flow<CardWithCommands?>, delay: Duration): MutableStateFlow<CardWithCommands?> {
val outgoing = MutableStateFlow<CardWithCommands?>(null)
val delayedReaderDisconnected = SupervisorJob()
val topScope = CoroutineScope(coroutineContext)
topScope.launch {
var newChannel: Channel<CardCommand>? = null
fun reset() {
outgoing.value = null
newChannel?.close()
newChannel = null
}
suspend fun setupConsume(channel: Channel<CardCommand>, to: Channel<CardCommand>) {
channel.consumeEach { msg ->
println("Message: $msg")
when (msg) {
CardCommand.ReaderDisconnected -> {
Timber.d { "Reader disconnected, ignoring for now..." }
launch(delayedReaderDisconnected) {
delay(delay)
Timber.d { "Sending ReaderDisconnected" }
to.send(msg)
reset()
}
}
CardCommand.RotatedCard -> to.send(msg)
CardCommand.SkippedCard -> to.send(msg)
CardCommand.TagLeftField -> {
to.send(msg)
reset()
}
}
}
}
flow.collect { card ->
delayedReaderDisconnected.cancelChildren()
if (card != null) {
suspend fun setupChannels() {
newChannel = Channel<CardCommand>(5)
outgoing.value = card.copy(
commandChannel = newChannel!!
)
setupConsume(card.commandChannel, newChannel!!)
}
if (outgoing.value == null) {
setupChannels()
} else {
if (card.message == outgoing.value?.message) {
Timber.d { "Reader here again, all fine." }
setupConsume(card.commandChannel, newChannel!!)
} else {
newChannel?.send(CardCommand.TagLeftField)
reset()
setupChannels()
}
}
} else {
reset()
}
}
}
return outgoing
}
My main problem is still lack of the first message I send into the channel π€