David Kubecka
02/05/2023, 8:23 PM@Service
class AsyncExecutionManager(val taskExecutor: AsyncTaskExecutor) {
private val jobs = ConcurrentHashMap<String, JobStatus>()
fun <T> startJob(jobId: String, job: () -> T) {
val previousStatus = jobs.put(jobId, JobStatus.RUNNING)
if (previousStatus != JobStatus.RUNNING) {
taskExecutor.submit {
job()
}
}
}
Namely, I want to check that the same job cannot be run twice at the same time. To do that, I mock AsyncTaskExecutor#submit
(from Spring) by starting a thread that just runs the given job
lambda, catches exceptions, and stores them to a list which is then checked in the test. The mock looks as follows:
private val asyncManager = AsyncExecutionManager(
mockk {
val slot = slot<Runnable>()
every { submit(capture(slot)) } answers {
thread {
try {
slot.captured.run()
} catch (e: Exception) {
errors.add(e)
}
}
CompletableFuture.completedFuture(Unit)
}
},
)
And the test itself:
@Test
fun `starting the same task concurrently`() {
// asyncManager.startJob(job1) { error("should not happen") }
asyncManager.startJob(job1) { Thread.sleep(100) }
asyncManager.startJob(job1) { error("should not happen") }
expectThat(asyncManager.getJobStatus(job1)).isEqualTo(JobStatus.RUNNING)
Thread.sleep(100)
expectThat(errors).isEmpty()
}
The test passes on my machine but I'm not happy about two of its aspects:
• The last sleep which must be there to "wait" for the error-throwing threads to catch and handle the exceptions. Without the sleep, if I uncomment the first thread, the test passes even if it shouldn't.
• Using threads instead of coroutines. I wonder if there's a way how to rewrite my test to start a coroutine instead of a thread in the mocked AsyncTaskExecutor#submit
. The problem is that the method doesn't provide a coroutine scope and I don't know how to provide one. I tried making job
suspending and using runBlocking
inside the real submit
lambda but then the jobs in my test don't run concurrently anymore.
So basically there are two questions:
• Can coroutines be used in my setup of mocking AsyncTaskExecutor#submit
?
• Is there a more robust way to test my code's correctness, other than collecting errors and waiting "sufficient" time? This question is probably not Kotlin-specific but I hope it might be related to the first question, or even there might be special Kotlin tooling for these kinds of situations.ephemient
02/06/2023, 4:20 AM@Test
fun test() = runTest {
val dispatcher = coroutineContext[CoroutineDispatcher]!!
val taskExecutor = ConcurrentTaskExecutor(dispatcher.asExecutor())
val asyncManager = AsyncExecutionManager(taskExecutor)
David Kubecka
02/06/2023, 10:08 AMAsyncTaskExecutor#submit
, right?ephemient
02/06/2023, 8:32 PMrunCurrent()
will advance all coroutines to the current time. actually the test dispatcher is single threaded, for consistent behavior across runs, so your jobs should be run anywayDavid Kubecka
02/07/2023, 3:22 PMephemient
02/07/2023, 5:33 PMDavid Kubecka
02/08/2023, 12:15 PMephemient
02/08/2023, 12:58 PMjava.util.concurrent
utilities such as CyclicBarrier
or Phaser
to observe and control what each runnable is doing, instead of relying on thread sleepsDavid Kubecka
02/08/2023, 1:07 PMerror
) and its handler in the mock to finish so that I can check whether there were any errors.