fun processFluxDataBuffer(dataBufferFlux: Flux<...
# coroutines
l
fun processFluxDataBuffer(dataBufferFlux: Flux<DataBuffer>): Mono<ByteArray> { return Mono.create { sink: MonoSink<ByteArray> -> val byteArrayOutputStream = ByteArrayOutputStream() dataBufferFlux.subscribeOn(Schedulers.boundedElastic()) .subscribe( { dataBuffer: DataBuffer -> val bytes = ByteArray(dataBuffer.readableByteCount()) dataBuffer.read(bytes) byteArrayOutputStream.write(bytes) }, { error: Throwable -> sink.error(error) }, { val byteArray = byteArrayOutputStream.toByteArray() sink.success(byteArray) } ) } }. is there any better way to do it in kotlin co-routine
d
What's your goal? To concatenate all of the DataBuffer into a single ByteArray?
If so, something like:
Copy code
suspend fun join(flow: Flow<ByteBuffer>): ByteArray {
    val output = ByteArrayOutputStream()
    flow.collect { output.write(it.array()) }
    return output.toByteArray()
}