Sam
08/03/2025, 6:42 PMAIAgent instance with this strategy graph. What is the method of best practice here to make sure we "pick up from where we left off" for that particular user? I need a way that a user can have a back and fourth conversation, which means storing their conversation state somewhere intermittently and retrieving it when a new user message comes through.
I was looking at maybe having a final node in my strategy graph that launches an llm.readSession, extracts the prompt, and saves it in my DB. Then every time a user sends a message, the first node in the graph can retrieve their prompt from the DB that includes all past messages and tool calls/results and simply amend their new message to it. But I'm thinking maybe the Persistency feature can address these concerns for me and will prevent me from re-inventing the wheel.
Cheers!Aria
08/03/2025, 6:54 PMrollbackToLatestCheckpoint when executing your agent. You can save checkpoints for specific users by associating the checkpointing agentId s with your userId s (or their sessions)Aria
08/03/2025, 7:01 PMSam
08/03/2025, 7:38 PMsuspend fun processMessage(
company: String,
brand: String,
channelEvent: TextChannelEvent,
channelInterface: ChannelInterface
): AgentUtterance {
val awsAccessKeyId = parameterService.getOrThrow("bedrock/access-key-id")
val awsSecretAccessKey = parameterService.getOrThrow("bedrock/secret-access-key")
val agent = createAgent(
company = company,
brand = brand,
channelEvent = channelEvent,
toolFactory = toolFactory,
promptExecutor = providePromptExecutor(awsAccessKeyId, awsSecretAccessKey)
)
try {
val response = agent.run(channelEvent.userText)
return AgentUtterance().addElement(TopicMessage("CONCIERGE_RESPONSE", response.body))
} catch (e: Exception) {
return AgentUtterance().addElement(TopicMessage("CONCIERGE_RESPONSE", "Sorry, something went wrong. Please try again later."))
}
}
So I spawn a new AIAgent instance for each message that comes throughSam
08/03/2025, 7:39 PMuserId as you say, it should be fairly straightforward. Unless I'm getting the wrong end of the stick here and I should provide each user with their own AIAgent instance, so each user uses the same exact one each time as they did previously. So let's say a user's ID is 1234, then I'd create the AIAgent instance like this:
return AIAgent(
promptExecutor = promptExecutor,
strategy = createStrategy(),
agentConfig = createAgentConfig(company, brand, channelEvent.channelContext.userId, conversationSessionManager, isNewUser),
toolRegistry = toolRegistry,
id = "1234"
)
Then when I call persistency().rollbackToLatestCheckpoint it will retrieve the latest checkpoint for that user (I think). I'm not sure how the id field is used for `AIAgent`sSam
08/03/2025, 7:40 PMinstall(Persistency) {
enableAutomaticPersistency = true
}Sam
08/03/2025, 11:56 PMSam
08/03/2025, 11:59 PMpersistency().getCheckpointById() for after I've saved one manually:
/**
* Retrieves a specific checkpoint by ID for the specified agent.
*
* @param checkpointId The ID of the checkpoint to retrieve
* @return The checkpoint data with the specified ID, or null if not found
*/
public suspend fun getCheckpointById(checkpointId: String): AgentCheckpointData? =
persistencyStorageProvider.getCheckpoints().firstOrNull { it.checkpointId == checkpointId }
That means my custom StorageProvider has to implement a function that returns the checkpoints for ALL users, as persistencyStorageProvider.getCheckpoints() doesn't accept any parameters for filtering, like a userID. There could be millions of checkpoints in there for a large system from all of the user's checkpoints, and this function returns all of them and then filters after? That can't be right, unless I'm misunderstanding something hereSam
08/04/2025, 12:05 AMpersistencyStorageProvider.getLatestCheckpoint() Why would this function ever be useful? Surely calling that would have the distinct possibility of returning a checkpoint that doesn't belong to the intended user at all?Sam
08/04/2025, 12:12 AMsuspend fun processMessage(
company: String,
brand: String,
channelEvent: TextChannelEvent
): AgentUtterance {
val awsAccessKeyId = parameterService.getOrThrow("bedrock/access-key-id")
val awsSecretAccessKey = parameterService.getOrThrow("bedrock/secret-access-key")
val agent = createAgent(
company = company,
brand = brand,
channelEvent = channelEvent,
toolFactory = toolFactory,
promptExecutor = providePromptExecutor(awsAccessKeyId, awsSecretAccessKey)
)
try {
val response = agent.run(channelEvent.userText)
return AgentUtterance().addElement(TopicMessage("CONCIERGE_RESPONSE", response.body))
} catch (e: Exception) {
return AgentUtterance().addElement(TopicMessage("CONCIERGE_RESPONSE", "Sorry, something went wrong. Please try again later."))
}
}
private fun createAgent(
company: String,
brand: String,
channelEvent: TextChannelEvent,
toolFactory: ToolFactory,
promptExecutor: PromptExecutor
): AIAgent<String, ConciergeStructuredResponse> {
val (chatUser: ChatUser, isNewUser) = userService.getOrCreateUser(
channelEvent.channelContext.userId,
channelEvent.channelContext.channelType,
company,
brand,
channelEvent
)
val toolRegistry = createToolRegistry(company, brand, channelEvent, chatUser, toolFactory)
return AIAgent(
promptExecutor = promptExecutor,
strategy = createStrategy(),
agentConfig = createAgentConfig(company, brand, channelEvent.channelContext.userId, conversationSessionManager, isNewUser),
toolRegistry = toolRegistry
) {
install(Persistency)
}
}
Each time a user message comes through my system... So I'm creating a complete new agent every time.
Also, how does the Prompt tie into this? I noticed that each Prompt has an id field, which the KDoc mentions is a "unique ID". In what way is this ID unique, and why does it need to be? Is this ID used anywhere else in Koog?Aria
08/04/2025, 12:12 AMclass InMemoryPersistencyStorageProvider(private val persistenceId: String) : PersistencyStorageProvider) then the function implementations would use that ID to filter the DBAria
08/04/2025, 12:14 AMSam
08/04/2025, 12:15 AMSam
08/04/2025, 12:18 AMInMemoryPersistencyStorageProvider and how that uses agentIds for filtering, I guess you could always just create a class that implements PersistencyStorageProvider that accepts a userId argument instead, and then the getLatestCheckpoint can use that for filtering. But then as you say, that doesn't scale well as you'll need a separate object for each user requestAria
08/04/2025, 2:11 AMAIAgentContext to the persistency storage provider methods so you can access the context.agentId or the rest of the context dynamically https://github.com/JetBrains/koog/pull/536Anastasiia Zarechneva
08/04/2025, 8:20 AMSam
08/04/2025, 9:12 AMpersistency().getCheckpoints()
That means that the same exact AIAgent instance must have been invoked with AIAgent.run() multiple times for that function to be useful (unless it's being used between the point AIAgent.run() is called and that the strategy graph finishes), which just isn't a pattern I'm familiar with. Not only that, it will also have had to be invoked on behalf of a single user ONLY, to prevent that function from returning checkpoints for multiple users. if that's the case, why are we persisting this exact agent and it's checkpoints? Why don't we just persist all checkpoints no matter the agent or user, then we can have a function that only returns the checkpoints belonging to a single user or some other filtering metric? If so, the specific AIAgent instance that was used now doesn't matter.
For example, in the case of a user ID, each user has one of those because a user object might contain some state that is mutable and needs to be persisted in a database, so we can then retrieve and modify said state for that user by referencing their ID.
What about in the case of agents? What is the state that we're retrieving or referencing when we refer to an agent by it's ID?
Unless I'm mistaken here and there are multiple patterns that Koog accommodates. The one I'm using is for each user request that comes through, we spawn the user a fresh, new instance of AIAgent. All we have to do then is modify the Prompt to include all the messages in the conversation history from previous sessionsAria
08/04/2025, 6:38 PMSam
08/05/2025, 8:34 AMVadim Briliantov
08/07/2025, 8:49 PMAgentMemory feature — it actually doesn’t assume that the agent persists the data for one single user. There are `MemorySubject`s that are serializable and can be stored and serialized as you want in your instance of the AgentMemoryProvider (it’s an interface that you’ll likely implement for your favorite storage, ex: S3 or more likely — a real database):
public suspend fun save(fact: Fact, subject: MemorySubject, scope: MemoryScope)
Basically if you have some User class — not the one we have in examples (the one in examples is actually a singleton User object — for sure it’s there rather for the educational purposes with an intent of simplification), but the real one that would have a userId: String for example:
@Serializable
data class User(val userId: String) : MemorySubject() {
override val name: String = "user"
override val promptDescription: String =
"User's preferences, settings, and behavior patterns, expectations from the agent, preferred messaging style, etc."
override val priorityLevel: Int = 2
}
Then you’ll implement the AgentMemoryProvider.save so that it saves the information under the right keys in your database (ex: userId + subject type -> fact)
We were planning to implement the out-of-the-box database support in the nearest future but you can already do it by yourself for your use case.Mark Tkachenko
08/08/2025, 10:22 AMSam
08/08/2025, 10:37 AMMark Tkachenko
08/08/2025, 10:40 AMbecause we need to make sure we retrieve checkpoints that are compatible with the strategy graph we’re executingyes, of course if you restore agent from snapshot - you want it to be based on the same graph
Sam
08/08/2025, 10:44 AMAIAgent instance to handle that request.
So yeah, what I'm struggling with is implementing a way for the first node in my strategy graph to "load" where that particular user left off. Also making sure that at each node, I'm updating this state. I know you can do enableAutomaticPersistency = true, but I'd like to do it manually at first just so I know what's going on behind the scenesMark Tkachenko
08/08/2025, 10:46 AMcontext.withPersistency(context) { ctx ->
createCheckpoint(
agentContext = ctx,
nodeId = "current-node-id",
lastInput = inputData,
lastInputType = inputType,
checkpointId = ctx.runId,
)
}Mark Tkachenko
08/08/2025, 10:46 AMSam
08/08/2025, 10:52 AMnodeId be just the same as the name of the node, or does it need to be globally unique? If it isn't, how come the node name isn't used here? Because it's optional?
Also I'm assuming that lastInput and lastInputType are taken from the node's Input argument? So in this case:
// Node that updates the prompt with the user's message, calls the LLM and adds the resulting response to the prompt
fun AIAgentSubgraphBuilderBase<*, *>.nodeUpdatePromptWithUserMessageAndCallLLM(
name: String? = null,
allowToolCalls: Boolean = true
): AIAgentNodeDelegate<String, Message.Response> =
node(name) { userMessage ->
llm.writeSession {
updatePrompt {
user(userMessage)
}
if (allowToolCalls) requestLLM()
else requestLLMWithoutTools()
}
}
lastInput would be userMessage, and lastInputType would be String?
And finally, how can I access context within a node? For some reason for me it's not picking it up? Or does it need to be context() or withContext(), and what's the difference between these. When I do have it, where is runId set?
Sorry for a lot of questions 😅Mark Tkachenko
08/08/2025, 10:58 AMdoes it need to be globally uniquegraph nodes should have unique in order to use persistency.
yes, that’s correctwould belastInput, anduserMessagewould belastInputTypeString
And finally, how can I accesscontext iswithin a nodecontext
this inside node { } builderSam
08/08/2025, 11:12 AMMark Tkachenko
08/08/2025, 11:14 AMSam
08/08/2025, 11:19 AM"test-" + UUID.randomUUID().toString() so that it's never the same twice? i.e. no hard-coded strings allowed pretty muchMark Tkachenko
08/08/2025, 11:23 AMSam
08/08/2025, 11:24 AMSam
08/08/2025, 11:24 AMSam
08/08/2025, 11:25 AMrunId. Does that have to be globally unique? And where do I set that? Or is this only set here in `AIAgent.run()`:
override suspend fun run(agentInput: Input): Output {
runningMutex.withLock {
if (isRunning) {
throw IllegalStateException("Agent is already running")
}
isRunning = true
}
pipeline.prepareFeatures()
val sessionUuid = Uuid.random()
val runId = sessionUuid.toString()Sam
08/08/2025, 11:44 AMcheckpointId = ctx.runId to a createCheckpoint() call lead to the case where multiple checkpoints have the same checkpoint ID?Sam
08/08/2025, 11:44 AMnodeId is also provided to createCheckpoint() as an argument, so we know which checkpoint to return when getLatestCheckpoint() is called?