Hi, my use case is as follows: I have a table in postgres which contians events. I need to get batches of these events and process them in the application, processing is transactional.
There is a draft implementation using scheduled thread pools, but it's quite convoluted, hard to follow and is plagued by race conditions and inconsistency. My task is to reimplement it.
I wonder if there are any good methods of doing it with coroutines lib?
Basically there is a 40 thread pool, each thread can process one event at a time. And there is a JPA repository that should be used in the producer (in reactive streams terminology) of those events. I need some kind of back-pressure setup, where the 40 thread pool is maximally utilized, but never overwhelmed. What are my options with coroutines?
I looked at actors, but the relevant channel API seems to be obsoleted, and
consumeEach
assumes that the actor is temporary and will be closed after each message is processed, while in my case, the actor should probably persist, even if at the moment there are no new events.
Another problem with actor is, that by making it unbounded I can run into OOM if there is a spike in events count, this is where I need backpressure and cannot just read events from the repository with constant rate.