fun processFluxDataBuffer(dataBufferFlux: Flux<DataBuffer>): Mono<ByteArray> {
return Mono.create { sink: MonoSink<ByteArray> ->
val byteArrayOutputStream = ByteArrayOutputStream()
dataBufferFlux.subscribeOn(Schedulers.boundedElastic())
.subscribe(
{ dataBuffer: DataBuffer ->
val bytes = ByteArray(dataBuffer.readableByteCount())
dataBuffer.read(bytes)
byteArrayOutputStream.write(bytes)
},
{ error: Throwable ->
sink.error(error)
},
{
val byteArray = byteArrayOutputStream.toByteArray()
sink.success(byteArray)
}
)
}
}. is there any better way to do it in kotlin co-routine