Are there destructive primitives? I'm looking to w...
# coroutines
r
Are there destructive primitives? I'm looking to write a parser.
first()
etc. doesn't advance though 😞 I'd also take a parser-combinator libraries if there's one around 😄
Copy code
return channelFlow<TLV> {
        when(input.first()) {
            (0x00.toByte()) -> send(TLVNULL)
            (0x03.toByte()) -> {
                val length = parseLength(input)
                val bytes = input.take(length).toList()
                send(TLVNDEF(NdefMessage(bytes.toByteArray())))
            }
            (0xFD.toByte()) -> {
                val length = parseLength(input)
                val bytes = input.take(length)
                send(TLVProp(bytes.toList().toByteArray()))
            }
            (0xFD.toByte()) -> {
                send(TLVTerminator)
                close()
            }
        }
    }
s
You might want
input
to be a channel instead of a flow
r
Can I convert that?
s
Yes, using
produceIn(scope)
r
I didn't really want to have a
scope
in there, is there a flow primitive that allows me to advance the flow?
s
channelFlow
already creates a scope for you
You can’t “advance” the flow itself because it isn’t stateful — it’s the flow collector that provides the state.
Calling
produceIn
to create a channel is just one way to create a stateful collector; you could use another approach if you want. For example you can just create some variables to hold mutable state, and then call
collect
on the flow to loop over the items and update your state.
r
I'd prefer it to be a transformation from
Flow
to
Flow
, for composition purposes. Looks like I gotta go with
transformWhile
?
s
For stateful transformations it’s normally easiest to make a new flow that collects the original flow. The new flow can either be a
channelFlow
or just a
flow
depending on how much clever concurrency you need. For example:
Copy code
val output = flow {
    val state = // create some mutable state
    input.collect {
        state.update(it)
        if (state.hasOutput()) {
            emit(state.getOutput())
            state.reset()
        }
    }
}
☝️ imaginary API for
state
, but hopefully you get the idea
r
Sounds like thinking. I think I'm more of a "wire things through so I don't have to state-transform my code" kinda guy.
Copy code
suspend fun <T> takePair(input: Flow<T>, count: Int): Pair<Flow<T>, List<T>> {
    return Pair(input.drop(count), input.take(count).toList())
}

suspend fun <T> firstPair(input: Flow<T>): Pair<Flow<T>, T> {
    return Pair(input.drop(1), input.first())
}
No destructuring on the
val
side looks like 😞
s
Okay, but
input.drop(count)
is always going to give you a flow that starts again from the beginning and just skips values until it reaches the position you want. I doubt that’s the behaviour you’re looking for.
r
Gotta wire the resulting ones through, so stacking drops
Works, but probably gonna get me crucified somewhere:
Copy code
suspend fun <T> takePair(input: Flow<T>, count: Int): Pair<Flow<T>, List<T>> {
    return Pair(input.drop(count), input.take(count).toList())
}

suspend fun <T> firstPair(input: Flow<T>): Pair<Flow<T>, T> {
    return Pair(input.drop(1), input.first())
}

fun fromFlow(input: Flow<Byte>): Flow<TLV> {
    return channelFlow<TLV> {
        suspend fun run(f: Flow<Byte>) {
            val f1 = firstPair(f)
            Log.d("TLV", "type: ${f1.second}")
            when(f1.second) {
                (0x00.toByte()) -> {
                    send(TLVNULL)
                    run(f1.first)
                }
                (0x03.toByte()) -> {
                    val f2 = parseLength(f1.first)
                    Log.d("TLV", "length: ${f2.second}")
                    val f3 = takePair(f2.first, f2.second)
                    val bytes = f3.second.toList()
                    Log.d("TLV", "bytes: $bytes")
                    send(TLVNDEF(NdefMessage(bytes.toByteArray())))
                    run(f3.first)
                }
                (0xFD.toByte()) -> {
                    val f2 = parseLength(f1.first)
                    val f3 = takePair(f2.first, f2.second)
                    val bytes = f3.second.toList()
                    send(TLVProp(bytes.toList().toByteArray()))
                    run(f3.first)
                }
                (0xFE.toByte()) -> {
                    send(TLVTerminator)
                    close()
                }
                else -> {
                    throw IllegalStateException("Invalid type byte: ${f1.second}")
                }
            }
        }
        run(input)
    }
}

suspend fun parseLength(input: Flow<Byte>): Pair<Flow<Byte>, Int> {
    val f1 = firstPair(input)
    return when (f1.second) {
        (0xFF.toByte()) -> {
            val f2 = takePair(f1.first, 2)
            when (val i2 = f2.second.toList().toByteArray()) {
                (byteArrayOf(
                    0xFF.toByte(),
                    0xFF.toByte()
                )) -> throw IllegalStateException("0xFFFF is RFU")
                else ->
                    Pair(f2.first, (i2[0].toInt() shl 8) or (i2[1].toInt()))
            }
        }
        else -> Pair(f1.first, f1.second.toInt())
    }
}
🤔 nah, I don't think it quite likes the way I'm going about it... still requesting the data multiple times.
a
Something like okio is going to be a better fit for this use case https://square.github.io/okio/recipes/
👍 1
r
This works pretty well:
Copy code
suspend fun <T> ReceiveChannel<T>.receive(length: Int): List<T> =
    buildList {
        var count = 0
        while(count < length) {
            count++
            add(receive())
        }
    }

@OptIn(FlowPreview::class)
suspend fun fromFlow(flow: Flow<Byte>): Flow<TLV> {
    return channelFlow<TLV> {
        val input = flow.produceIn(this)
        when (val type = input.receive()) {
            (0x00.toByte()) -> send(TLVNULL)
            (0x03.toByte()) -> {
                val length = parseLength(input)
                val bytes = input.receive(length)
                send(TLVNDEF(NdefMessage(bytes.toByteArray())))
            }
            (0xFD.toByte()) -> {
                val length = parseLength(input)
                val bytes = input.receive(length)
                send(TLVProp(bytes.toByteArray()))
            }
            (0xFE.toByte()) -> {
                send(TLVTerminator)
                close()
            }
            else -> {
                throw IllegalStateException("Invalid type byte: $type")
            }
        }
    }
}

suspend fun parseLength(input: ReceiveChannel<Byte>): Int {
    return when (val l = input.receive()) {
        (0xFF.toByte()) -> {
            when (val i2 = input.receive(2).toByteArray()) {
                (byteArrayOf(
                    0xFF.toByte(),
                    0xFF.toByte()
                )) -> throw IllegalStateException("0xFFFF is RFU")
                else ->
                    (i2[0].toInt() shl 8) or (i2[1].toInt())
            }
        }
        else -> l.toInt()
    }
}
👍 1
s
👍 that code you posted with
produceIn
and
receive
is pretty much what I had in mind. I like the little extension function for receiving multiple values 👌
The Okio suggestion is worth considering too because passing around individual bytes as object references over a channel/flow could be adding a lot of overhead, depending on your use case
r
Yeah, but since I'm reading in 16 byte increments from an NFC card, the overhead can't matter.
FS2 in Scala does optimize this use case pretty well with
Chunk
😄