Looking at implementing my own <PersistencyStorage...
# koog-agentic-framework
s
Looking at implementing my own PersistencyStorageProvider. How come this has been designed so that each request that comes through into my service will have to have their own instance of this object? Functions like
getCheckpoints()
and
getLatestCheckpoint()
should really only be scoped to a per-user basis, which means if my service gets 100 concurrent requests, there'll be 100 instances of my storage provider object stored in memory. Can't be very scalable. What's the best practice around this?
I'd love to have a way to just say
getCheckpoints(userId: String)
, but doesn't seem as straight forward with the way the
PersistencyStorageProvider
has been designed
m
Well, we don’t have any
user
entity inside the agent. So, it’s kinda hard to introduce such discriminator and stay general-purpose 🙂 So, what’s your scenario? You create the agent per user request? Because that’s the only case when persistent provider will be created. And do you use continues persistency? Because it’s also possible to create and save snapshot manually and also get snapshot by id
s
I'm not saying that the
PersistencyStorageProvider
interface should have functions that accept only a
userId
, but I think it might be good design for people to decide which filtering metrics they want so it stays generic. So something like this:
Copy code
public interface PersistencyStorageProvider {
    public suspend fun getCheckpoints(filters: Map<String, Any>? = null): List<AgentCheckpointData>
    public suspend fun saveCheckpoint(agentCheckpointData: AgentCheckpointData, filters: Map<String, Any>? = null)
    public suspend fun getLatestCheckpoint(filters: Map<String, Any>? = null): AgentCheckpointData?
}
Copy code
class ExampleUsage(private val persistencyStorageProvider: PersistencyStorageProvider) {

    suspend fun demonstrateUsage() {
        // Example of calling getCheckpoints with filters
        val filtersForGet = mapOf(
            "date" to "2023-10-01",
            "status" to "completed",
            "priority" to 1
        )
        val checkpoints = persistencyStorageProvider.getCheckpoints(filtersForGet)
        println("Filtered Checkpoints: $checkpoints")

        // Example of calling saveCheckpoint with filters
        val agentCheckpointData = AgentCheckpointData(/* initialization */)
        val filtersForSave = mapOf("overwrite" to true)
        persistencyStorageProvider.saveCheckpoint(agentCheckpointData, filtersForSave)
        println("Checkpoint saved with filters: $filtersForSave")

        // Example of calling getLatestCheckpoint with filters
        val filtersForLatest = mapOf("agentId" to "12345")
        val latestCheckpoint = persistencyStorageProvider.getLatestCheckpoint(filtersForLatest)
        println("Latest Checkpoint: $latestCheckpoint")
    }
}
Because right now if I want to filter on some metric that might vary a lot per request, my only option is to create multiple instances of my
PersistencyStorageProvider
and pass this filtering metric into the class constructor so the functions can use it, which isn't the best
Or maybe rather than a
Map<String, Any>
, it's just
Any
or some other generic type
That way, instance of classes that extend
PersistencyStorageProvider
can be singletons
v
Side note — please consider that the agent state (LLM history with all prompts) and it’s performance (latency of the model provider responses) bring significantly higher performance overhead compared to storing one single JVM object with a short userID string. Please also note that if you have multiple agents running at the same time for multiple users (N users) for a while (let’s, say, M is the average conversation length), the overall resource consumption would be: 1. N * M for keeping N histories at the same time 2. N short userID strings (+ objects) 3. If you organise the http clients and database clients the right way, one database connection per server (for all N persistent storage providers) and one LLM connection (for all N agent instances) “1” is significantly higher cost compared to “2” and “3”
Even if you would create a client/connection per user in “3” the overall performance impact would be still proportional to N*(M+1) which is asymptotically same as N*M
Also the most promising strategy for organising connections in “3” is probably having a fixed connection pool
And last but not least thing to consider — cost ($$$) of LLM inference is usually significantly (!) higher than cost of cloud compute resources required to operate your seever-side application. There are differences depending on whether you are self-hosting smaller models or paying for OpenAI/Anthropic/Gemini — but in any case it comes with a cost
s
Just to make sure I understand your point — are you suggesting that the intended pattern in Koog is to have a singleton
AIAgent
instance shared across all users? If so, that doesn't make sense because each
AIAgent
takes in an
AIAgentConfig
that provides a prompt, which will be different on a user-by-user basis, so it can't be shared
Not even just the system message can be shared if it contains variables
v
No, you can have a unique instance of AIAgent per user, but feel free to share a single promptExecutor (that actually stores the http client)
1
s
@Vadim Briliantov Okay that makes sense, but then how does that tie back to my issue regarding the
PersistencyStorageProvider
? For example, to implement my own implementation of this class, I have to override the
getCheckpoints()
function, which is used by the
Persistency
feature here:
Copy code
/**
 * Retrieves a specific checkpoint by ID for the specified agent.
 *
 * @param checkpointId The ID of the checkpoint to retrieve
 * @return The checkpoint data with the specified ID, or null if not found
 */
