Yan Pujante
05/10/2021, 1:19 PMclass JobsMgr {
private val _jobsToProcess = Channel<Job>(Channel.UNLIMITED)
private val _jobsProcessed: Flow<Job> = _jobsToProcess.consumeAsFlow().map { job -> /* processing */ }.flowOn(Dispatchers.Default)
init {
// I find this very ugly.. is there a better way?
Executors.newSingleThreadExecutor().execute {
runBlocking {
_jobsProcessed.collect {
delay(10 * 1000)
_lock.withLock { _jobs.remove(it.jobId) }
}
}
}
}
fun enqueueJob(...) {
//...
runBlocking {
_jobsToProcess.send(job)
}
}
}
It is running in a spring boot application so I do not have control over the main loop. This is why I created a process to collect and remove the completed jobs. Is this how Channel/Flow is supposed to be used? And is there a better way to "start" the collection. That seems like ugly code to me but not sure how to do better.marstran
05/10/2021, 4:37 PMactor
coroutine builder. An actor is guaranteed to process its messages sequentially._jobs.remove(it.jobId)
stuff? Where is _jobs
defined?actor
could look something like this:
class JobsMgr(val scope: CoroutineScope) {
private val jobActor = scope.actor(Dispatchers.Default, Channel.UNLIMITED) {
for (job in channel) {
// Processing
}
}
fun enqueueJob(job: Job) {
jobActor.offer(job)
}
}
uli
05/10/2021, 7:52 PMinit {
someScope.launch {... }
}
Where some scope is a scope you define for later teardown. See structured concurrency
offer
is fine, as you have unlimited bufferingdelay(10 * 1000)
?Yan Pujante
05/10/2021, 8:10 PMdelay
/ remove
part is because the job (once completed) is stored in a map so you can still access it and get details of a recent job that was run. After a while (10s is for testing purposes...) the job is deleted from the map to avoid filling up the memorysomeScope
?uli
05/10/2021, 8:14 PMCoroutineScope(job + dispatcher)
where job is Job()
or SupervisorJob()
depending on expected behavior. And dispatcher is most likely one of <http://Dispatchers.IO|Dispatchers.IO>
and Dispatchers.Default
kotlin strctured concurrency
. Do not go any further until you understand this topic.Yan Pujante
05/10/2021, 8:20 PM