Hi, can I achieve this with arrow-fx, this article...
# arrow
g
Hi, can I achieve this with arrow-fx, this article got me curious: https://arrow-kt.io/docs/apidocs/arrow-fx-coroutines/arrow.fx.coroutines/-schedule/ Is there an example, that uses persistence to suspend and resume a thread/job?
s
Hey @Gopal S Akshintala,
Is there an example, that uses persistence to suspend and resume a thread/job?
Do you mean like a persisted cronjob using Scheduler for Kotlin? Something like that doesn't exist, but it should be possible to build something like this.
g
You got it exactly right @simon.vergauwen, we have high volume of data (300k) to be batch-processed to generate Invoices (it’s a Billing application) in multiple-steps. We use Spring-Batch, but it feels like a force fit for our use-case. So hunting for better solutions and this article in arrow-kt caught my eye. We schedule it with a cron which uses mq to fire this job instance. If Continuation can be serialized and stored in db to start, restart, retry and resume the job, this solves my use-case.
s
If Continuation can be serialized and stored in db to start, restart, retry and resume the job, this solves my use-case.
This should be do-able, but given this issue I doubt it's possible yet atm 😞 https://github.com/Kotlin/kotlinx.coroutines/issues/76
s
Hey. This is interesting thread. And at the same time I feel a bit lot, surely because either I don't understand the content here or my lack of knowledge. Can you elaborate what do you mean by serializing the continuation, so that it can be stored in db to start, restart etc job, please ?
@Gopal S Akshintala @simon.vergauwen
s
Hey @Satyam Agarwal, Yes, for example lets say you have some service that keeps an JVM instance alive (K8S or OpenShift) you could be running inside the code below. But let's say you've been waiting for 6 days, and the instances dies and restarts it will attempt to wait for another 7 days. If that happens every 6th day, the actual crawling might never happen. So instead you want to "serialise" the current state, and restart it when the JVM starts again.
Copy code
fun main(): Unit = runBlocking(Dispatchers.Default) {
  Schedule.spaced(7.days).repeat {
     crawlWebsitesAndStore()
  }
}
s
So instead you want to “serialise” the current state, and restart it when the JVM starts again.
By serialising the current state, we actually mean to write/persist the current state of the job somewhere when the instance dies so that it can be picked up next time new/old instance starts ?
s
Yes, exactly. And the "state" refers to the inner state of the
Continuation
/ Coroutine implementations that are found in the Kotlin Std, and KotlinX Coroutines. I.e. if it can correctly serialise the state of
suspend fun delay(x: Long)
it should even be able to keep track of how long it slept, and continue from there.
s
hmm. I understand. But isn’t this a bit too much work, that doesn’t provide so much certainty ? I usually run my scheduled jobs in a kind of stateless fashion. I persist actions and last updated until in the db. so that scheduled job can get killed anytime, and and run 100 times because application is starting up 100 times in a row, but job should ensure, to ignore, retry, or process based on what is there in db.
in most cases how long a job has run doesn’t make sense in my experience. I just fire a job, and it runs until operations are finished, and then job kills itself.
g
@Satyam Agarwal do you have an example you can share pls
s
It has locking mechanism too, so that I can have mutiple replicas, and still ensure that my job runs on just one.
You can also use kubernetes cron job feature, to avoid all this.
However, with this library, you can describe job names, initial delay and scheduling.
Then I just call this function on application startup :
Copy code
suspend fun scheduleRequestCacheEviction() {
    val jobToExecute: Runnable = Runnable {
        shedLockExecutor.executeWithLock(
            Runnable { runBlocking { myService.evictFromRequestCache() } },
            LockConfiguration(Instant.now(), "RequestCacheEviction", durationLockMax, durationLockMin)
        )
    }

    scheduledExecutorService
        .scheduleAtFixedRate(jobToExecute, jobDelayBeforeFirstRunInSeconds(), Duration.ofHours(6).seconds, SECONDS)
}
Copy code
val cacheEvictionJob: CacheEvictionJob = object : CacheEvictionJob {
    override val myService: MyService = myService
    override val coroutineContext: CoroutineContext = IO
    override val scheduledExecutorService: ScheduledExecutorService = Executors.newSingleThreadScheduledExecutor()
    override val shedLockExecutor: LockingTaskExecutor = DefaultLockingTaskExecutor(lockProvider())
}
and I call this on app startup :
Copy code
cacheEvictionJob.scheduleRequestCacheEviction()
CacheEvictionJob
extends
CoroutineScope
Since its running on dedicated thread, I don’t need to worry about its startup or completion. This thread will never be killed, and schedlock will ensure to wake up the jobs on this thread.
And this library does support vanilla implementation too ! As I hate Spring 😄
Also, Ignore my vanilla DI approach 😛 Its way too old, and everyone hate their old code.
g
Thanks alot, @Satyam Agarwal do u hv any public repo I can clone and run locally, to see this in action
s
My public contributions are next to nothing. Its really hard to make time to write code after my job hours. Sorry 😅
I can try to set something up in the weekend, but I can’t promise you.
s
Hey @Satyam Agarwal, That is certainly a super valid solution! What we were discussing solves the same issue, in slightly different way. There are many solutions for this. I am personally also using K8S cronjob.
s
Totally! What intrigued me, was serializing the continuation. So I thought Continuation implements Serializable from Java, and then persisting the whole object somewhere to use it later. I wasn't able to comprehend it. It got me so excited 😅
g
Its really hard to make time to write code after my job hours
Thanks @Satyam Agarwal I can totally relate 😄 and so I feel so grateful to OSS authors
I can try to set something up in the weekend
A simple prototype can help me get started quickly!
s
So I thought Continuation implements Serializable from Java, and then persisting the whole object somewhere touse it later.
I wasn't able to comprehend it. It got me so excited 😅
That is actually exactly what it does, and the goal of the linked ticket. It allows for building some extremely powerful things. I remember watching a talk about this on the very first KotlinConf but I cannot seem to recall which talk it was and cannot spot it on YouTube.
s
Damn! That would be interesting to read/see video of