public suspend fun getCheckpointById(checkpointId: String): AgentCheckpointData? =
    persistencyStorageProvider.getCheckpoints().firstOrNull { it.checkpointId == checkpointId }
That means my implementation of the
persistencyStorageProvider.getCheckpoints()
function needs to return a list of checkpoints narrowed to a certain scope, such as a single user. It can't return all of the checkpoints for all users, because then the
.firstOrNull
is filtering through checkpoints we already know aren't relevant because they belong to users that aren't the one we care about. Same thing with the
getLatestCheckpoint()
function I will have to override, which is used here in the
Persistency
feature:
Copy code
/**
 * Retrieves the latest checkpoint for the specified agent.
 *
 * @return The latest checkpoint data, or null if no checkpoint exists
 */
public suspend fun getLatestCheckpoint(): AgentCheckpointData? =
    persistencyStorageProvider.getLatestCheckpoint()
My implementation of
persistencyStorageProvider.getLatestCheckpoint()
NEEDS to be narrowed to the scope of a single user, because otherwise it'll more often than not return a checkpoint that doesn't belong to the user I want at all. I'm just trying to understand what you would recommend here
As my implementation of
PersistencyStorageProvider
, in my opinion, should be a singleton, as it's essentially just a repository class. Currently with the way it's designed, it can't be if I need to filter the checkpoints on some metric, as that metric will need to be passed into the constructor of my
PersistencyStorageProvider
implementation class, because functions I need to override don't accept any arguments...
I apologise if I'm missing something glaring here, I'm just very confused on this matter haha
m
It seems like you’re trying to reuse the same agent instance for multiple users, but I don’t think that’s the best approach. The agent isn’t designed for parallel execution—trying to run it that way would throw an exception. It also uses a single shared context, which risks leaking data between users. A better approach would be to have a separate agent per user, keyed by a unique user ID. Your persistence provider could then use that ID to manage snapshots and implement the necessary read operations for each agent individually.
s
Nope I'm not trying to do that
I'm spawning a different one every time
m
Okay, then you can provide each agent with its persistent provider and user-id
s
What I'm trying to understand is, how can my PersistencyStorageProvider use an ID, while maintaining the PersistencyStorageProvider as a singleton
I don't want a separate instance of this class for each ID
And I don't think it's possible
Because the functions accept no arguments
Each AIAgent instance, like let's say I have 100 in memory, all use the same PersistencyStorageProvider instance
Because all the class is essentially, is a repository
while typically are singletons
and the only way I can think of to achieve this is something like this, which is meh:
Copy code
package ai.koog.agents.snapshot.providers

import ai.koog.agents.snapshot.feature.AgentCheckpointData
import com.hazelcast.core.HazelcastInstance
import com.hazelcast.map.IMap
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import javax.inject.Inject

