Dark Dwarf Blog background

Drive Agent 设计

Drive Agent 设计

这是第一次设计这种比较完整的 Agent 系统并集成到实际应用中,因此记录一下实现 Google Drive Clone 的 Agent 部分的设计。

LLM 调用直接使用 URL fetch 访问 Deepseek API,并且全部未使用 Agent 框架、直接手搓实现,之后可能会用框架重构一下。

1. 总体架构

Agent 包含的模块和交互流程如下:

需要规划

直接执行

BaseAgent.run

Agent循环

CapabilityGateway

权限 + 审批

MCP Tool Call

实际工具执行

用户消息

AgentController

Express HTTP/SSE

AgentService

编排中枢

shouldPlanTask

复杂度判断

TaskPlanner

任务分解

AgentRouter

混合路由

MemoryManager

记忆构建

TaskOrchestrator

多Agent编排

单Agent执行

Response

SSE事件流 + 任务进度

我们使用 TaskOrchestrator 这个独立组件来编排 Agent、处理 Agent 调用之类的。然后所有的调用执行统一通过 CapacityGateway 进行调用。然后每个 Agent 配备了 MemoryManager 等组件。之后还可插拔其他的组件。

然后前后端通过 SSE 进行事件传输。backend 在不同阶段发送对应 event、frontend 再通过 Hooks 将 event 渲染成 UI。

2. Agent 组件设计

a.a. 负责功能执行的 MultiAgent

初版的实现是只用一个 Agent,通过 Prompt 设计让 Agent 自己根据具体任务来决定调用什么工具、怎么完成任务。但是效果很不好:比如在一个文档中让它查找一个写了某个内容的文档并做总结,它有时会用 search_file 工具而不是正确的 sematic_search_file、导致找到不对的文档,还会莫名其妙调用 create_file工具而不是编辑当前文件,导致很多乱七八糟的错误(如果要指明 Agent 的相关行为也可以,但是要使用很多 Prompt + 不一定有效)。

然后还打算提供 Task Plan 来指引 Agent 完成任务,于是引入了下面的完成具体任务的 Agent:

  • Document Agent:负责文档编辑相关的 Agent。
  • Drive Agent:负责文件创建、移动之类的 Agent.
  • Search Agent:负责向量索引建立、语义搜索相关的。

和负责编排的 Task Plan Agent。还有些辅助决策的 Agent,比如 Router Agent 和决策是否使用 Task Plan 的 Agent。

引入专门的 Search Agent 来做这个就是因为前面说的问题,让 Drive Agent 自己完成搜索任务有时会跑去 search_file 了,直接单独提取一个 Search Agent 了。

b.b. BaseAgent 抽象类

BaseAgent 抽象基类设计如下:

export abstract class BaseAgent {
  protected memoryManager: MemoryManager;

  constructor(
    protected mcpClient: McpClientService,
    protected gateway: CapabilityGateway,
  ) {
    this.memoryManager = new MemoryManager();
  }

  abstract readonly agentType: AgentType;

  abstract getSystemPrompt(context: AgentContext): string;

  abstract getAllowedTools(): Set<string>;

  abstract enrichContext(context: AgentContext): Promise<AgentContext>;

  async run(
    context: AgentContext,
    messages: IMessage[],
    conversationId: string,
    options?: AgentRunOptions,
  ): Promise<AgentLoopResult>;

  // Agent Loop
  private async runAgentLoop(
    messages: LlmMessage[],
    tools: LlmTool[],
    context: AgentContext,
    conversationId: string,
    onEvent?: AgentEventCallback,
    signal?: AbortSignal,
  ): Promise<LoopResult>;
}

我们把 Agent 的运行 Loop 在抽象类中实现了,然后其他的让不同的处理具体事情的 Agent 自己实现,比如上下文填充这些的。

Agent Loop 如下(这里的 Gateway 组件后面会讲解它的设计):

空响应

有响应

不允许且需审批

不允许

允许

开始Agent循环

迭代次数 < MAX_TOOL_CALLS?

返回达到最大操作次数

内存压缩

