Hi, my use case is as follows: I have a table in p...
# coroutines
c
Hi, my use case is as follows: I have a table in postgres which contians events. I need to get batches of these events and process them in the application, processing is transactional. There is a draft implementation using scheduled thread pools, but it's quite convoluted, hard to follow and is plagued by race conditions and inconsistency. My task is to reimplement it. I wonder if there are any good methods of doing it with coroutines lib? Basically there is a 40 thread pool, each thread can process one event at a time. And there is a JPA repository that should be used in the producer (in reactive streams terminology) of those events. I need some kind of back-pressure setup, where the 40 thread pool is maximally utilized, but never overwhelmed. What are my options with coroutines? I looked at actors, but the relevant channel API seems to be obsoleted, and
consumeEach
assumes that the actor is temporary and will be closed after each message is processed, while in my case, the actor should probably persist, even if at the moment there are no new events. Another problem with actor is, that by making it unbounded I can run into OOM if there is a spike in events count, this is where I need backpressure and cannot just read events from the repository with constant rate.
t
Flow
👍 1
c
Hm... when you put it like that... Thanks 🙂
t
sorry for being very undetailed, but with Flow backpressure is more or less a standard feature, I’d play with it a bit, then maybe you’ll have more detailed questions for your design.
d
If the processing is not cpu intensive, use async libs (Completable Future or Publisher or Flowable can be converted to a suspend fun or Flow using integrations provided in coroutines lib), for the ones that take longer (like db requests to graft extra data can use jasyncsql lib, that has suspend fun for requests). For blocking stuff, you need withContext to turn them into suspend fun, and then recieve the events as a Flow and do all the processing with those wrappers in some of its operators. Like that Flow handles backpressure out of the box... but there's of course more to it like parallel processing with flatMapMerge for example... but that's the general direction.
c
I've ended up leaving it as is, just fixed the issues (removed half of the code while effectively keeping the approach the same)
👍 2