I am writing a suspend-based wrapper around Caffei...
# coroutines
d
I am writing a suspend-based wrapper around Caffeine's AsyncLoadingCache, which natively understands cache-loading functions that return CompletableFutures. My class has a
get()
method which launches a coroutine using
CoroutineScope.future{}
. That coroutine should not be scoped to the
get()
method's lifetime, because if
get()
is called concurrently with the same key, both
get()
calls will end up waiting on the same CompletableFuture, so cancelling the first coroutine shouldn't cause the second coroutine to fail. So I'm currently passing a
CoroutineScope
into the constructor of the cache and using that to launch the `future`s. However, my current implementation means that if any of the calls inside
future{}
fail, the long-lived CoroutineScope associated with the cache is canceled, which I don't want. What I want basically is a supervisor scope nested inside the passed-in CoroutineScope, created at cache construction time. But
supervisorScope
doesn't seem to be what I want — I can't call that at cache construction time because I don't want to actually wait before returning like a supervisorScope does! Is there some other API I should be using here? I feel like maybe the answer is
SupervisorJob()
but I am still kinda confused about the distinction between Job and CoroutineScope — I can make a
SupervisorJob(coroutineScope.coroutineContext[Job])
but I'm confused about how to extract a CoroutineScope from it that I can use for launching coroutines.
I guess I can do something like
Copy code
SupervisorJob(cs.coroutineContext[Job]).let { job ->
  object : CoroutineScope {
    override val coroutineContext = cs.coroutineContext + job
  }
}
but surely there's a simpler way? Also this doesn't seem to work.
s
Your cache (AsyncLoadingCache) seems to need its own lifecycle, completing its work, even if the caller is cancelled, correct?
That would allow it to report a returned value to any other suspended callers….
d
Yes, which is why I'm passing a CoroutineScope to its constructor, so that when I'm entirely done with the cache I can cancel everything.
But I don't want that scope to get cancelled just because one of the spawned coroutines throws (which is a normal operation, which will get captured and processed inside the future)
s
That seems a good way: Create your cache and tie it to its own scope using SupervisorJob. Let the callers call the cache in their own scope
d
Right, that's what I'm doing, but I'm having trouble getting SupervisorJob to work. I had the cache working correctly except breaking if one of the loading functions threw. I switched to something like the above and now it seems to hang.
Copy code
class InMemAsyncCache<K : Any, V>(
	coroutineScope: CoroutineScope,
	loader: suspend (K) -> V
) {
	class Value<V>(val value: V)

	private val job = SupervisorJob(coroutineScope.coroutineContext[Job])
	private val nestedScope = object : CoroutineScope {
		override val coroutineContext = coroutineScope.coroutineContext + job
	}

	private val under = Caffeine.newBuilder().let { builder ->
		builder.buildAsync<K, Value<V>> { k, _ -> nestedScope.future { Value(loader(k)) } }
	}

	suspend fun get(key: K): V = under.get(key).await().value
}
s
Copy code
class MyAsyncLoadingCache {
    private val scope = CoroutineScope(SupervisorJob() + someDispatcher)
    .... 
    suspend fun getData(key: Key) : Data {
        val data: Data = scope.async { ..... }.await()
        return data
    }
    ...

