reactormonk
07/14/2022, 8:54 AMfirst()
etc. doesn't advance though 😞
I'd also take a parser-combinator libraries if there's one around 😄
return channelFlow<TLV> {
when(input.first()) {
(0x00.toByte()) -> send(TLVNULL)
(0x03.toByte()) -> {
val length = parseLength(input)
val bytes = input.take(length).toList()
send(TLVNDEF(NdefMessage(bytes.toByteArray())))
}
(0xFD.toByte()) -> {
val length = parseLength(input)
val bytes = input.take(length)
send(TLVProp(bytes.toList().toByteArray()))
}
(0xFD.toByte()) -> {
send(TLVTerminator)
close()
}
}
}
Sam
07/14/2022, 9:13 AMinput
to be a channel instead of a flowreactormonk
07/14/2022, 9:16 AMSam
07/14/2022, 9:18 AMproduceIn(scope)
reactormonk
07/14/2022, 9:35 AMscope
in there, is there a flow primitive that allows me to advance the flow?Sam
07/14/2022, 9:38 AMchannelFlow
already creates a scope for youSam
07/14/2022, 9:39 AMSam
07/14/2022, 9:42 AMproduceIn
to create a channel is just one way to create a stateful collector; you could use another approach if you want. For example you can just create some variables to hold mutable state, and then call collect
on the flow to loop over the items and update your state.reactormonk
07/14/2022, 9:43 AMFlow
to Flow
, for composition purposes. Looks like I gotta go with transformWhile
?Sam
07/14/2022, 9:46 AMchannelFlow
or just a flow
depending on how much clever concurrency you need. For example:
val output = flow {
val state = // create some mutable state
input.collect {
state.update(it)
if (state.hasOutput()) {
emit(state.getOutput())
state.reset()
}
}
}
Sam
07/14/2022, 9:47 AMstate
, but hopefully you get the ideareactormonk
07/14/2022, 10:07 AMsuspend fun <T> takePair(input: Flow<T>, count: Int): Pair<Flow<T>, List<T>> {
return Pair(input.drop(count), input.take(count).toList())
}
suspend fun <T> firstPair(input: Flow<T>): Pair<Flow<T>, T> {
return Pair(input.drop(1), input.first())
}
reactormonk
07/14/2022, 10:08 AMval
side looks like 😞Sam
07/14/2022, 10:11 AMinput.drop(count)
is always going to give you a flow that starts again from the beginning and just skips values until it reaches the position you want. I doubt that’s the behaviour you’re looking for.reactormonk
07/14/2022, 10:12 AMreactormonk
07/14/2022, 10:19 AMsuspend fun <T> takePair(input: Flow<T>, count: Int): Pair<Flow<T>, List<T>> {
return Pair(input.drop(count), input.take(count).toList())
}
suspend fun <T> firstPair(input: Flow<T>): Pair<Flow<T>, T> {
return Pair(input.drop(1), input.first())
}
fun fromFlow(input: Flow<Byte>): Flow<TLV> {
return channelFlow<TLV> {
suspend fun run(f: Flow<Byte>) {
val f1 = firstPair(f)
Log.d("TLV", "type: ${f1.second}")
when(f1.second) {
(0x00.toByte()) -> {
send(TLVNULL)
run(f1.first)
}
(0x03.toByte()) -> {
val f2 = parseLength(f1.first)
Log.d("TLV", "length: ${f2.second}")
val f3 = takePair(f2.first, f2.second)
val bytes = f3.second.toList()
Log.d("TLV", "bytes: $bytes")
send(TLVNDEF(NdefMessage(bytes.toByteArray())))
run(f3.first)
}
(0xFD.toByte()) -> {
val f2 = parseLength(f1.first)
val f3 = takePair(f2.first, f2.second)
val bytes = f3.second.toList()
send(TLVProp(bytes.toList().toByteArray()))
run(f3.first)
}
(0xFE.toByte()) -> {
send(TLVTerminator)
close()
}
else -> {
throw IllegalStateException("Invalid type byte: ${f1.second}")
}
}
}
run(input)
}
}
suspend fun parseLength(input: Flow<Byte>): Pair<Flow<Byte>, Int> {
val f1 = firstPair(input)
return when (f1.second) {
(0xFF.toByte()) -> {
val f2 = takePair(f1.first, 2)
when (val i2 = f2.second.toList().toByteArray()) {
(byteArrayOf(
0xFF.toByte(),
0xFF.toByte()
)) -> throw IllegalStateException("0xFFFF is RFU")
else ->
Pair(f2.first, (i2[0].toInt() shl 8) or (i2[1].toInt()))
}
}
else -> Pair(f1.first, f1.second.toInt())
}
}
reactormonk
07/14/2022, 12:54 PMAdam Powell
07/14/2022, 1:34 PMreactormonk
07/14/2022, 1:43 PMsuspend fun <T> ReceiveChannel<T>.receive(length: Int): List<T> =
buildList {
var count = 0
while(count < length) {
count++
add(receive())
}
}
@OptIn(FlowPreview::class)
suspend fun fromFlow(flow: Flow<Byte>): Flow<TLV> {
return channelFlow<TLV> {
val input = flow.produceIn(this)
when (val type = input.receive()) {
(0x00.toByte()) -> send(TLVNULL)
(0x03.toByte()) -> {
val length = parseLength(input)
val bytes = input.receive(length)
send(TLVNDEF(NdefMessage(bytes.toByteArray())))
}
(0xFD.toByte()) -> {
val length = parseLength(input)
val bytes = input.receive(length)
send(TLVProp(bytes.toByteArray()))
}
(0xFE.toByte()) -> {
send(TLVTerminator)
close()
}
else -> {
throw IllegalStateException("Invalid type byte: $type")
}
}
}
}
suspend fun parseLength(input: ReceiveChannel<Byte>): Int {
return when (val l = input.receive()) {
(0xFF.toByte()) -> {
when (val i2 = input.receive(2).toByteArray()) {
(byteArrayOf(
0xFF.toByte(),
0xFF.toByte()
)) -> throw IllegalStateException("0xFFFF is RFU")
else ->
(i2[0].toInt() shl 8) or (i2[1].toInt())
}
}
else -> l.toInt()
}
}
reactormonk
07/14/2022, 1:44 PMSam
07/14/2022, 1:47 PMproduceIn
and receive
is pretty much what I had in mind. I like the little extension function for receiving multiple values 👌Sam
07/14/2022, 1:48 PMreactormonk
07/14/2022, 1:49 PMreactormonk
07/14/2022, 1:49 PMChunk
😄