https://kotlinlang.org logo
Title
a

Axel

04/26/2023, 2:05 PM
I have this class in spring. If the block within
launch {}
throws an unexpected exception, the
populate()
method is never invoked again, even though its
@Scheduled
to recurr. Why is that?
@Component
class PrePopulatedStoreCache(
    private val contentManagerClient: ContentManagerClient,
): CoroutineScope, DisposableBean {
    private val cache:
            PrePopulatedCache<StoreByMerchantIdCacheKey, MinimalStore> =
        CaffeineBackedPrePopulatedCache.create(CacheConfiguration("all-stores"))

    private val job = Job()
    override val coroutineContext: CoroutineContext
        get() = job + Dispatchers.Default

    override fun destroy() = job.cancel()

    @Scheduled(fixedRate = 20, timeUnit = TimeUnit.SECONDS)
    private fun populate() {
        launch {
            <http://log.info|log.info>("Pre populating Store cache")
            contentManagerClient.getAllStoresV3()
                .onLeft { log.error("Unable to prepopulate Store Cache: $it") }
                .onRight { stores ->
                    cache.invalidateAll()
                    stores.forEach { cache.put(StoreByMerchantIdCacheKey(it.market, it.merchantExternalId), it) }
                    <http://log.info|log.info>("Populated cache with ${stores.size} stores in thread ${Thread.currentThread().name}")
                }
        }
    }

    suspend fun get(market: Market, merchantId: MerchantExternalId) =
        cache.get(StoreByMerchantIdCacheKey(market, merchantId))
}

data class StoreByMerchantIdCacheKey(val market: Market, val merchantExternalId: MerchantExternalId)
t

thanksforallthefish

04/27/2023, 6:35 AM
It’s a weird behavior in spring, now I cannot find any docs but tl;dr if a scheduled job fails and you don’t provide an error handler, spring will suppress further execution. to solve, we added another bean
@Bean
  fun errorHandlerTaskSchedulerCustomizer() = TaskSchedulerCustomizer {
    it.setErrorHandler { ex -> logger.warn("Error while executing task. ", ex) }
  }
a

Axel

04/27/2023, 6:40 AM
how do you use this
TaskSchedulerCustomizer
?
t

thanksforallthefish

04/27/2023, 6:44 AM
it should be automatic, if you use Spring Boot, as it will be auto-configured.
@Configuration
@EnableScheduling
class SchedulingConfig {
  @Bean
  fun errorHandlerTaskSchedulerCustomizer() = TaskSchedulerCustomizer {
    it.setErrorHandler { ex -> logger.warn("Error while executing task. ", ex) }
  }
}
a

Axel

04/27/2023, 6:45 AM
aha I understand, allright thank you!
t

thanksforallthefish

04/27/2023, 6:45 AM
wait to see if it works 😄 but welcome anyway
s

Szymon Jeziorski

04/27/2023, 7:46 AM
Your issue is actually more coroutine related than spring related. Since you're using
Job
as parent coroutine, its children follow standard structured concurrency rules. This means that when direct children of the coroutine throws unhandled exception this children is being cancelled and cancellation is also propagated to the parent. In your scenario first unhandled exception within coroutine created by
launch
in
populate
would cause parent's job cancellation and therefore cancellation of the whole scope. I'm pretty sure
populate
method is actually being invoked by spring after first failure on each 20 seconds, but
launch
block returns immediately with
JobCancellationException
since its parent was cancelled, so `launch`'s body is not executed at all To fix this, you can use
SupervisorJob
as parent job instead of normal
Job
, as failures of children would not cancel the supervisor job itself. Also as a side note, it may be just me, but seeing the bean class extend
CoroutineScope
is kind of weird, you kind of have to inspect the class itself to see how
launch
within
populate
works without any
scope
as explicit receiver. In my opinion, declaring
scope
as a field either in this class or somewhere else, and then using it as
scope.launch { }
instead of implicit
launch
would improve on readability and overall make more sense.
PrePopulatedStoreCache
is not logically a coroutine scope itself, it just uses one under the hood. Imagining the class is injected anywhere, invocations such as
prePopulatedStoreCache.async { }
would be technically possible, which firstly looks confusing and unintuitive and secondly can be considered as leaking internals.
a

Axel

04/27/2023, 8:18 AM
Yes! That works much better
@Component
class PrePopulatedStoreCache(
    private val contentManagerClient: ContentManagerClient,
): DisposableBean {
    private val cache:
            PrePopulatedCache<StoreByMerchantIdCacheKey, MinimalStore> =
        CaffeineBackedPrePopulatedCache.create(CacheConfiguration("all-stores"))

    private val job = SupervisorJob()
    private val scope = CoroutineScope(job + Dispatchers.Default)
    override fun destroy() = job.cancel()
@Scheduled(fixedRate = 12, timeUnit = TimeUnit.SECONDS)
    private fun populate() {
        scope.launch {
            <http://log.info|log.info>("Pre populating Store cache")
            contentManagerClient.getAllStoresV3()
                .onLeft { log.error("Unable to prepopulate Store Cache: $it") }
                .onRight { stores ->
                    cache.invalidateAll()
                    stores.forEach { cache.put(StoreByMerchantIdCacheKey(it.market, it.merchantExternalId), it) }
                    <http://log.info|log.info>("Populated cache with ${stores.size} stores")
                }
        }
    }

    suspend fun get(market: Market, merchantId: MerchantExternalId) =
        cache.get(StoreByMerchantIdCacheKey(market, merchantId))
}

data class StoreByMerchantIdCacheKey(val market: Market, val merchantExternalId: MerchantExternalId)
(The scheduled 12 seconds are just for testing this method)