调用LLM

获取响应

返回错误信息

有tool calls?

返回最终内容

循环处理每个tool call

解析参数并添加userId

发送TOOL_CALL_START事件

Gateway检查权限

权限判断

发送APPROVAL_NEEDED事件

等待用户审批

审批通过?

执行工具

返回拒绝信息

返回阻断信息

工具执行

处理结果长度限制

记录tool call

发送TOOL_CALL_END事件

将结果添加到messages

还有下一个tool?

b.b. 上下文管理

下面简单讲讲 Document Agent 这些完成具体任务的 Agent 的一些实现细节。首先是上下文管理。

对于 Drive Agent,我们需要将 Agent 所处的环境作为上下文塞进去。这样才能处理类似“在当前目录中创建xxx文件并分享”这样的场景。在给 Agent 构建 System Prompt 时,我们就需要先获取一下当前环境的一些信息然后注入 Prompt。比如对于 Document Agent,我们获取当前文档内容以及 Drive 中与之相关的其他文档:

try {
  const fileResult = await this.mcpClient.callTool("read_file", {
    userId: context.userId,
    fileId: context.fileId,
  });

  const fileData = fileResult.content.map((c) => c.text).join("\n");

  try {
    const parsed = JSON.parse(fileData);
    enriched.documentContent = parsed.content || fileData;
    enriched.documentName = parsed.file?.name || "Unknown document";
  } catch {
    enriched.documentContent = fileData;
    enriched.documentName = "Unknown document";
  }
} catch (error) {
  logger.warn(
    { error, fileId: context.fileId },
    "Failed to read document content for context enrichment",
  );
  enriched.documentContent = "(Could not load document content)";
}

try {
  const searchQuery = enriched.documentName || "related documents";
  const searchResult = await this.mcpClient.callTool("semantic_search_files", {
    userId: context.userId,
    query: searchQuery,
    limit: 5,
  });

  const searchData = searchResult.content.map((c) => c.text).join("\n");
  if (!searchResult.isError && searchData.length > 10) {
    enriched.relatedContext = searchData;
  }
} catch {
  logger.debug("Semantic search unavailable for document context enrichment");
}

然后注入到 Prompt 中:

let documentSection = "";
if (context.documentContent) {
  const MAX_DOC_CHARS = 30_000;
  const truncated = context.documentContent.length > MAX_DOC_CHARS;
  const content = truncated
    ? context.documentContent.slice(0, MAX_DOC_CHARS)
    : context.documentContent;

  documentSection = `\n\n## Current Document
**Name**: ${context.documentName || "Unknown"}
**File ID**: ${context.fileId}
**Content** (${truncated ? `first ${MAX_DOC_CHARS} chars of ${context.documentContent.length}` : `${content.length} chars`}):
\`\`\`
${content}
\`\`\``;

  if (truncated) {
    documentSection +=
      "\n\n*Note: Document content was truncated in the preview. Use `read_file` for the full content, or `patch_file` for targeted edits.*";
  }
}

let relatedSection = "";
if (context.relatedContext) {
  relatedSection = `\n\n## Related Workspace Context
The following relevant content was found in the user's workspace (from semantic search):
\`\`\`json
${context.relatedContext.slice(0, 5000)}
\`\`\`
Use this context to write more informed, workspace-aware content when appropriate.`;
}

const result = `## Context
- User ID: ${context.userId}
- Current File ID: ${context.fileId || "(none)"}
${documentSection}
${relatedSection}`;

其他的也一样,比如 Drive Agent 需要知道当前的工作目录和目录内容快照。还有决定是否需要使用 Task Plan 和负责 Task Plan 的 Agent 也需要指明让 Agent 读取当前环境:

IMPORTANT CONTEXT: The user may be in one of two environments:
- **Document Editor**: Currently viewing/editing a specific file. Context will include "currentFileId".
- **Drive Browser**: Browsing files/folders. No currentFileId.

这部分的 Context 就是让前端来传、而不是后端提取了。因为是 Agent 的入口+不太需要(只需要判断下在哪个 Agent 中就行了)。

