class SuspendingBatchingPublisherImpl<T, R>(
private val rabbitTemplate: RabbitTemplate,
private val eventBatchingProperties: EventBatchingProperties,
private val batchFunction: java.util.function.Function<List<T>, R>,
private val batchLength: java.util.function.Function<R, Int>
) : BatchingEventPublisher<T>, CoroutineScope {
companion object {
private val LOG = FluentLoggerFactory.getLogger(SuspendingBatchingPublisherImpl::class.java)
}
// Metrics
private val eventSentMeter = Metrics.counter(eventSentName)
private val eventErrorMeter = Metrics.counter(eventErrorsName)
private val eventQueueMeter = Metrics.counter(eventQueueName)
// Fields
private val executorService = Executors.newFixedThreadPool(4)
private val maxSize: Int = eventBatchingProperties.bufferSize
private val maxTime: Long = TimeUnit.SECONDS.toMillis(eventBatchingProperties.bufferSeconds.toLong())
private val channel = Channel<T>(eventBatchingProperties.overflowBufferSize)
private val ticker = ticker(maxTime)
private val job = Job()
override val coroutineContext: CoroutineContext
get() = executorService.asCoroutineDispatcher() + job
private val flow: Flow<List<T>> = flow {
val buffer = mutableListOf<T>()
suspend fun emitBuffer() {
if (buffer.isNotEmpty()) {
emit(buffer.toList())
buffer.clear()
}
}
try {
whileSelect {
channel.onReceiveOrNull { value ->
if (value != null) buffer.add(value)
if (value == null || buffer.size >= maxSize) emitBuffer()
value != null
}
ticker.onReceive {
emitBuffer()
true
}
}
} finally {
eventErrorMeter.increment()
LOG.warn().log("Canceling SuspendingBatchingPublisherImpl job")
job.cancel()
}
}
@PostConstruct
override fun initialize() {
launch {
flow.map { items ->
batchFunction.apply(items)
}.collect { eventList ->
val size = batchLength.apply(eventList)
try {
// Set optional message priority
if (eventBatchingProperties.priority > 0) {
rabbitTemplate.convertAndSend(eventList) { msg ->
msg.messageProperties.priority = eventBatchingProperties.priority
msg
}
} else {
rabbitTemplate.convertAndSend(eventList)
}
LOG.debug().log("Published list of {} event messages to Rabbit on exchange {}",
size, rabbitTemplate.exchange)
eventSentMeter.increment(size.toDouble())
} catch (e: AmqpException) {
LOG.error().log("Failed to publish list of {} events to Rabbit: {}", size, e.message)
eventErrorMeter.increment()
}
}
}
}
override fun publishEvent(event: T) {
launch {
LOG.trace().log("Publishing {}", event)
channel.send(event)
eventQueueMeter.increment()
}
}
override fun flush() {
}
override fun getOverflowSize(): Int {
return eventBatchingProperties.overflowBufferSize
}
override fun getBufferSize(): Int {
return bufferSize
}
override fun getBufferSeconds(): Int {
return eventBatchingProperties.bufferSize
}
@PreDestroy
fun complete() {
channel.cancel()
ticker.cancel()
job.complete()
}
}