tieskedh
11/12/2020, 5:45 PMsuspend inline fun ByteReadChannel.toFlow(): Flow<ByteArray> {
return flow {
do {
read { source, start, endExclusive ->
val array = when {
endExclusive > start -> {
val mem = source.slice(start, endExclusive - start)
val byteArray = ByteArray(mem.size.toInt())
mem.copyTo(byteArray, 0, mem.size.toInt())
byteArray
}
else -> byteArrayOf()
}
emit(array)
255
}
if (isClosedForRead) break
} while (currentCoroutineContext().isActive)
}
}
Rustam Siniukov
11/12/2020, 6:57 PMcurrentCoroutineContext().isActive
, reading will be cancelled automatically if reader coroutine was cancelled
2. from read block you need to return actual number of bytes read
3. no need to slice memory, you can directly copy source to array
in the end, your code should look like
return flow {
while (!channel.isClosedForRead) {
channel.read { source, start, endExclusive ->
val size = (endExclusive - start).toInt()
val byteArray = ByteArray(size)
source.copyTo(byteArray, start, size)
emit(array)
size
}
}
}
tieskedh
11/12/2020, 8:07 PM