c.c. Memory 管理

然后就是 Memory 管理了。整个模块架构如下(直接抄代码中的注释了):

  • 滑动窗口管理:保留最近 NN 条原始消息
  • 历史摘要生成:对超出窗口的消息生成 LLM 摘要
  • 上下文组装:将摘要 + 滑动窗口组合为 LLM 可用的消息序列
  • 任务计划集成:将活跃的 TaskPlan 注入上下文

首先是聊天记录管理:我们使用常用的长期记忆+短期记忆,长期的让 LLM 生成一个 Summary,短期的直接使用原始对话:

const totalCount = messages.length;

if (totalCount <= MEMORY_SUMMARY_THRESHOLD) {
  return {
    summaries: existingSummaries,
    recentMessages: messages,
    activePlan,
    totalMessageCount: totalCount,
  };
}

// 分离旧消息和新消息
const cutoff = totalCount - MEMORY_SLIDING_WINDOW;
const recentMessages = messages.slice(cutoff);

// 检查是否需要新的摘要
const lastSummarizedIdx =
  existingSummaries.length > 0
    ? existingSummaries[existingSummaries.length - 1].messageRange.to
    : 0;

const newSummaries = [...existingSummaries];

if (lastSummarizedIdx < cutoff) {
  // 有新的未摘要消息
  const unsummarized = messages.slice(lastSummarizedIdx, cutoff);
  if (unsummarized.length > 0) {
    const summaryText = await generateSummary(unsummarized);
    if (summaryText) {
      newSummaries.push({
        summary: summaryText,
        messageRange: { from: lastSummarizedIdx, to: cutoff },
        createdAt: new Date(),
      });

      logger.info(
        {
          range: `${lastSummarizedIdx}-${cutoff}`,
          summaryLength: summaryText.length,
        },
        "Generated conversation summary",
      );
    }
  }
}

然后将 Memory 和 Task Plan 杂揉到 System Prompt 里面构建消息序列:

// 注入摘要上下文
if (memoryState.summaries.length > 0) {
  const summaryBlock = memoryState.summaries
    .map(
      (s, i) =>
        `[Summary ${i + 1} (msgs ${s.messageRange.from}-${s.messageRange.to})]: ${s.summary}`,
    )
    .join("\n\n");

  messages.push({
    role: "system",
    content: `## Conversation History Summary\nThe following is a summary of earlier messages in this conversation:\n\n${summaryBlock}\n\n---\nRecent messages follow below.`,
  });
}

// 注入任务计划
if (memoryState.activePlan && !memoryState.activePlan.isComplete) {
  const planBlock = this.formatTaskPlan(memoryState.activePlan);
  messages.push({
    role: "system",
    content: `## Active Task Plan\n${planBlock}`,
  });
}

// 注入滑动窗口消息
for (const msg of memoryState.recentMessages) {
  if (msg.role === "user") {
    messages.push({ role: "user", content: msg.content });
  } else if (msg.role === "assistant") {
    messages.push({ role: "assistant", content: msg.content });
  }
}

如果上下文超限了,用 LLM 压缩一下即可。

d.d. Agent 路由

引入 MultiAgent 后接下来就需要写 Router 来选择正确的 Agent 了,先直接用 “关键词匹配+LLM Router” 这种比较简单的想法实现了。具体的策略如下:

  • 高置信度模式匹配: 使用正则表达式处理高频、明确的动作。
  • 会话粘性: 保持对话连贯性,除非意图发生明显偏移。
  • LLM Router 兜底: 当上述手段都无法判断时,才调用 LLM 进行意图分析。

LLM 让它限制一下返回格式、用 Prompt 让它返回 JSON。

Prompt 设计如下:

const LLM_ROUTER_PROMPT = `You are a routing classifier for a cloud drive application's AI agent system.
Your task is to determine which specialized agent should handle the user's request.

Available agents:
${AGENT_REGISTRY.map(
  (a) =>
    `- "${a.type}": ${a.description}\n  Capabilities: ${a.capabilities.join(", ")}`,
).join("\n")}

Rules:
1. Respond ONLY with a valid JSON object, no extra text.
2. The "route_to" field must be one of: ${AGENT_REGISTRY.map((a) => `"${a.type}"`).join(", ")}.
3. "confidence" is a float between 0 and 1.
4. "reason" is a brief explanation in the user's language.

Output format:
{"route_to": "<agent_type>", "confidence": <float>, "reason": "<brief_reason>"}`;

e.e. Task Planner

Task Planner 使用 LLM 对复杂任务生成 json Task Plan 然后一步一步执行。在调试过程中发现的一些 Task Plan 生成的问题(比如在 Document Agent 环境中计划创建文件)直接写进 Prompt 中让 Agent 注意了(可能有更优雅的解决方式,不过当时直接用这种快方法修了 bug):

IMPORTANT CONTEXT RULES:
- If the user is currently in the Document Editor (context includes "currentFileId"), the "document" agent can ONLY edit THAT file. NEVER create a step that creates a new file via the document agent. If the user wants to write content, it goes into the CURRENT document.
- The "document" agent does NOT create, delete, move or share files. Only the "drive" agent does that.
- If the user asks to "write a summary" while in the Document Editor, the summary should be written INTO the current document (via "document" agent), NOT into a new file.
- When editing the current document, prefer fewer steps. A simple "edit current document" task is usually 1-2 steps, not 5.

然后定义基本的 Task 状态流传并实现即可:

export const TASK_STATUS = {
  PENDING: "pending",
  IN_PROGRESS: "in-progress",
  COMPLETED: "completed",
  FAILED: "failed",
  SKIPPED: "skipped",
} as const;
export class TaskPlanTracker {
  startCurrentStep(plan: TaskPlan): TaskPlan;
  completeCurrentStep(plan: TaskPlan, result?: string): TaskPlan;
  failCurrentStep(plan: TaskPlan, error: string): TaskPlan;
  skipCurrentStep(plan: TaskPlan, reason?: string): TaskPlan;
}

其实这些东西可以引入任务队列这些的,之后重构好之后会记录一下。

3. Agent 编排

a.a. 实现的一些细节

TaskOrchestrator 的每一步包括:

export interface StepResult {
  step: TaskStep;
  content: string;
  toolCalls: IToolCall[];
  pendingApprovals: AgentLoopResult["pendingApprovals"];
  success: boolean;
  error?: string;
}

每次完成一个阶段后,push stepResult 然后触发回调。举一个例子:

export interface AgentStreamEvent {
  type: AgentEventType;
  data: Record<string, unknown>;
}

// Agent 事件回调函数类型
export type AgentEventCallback = (event: AgentStreamEvent) => void;

stepResults.push({
  step,
  content: "",
  toolCalls: [],
  pendingApprovals: [],
  success: false,
  error: `No agent for type: ${agentType}`,
});

onEvent?.({
  type: AGENT_EVENT_TYPE.TASK_STEP_UPDATE,
  data: {
    stepId: step.id,
    status: TASK_STATUS.FAILED,
    error: `No agent for type: ${agentType}`,
  },
});

这样前后端就完成沟通了。

b.b. 总体流程

整个 Agent 流程的 Event 如下:

Controller.chatStream
  └── sendEvent (写入 HTTP Response)

AgentService.chat(onEvent)
  ├── emit route_decision
  ├── emit task_plan

  └── TaskOrchestrator.executePlan(onEvent)
        ├── emit task_step_update (in-progress)

        └── BaseAgent.run(onEvent)
        |   ├── emit tool_call_start
        |   ├── emit approval_needed
        |   ├── emit approval_resolved
        |   ├── emit tool_call_end
        |   └── emit content

        └── emit task_step_update (completed/failed)

流程图如下:

还有步骤

所有步骤完成

开始执行计划

初始化变量

遍历计划中的每个步骤

步骤状态为

PENDING?

获取对应Agent

Agent是否存在?

标记步骤为FAILED

记录错误

标记步骤为IN_PROGRESS

构建上下文和消息

进入重试循环

