https://kotlinlang.org logo
#coroutines
Title
# coroutines
r

reactormonk

07/14/2022, 8:54 AM
Are there destructive primitives? I'm looking to write a parser.
first()
etc. doesn't advance though 😞 I'd also take a parser-combinator libraries if there's one around 😄
Copy code
return channelFlow<TLV> {
        when(input.first()) {
            (0x00.toByte()) -> send(TLVNULL)
            (0x03.toByte()) -> {
                val length = parseLength(input)
                val bytes = input.take(length).toList()
                send(TLVNDEF(NdefMessage(bytes.toByteArray())))
            }
            (0xFD.toByte()) -> {
                val length = parseLength(input)
                val bytes = input.take(length)
                send(TLVProp(bytes.toList().toByteArray()))
            }
            (0xFD.toByte()) -> {
                send(TLVTerminator)
                close()
            }
        }
    }
s

Sam

07/14/2022, 9:13 AM
You might want
input
to be a channel instead of a flow
r

reactormonk

07/14/2022, 9:16 AM
Can I convert that?
s

Sam

07/14/2022, 9:18 AM
Yes, using
produceIn(scope)
r

reactormonk

07/14/2022, 9:35 AM
I didn't really want to have a
scope
in there, is there a flow primitive that allows me to advance the flow?
s

Sam

07/14/2022, 9:38 AM
channelFlow
already creates a scope for you
You can’t “advance” the flow itself because it isn’t stateful — it’s the flow collector that provides the state.
Calling
produceIn
to create a channel is just one way to create a stateful collector; you could use another approach if you want. For example you can just create some variables to hold mutable state, and then call
collect
on the flow to loop over the items and update your state.
r

reactormonk

07/14/2022, 9:43 AM
I'd prefer it to be a transformation from
Flow
to
Flow
, for composition purposes. Looks like I gotta go with
transformWhile
?
s

Sam

07/14/2022, 9:46 AM
For stateful transformations it’s normally easiest to make a new flow that collects the original flow. The new flow can either be a
channelFlow
or just a
flow
depending on how much clever concurrency you need. For example:
Copy code
val output = flow {
    val state = // create some mutable state
    input.collect {
        state.update(it)
        if (state.hasOutput()) {
            emit(state.getOutput())
            state.reset()
        }
    }
}
☝️ imaginary API for
state
, but hopefully you get the idea
r

reactormonk

07/14/2022, 10:07 AM
Sounds like thinking. I think I'm more of a "wire things through so I don't have to state-transform my code" kinda guy.
Copy code
suspend fun <T> takePair(input: Flow<T>, count: Int): Pair<Flow<T>, List<T>> {
    return Pair(input.drop(count), input.take(count).toList())
}

suspend fun <T> firstPair(input: Flow<T>): Pair<Flow<T>, T> {
    return Pair(input.drop(1), input.first())
}
No destructuring on the
val
side looks like 😞
s

Sam

07/14/2022, 10:11 AM
Okay, but
input.drop(count)
is always going to give you a flow that starts again from the beginning and just skips values until it reaches the position you want. I doubt that’s the behaviour you’re looking for.
r

reactormonk

07/14/2022, 10:12 AM
Gotta wire the resulting ones through, so stacking drops
Works, but probably gonna get me crucified somewhere:
Copy code
suspend fun <T> takePair(input: Flow<T>, count: Int): Pair<Flow<T>, List<T>> {
    return Pair(input.drop(count), input.take(count).toList())
}

suspend fun <T> firstPair(input: Flow<T>): Pair<Flow<T>, T> {
    return Pair(input.drop(1), input.first())
}

