bj0
07/09/2019, 11:45 PMFlow of data that has separators in it, is there an easy way to split it into a flow of separated chunks? ie: "@some data@more data@ -> [some data, more data]jw
07/09/2019, 11:49 PMflatMapConcat(item -> split(item).toFlow())bj0
07/09/2019, 11:59 PMbj0
07/09/2019, 11:59 PMjw
07/09/2019, 11:59 PMbj0
07/10/2019, 12:00 AMjw
07/10/2019, 12:01 AMjw
07/10/2019, 12:01 AMbj0
07/10/2019, 12:02 AMbj0
07/10/2019, 12:02 AMbj0
07/10/2019, 12:02 AMbj0
07/10/2019, 12:04 AMfun Flow<ByteArray>.parse() = flow {
// message separator
val eom = 0x7e.toByte()
var data = flatMapConcat { it.asList().asFlow() }
while (true) {
data = data.dropWhile { it != eom }.drop(1)
val msg = data.takeWhile { it != eom }
.toList().toByteArray()
if (msg.isEmpty())
break
emit(msg)
data.drop(msg.size)
}
}bj0
07/10/2019, 12:05 AMmsg.isEmpty() is a problem, if there's an empty message it will stop early, but if it's not there the loop never endsZach Klippenstein (he/him) [MOD]
07/10/2019, 12:39 AMval eom = 0x7e.toByte()
fun Flow<ByteArray>.parse() = flow {
val data = flatMapConcat {
it.toList().asFlow()
}
val buffer = mutableListOf<Byte>()
data.collect { byte ->
if (byte == eom) {
emit(buffer.toList())
buffer.clear()
} else {
buffer += byte
}
}
// Emit the last buffer.
emit(buffer.toList())
}Zach Klippenstein (he/him) [MOD]
07/10/2019, 12:43 AMBuffer instead of a simple mutable list, since it’s probably more efficient, but same idea.ursus
07/10/2019, 3:28 AMbuffer?louiscad
07/10/2019, 6:42 AMursus
07/10/2019, 6:43 AMasync { }? then it would be needed right#louiscad
07/10/2019, 6:47 AMemit from inside them because the CoroutineContext would not be the same (different Job). Calls to emit must be called on the came CoroutineContext, it's specified in the documentation, and checked at runtime.ursus
07/10/2019, 6:51 AMlouiscad
07/10/2019, 6:52 AMursus
07/10/2019, 6:53 AMlouiscad
07/10/2019, 6:55 AMMutex or actor when you still need it.ursus
07/10/2019, 6:57 AMjw
07/10/2019, 11:09 AMlouiscad
07/10/2019, 11:42 AMDeferred into a List<Deferred> and call awaitAll() on it. You shouldn't need extra synchronization if you don't have shared mutable state as I said.bj0
07/10/2019, 2:59 PMZach Klippenstein (he/him) [MOD]
07/10/2019, 11:57 PMbj0
07/11/2019, 8:41 PM.split methodZach Klippenstein (he/him) [MOD]
07/11/2019, 9:16 PMsplit is probably going to use some mutable data structure or a mutable reference to a persistent immutable data structure under the hood anyway though.