Sam
08/03/2025, 6:42 PMAIAgent
instance with this strategy graph. What is the method of best practice here to make sure we "pick up from where we left off" for that particular user? I need a way that a user can have a back and fourth conversation, which means storing their conversation state somewhere intermittently and retrieving it when a new user message comes through.
I was looking at maybe having a final node in my strategy graph that launches an llm.readSession
, extracts the prompt, and saves it in my DB. Then every time a user sends a message, the first node in the graph can retrieve their prompt from the DB that includes all past messages and tool calls/results and simply amend their new message to it. But I'm thinking maybe the Persistency feature can address these concerns for me and will prevent me from re-inventing the wheel.
Cheers!Aria
08/03/2025, 6:54 PMrollbackToLatestCheckpoint
when executing your agent. You can save checkpoints for specific users by associating the checkpointing agentId
s with your userId
s (or their sessions)Aria
08/03/2025, 7:01 PMSam
08/03/2025, 7:38 PMsuspend fun processMessage(
company: String,
brand: String,
channelEvent: TextChannelEvent,
channelInterface: ChannelInterface
): AgentUtterance {
val awsAccessKeyId = parameterService.getOrThrow("bedrock/access-key-id")
val awsSecretAccessKey = parameterService.getOrThrow("bedrock/secret-access-key")
val agent = createAgent(
company = company,
brand = brand,
channelEvent = channelEvent,
toolFactory = toolFactory,
promptExecutor = providePromptExecutor(awsAccessKeyId, awsSecretAccessKey)
)
try {
val response = agent.run(channelEvent.userText)
return AgentUtterance().addElement(TopicMessage("CONCIERGE_RESPONSE", response.body))
} catch (e: Exception) {
return AgentUtterance().addElement(TopicMessage("CONCIERGE_RESPONSE", "Sorry, something went wrong. Please try again later."))
}
}
So I spawn a new AIAgent
instance for each message that comes throughSam
08/03/2025, 7:39 PMuserId
as you say, it should be fairly straightforward. Unless I'm getting the wrong end of the stick here and I should provide each user with their own AIAgent
instance, so each user uses the same exact one each time as they did previously. So let's say a user's ID is 1234
, then I'd create the AIAgent
instance like this:
return AIAgent(
promptExecutor = promptExecutor,
strategy = createStrategy(),
agentConfig = createAgentConfig(company, brand, channelEvent.channelContext.userId, conversationSessionManager, isNewUser),
toolRegistry = toolRegistry,
id = "1234"
)
Then when I call persistency().rollbackToLatestCheckpoint
it will retrieve the latest checkpoint for that user (I think). I'm not sure how the id
field is used for `AIAgent`sSam
08/03/2025, 7:40 PMinstall(Persistency) {
enableAutomaticPersistency = true
}
Sam
08/03/2025, 11:56 PMSam
08/03/2025, 11:59 PMpersistency().getCheckpointById()
for after I've saved one manually:
/**
* Retrieves a specific checkpoint by ID for the specified agent.
*
* @param checkpointId The ID of the checkpoint to retrieve
* @return The checkpoint data with the specified ID, or null if not found
*/
public suspend fun getCheckpointById(checkpointId: String): AgentCheckpointData? =
persistencyStorageProvider.getCheckpoints().firstOrNull { it.checkpointId == checkpointId }
That means my custom StorageProvider
has to implement a function that returns the checkpoints for ALL users, as persistencyStorageProvider.getCheckpoints()
doesn't accept any parameters for filtering, like a userID
. There could be millions of checkpoints in there for a large system from all of the user's checkpoints, and this function returns all of them and then filters after? That can't be right, unless I'm misunderstanding something hereSam
08/04/2025, 12:05 AMpersistencyStorageProvider.getLatestCheckpoint()
Why would this function ever be useful? Surely calling that would have the distinct possibility of returning a checkpoint that doesn't belong to the intended user at all?Sam
08/04/2025, 12:12 AMsuspend fun processMessage(
company: String,
brand: String,
channelEvent: TextChannelEvent
): AgentUtterance {
val awsAccessKeyId = parameterService.getOrThrow("bedrock/access-key-id")
val awsSecretAccessKey = parameterService.getOrThrow("bedrock/secret-access-key")
val agent = createAgent(
company = company,
brand = brand,
channelEvent = channelEvent,
toolFactory = toolFactory,
promptExecutor = providePromptExecutor(awsAccessKeyId, awsSecretAccessKey)
)
try {
val response = agent.run(channelEvent.userText)
return AgentUtterance().addElement(TopicMessage("CONCIERGE_RESPONSE", response.body))
} catch (e: Exception) {
return AgentUtterance().addElement(TopicMessage("CONCIERGE_RESPONSE", "Sorry, something went wrong. Please try again later."))
}
}
private fun createAgent(
company: String,
brand: String,
channelEvent: TextChannelEvent,
toolFactory: ToolFactory,
promptExecutor: PromptExecutor
): AIAgent<String, ConciergeStructuredResponse> {
val (chatUser: ChatUser, isNewUser) = userService.getOrCreateUser(
channelEvent.channelContext.userId,
channelEvent.channelContext.channelType,
company,
brand,
channelEvent
)
val toolRegistry = createToolRegistry(company, brand, channelEvent, chatUser, toolFactory)
return AIAgent(
promptExecutor = promptExecutor,
strategy = createStrategy(),
agentConfig = createAgentConfig(company, brand, channelEvent.channelContext.userId, conversationSessionManager, isNewUser),
toolRegistry = toolRegistry
) {
install(Persistency)
}
}
Each time a user message comes through my system... So I'm creating a complete new agent every time.
Also, how does the Prompt tie into this? I noticed that each Prompt has an id
field, which the KDoc mentions is a "unique ID". In what way is this ID unique, and why does it need to be? Is this ID used anywhere else in Koog?Aria
08/04/2025, 12:12 AMclass InMemoryPersistencyStorageProvider(private val persistenceId: String) : PersistencyStorageProvider
) then the function implementations would use that ID to filter the DBAria
08/04/2025, 12:14 AMSam
08/04/2025, 12:15 AMSam
08/04/2025, 12:18 AMInMemoryPersistencyStorageProvider
and how that uses agentIds
for filtering, I guess you could always just create a class that implements PersistencyStorageProvider
that accepts a userId
argument instead, and then the getLatestCheckpoint
can use that for filtering. But then as you say, that doesn't scale well as you'll need a separate object for each user requestAria
08/04/2025, 2:11 AMAIAgentContext
to the persistency storage provider methods so you can access the context.agentId or the rest of the context dynamically https://github.com/JetBrains/koog/pull/536Anastasiia Zarechneva
08/04/2025, 8:20 AMSam
08/04/2025, 9:12 AMpersistency().getCheckpoints()
That means that the same exact AIAgent
instance must have been invoked with AIAgent.run()
multiple times for that function to be useful (unless it's being used between the point AIAgent.run()
is called and that the strategy graph finishes), which just isn't a pattern I'm familiar with. Not only that, it will also have had to be invoked on behalf of a single user ONLY, to prevent that function from returning checkpoints for multiple users. if that's the case, why are we persisting this exact agent and it's checkpoints? Why don't we just persist all checkpoints no matter the agent or user, then we can have a function that only returns the checkpoints belonging to a single user or some other filtering metric? If so, the specific AIAgent
instance that was used now doesn't matter.
For example, in the case of a user ID, each user has one of those because a user object might contain some state that is mutable and needs to be persisted in a database, so we can then retrieve and modify said state for that user by referencing their ID.
What about in the case of agents? What is the state that we're retrieving or referencing when we refer to an agent by it's ID?
Unless I'm mistaken here and there are multiple patterns that Koog accommodates. The one I'm using is for each user request that comes through, we spawn the user a fresh, new instance of AIAgent
. All we have to do then is modify the Prompt
to include all the messages in the conversation history from previous sessionsAria
08/04/2025, 6:38 PMSam
08/05/2025, 8:34 AMVadim Briliantov
08/07/2025, 8:49 PMAgentMemory
feature — it actually doesn’t assume that the agent persists the data for one single user. There are `MemorySubject`s that are serializable and can be stored and serialized as you want in your instance of the AgentMemoryProvider
(it’s an interface that you’ll likely implement for your favorite storage, ex: S3 or more likely — a real database):
public suspend fun save(fact: Fact, subject: MemorySubject, scope: MemoryScope)
Basically if you have some User
class — not the one we have in examples (the one in examples is actually a singleton User object — for sure it’s there rather for the educational purposes with an intent of simplification), but the real one that would have a userId: String
for example:
@Serializable
data class User(val userId: String) : MemorySubject() {
override val name: String = "user"
override val promptDescription: String =
"User's preferences, settings, and behavior patterns, expectations from the agent, preferred messaging style, etc."
override val priorityLevel: Int = 2
}
Then you’ll implement the AgentMemoryProvider.save
so that it saves the information under the right keys in your database (ex: userId + subject type -> fact)
We were planning to implement the out-of-the-box database support in the nearest future but you can already do it by yourself for your use case.Mark Tkachenko
08/08/2025, 10:22 AMSam
08/08/2025, 10:37 AMMark Tkachenko
08/08/2025, 10:40 AMbecause we need to make sure we retrieve checkpoints that are compatible with the strategy graph we’re executingyes, of course if you restore agent from snapshot - you want it to be based on the same graph
Sam
08/08/2025, 10:44 AMAIAgent
instance to handle that request.
So yeah, what I'm struggling with is implementing a way for the first node in my strategy graph to "load" where that particular user left off. Also making sure that at each node, I'm updating this state. I know you can do enableAutomaticPersistency = true
, but I'd like to do it manually at first just so I know what's going on behind the scenesMark Tkachenko
08/08/2025, 10:46 AMcontext.withPersistency(context) { ctx ->
createCheckpoint(
agentContext = ctx,
nodeId = "current-node-id",
lastInput = inputData,
lastInputType = inputType,
checkpointId = ctx.runId,
)
}
Mark Tkachenko
08/08/2025, 10:46 AMSam
08/08/2025, 10:52 AMnodeId
be just the same as the name of the node, or does it need to be globally unique? If it isn't, how come the node name isn't used here? Because it's optional?
Also I'm assuming that lastInput
and lastInputType
are taken from the node's Input argument? So in this case:
// Node that updates the prompt with the user's message, calls the LLM and adds the resulting response to the prompt
fun AIAgentSubgraphBuilderBase<*, *>.nodeUpdatePromptWithUserMessageAndCallLLM(
name: String? = null,
allowToolCalls: Boolean = true
): AIAgentNodeDelegate<String, Message.Response> =
node(name) { userMessage ->
llm.writeSession {
updatePrompt {
user(userMessage)
}
if (allowToolCalls) requestLLM()
else requestLLMWithoutTools()
}
}
lastInput
would be userMessage
, and lastInputType
would be String
?
And finally, how can I access context
within a node? For some reason for me it's not picking it up? Or does it need to be context()
or withContext()
, and what's the difference between these. When I do have it, where is runId
set?
Sorry for a lot of questions 😅Mark Tkachenko
08/08/2025, 10:58 AMdoes it need to be globally uniquegraph nodes should have unique in order to use persistency.
yes, that’s correctwould belastInput
, anduserMessage
would belastInputType
String
And finally, how can I accesscontext iswithin a nodecontext
this
inside node { }
builderSam
08/08/2025, 11:12 AMMark Tkachenko
08/08/2025, 11:14 AMSam
08/08/2025, 11:19 AM"test-" + UUID.randomUUID().toString()
so that it's never the same twice? i.e. no hard-coded strings allowed pretty muchMark Tkachenko
08/08/2025, 11:23 AMSam
08/08/2025, 11:24 AMSam
08/08/2025, 11:24 AMSam
08/08/2025, 11:25 AMrunId
. Does that have to be globally unique? And where do I set that? Or is this only set here in `AIAgent.run()`:
override suspend fun run(agentInput: Input): Output {
runningMutex.withLock {
if (isRunning) {
throw IllegalStateException("Agent is already running")
}
isRunning = true
}
pipeline.prepareFeatures()
val sessionUuid = Uuid.random()
val runId = sessionUuid.toString()
Sam
08/08/2025, 11:44 AMcheckpointId = ctx.runId
to a createCheckpoint()
call lead to the case where multiple checkpoints have the same checkpoint ID?Sam
08/08/2025, 11:44 AMnodeId
is also provided to createCheckpoint()
as an argument, so we know which checkpoint to return when getLatestCheckpoint()
is called?