class HazelcastPersistencyStorageProvider @Inject constructor(
    private val hazelcastInstance: HazelcastInstance
) : PersistencyStorageProvider {

    private val mutex = Mutex()
    private val userCheckpoints: IMap<String, MutableList<AgentCheckpointData>> =
        hazelcastInstance.getMap("userCheckpoints")

    // Simulate user context with a thread-local variable
    private val currentUser = ThreadLocal<String>()

    fun setCurrentUser(userId: String) {
        currentUser.set(userId)
    }

    override suspend fun getCheckpoints(): List<AgentCheckpointData> {
        val userId = currentUser.get() ?: throw IllegalStateException("User ID not set")
        return mutex.withLock {
            userCheckpoints[userId]?.toList() ?: emptyList()
        }
    }

    override suspend fun saveCheckpoint(agentCheckpointData: AgentCheckpointData) {
        val userId = currentUser.get() ?: throw IllegalStateException("User ID not set")
        mutex.withLock {
            val checkpoints = userCheckpoints.computeIfAbsent(userId) { mutableListOf() }
            checkpoints.add(agentCheckpointData)
            userCheckpoints[userId] = checkpoints
        }
    }

    override suspend fun getLatestCheckpoint(): AgentCheckpointData? {
        val userId = currentUser.get() ?: throw IllegalStateException("User ID not set")
        return mutex.withLock {
            userCheckpoints[userId]?.maxByOrNull { it.createdAt }
        }
    }
}
Using a class-level field for user context in a concurrent environment can lead to bottlenecks, as each function call must wait for the mutex lock, causing a queue and delaying responses, especially if each call is blocking (need to call DB)
m
how can my PersistencyStorageProvider use an ID, while maintaining the PersistencyStorageProvider as a singleton
Well, it’s not how it’s designed, because on the design stage we decided not to introduce mandatory id for checkpoint/group of checkpoints, leaving it to implementation to decide.
s
But can we leave it up to the implementation to decide, while also providing a way for the developer to provide arguments to the functions? So the interface might look something like this:
Copy code
package ai.koog.agents.snapshot.providers

import ai.koog.agents.snapshot.feature.AgentCheckpointData

public interface PersistencyStorageProvider {
    public suspend fun getCheckpoints(filter: Map<String, Any> = emptyMap()): List<AgentCheckpointData>
    public suspend fun saveCheckpoint(agentCheckpointData: AgentCheckpointData)
    public suspend fun getLatestCheckpoint(filter: Map<String, Any> = emptyMap()): AgentCheckpointData?
}
maybe not this, but you get the idea
Something generic, but still allowing for a way for these functions to accept arguments
m
Yeah, I get the point, could we convert this conversation to the proposal issue?
s
Yeah of course!
thanks for the help btw 🙂
🙌 1
Want me to create the issue on github?
m
If you could please, yes
s
Hopefully it explains the problem clearly, I can make any edits if needed 🙂
just edited it with a code example
m
yeah, it looks pretty clear
thank you!
🫡 1
s
Yeah think it looks better to do:
Copy code
public interface PersistencyStorageProvider<F> {
    public suspend fun getCheckpoints(filter: F): List<AgentCheckpointData>
    public suspend fun saveCheckpoint(agentCheckpointData: AgentCheckpointData, filter: F)
    public suspend fun getLatestCheckpoint(filter: F): AgentCheckpointData?
}
Then we can have:
Copy code
/**
 * In-memory implementation of [PersistencyStorageProvider].
 * This provider stores snapshots in a mutable map.
 */
public class InMemoryPersistencyStorageProvider(private val persistenceId: String) : PersistencyStorageProvider<CheckpointFilter> {
    private val mutex = Mutex()
    private val snapshotMap = mutableMapOf<String, List<AgentCheckpointData>>()

    override suspend fun getCheckpoints(filter: CheckpointFilter): List<AgentCheckpointData> {
        ....
    }

    override suspend fun saveCheckpoint(agentCheckpointData: AgentCheckpointData, filter: CheckpointFilter) {
        ....
    }

    override suspend fun getLatestCheckpoint(filter: CheckpointFilter): AgentCheckpointData? {
        ....
    }
}
Updated the issue description
However this means that developers HAVE to provide some class to use as a filter
so it's definitely a design decision