I'm wrapping java.nio.channels.AsynchronousFileCha...
# coroutines
j
I'm wrapping java.nio.channels.AsynchronousFileChannel with a callbackFlow to convert an asynchronous File reader into a flow. Is there a better way of doing this or anything that I can improve on this implementation?
Copy code
class FileFlow(private val uri: String) {

	private data class FileBaton(
		val fileChannel: AsynchronousFileChannel,
		val flow: ProducerScope<ByteArray>,
		val buffer: ByteBuffer,
		val position: Long,
		val handleError: (message: String, e: Throwable?) -> Unit = { message, e ->
			flow.cancel(message, e)
			runBlocking(<http://Dispatchers.IO|Dispatchers.IO>) {
				fileChannel.close()
			}
		}
	)

	fun read(bufferSize: DataSize): Flow<ByteArray> = callbackFlow {

		val path = Path.of(uri)
		val baton = FileBaton(
			buffer = ByteBuffer.allocate(bufferSize.B),
			fileChannel = withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
				AsynchronousFileChannel.open(path, StandardOpenOption.READ)
			},
			flow = this,
			position = 0,
		)

		baton.fileChannel.read(
			baton.buffer,
			baton.position,
			baton,
			object : CompletionHandler<Int, FileBaton> {
				override fun completed(read: Int, baton: FileBaton) {
					if (read > 0) {
						trySendBlocking(
							baton.buffer.array().sliceArray(0 until read)
						).onFailure { e ->
							baton.handleError(e?.message ?: "", e)
						}

						baton.buffer.rewind()
						baton.fileChannel.read(
							baton.buffer,
							baton.position + read,
							baton.copy(
								position = baton.position + read
							),
							this
						)
					} else {
						baton.flow.channel.close()
						runBlocking(<http://Dispatchers.IO|Dispatchers.IO>) {
							baton.fileChannel.close()
						}
					}
				}

				override fun failed(e: Throwable?, baton: FileBaton) {
					baton.handleError(e?.message ?: "", e)
				}
			}
		)
		awaitClose {

		}
	}
}
Copy code
FileFlow("/home/.../blah.txt").read(250.kB).collect {
   print(String(it))
}
y
I'd be curious whether it's just simpler and more performant to use okio approach, call the synchronous IO methods from IO dispatcher. Do you start with a channel? Do you need a Flow or its just the way to achieve the read?
j
it will perform slightly better when the number of files you're reading concurrently is small. I'm reinventing the wheel here and building a framework from scratch that's as far as possible asynchronous all the way from the web server down to the database drivers for academic purposes. So in this scenario, AsynchronousFileChannel seems to be a good choice seeing as it's actually async. There's also an AsynchronousFileChannel::read that returns a future, not sure if that's a better way of implementing this over a CompletionHandler, it does seem simpler, but then you're doing while (!doneReading) { val future = channel.read() } and I'm assuming that while loop is blocking
j
runBlocking(<http://Dispatchers.IO|Dispatchers.IO>)
is a pretty bad idea if I may, because you'll be not only blocking the current thread but also one thread from the IO dispatcher. Why do you even need to run this
close()
in a coroutine? The completion handler will be called a the thread pool dedicated for IO already, so I don't think there is any point in switching threads there.
Also, what's the purpose of the
FileBaton
class? It doesn't seem to bring any value as opposed to just using local variables, but it adds extra allocations and indirections. Is it to materialize the capture of the
CompletionHandler
?
but then you're doing while (!doneReading) { val future = channel.read() } and I'm assuming that while loop is blocking
The while loop is not really blocking. You will likely use `kotlinx-coroutines-jdk8`'s
CompletionStage.await()
to just suspend for the value returned by the future, and send it to the callback flow, so there will be suspension points inside the loop which means it won't block the current thread. EDIT: sorry I was wrong here,
read
returns a
Future
not a
CompletableFuture
, so there is no way to await on this one directly. However you can wrap the callback-based API into a suspending function using
suspendCoroutine
Another point, if ultimately you use a
Path
I think it would be better to let users provide the
Path
directly. You can add helpers for passing strings if you want, but if some calling code already has a
Path
, it feels wrong to convert it to string and then back to a path again.
Here is an example on how to convert the callback-based version into a suspending function:
Copy code
private suspend fun AsynchronousFileChannel.readSuspending(buffer: ByteBuffer, position: Long) =
    suspendCoroutine<Int>{ continuation ->
        read(buffer, position, null, object : CompletionHandler<Int, Nothing?> {
            override fun completed(result: Int, attachment: Nothing?) {
                continuation.resume(result)
            }
            override fun failed(exc: Throwable, attachment: Nothing?) {
                continuation.resumeWithException(exc)
            }
        })
    }
🔥 1
K 1
With this helper in hand, I would write the flow the following way:
Copy code
fun Path.readBytesAsFlow(bufferSizeBytes: Int, startPosition: Long = 0): Flow<ByteArray> = flow {
    AsynchronousFileChannel.open(this@readBytesAsFlow, StandardOpenOption.READ).use { fileChannel ->

        val buffer = ByteBuffer.allocate(bufferSizeBytes)
        var positionInFile = startPosition

        // emit() calls are cancellable by default, the loops exits at the end of the file otherwise
        while (true) {
            val nRead = fileChannel.readSuspending(buffer, positionInFile)
            if (nRead <= 0) {
                break
            }
            emit(buffer.array().copyOf(nRead))
            buffer.rewind()
            positionInFile += nRead
        }
    }
}.flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
There is no need for
channelFlow
or
callbackFlow
anymore, since we're using suspending functions now and don't start anything concurrent in the flow's body.
🔥 1
K 1
j
wow, that's beautiful 😮 the suspendCoroutine looks like a great solution to wrap these callback APIs and the extension function is just brilliant! I'm going to go and try out similar things for AsynchronousServerSocketChannel.readBytesAsFlow and writesBytes Thanks a lot for the in depth reply and example Joffrey!
The Baton was meant as an Attachment that can be passed along between reads, but your solution only requires an Int to be passed along which can then be send around instead of the Baton. close is a blocking operation, at least it complains about it when running it inside a suspending function, but since you're using flowOn(Dispatchers.IO), it's already on the IO threadpool and the channel.open.use { } takes care of closing the channel automatically. You're also sending null into Nothing? instead of what I was doing with Unit. I was under the impression that runBlocking would only block that thread for a very short moment, but I'll remember for next time that if I feel the need to use runBlocking, then I'm probably doing something wrong, except for the main method Kotlin Poetry if there was such a thing, I'm learning a lot here, thank you!
j
Happy to help! Yes,
suspendCoroutine
is meant exactly for this purpose. Note that if the callback-based API you're wrapping supports cancellation, you should use
suspendCancellableCoroutine
instead. But it wasn't the case here AFAICT.
close is a blocking operation, at least it complains about it when running it inside a suspending function
Yeah, I think the inspection reports it as soon as the Java function throws
IOException
, even if it's not really blocking per se. In that case, it might be useful, but the code you had written was already being run on the thread pool of the file channel internals I believe, which means there is no need to dispatch it on
<http://Dispatchers.IO|Dispatchers.IO>
(also in the callback you were technically not in a suspend function, so if you saw the inspection warning, it must have been a bug)
114 Views