fun fromFlow(input: Flow<Byte>): Flow<TLV> {
    return channelFlow<TLV> {
        suspend fun run(f: Flow<Byte>) {
            val f1 = firstPair(f)
            Log.d("TLV", "type: ${f1.second}")
            when(f1.second) {
                (0x00.toByte()) -> {
                    send(TLVNULL)
                    run(f1.first)
                }
                (0x03.toByte()) -> {
                    val f2 = parseLength(f1.first)
                    Log.d("TLV", "length: ${f2.second}")
                    val f3 = takePair(f2.first, f2.second)
                    val bytes = f3.second.toList()
                    Log.d("TLV", "bytes: $bytes")
                    send(TLVNDEF(NdefMessage(bytes.toByteArray())))
                    run(f3.first)
                }
                (0xFD.toByte()) -> {
                    val f2 = parseLength(f1.first)
                    val f3 = takePair(f2.first, f2.second)
                    val bytes = f3.second.toList()
                    send(TLVProp(bytes.toList().toByteArray()))
                    run(f3.first)
                }
                (0xFE.toByte()) -> {
                    send(TLVTerminator)
                    close()
                }
                else -> {
                    throw IllegalStateException("Invalid type byte: ${f1.second}")
                }
            }
        }
        run(input)
    }
}

suspend fun parseLength(input: Flow<Byte>): Pair<Flow<Byte>, Int> {
    val f1 = firstPair(input)
    return when (f1.second) {
        (0xFF.toByte()) -> {
            val f2 = takePair(f1.first, 2)
            when (val i2 = f2.second.toList().toByteArray()) {
                (byteArrayOf(
                    0xFF.toByte(),
                    0xFF.toByte()
                )) -> throw IllegalStateException("0xFFFF is RFU")
                else ->
                    Pair(f2.first, (i2[0].toInt() shl 8) or (i2[1].toInt()))
            }
        }
        else -> Pair(f1.first, f1.second.toInt())
    }
}
🤔 nah, I don't think it quite likes the way I'm going about it... still requesting the data multiple times.
a

Adam Powell

07/14/2022, 1:34 PM
Something like okio is going to be a better fit for this use case https://square.github.io/okio/recipes/
👍 1
r

reactormonk

07/14/2022, 1:43 PM
This works pretty well:
Copy code
suspend fun <T> ReceiveChannel<T>.receive(length: Int): List<T> =
    buildList {
        var count = 0
        while(count < length) {
            count++
            add(receive())
        }
    }

@OptIn(FlowPreview::class)
suspend fun fromFlow(flow: Flow<Byte>): Flow<TLV> {
    return channelFlow<TLV> {
        val input = flow.produceIn(this)
        when (val type = input.receive()) {
            (0x00.toByte()) -> send(TLVNULL)
            (0x03.toByte()) -> {
                val length = parseLength(input)
                val bytes = input.receive(length)
                send(TLVNDEF(NdefMessage(bytes.toByteArray())))
            }
            (0xFD.toByte()) -> {
                val length = parseLength(input)
                val bytes = input.receive(length)
                send(TLVProp(bytes.toByteArray()))
            }
            (0xFE.toByte()) -> {
                send(TLVTerminator)
                close()
            }
            else -> {
                throw IllegalStateException("Invalid type byte: $type")
            }
        }
    }
}

suspend fun parseLength(input: ReceiveChannel<Byte>): Int {
    return when (val l = input.receive()) {
        (0xFF.toByte()) -> {
            when (val i2 = input.receive(2).toByteArray()) {
                (byteArrayOf(
                    0xFF.toByte(),
                    0xFF.toByte()
                )) -> throw IllegalStateException("0xFFFF is RFU")
                else ->
                    (i2[0].toInt() shl 8) or (i2[1].toInt())
            }
        }
        else -> l.toInt()
    }
}
👍 1
s

Sam

07/14/2022, 1:47 PM
👍 that code you posted with
produceIn
and
receive
is pretty much what I had in mind. I like the little extension function for receiving multiple values 👌
The Okio suggestion is worth considering too because passing around individual bytes as object references over a channel/flow could be adding a lot of overhead, depending on your use case
r

reactormonk

07/14/2022, 1:49 PM
Yeah, but since I'm reading in 16 byte increments from an NFC card, the overhead can't matter.
FS2 in Scala does optimize this use case pretty well with
Chunk
😄
3 Views