JoeHegarty
08/21/2018, 9:26 PMelizarov
08/22/2018, 7:07 AMgroostav
08/22/2018, 9:31 PMkotlinx.coroutines.channels.*
expose a bunch of backpressure controls which all default to pay it forward. The rendezvous channel for example will suspend `send`ers (producers) after the first one of too many elements has been produced.
I don't know enough about reactor to know how you send things to it, but, assuming it exposes some kind of non-blocking callback-based API to let you know when its accepted a message, you could write your own adapter:
val inputReactorPipelineObject: Reactor.PipelineEntrance<T> = //...
val inputChannel = actor<T> {
consumeEach { message: T ->
suspendCoroutineOrReturn<Unit> { continuation ->
inputReactorPipelineObject.offer(message, onMessageProcessingStarted = { continuation.resume(Unit) })
COROUTINE_SUSPENDED;
}
}
}
groostav
08/22/2018, 9:34 PMoffer
method that blocks until the message processing starts, then it presumably has an executor it lets you configure, if thats the case...
val reactorExecutor: Executor = //...
val inpuReactorPipelineObject: Reactor.PipelineEntrance<T> = ...
val inputChannel: SendChannel<T> = actor<T>(context = reactorExecutor.asCoroutineDispatcher()) {
consumeEach { message ->
//call the blocking API and block, but since we're on the executor used by the message, its as good as we're gonna get.
inpuReactorPipelineObject.offer(message)
}
}
groostav
08/22/2018, 9:35 PMoffer
throws some kind of exception, that exception will get thrown at the caller of inputChannel.send()
groostav
08/22/2018, 9:35 PMClosedChannelException
... I cant remember...JoeHegarty
08/22/2018, 11:11 PM