   fun destroy()  { scope.cancel() }
}
When the caller calls
getData(someKey)
in its own scope, the
getData
will be cancelled, but the code inside
scope.async { .... }
won’t
d
I don't need to pass in a parent to SupervisorJob()?
s
No…
You can, if you want to tie it up to some over-arching lifecycle….
d
You're better off with an explicit
close
method.
Like ktor
HttpClient
.
s
@Dominaezzz Is the
fun destroy
not enough, which can be expanded not only to cancel the scope, but do some additional clean up as well?
d
I mean I do want any outstanding coroutines to be cancelled if the overall CoroutineScope is cancelled. But you're saying that the mechanism I should use for that is "explicit close(), don't pass a CoroutineScope to the constructor, make a new CS without a parent" and not "pass in a CoroutineScope to the constructor and use its cancellation to cancel"?
s
What is the ‘overall’ CoroutineScope?
d
Oops, didn't see that. Yeah, it's enough.
d
I guess I've been operating under the assumption that structured concurrency means "any piece of code that spawns coroutines that outlive a single suspend function needs to take a CoroutineScope as an argument". when I guess more generally it can have lifecycle methods
and that creating an "unrooted" CS like that feels like using GlobalScope... but it's OK if I'm managing lifecycle myself?
s
If you cancel the
MyAsyncLoadingCache.scope
by calling
MyAsyncLoadingCache.destroy()
, any caller that is awaiting the result of
scope.async
will get a
CancelationException
, which will cause the cancelation of the caller’s Job as well (how that is handled exactly, depends on the caller’s CoroutineScope).
You can’t cancel
GlobalScope
(it doesn’t have
Job
)
d
CoroutineScope should only be passed in to coroutine builders, functions that simply spawn coroutines for the caller. This is not the case here.
d
But this is an object that calls a coroutine builder in one of its methods? Is that different enough?
d
It calls a builder internally. Not for the caller.
s
You switch scopes here. From the caller’s scope to the scope of the cache itself. That is done by
scope.async { ....}.await()
.
d
OK, I do get why what the thing I was trying didn't work — if the SupervisorJob ends up in a job hierarchy, then something has to cause it to complete in order for the CoroutineScope it's nested inside to complete, which doesn't happen, which is why my test hangs.
OK. So it sounds like what you're saying is "provide CoroutineScope to code for them to launch coroutines, not as an awkward form of lifecycle management"
s
Yes. Your cache has its own lifecycle, that may be different from the callers’.
d
OK. The rule I had written for our internal "how to think about coroutines" docs was:
No function should create coroutines that outlive the function call’s lifetime, unless explicitly provided with a CoroutineScope defining the created coroutine’s lifetime.
But it sounds like it should just be:
No function should create coroutines that outlive the function call’s lifetime, unless explicitly provided with a CoroutineScope defining the created coroutine’s lifetime, or the function is a method on an object with a
close()
-style method that will cancel all spawned coroutines.
s
Copy code
class InMemAsyncCache<K : Any, V>(
    loader: suspend (K) -> V,
    dispatcher: CoroutineContext,
) {
    class Value<V>(val value: V)

    private val scope = CoroutineScope(SupervisorJob() + dispatcher)
    private val under = Caffeine.newBuilder().let { builder ->
        builder.buildAsync<K, Value<V>> { ... }
    }

    suspend fun get(key: K): V = scope.async { under.get(key) }.await().value

    fun destroy() { scope.cancel() }
}
(I’m not sure what the ‘buildAsync’ with the
.future
call does….)
d
that's just part of the Caffeine API
s
The lambda of the
buildAsync
function is a regular blocking (non-suspending) lambda, correct?
And
under.get(key)
is a regular blocking (non-suspending) function a well?
d
yes, which returns CompletableFuture, and yes
(the
Value
thing is just about dealing w/ the fact that Caffeine doesn't store null values and we want to)
s
Then it seems that your
loader
parameter does not need to be
suspend
and
nestedScope.future
is not necessary either….. It can all be blocking
d
is it reasonable for the declaration to be
dispatcher: CoroutineContext = EmptyCoroutineContext
which I believe means that Dispatchers.Default will get used?
s
I like to provide the dispatcher, for my unit-tests 🙂
d
no, the point is to allow the user of this feature to write a suspend function for its loader, which gets converted into the Future-based API that Caffeine understands
(this is a cache where on cache miss, a loader gets automatically called.)
re dispatcher, wasn't proposing dropping the argument, just wondering if having a default argument is reasonable
s
Ah… I see, then yes, keep the
scope.future { .... }
d
Thanks! This is really helpful. It's good to learn that I oversimplified the lessons of structured concurrency into "CoroutineScopes need to be passed around obsessively and everything needs to end up on a single CoroutineScope tree"
s
That is the default most common use-case, a single CoroutineScope. But scope-switching may be necessary, like in your example/use-case
Funny timing, I’ve been in the process of writing a blog-post about this: CoroutineScopes for regular suspend funs, Channels, Flows and switching between scopes…. 🙂
d
the one nice thing about passing CoroutineScope into my caches is it meant for an object with a bunch of cache fields on it, I just had to cancel the scope and it canceled all the caches. now I need to make that object's
fun close()
go through and do
cache1.close(); cache2.close(); cache3.close()
etc (but actually with appropriate try/finally's so they all close even on exception). this isn't a coroutines question any more but is there a Kotlin idiom I don't know about for "this
Closeable
contains a bunch of other `Closeable`s"?
i'd be happy to give feedback on the post if you want!
s
About passing CoroutineScopes. You hardly ever should do that. The only time you should do it is when calling a function form a regular, non-blocking, function to launch something suspending in parallel with your regular non-blocking code. And then you use an extension function in the form of
fun CouroutineScope.doSomething(...)
d
Which will return some primitive you can await on.
s
You could create a new class called CachesManager that manages all your caches (cache1, cache2, etc), and that manager has a lifecycle with a top-most scope representing it
d
s
If you’d like to use auto-closable, you could something like this:
Copy code
interface ClosableCoroutineScope : CoroutineScope, AutoClosable {
    override fun close() = cancel()
    
    companion object {
        operator fun invoke(context: CoroutineContext) : ClosableCoroutineScope = 
            object: ClosableCoroutineScope {
                override val coroutineContext = context
            }
    }
}

class MyCache(context: CoroutineContext): ClosableCoroutineScope by ClosableCoroutineScope(context + SupervisorJob()) {
    ...
    fun getValue(key: Key): Data = async { ... }.await()
    ...
}
d
Ah, that's not my question, it's just - if I have an object with like 9 fields all of which need to be closed, is there a standard idiom for calling close on them properly even if some close calls throw etc. Seems like no
s
I see… I don’t think there is…. maybe a an auto-closable collection containing auto-closable items 🙂
d
use
but I don't think it helps much.
s
Are you going to publish this ?
s
@streetsofboston Looks good. I will read that.. but I was actually asking about the caffeine wrapper 😉
😆 1
360 Views