Feat: Added private memory context management for SubAgent#8467
Feat: Added private memory context management for SubAgent#8467Hola-Gracias wants to merge 16 commits into
Conversation
Added a new dataclass _ToolLoopAgentRunResult to encapsulate LLM response and run context.
Added context persistence settings for SubAgent.
Refactor _DummyEvent class and add tests for FunctionToolExecutor.
Add assertion for context persistence in dashboard test
Refactor clear method to remove unlocked locks from _locks.
There was a problem hiding this comment.
Code Review
This pull request introduces context persistence for subagents, allowing them to maintain private memory across repeated handoffs within the same session. It implements a session manager and runner to handle message trimming, TTL expiration, and concurrency locking, alongside dashboard UI updates for configuration. The review feedback highlights a syntax error in the unit tests due to Chinese characters and full-width punctuation, and suggests several robustness and performance improvements, such as using Pydantic's model_copy for cloning messages and handling float strings more gracefully during configuration parsing.
| 现在=100.0, | ||
| ) | ||
|
|
||
| trimmed = manager.get_messages( | ||
| key, | ||
| ttl_seconds=3600, | ||
| config_fingerprint="fp", | ||
| 现在=101.0, |
There was a problem hiding this comment.
在测试用例中,参数名 现在 应该是 now,并且 现在=100.0, 末尾使用了中文全角逗号 ,,这会导致 Python 语法错误(SyntaxError)以及 TypeError。请将其修正为 now=100.0, 和 now=101.0,。
| 现在=100.0, | |
| ) | |
| trimmed = manager.get_messages( | |
| key, | |
| ttl_seconds=3600, | |
| config_fingerprint="fp", | |
| 现在=101.0, | |
| now=100.0, | |
| ) | |
| trimmed = manager.get_messages( | |
| key, | |
| ttl_seconds=3600, | |
| config_fingerprint="fp", | |
| now=101.0, |
| @staticmethod | ||
| def _clone_message(message: Message) -> Message: | ||
| return Message.model_validate(copy.deepcopy(message.model_dump())) |
There was a problem hiding this comment.
在 Pydantic v2 中,使用 message.model_copy(deep=True) 是克隆 Pydantic 模型最推荐且最高效的方式。当前使用的 Message.model_validate(copy.deepcopy(message.model_dump())) 会经历完整的序列化、深拷贝和重新校验过程,在消息较多或包含大文本/图片时会带来不必要的性能开销。
| @staticmethod | |
| def _clone_message(message: Message) -> Message: | |
| return Message.model_validate(copy.deepcopy(message.model_dump())) | |
| @staticmethod | |
| def _clone_message(message: Message) -> Message: | |
| return message.model_copy(deep=True) |
There was a problem hiding this comment.
Hey - I've found 2 issues, and left some high level feedback:
- In
test_subagent_session_manager_matches_non_string_dict_tool_call_idthe keyword arguments现在=100.0/现在=101.0and the full-width comma will not parse as valid Python; these should use the existingnowkeyword with ASCII punctuation to keep the tests runnable. - The logic for normalizing
context_persistenceis now implemented separately insubagent_runner.py, the dashboard Vue code, and the dashboard route; consider centralizing this normalization in one place (or at least sharing defaults) to avoid subtle drift between frontend, API, and backend behavior over time.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In `test_subagent_session_manager_matches_non_string_dict_tool_call_id` the keyword arguments `现在=100.0` / `现在=101.0` and the full-width comma will not parse as valid Python; these should use the existing `now` keyword with ASCII punctuation to keep the tests runnable.
- The logic for normalizing `context_persistence` is now implemented separately in `subagent_runner.py`, the dashboard Vue code, and the dashboard route; consider centralizing this normalization in one place (or at least sharing defaults) to avoid subtle drift between frontend, API, and backend behavior over time.
## Individual Comments
### Comment 1
<location path="tests/unit/test_subagent_orchestrator.py" line_range="231-240" />
<code_context>
+ return SimpleNamespace(
+ name="transfer_to_subagent",
+ provider_id=None,
+ context_persistence={
+ "enable": True,
+ "max_turns": 10,
</code_context>
<issue_to_address>
**issue (bug_risk):** Non-ASCII keyword argument and punctuation make this test invalid Python
Using the non-ASCII identifier `现在` and a full-width comma `,` in `set_messages` / `get_messages` makes this file invalid Python and prevents the test suite from running. Replace them with a valid identifier (e.g. `now=100.0` / `now=101.0`) and a standard comma so the module can be imported and executed.
</issue_to_address>
### Comment 2
<location path="astrbot/core/subagent_runner.py" line_range="68" />
<code_context>
+ config_fingerprint: str
+
+
+class SubAgentSessionManager:
+ def __init__(self) -> None:
+ self._records: dict[tuple[str, str, str], _SubAgentContextRecord] = {}
</code_context>
<issue_to_address>
**issue (complexity):** Consider refactoring the session manager and runner to avoid redundant cloning, centralize cleanup, and extract a shared stateless path to streamline the control flow and data handling.
You can simplify a few hotspots without changing behavior:
### 1. Avoid double cloning and JSON round-tripping
Right now messages are cloned in `_group_messages` **and** again in `get_messages`, and cloning goes through `model_dump`/`model_validate`. You can:
- Only clone once at the API boundary (when returning from `get_messages` or storing in `_records`), and
- Use `copy.deepcopy` directly.
Example:
```python
class SubAgentSessionManager:
...
def get_messages(... ) -> list[Message] | None:
...
if record.config_fingerprint != config_fingerprint:
self.clear(key)
return None
# clone once here, no need to clone inside _group_messages/_trim
return [copy.deepcopy(m) for m in record.messages]
def set_messages(... ) -> None:
trimmed = self._trim_messages(
messages,
max_turns=context_persistence["max_turns"],
)
# store deep copies to protect internal state
cloned = [copy.deepcopy(m) for m in trimmed]
self._records[key] = _SubAgentContextRecord(
messages=cloned,
last_used_at=time.monotonic() if now is None else now,
config_fingerprint=config_fingerprint,
)
@staticmethod
def _clone_message(message: Message) -> Message:
# if you still need it in a couple of places, keep it simple
return copy.deepcopy(message)
```
Then `_group_messages` can just operate on the passed-in list without cloning internally:
```python
def _group_messages(self, messages: list[Message]) -> list[list[Message]]:
groups: list[list[Message]] = []
index = 0
while index < len(messages):
message = messages[index]
if message.role in {"system", "_checkpoint"} or message.role == "tool":
index += 1
continue
group = [message] # no clone here
if message.role == "assistant" and message.tool_calls:
expected_ids = {
str(tool_call.get("id") if isinstance(tool_call, dict) else tool_call.id)
for tool_call in message.tool_calls
}
next_index = index + 1
while next_index < len(messages):
next_message = messages[next_index]
if (
next_message.role == "tool"
and next_message.tool_call_id in expected_ids
):
group.append(next_message) # no clone
next_index += 1
continue
break
index = next_index
else:
index += 1
groups.append(group)
return groups
```
This keeps the same semantics but removes JSON round-trips and redundant cloning.
---
### 2. Factor out the stateless run path
`SubAgentRunner.run` repeats the same `tool_loop_agent(...)` call in multiple branches. Extracting a helper makes the control flow easier to follow and reduces maintenance cost:
```python
class SubAgentRunner:
...
async def _run_stateless(
self,
*,
ctx: Any,
event: Any,
provider_id: str,
input_: str | None,
image_urls: list[str],
system_prompt: str,
tools: Any,
begin_contexts: list[Message] | None,
max_steps: int,
tool_call_timeout: int,
stream: bool,
) -> LLMResponse:
return await ctx.tool_loop_agent(
event=event,
chat_provider_id=provider_id,
prompt=input_,
image_urls=image_urls,
system_prompt=system_prompt,
tools=tools,
contexts=begin_contexts,
max_steps=max_steps,
tool_call_timeout=tool_call_timeout,
stream=stream,
)
async def run(... ) -> LLMResponse:
context_persistence = normalize_context_persistence(
getattr(tool, "context_persistence", None)
)
key = self._session_manager.build_key(run_context, tool.agent.name)
if not context_persistence["enable"]:
self._session_manager.clear(key)
return await self._run_stateless(
ctx=ctx,
event=event,
provider_id=provider_id,
input_=input_,
image_urls=image_urls,
system_prompt=system_prompt,
tools=tools,
begin_contexts=begin_contexts,
max_steps=max_steps,
tool_call_timeout=tool_call_timeout,
stream=stream,
)
internal_run = getattr(ctx, "_run_tool_loop_agent_internal", None)
if internal_run is None:
logger.debug(
"Context._run_tool_loop_agent_internal is unavailable; falling "
"back to stateless SubAgent handoff."
)
return await self._run_stateless(
ctx=ctx,
event=event,
provider_id=provider_id,
input_=input_,
image_urls=image_urls,
system_prompt=system_prompt,
tools=tools,
begin_contexts=begin_contexts,
max_steps=max_steps,
tool_call_timeout=tool_call_timeout,
stream=stream,
)
...
```
---
### 3. Centralize lock/record cleanup
You already have the same “remove record and possibly lock” logic in multiple places. A tiny helper reduces duplication and makes lifecycle clearer:
```python
class SubAgentSessionManager:
...
def _remove_record_and_lock(self, key: tuple[str, str, str]) -> None:
self._records.pop(key, None)
lock = self._locks.get(key)
if lock is not None and not lock.locked():
self._locks.pop(key, None)
def clear(self, key: tuple[str, str, str]) -> None:
self._remove_record_and_lock(key)
def clear_except_agents(self, agent_names: set[str]) -> None:
stale_keys = [key for key in self._records if key[2] not in agent_names]
for key in stale_keys:
self._remove_record_and_lock(key)
def get_messages(... ) -> list[Message] | None:
record = self._records.get(key)
if record is None:
return None
now = time.monotonic() if now is None else now
if ttl_seconds != -1 and now - record.last_used_at > ttl_seconds:
self._remove_record_and_lock(key)
return None
if record.config_fingerprint != config_fingerprint:
self._remove_record_and_lock(key)
return None
...
```
These focused changes reduce complexity and duplication, while preserving all existing behavior and the current feature set.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
Add tests for context persistence and session manager behavior.
feat #8411
Modifications / 改动点
新增 SubAgent 私有内存上下文管理:按会话和 SubAgent 维度保存消息、per-key 锁、TTL、配置指纹失效、按 max_turns 裁剪。
Screenshots or Test Results / 运行截图或测试结果
Checklist / 检查清单
😊 If there are new features added in the PR, I have discussed it with the authors through issues/emails, etc.
/ 如果 PR 中有新加入的功能,已经通过 Issue / 邮件等方式和作者讨论过。
👀 My changes have been well-tested, and "Verification Steps" and "Screenshots" have been provided above.
/ 我的更改经过了良好的测试,并已在上方提供了“验证步骤”和“运行截图”。
🤓 I have ensured that no new dependencies are introduced, OR if new dependencies are introduced, they have been added to the appropriate locations in
requirements.txtandpyproject.toml./ 我确保没有引入新依赖库,或者引入了新依赖库的同时将其添加到
requirements.txt和pyproject.toml文件相应位置。😮 My changes do not introduce malicious code.
/ 我的更改没有引入恶意代码。
Summary by Sourcery
Add private per-subagent context persistence for subagent handoffs, including backend session management and dashboard configuration UI.
New Features:
Enhancements:
Tests: