Drive Agent 设计
这是第一次设计这种比较完整的 Agent 系统并集成到实际应用中,因此记录一下实现 Google Drive Clone 的 Agent 部分的设计。
LLM 调用直接使用 URL fetch 访问 Deepseek API,并且全部未使用 Agent 框架、直接手搓实现,之后可能会用框架重构一下。
1. 总体架构
Agent 包含的模块和交互流程如下:
我们使用 TaskOrchestrator 这个独立组件来编排 Agent、处理 Agent 调用之类的。然后所有的调用执行统一通过 CapacityGateway 进行调用。然后每个 Agent 配备了 MemoryManager 等组件。之后还可插拔其他的组件。
然后前后端通过 SSE 进行事件传输。backend 在不同阶段发送对应 event、frontend 再通过 Hooks 将 event 渲染成 UI。
2. Agent 组件设计
负责功能执行的 MultiAgent
初版的实现是只用一个 Agent,通过 Prompt 设计让 Agent 自己根据具体任务来决定调用什么工具、怎么完成任务。但是效果很不好:比如在一个文档中让它查找一个写了某个内容的文档并做总结,它有时会用 search_file 工具而不是正确的 sematic_search_file、导致找到不对的文档,还会莫名其妙调用 create_file工具而不是编辑当前文件,导致很多乱七八糟的错误(如果要指明 Agent 的相关行为也可以,但是要使用很多 Prompt + 不一定有效)。
然后还打算提供 Task Plan 来指引 Agent 完成任务,于是引入了下面的完成具体任务的 Agent:
- Document Agent:负责文档编辑相关的 Agent。
- Drive Agent:负责文件创建、移动之类的 Agent.
- Search Agent:负责向量索引建立、语义搜索相关的。
和负责编排的 Task Plan Agent。还有些辅助决策的 Agent,比如 Router Agent 和决策是否使用 Task Plan 的 Agent。
引入专门的 Search Agent 来做这个就是因为前面说的问题,让 Drive Agent 自己完成搜索任务有时会跑去
search_file了,直接单独提取一个 Search Agent 了。
BaseAgent 抽象类
BaseAgent 抽象基类设计如下:
export abstract class BaseAgent {
protected memoryManager: MemoryManager;
constructor(
protected mcpClient: McpClientService,
protected gateway: CapabilityGateway,
) {
this.memoryManager = new MemoryManager();
}
abstract readonly agentType: AgentType;
abstract getSystemPrompt(context: AgentContext): string;
abstract getAllowedTools(): Set<string>;
abstract enrichContext(context: AgentContext): Promise<AgentContext>;
async run(
context: AgentContext,
messages: IMessage[],
conversationId: string,
options?: AgentRunOptions,
): Promise<AgentLoopResult>;
// Agent Loop
private async runAgentLoop(
messages: LlmMessage[],
tools: LlmTool[],
context: AgentContext,
conversationId: string,
onEvent?: AgentEventCallback,
signal?: AbortSignal,
): Promise<LoopResult>;
}
我们把 Agent 的运行 Loop 在抽象类中实现了,然后其他的让不同的处理具体事情的 Agent 自己实现,比如上下文填充这些的。
Agent Loop 如下(这里的 Gateway 组件后面会讲解它的设计):
上下文管理
下面简单讲讲 Document Agent 这些完成具体任务的 Agent 的一些实现细节。首先是上下文管理。
对于 Drive Agent,我们需要将 Agent 所处的环境作为上下文塞进去。这样才能处理类似“在当前目录中创建xxx文件并分享”这样的场景。在给 Agent 构建 System Prompt 时,我们就需要先获取一下当前环境的一些信息然后注入 Prompt。比如对于 Document Agent,我们获取当前文档内容以及 Drive 中与之相关的其他文档:
try {
const fileResult = await this.mcpClient.callTool("read_file", {
userId: context.userId,
fileId: context.fileId,
});
const fileData = fileResult.content.map((c) => c.text).join("\n");
try {
const parsed = JSON.parse(fileData);
enriched.documentContent = parsed.content || fileData;
enriched.documentName = parsed.file?.name || "Unknown document";
} catch {
enriched.documentContent = fileData;
enriched.documentName = "Unknown document";
}
} catch (error) {
logger.warn(
{ error, fileId: context.fileId },
"Failed to read document content for context enrichment",
);
enriched.documentContent = "(Could not load document content)";
}
try {
const searchQuery = enriched.documentName || "related documents";
const searchResult = await this.mcpClient.callTool("semantic_search_files", {
userId: context.userId,
query: searchQuery,
limit: 5,
});
const searchData = searchResult.content.map((c) => c.text).join("\n");
if (!searchResult.isError && searchData.length > 10) {
enriched.relatedContext = searchData;
}
} catch {
logger.debug("Semantic search unavailable for document context enrichment");
}
然后注入到 Prompt 中:
let documentSection = "";
if (context.documentContent) {
const MAX_DOC_CHARS = 30_000;
const truncated = context.documentContent.length > MAX_DOC_CHARS;
const content = truncated
? context.documentContent.slice(0, MAX_DOC_CHARS)
: context.documentContent;
documentSection = `\n\n## Current Document
**Name**: ${context.documentName || "Unknown"}
**File ID**: ${context.fileId}
**Content** (${truncated ? `first ${MAX_DOC_CHARS} chars of ${context.documentContent.length}` : `${content.length} chars`}):
\`\`\`
${content}
\`\`\``;
if (truncated) {
documentSection +=
"\n\n*Note: Document content was truncated in the preview. Use `read_file` for the full content, or `patch_file` for targeted edits.*";
}
}
let relatedSection = "";
if (context.relatedContext) {
relatedSection = `\n\n## Related Workspace Context
The following relevant content was found in the user's workspace (from semantic search):
\`\`\`json
${context.relatedContext.slice(0, 5000)}
\`\`\`
Use this context to write more informed, workspace-aware content when appropriate.`;
}
const result = `## Context
- User ID: ${context.userId}
- Current File ID: ${context.fileId || "(none)"}
${documentSection}
${relatedSection}`;
其他的也一样,比如 Drive Agent 需要知道当前的工作目录和目录内容快照。还有决定是否需要使用 Task Plan 和负责 Task Plan 的 Agent 也需要指明让 Agent 读取当前环境:
IMPORTANT CONTEXT: The user may be in one of two environments:
- **Document Editor**: Currently viewing/editing a specific file. Context will include "currentFileId".
- **Drive Browser**: Browsing files/folders. No currentFileId.
这部分的 Context 就是让前端来传、而不是后端提取了。因为是 Agent 的入口+不太需要(只需要判断下在哪个 Agent 中就行了)。
Memory 管理
然后就是 Memory 管理了。整个模块架构如下(直接抄代码中的注释了):
- 滑动窗口管理:保留最近 条原始消息
- 历史摘要生成:对超出窗口的消息生成 LLM 摘要
- 上下文组装:将摘要 + 滑动窗口组合为 LLM 可用的消息序列
- 任务计划集成:将活跃的 TaskPlan 注入上下文
首先是聊天记录管理:我们使用常用的长期记忆+短期记忆,长期的让 LLM 生成一个 Summary,短期的直接使用原始对话:
const totalCount = messages.length;
if (totalCount <= MEMORY_SUMMARY_THRESHOLD) {
return {
summaries: existingSummaries,
recentMessages: messages,
activePlan,
totalMessageCount: totalCount,
};
}
// 分离旧消息和新消息
const cutoff = totalCount - MEMORY_SLIDING_WINDOW;
const recentMessages = messages.slice(cutoff);
// 检查是否需要新的摘要
const lastSummarizedIdx =
existingSummaries.length > 0
? existingSummaries[existingSummaries.length - 1].messageRange.to
: 0;
const newSummaries = [...existingSummaries];
if (lastSummarizedIdx < cutoff) {
// 有新的未摘要消息
const unsummarized = messages.slice(lastSummarizedIdx, cutoff);
if (unsummarized.length > 0) {
const summaryText = await generateSummary(unsummarized);
if (summaryText) {
newSummaries.push({
summary: summaryText,
messageRange: { from: lastSummarizedIdx, to: cutoff },
createdAt: new Date(),
});
logger.info(
{
range: `${lastSummarizedIdx}-${cutoff}`,
summaryLength: summaryText.length,
},
"Generated conversation summary",
);
}
}
}
然后将 Memory 和 Task Plan 杂揉到 System Prompt 里面构建消息序列:
// 注入摘要上下文
if (memoryState.summaries.length > 0) {
const summaryBlock = memoryState.summaries
.map(
(s, i) =>
`[Summary ${i + 1} (msgs ${s.messageRange.from}-${s.messageRange.to})]: ${s.summary}`,
)
.join("\n\n");
messages.push({
role: "system",
content: `## Conversation History Summary\nThe following is a summary of earlier messages in this conversation:\n\n${summaryBlock}\n\n---\nRecent messages follow below.`,
});
}
// 注入任务计划
if (memoryState.activePlan && !memoryState.activePlan.isComplete) {
const planBlock = this.formatTaskPlan(memoryState.activePlan);
messages.push({
role: "system",
content: `## Active Task Plan\n${planBlock}`,
});
}
// 注入滑动窗口消息
for (const msg of memoryState.recentMessages) {
if (msg.role === "user") {
messages.push({ role: "user", content: msg.content });
} else if (msg.role === "assistant") {
messages.push({ role: "assistant", content: msg.content });
}
}
如果上下文超限了,用 LLM 压缩一下即可。
Agent 路由
引入 MultiAgent 后接下来就需要写 Router 来选择正确的 Agent 了,先直接用 “关键词匹配+LLM Router” 这种比较简单的想法实现了。具体的策略如下:
- 高置信度模式匹配: 使用正则表达式处理高频、明确的动作。
- 会话粘性: 保持对话连贯性,除非意图发生明显偏移。
- LLM Router 兜底: 当上述手段都无法判断时,才调用 LLM 进行意图分析。
LLM 让它限制一下返回格式、用 Prompt 让它返回 JSON。
Prompt 设计如下:
const LLM_ROUTER_PROMPT = `You are a routing classifier for a cloud drive application's AI agent system.
Your task is to determine which specialized agent should handle the user's request.
Available agents:
${AGENT_REGISTRY.map(
(a) =>
`- "${a.type}": ${a.description}\n Capabilities: ${a.capabilities.join(", ")}`,
).join("\n")}
Rules:
1. Respond ONLY with a valid JSON object, no extra text.
2. The "route_to" field must be one of: ${AGENT_REGISTRY.map((a) => `"${a.type}"`).join(", ")}.
3. "confidence" is a float between 0 and 1.
4. "reason" is a brief explanation in the user's language.
Output format:
{"route_to": "<agent_type>", "confidence": <float>, "reason": "<brief_reason>"}`;
Task Planner
Task Planner 使用 LLM 对复杂任务生成 json Task Plan 然后一步一步执行。在调试过程中发现的一些 Task Plan 生成的问题(比如在 Document Agent 环境中计划创建文件)直接写进 Prompt 中让 Agent 注意了(可能有更优雅的解决方式,不过当时直接用这种快方法修了 bug):
IMPORTANT CONTEXT RULES:
- If the user is currently in the Document Editor (context includes "currentFileId"), the "document" agent can ONLY edit THAT file. NEVER create a step that creates a new file via the document agent. If the user wants to write content, it goes into the CURRENT document.
- The "document" agent does NOT create, delete, move or share files. Only the "drive" agent does that.
- If the user asks to "write a summary" while in the Document Editor, the summary should be written INTO the current document (via "document" agent), NOT into a new file.
- When editing the current document, prefer fewer steps. A simple "edit current document" task is usually 1-2 steps, not 5.
然后定义基本的 Task 状态流传并实现即可:
export const TASK_STATUS = {
PENDING: "pending",
IN_PROGRESS: "in-progress",
COMPLETED: "completed",
FAILED: "failed",
SKIPPED: "skipped",
} as const;
export class TaskPlanTracker {
startCurrentStep(plan: TaskPlan): TaskPlan;
completeCurrentStep(plan: TaskPlan, result?: string): TaskPlan;
failCurrentStep(plan: TaskPlan, error: string): TaskPlan;
skipCurrentStep(plan: TaskPlan, reason?: string): TaskPlan;
}
其实这些东西可以引入任务队列这些的,之后重构好之后会记录一下。
3. Agent 编排
实现的一些细节
TaskOrchestrator 的每一步包括:
export interface StepResult {
step: TaskStep;
content: string;
toolCalls: IToolCall[];
pendingApprovals: AgentLoopResult["pendingApprovals"];
success: boolean;
error?: string;
}
每次完成一个阶段后,push stepResult 然后触发回调。举一个例子:
export interface AgentStreamEvent {
type: AgentEventType;
data: Record<string, unknown>;
}
// Agent 事件回调函数类型
export type AgentEventCallback = (event: AgentStreamEvent) => void;
stepResults.push({
step,
content: "",
toolCalls: [],
pendingApprovals: [],
success: false,
error: `No agent for type: ${agentType}`,
});
onEvent?.({
type: AGENT_EVENT_TYPE.TASK_STEP_UPDATE,
data: {
stepId: step.id,
status: TASK_STATUS.FAILED,
error: `No agent for type: ${agentType}`,
},
});
这样前后端就完成沟通了。
总体流程
整个 Agent 流程的 Event 如下:
Controller.chatStream
└── sendEvent (写入 HTTP Response)
↑
AgentService.chat(onEvent)
├── emit route_decision
├── emit task_plan
│
└── TaskOrchestrator.executePlan(onEvent)
├── emit task_step_update (in-progress)
│
└── BaseAgent.run(onEvent)
| ├── emit tool_call_start
| ├── emit approval_needed
| ├── emit approval_resolved
| ├── emit tool_call_end
| └── emit content
│
└── emit task_step_update (completed/failed)
流程图如下:
4. Capacity Gateway 组件
为了让 Agent 安全执行指令(不要莫名其妙执行一些危险指令,比如把 drive 文档全删光了),我们不能让 Agent 自己执行工具调用,而是需要判断一下当前是否能执行工具调用。
只需要管理工具调用就行,因为 Agent 只能通过工具调用来操作 Drive。
同时我们把 Approve 的逻辑也塞在这里(最开始是放在 Agent Orchestator 里面的,但是有点问题,而且这个应该也是 Gateway 的指责)。这个东西的关键是引入一个存储 resolve 的 Map:
export interface ApprovalResolution {
approved: boolean;
modifiedArgs?: Record<string, unknown>;
}
const approvalResolvers = new Map<
string,
{ resolve: (result: ApprovalResolution) => void }
>();
modifiedArgs是 Approve 中被选择的参数,比如 Share 可以自己配置参数。
这个是非常关键的。我们考虑 Approve 的流程:
- Agent 发起了
waitForApproval然后停留在原处,SSE 流在这里阻塞。同时后端给前端发送 JSON。 - 前端收到这个东西后弹出对话框让用户确认。
- 用户确认后,前端发送新的 POST HTTP 请求给后端。
但是此时 SSE 流仍然被阻塞着。怎么唤醒它呢?就需要使用我们的回调!我们从这个 Map 中取出 resolve 函数:
const resolver = approvalResolvers.get(approvalId);
然后进行相关的处理即可。比如 Approve 失败了就返回 failed:
resolver.resolve({ approved: false });
成功了就返回对应的结果:
resolver.resolve({ approved, modifiedArgs });
然后剩下的就简单了:对于等待 Approve 的调用,我们往 Map 中塞进它的 resolve(同时做一些 abort、超时处理之类的):
return new Promise<ApprovalResolution>((resolve) => {
// 已经被终止(SSE 断开)
if (signal?.aborted) {
resolve({ approved: false });
return;
}
// 监听 SSE 连接断开
const onAbort = () => {
approvalResolvers.delete(approvalId);
resolve({ approved: false });
};
signal?.addEventListener("abort", onAbort, { once: true });
// 注册 resolver — resolveApproval 调用时触发
approvalResolvers.set(approvalId, {
resolve: (result) => {
signal?.removeEventListener("abort", onAbort);
resolve(result);
},
});
// 超时保护(与 APPROVAL_TTL_SECONDS 对齐)
setTimeout(() => {
if (approvalResolvers.has(approvalId)) {
approvalResolvers.delete(approvalId);
signal?.removeEventListener("abort", onAbort);
resolve({ approved: false });
logger.debug({ approvalId }, "waitForApproval timed out");
}
}, APPROVAL_TTL_SECONDS * 1000);
});
然后在处理 Approve 的函数中调用 resolve 来取消阻塞:
const request = pendingApprovals.get(approvalId);
const elapsed = Date.now() - request.createdAt.getTime();
if (elapsed > request.ttlSeconds * 1000) {
request.status = APPROVAL_STATUS.EXPIRED;
request.resolvedAt = new Date();
pendingApprovals.delete(approvalId);
const resolver = approvalResolvers.get(approvalId);
if (resolver) {
resolver.resolve({ approved: false });
approvalResolvers.delete(approvalId);
}
return request;
}
request.status = approved ? APPROVAL_STATUS.APPROVED : APPROVAL_STATUS.REJECTED;
request.resolvedAt = new Date();
if (approved) {
this.incrementRateCounter(userId);
}
// 触发等待中的 Agent 循环继续执行
const resolver = approvalResolvers.get(approvalId);
if (resolver) {
resolver.resolve({ approved, modifiedArgs });
approvalResolvers.delete(approvalId);
}