janvladimirmostert
01/06/2022, 10:42 PMclass FileFlow(private val uri: String) {
private data class FileBaton(
val fileChannel: AsynchronousFileChannel,
val flow: ProducerScope<ByteArray>,
val buffer: ByteBuffer,
val position: Long,
val handleError: (message: String, e: Throwable?) -> Unit = { message, e ->
flow.cancel(message, e)
runBlocking(<http://Dispatchers.IO|Dispatchers.IO>) {
fileChannel.close()
}
}
)
fun read(bufferSize: DataSize): Flow<ByteArray> = callbackFlow {
val path = Path.of(uri)
val baton = FileBaton(
buffer = ByteBuffer.allocate(bufferSize.B),
fileChannel = withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
AsynchronousFileChannel.open(path, StandardOpenOption.READ)
},
flow = this,
position = 0,
)
baton.fileChannel.read(
baton.buffer,
baton.position,
baton,
object : CompletionHandler<Int, FileBaton> {
override fun completed(read: Int, baton: FileBaton) {
if (read > 0) {
trySendBlocking(
baton.buffer.array().sliceArray(0 until read)
).onFailure { e ->
baton.handleError(e?.message ?: "", e)
}
baton.buffer.rewind()
baton.fileChannel.read(
baton.buffer,
baton.position + read,
baton.copy(
position = baton.position + read
),
this
)
} else {
baton.flow.channel.close()
runBlocking(<http://Dispatchers.IO|Dispatchers.IO>) {
baton.fileChannel.close()
}
}
}
override fun failed(e: Throwable?, baton: FileBaton) {
baton.handleError(e?.message ?: "", e)
}
}
)
awaitClose {
}
}
}
FileFlow("/home/.../blah.txt").read(250.kB).collect {
print(String(it))
}
yschimke
01/07/2022, 7:07 AMjanvladimirmostert
01/07/2022, 8:08 AMJoffrey
01/07/2022, 12:35 PMrunBlocking(<http://Dispatchers.IO|Dispatchers.IO>)
is a pretty bad idea if I may, because you'll be not only blocking the current thread but also one thread from the IO dispatcher. Why do you even need to run this close()
in a coroutine?
The completion handler will be called a the thread pool dedicated for IO already, so I don't think there is any point in switching threads there.Joffrey
01/07/2022, 12:40 PMFileBaton
class? It doesn't seem to bring any value as opposed to just using local variables, but it adds extra allocations and indirections. Is it to materialize the capture of the CompletionHandler
?Joffrey
01/07/2022, 12:43 PMbut then you're doing while (!doneReading) { val future = channel.read() } and I'm assuming that while loop is blockingThe while loop is not really blocking. You will likely use `kotlinx-coroutines-jdk8`'s
CompletionStage.await()
to just suspend for the value returned by the future, and send it to the callback flow, so there will be suspension points inside the loop which means it won't block the current thread.
EDIT: sorry I was wrong here, read
returns a Future
not a CompletableFuture
, so there is no way to await on this one directly. However you can wrap the callback-based API into a suspending function using suspendCoroutine
Joffrey
01/07/2022, 12:44 PMPath
I think it would be better to let users provide the Path
directly. You can add helpers for passing strings if you want, but if some calling code already has a Path
, it feels wrong to convert it to string and then back to a path again.Joffrey
01/07/2022, 1:06 PMprivate suspend fun AsynchronousFileChannel.readSuspending(buffer: ByteBuffer, position: Long) =
suspendCoroutine<Int>{ continuation ->
read(buffer, position, null, object : CompletionHandler<Int, Nothing?> {
override fun completed(result: Int, attachment: Nothing?) {
continuation.resume(result)
}
override fun failed(exc: Throwable, attachment: Nothing?) {
continuation.resumeWithException(exc)
}
})
}
Joffrey
01/07/2022, 1:48 PMfun Path.readBytesAsFlow(bufferSizeBytes: Int, startPosition: Long = 0): Flow<ByteArray> = flow {
AsynchronousFileChannel.open(this@readBytesAsFlow, StandardOpenOption.READ).use { fileChannel ->
val buffer = ByteBuffer.allocate(bufferSizeBytes)
var positionInFile = startPosition
// emit() calls are cancellable by default, the loops exits at the end of the file otherwise
while (true) {
val nRead = fileChannel.readSuspending(buffer, positionInFile)
if (nRead <= 0) {
break
}
emit(buffer.array().copyOf(nRead))
buffer.rewind()
positionInFile += nRead
}
}
}.flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
There is no need for channelFlow
or callbackFlow
anymore, since we're using suspending functions now and don't start anything concurrent in the flow's body.janvladimirmostert
01/07/2022, 8:48 PMjanvladimirmostert
01/07/2022, 9:11 PMJoffrey
01/08/2022, 12:05 PMsuspendCoroutine
is meant exactly for this purpose. Note that if the callback-based API you're wrapping supports cancellation, you should use suspendCancellableCoroutine
instead. But it wasn't the case here AFAICT.
close is a blocking operation, at least it complains about it when running it inside a suspending functionYeah, I think the inspection reports it as soon as the Java function throws
IOException
, even if it's not really blocking per se. In that case, it might be useful, but the code you had written was already being run on the thread pool of the file channel internals I believe, which means there is no need to dispatch it on <http://Dispatchers.IO|Dispatchers.IO>
(also in the callback you were technically not in a suspend function, so if you saw the inspection warning, it must have been a bug)