Axel
04/26/2023, 2:05 PMlaunch {}
throws an unexpected exception, the populate()
method is never invoked again, even though its @Scheduled
to recurr. Why is that?
@Component
class PrePopulatedStoreCache(
private val contentManagerClient: ContentManagerClient,
): CoroutineScope, DisposableBean {
private val cache:
PrePopulatedCache<StoreByMerchantIdCacheKey, MinimalStore> =
CaffeineBackedPrePopulatedCache.create(CacheConfiguration("all-stores"))
private val job = Job()
override val coroutineContext: CoroutineContext
get() = job + Dispatchers.Default
override fun destroy() = job.cancel()
@Scheduled(fixedRate = 20, timeUnit = TimeUnit.SECONDS)
private fun populate() {
launch {
<http://log.info|log.info>("Pre populating Store cache")
contentManagerClient.getAllStoresV3()
.onLeft { log.error("Unable to prepopulate Store Cache: $it") }
.onRight { stores ->
cache.invalidateAll()
stores.forEach { cache.put(StoreByMerchantIdCacheKey(it.market, it.merchantExternalId), it) }
<http://log.info|log.info>("Populated cache with ${stores.size} stores in thread ${Thread.currentThread().name}")
}
}
}
suspend fun get(market: Market, merchantId: MerchantExternalId) =
cache.get(StoreByMerchantIdCacheKey(market, merchantId))
}
data class StoreByMerchantIdCacheKey(val market: Market, val merchantExternalId: MerchantExternalId)
thanksforallthefish
04/27/2023, 6:35 AM@Bean
fun errorHandlerTaskSchedulerCustomizer() = TaskSchedulerCustomizer {
it.setErrorHandler { ex -> logger.warn("Error while executing task. ", ex) }
}
Axel
04/27/2023, 6:40 AMTaskSchedulerCustomizer
?thanksforallthefish
04/27/2023, 6:44 AM@Configuration
@EnableScheduling
class SchedulingConfig {
@Bean
fun errorHandlerTaskSchedulerCustomizer() = TaskSchedulerCustomizer {
it.setErrorHandler { ex -> logger.warn("Error while executing task. ", ex) }
}
}
Axel
04/27/2023, 6:45 AMthanksforallthefish
04/27/2023, 6:45 AMSzymon Jeziorski
04/27/2023, 7:46 AMJob
as parent coroutine, its children follow standard structured concurrency rules. This means that when direct children of the coroutine throws unhandled exception this children is being cancelled and cancellation is also propagated to the parent.
In your scenario first unhandled exception within coroutine created by launch
in populate
would cause parent's job cancellation and therefore cancellation of the whole scope.
I'm pretty sure populate
method is actually being invoked by spring after first failure on each 20 seconds, but launch
block returns immediately with JobCancellationException
since its parent was cancelled, so `launch`'s body is not executed at all
To fix this, you can use SupervisorJob
as parent job instead of normal Job
, as failures of children would not cancel the supervisor job itself.
Also as a side note, it may be just me, but seeing the bean class extend CoroutineScope
is kind of weird, you kind of have to inspect the class itself to see how launch
within populate
works without any scope
as explicit receiver.
In my opinion, declaring scope
as a field either in this class or somewhere else, and then using it as scope.launch { }
instead of implicit launch
would improve on readability and overall make more sense. PrePopulatedStoreCache
is not logically a coroutine scope itself, it just uses one under the hood. Imagining the class is injected anywhere, invocations such as prePopulatedStoreCache.async { }
would be technically possible, which firstly looks confusing and unintuitive and secondly can be considered as leaking internals.Axel
04/27/2023, 8:18 AM@Component
class PrePopulatedStoreCache(
private val contentManagerClient: ContentManagerClient,
): DisposableBean {
private val cache:
PrePopulatedCache<StoreByMerchantIdCacheKey, MinimalStore> =
CaffeineBackedPrePopulatedCache.create(CacheConfiguration("all-stores"))
private val job = SupervisorJob()
private val scope = CoroutineScope(job + Dispatchers.Default)
override fun destroy() = job.cancel()
@Scheduled(fixedRate = 12, timeUnit = TimeUnit.SECONDS)
private fun populate() {
scope.launch {
<http://log.info|log.info>("Pre populating Store cache")
contentManagerClient.getAllStoresV3()
.onLeft { log.error("Unable to prepopulate Store Cache: $it") }
.onRight { stores ->
cache.invalidateAll()
stores.forEach { cache.put(StoreByMerchantIdCacheKey(it.market, it.merchantExternalId), it) }
<http://log.info|log.info>("Populated cache with ${stores.size} stores")
}
}
}
suspend fun get(market: Market, merchantId: MerchantExternalId) =
cache.get(StoreByMerchantIdCacheKey(market, merchantId))
}
data class StoreByMerchantIdCacheKey(val market: Market, val merchantExternalId: MerchantExternalId)
(The scheduled 12 seconds are just for testing this method)