尝试执行步骤

执行成功?

超过最大重试次数?

标记步骤为FAILED

标记步骤为COMPLETED

收集执行结果

更新summaries

构建最终响应

返回执行结果

结束

4. Capacity Gateway 组件

为了让 Agent 安全执行指令(不要莫名其妙执行一些危险指令,比如把 drive 文档全删光了),我们不能让 Agent 自己执行工具调用,而是需要判断一下当前是否能执行工具调用。

只需要管理工具调用就行,因为 Agent 只能通过工具调用来操作 Drive。

同时我们把 Approve 的逻辑也塞在这里(最开始是放在 Agent Orchestator 里面的,但是有点问题,而且这个应该也是 Gateway 的指责)。这个东西的关键是引入一个存储 resolve 的 Map:

export interface ApprovalResolution {
  approved: boolean;
  modifiedArgs?: Record<string, unknown>;
}

const approvalResolvers = new Map<
  string,
  { resolve: (result: ApprovalResolution) => void }
>();

modifiedArgs 是 Approve 中被选择的参数,比如 Share 可以自己配置参数。

这个是非常关键的。我们考虑 Approve 的流程:

  1. Agent 发起了 waitForApproval 然后停留在原处,SSE 流在这里阻塞。同时后端给前端发送 JSON。
  2. 前端收到这个东西后弹出对话框让用户确认。
  3. 用户确认后,前端发送新的 POST HTTP 请求给后端。

但是此时 SSE 流仍然被阻塞着。怎么唤醒它呢?就需要使用我们的回调!我们从这个 Map 中取出 resolve 函数:

const resolver = approvalResolvers.get(approvalId);

然后进行相关的处理即可。比如 Approve 失败了就返回 failed:

resolver.resolve({ approved: false });

成功了就返回对应的结果:

resolver.resolve({ approved, modifiedArgs });

然后剩下的就简单了:对于等待 Approve 的调用,我们往 Map 中塞进它的 resolve(同时做一些 abort、超时处理之类的):

return new Promise<ApprovalResolution>((resolve) => {
  // 已经被终止(SSE 断开)
  if (signal?.aborted) {
    resolve({ approved: false });
    return;
  }

  // 监听 SSE 连接断开
  const onAbort = () => {
    approvalResolvers.delete(approvalId);
    resolve({ approved: false });
  };
  signal?.addEventListener("abort", onAbort, { once: true });

  // 注册 resolver — resolveApproval 调用时触发
  approvalResolvers.set(approvalId, {
    resolve: (result) => {
      signal?.removeEventListener("abort", onAbort);
      resolve(result);
    },
  });

  // 超时保护(与 APPROVAL_TTL_SECONDS 对齐)
  setTimeout(() => {
    if (approvalResolvers.has(approvalId)) {
      approvalResolvers.delete(approvalId);
      signal?.removeEventListener("abort", onAbort);
      resolve({ approved: false });
      logger.debug({ approvalId }, "waitForApproval timed out");
    }
  }, APPROVAL_TTL_SECONDS * 1000);
});

然后在处理 Approve 的函数中调用 resolve 来取消阻塞:

const request = pendingApprovals.get(approvalId);
const elapsed = Date.now() - request.createdAt.getTime();

if (elapsed > request.ttlSeconds * 1000) {
  request.status = APPROVAL_STATUS.EXPIRED;
  request.resolvedAt = new Date();
  pendingApprovals.delete(approvalId);

  const resolver = approvalResolvers.get(approvalId);
  if (resolver) {
    resolver.resolve({ approved: false });
    approvalResolvers.delete(approvalId);
  }

  return request;
}

request.status = approved ? APPROVAL_STATUS.APPROVED : APPROVAL_STATUS.REJECTED;
request.resolvedAt = new Date();

if (approved) {
  this.incrementRateCounter(userId);
}

// 触发等待中的 Agent 循环继续执行
const resolver = approvalResolvers.get(approvalId);
if (resolver) {
  resolver.resolve({ approved, modifiedArgs });
  approvalResolvers.delete(approvalId);
}