https://kotlinlang.org logo
#coroutines
Title
# coroutines
y

Yan Pujante

05/10/2021, 1:19 PM
I am a bit new to coroutines/flow and wondering if this is a good design or not. I have a server that can receive long running job to process. Each job must be run sequentially. So I have something like this:
Copy code
class JobsMgr {
    private val _jobsToProcess = Channel<Job>(Channel.UNLIMITED)
    private val _jobsProcessed: Flow<Job> = _jobsToProcess.consumeAsFlow().map { job -> /* processing */ }.flowOn(Dispatchers.Default)

  init {
       // I find this very ugly.. is there a better way?
        Executors.newSingleThreadExecutor().execute {
            runBlocking {
                _jobsProcessed.collect {
                    delay(10 * 1000)
                    _lock.withLock { _jobs.remove(it.jobId) }
                }
            }
        }
  }

fun enqueueJob(...) { 
//...
               runBlocking {
                _jobsToProcess.send(job)
            }
}

}
It is running in a spring boot application so I do not have control over the main loop. This is why I created a process to collect and remove the completed jobs. Is this how Channel/Flow is supposed to be used? And is there a better way to "start" the collection. That seems like ugly code to me but not sure how to do better.
m

marstran

05/10/2021, 4:37 PM
I would take a look at the
actor
coroutine builder. An actor is guaranteed to process its messages sequentially.
By the way, do you really need the
_jobs.remove(it.jobId)
stuff? Where is
_jobs
defined?
I think an implementation with
actor
could look something like this:
Copy code
class JobsMgr(val scope: CoroutineScope) {

  private val jobActor = scope.actor(Dispatchers.Default, Channel.UNLIMITED) {
    for (job in channel) {
      // Processing
    }
  }

  fun enqueueJob(job: Job) {
    jobActor.offer(job)
  }

}
u

uli

05/10/2021, 7:52 PM
i’d say actor does not address the underlying issue of OP. It seems to be more how to switch from non-coroutine world to coroutine world.
In init you can just launch a coroutine instead of blocking your own thread
Copy code
init {
       someScope.launch {... }
}
Where some scope is a scope you define for later teardown. See
structured concurrency
`offer`does not sound like a good idea here, as it will lose jobs on back pressure Well, I guess
offer
is fine, as you have unlimited buffering
And what was the intention of
delay(10 * 1000)
?
y

Yan Pujante

05/10/2021, 8:10 PM
Thank you for the answers. The
delay
/
remove
part is because the job (once completed) is stored in a map so you can still access it and get details of a recent job that was run. After a while (10s is for testing purposes...) the job is deleted from the map to avoid filling up the memory
How do you create
someScope
?
u

uli

05/10/2021, 8:14 PM
CoroutineScope(job + dispatcher)
where job is
Job()
or
SupervisorJob()
depending on expected behavior. And dispatcher is most likely one of
<http://Dispatchers.IO|Dispatchers.IO>
and
Dispatchers.Default
Please google
kotlin strctured concurrency
. Do not go any further until you understand this topic.
👍 1
y

Yan Pujante

05/10/2021, 8:20 PM
Awesome thanks!
3 Views