I've implemented a class that extends CoroutineSc...
# coroutines
l
I've implemented a class that extends CoroutineScope and does batching with flows like timeoutbuffer in reactor and not sure if I am doing the error detection correctly. It is working but can someone please look at it provide feedback and or just their opinion on what is right and wrong about it? Actually I'm not sure how to handle errors or restart the flows. Advice is appreciated, code in the thread...
class SuspendingBatchingPublisherImpl<T, R>( private val rabbitTemplate: RabbitTemplate, private val eventBatchingProperties: EventBatchingProperties, private val batchFunction: java.util.function.Function<List<T>, R>, private val batchLength: java.util.function.Function<R, Int> ) : BatchingEventPublisher<T>, CoroutineScope { companion object { private val LOG = FluentLoggerFactory.getLogger(SuspendingBatchingPublisherImpl::class.java) } // Metrics private val eventSentMeter = Metrics.counter(eventSentName) private val eventErrorMeter = Metrics.counter(eventErrorsName) private val eventQueueMeter = Metrics.counter(eventQueueName) // Fields private val executorService = Executors.newFixedThreadPool(4) private val maxSize: Int = eventBatchingProperties.bufferSize private val maxTime: Long = TimeUnit.SECONDS.toMillis(eventBatchingProperties.bufferSeconds.toLong()) private val channel = Channel<T>(eventBatchingProperties.overflowBufferSize) private val ticker = ticker(maxTime) private val job = Job() override val coroutineContext: CoroutineContext get() = executorService.asCoroutineDispatcher() + job private val flow: Flow<List<T>> = flow { val buffer = mutableListOf<T>() suspend fun emitBuffer() { if (buffer.isNotEmpty()) { emit(buffer.toList()) buffer.clear() } } try { whileSelect { channel.onReceiveOrNull { value -> if (value != null) buffer.add(value) if (value == null || buffer.size >= maxSize) emitBuffer() value != null } ticker.onReceive { emitBuffer() true } } } finally { eventErrorMeter.increment() LOG.warn().log("Canceling SuspendingBatchingPublisherImpl job") job.cancel() } } @PostConstruct override fun initialize() { launch { flow.map { items -> batchFunction.apply(items) }.collect { eventList -> val size = batchLength.apply(eventList) try { // Set optional message priority if (eventBatchingProperties.priority > 0) { rabbitTemplate.convertAndSend(eventList) { msg -> msg.messageProperties.priority = eventBatchingProperties.priority msg } } else { rabbitTemplate.convertAndSend(eventList) } LOG.debug().log("Published list of {} event messages to Rabbit on exchange {}", size, rabbitTemplate.exchange) eventSentMeter.increment(size.toDouble()) } catch (e: AmqpException) { LOG.error().log("Failed to publish list of {} events to Rabbit: {}", size, e.message) eventErrorMeter.increment() } } } } override fun publishEvent(event: T) { launch { LOG.trace().log("Publishing {}", event) channel.send(event) eventQueueMeter.increment() } } override fun flush() { } override fun getOverflowSize(): Int { return eventBatchingProperties.overflowBufferSize } override fun getBufferSize(): Int { return bufferSize } override fun getBufferSeconds(): Int { return eventBatchingProperties.bufferSize } @PreDestroy fun complete() { channel.cancel() ticker.cancel() job.complete() } }