https://kotlinlang.org logo
#coroutines
Title
# coroutines
j

JoeHegarty

08/21/2018, 9:26 PM
Kind of an open ended question but how are folks handling backpressure in relation to coroutines? Say I have an existing pipeline in project reactor, messages are pumped in one side, queued, fed into a ParallelFlux and then transformed and ultimately consumed by the app. The queue and the "rails" in the pflux effectively give you back pressure so only X messages are in flight in the pipeline concurrently and as one message on a rail finishes it picks up the next from the queue. That seems like it'd be pretty hard to model as easily with coroutines. Am I missing something?
e

elizarov

08/22/2018, 7:07 AM
It is “given” with coroutines “automagically”. You don’t even have to think about backpressure with coroutines. All coroutines communication primitives (channels) automatically suspend producers when consumer don’t keep up.
g

groostav

08/22/2018, 9:31 PM
I dont think that's true,
kotlinx.coroutines.channels.*
expose a bunch of backpressure controls which all default to pay it forward. The rendezvous channel for example will suspend `send`ers (producers) after the first one of too many elements has been produced. I don't know enough about reactor to know how you send things to it, but, assuming it exposes some kind of non-blocking callback-based API to let you know when its accepted a message, you could write your own adapter:
Copy code
val inputReactorPipelineObject: Reactor.PipelineEntrance<T> = //...

val inputChannel = actor<T> { 
  consumeEach { message: T -> 
    suspendCoroutineOrReturn<Unit> { continuation -> 
      inputReactorPipelineObject.offer(message, onMessageProcessingStarted = { continuation.resume(Unit) })
      COROUTINE_SUSPENDED;
    }
  }
}
if not, and it simply has an
offer
method that blocks until the message processing starts, then it presumably has an executor it lets you configure, if thats the case...
Copy code
val reactorExecutor: Executor = //...
val inpuReactorPipelineObject: Reactor.PipelineEntrance<T> = ...

val inputChannel: SendChannel<T> = actor<T>(context = reactorExecutor.asCoroutineDispatcher()) {
  consumeEach { message -> 
    //call the blocking API and block, but since we're on the executor used by the message, its as good as we're gonna get.
    inpuReactorPipelineObject.offer(message)
  } 
}
a key question then becomes one of exception handling, and that's handled by channel failure states, which act as you might expect: if
offer
throws some kind of exception, that exception will get thrown at the caller of
inputChannel.send()
... it might be wraped in a
ClosedChannelException
... I cant remember
...
j

JoeHegarty

08/22/2018, 11:11 PM
Thanks. I'll take a closer look around these areas now I have an idea of where to start and see if I can model what I already have in reactor.
2 Views