Dark Dwarf Blog background

Drive MCP 设计

Drive MCP 设计

在封装 Drive Agent 的 Tool 调用时想到了使用 MCP,因为 MCP 是统一的协议、可以让我们的 Tool 被内置 Agent 外的东西通过 MCP Client 访问。比如实现完成后在 Copilot 访问 Drive 内文件:

alt text alt text

下面大致记录一下具体的系统设计。

1. Tool

Drive 中的工具分为下面几类:

  1. File 相关操作。
  2. Folder 相关操作。
  3. 对 Document 的修改写入。
  4. 知识层相关(其实就是 Embedding,给所有文档都做个 Embedding 存起来,方便 Sematic Search)与搜索工具。
  5. Share 相关(Drive 要可以分享文件之类的嘛)
  6. MapReduce 处理(这个是处理并发调用返回的大量结果的,在之后的文章会简单提一下)。

MCP Tool 的实现很简单,就是 server.registerTool 然后注册对应 Tool 逻辑即可,例如:

server.registerTool(
  "get_file_info",
  {
    description:
      "Get detailed metadata about a specific file (name, size, mimeType, timestamps, permissions). " +
      "WHEN TO USE: When you need file metadata (size, type, dates) and already have the file ID. " +
      "WHEN NOT TO USE: When you need the file's text content (use extract_file_content instead). " +
      "NOTES: Returns metadata only, NOT file content. If a resource URI already provides this info, do not call this.",
    inputSchema: z.object({
      userId: userIdParam,
      fileId: z.string().describe("The file ID to get information for"),
    }),
  },
  async ({ userId: rawUserId, fileId }) => {
    try {
      const userId = resolveUserId(rawUserId, authContext);
      const file = await fileService.getFileById(fileId, userId);
      return {
        content: [
          {
            type: "text" as const,
            text: JSON.stringify(file, null, 2),
          },
        ],
      };
    } catch (error) {
      const message = error instanceof Error ? error.message : "Unknown error";
      return {
        content: [{ type: "text" as const, text: `Error: ${message}` }],
        isError: true,
      };
    }
  },
);

这里要注意的是 description 的写法。我采用的最终写法是正例+反例+一些注意的 NOTE。一个经验是不要写一些太过绝对的指令,不然如果你的 Agent Model 不太好的话可能会被绕进去。比如 Agent 最开始搭建的时候让它搜索相关文档并做总结、它总是跑去用关键词搜索,于是在 Sematic File Search 工具中加入了 只要搜索文件相关内容就必须要用这个工具 的 description,结果 Agent 就经常跑去用这个 Tool 读文件了。

2. MCP 鉴权

这里讲讲我自己的 MCP 鉴权设计。我们给 MCP 引入一个 AuthContext 来传递用户相关鉴权状态:

export interface McpAuthContext {
  userId?: string;
  userEmail?: string;
  userName?: string;
  authenticatedAt?: Date;
  authMethod?: "api_key" | "none";
  keyName?: string;
}

用户的 API Key 存在 MongoDB 后端中,然后在做别的事情前,Agent 会校验并填充对应 Context:

export async function authenticateWithApiKey(
  rawKey: string,
  authContext: McpAuthContext,
): Promise<boolean> {
  const apiKeyService = getApiKeyService();
  const result = await apiKeyService.validateKey(rawKey);

  if (!result) {
    return false;
  }

  authContext.userId = result.userId;
  authContext.userEmail = result.userEmail;
  authContext.userName = result.userName;
  authContext.authenticatedAt = new Date();
  authContext.authMethod = "api_key";
  authContext.keyName = result.keyName;

  logger.info(
    {
      userId: result.userId,
      email: result.userEmail,
      keyName: result.keyName,
    },
    "MCP session authenticated via API key",
  );

  return true;
}

这个东西作为 Tool 的上下文传递,Agent 每次调用 Tool 都要校验一下:

async ({ userId: rawUserId, fileId }) => {
  const userId = resolveUserId(rawUserId, authContext);
  ......
};

其实有些类似 jwt payload。

在 Drive 内部 Agent 是不会调用这个的,因为 jwt token 中已经有 userId 了。但是在外部 Agent(比如我测试的 Copilot 中),就需要在 MCP Server 中传入这个或者让 Agent 自己调用 Auth Tool 了:

async ({ apiKey }) => {
  try {
    const success = await authenticateWithApiKey(apiKey, authContext);

    if (!success) {
      return {
        content: [
          {
            type: "text" as const,
            text: JSON.stringify({
              success: false,
              error:
                "Invalid or expired API key. Please generate a new one in the Drive web UI.",
            }),
          },
        ],
        isError: true,
      };
    }

    return {
      content: [
        {
          type: "text" as const,
          text: JSON.stringify({
            success: true,
            message: `Authenticated as ${authContext.userName} (${authContext.userEmail})`,
            userId: authContext.userId,
            name: authContext.userName,
            email: authContext.userEmail,
            keyName: authContext.keyName,
          }),
        },
      ],
    };
  } catch (error) {
    const message =
      error instanceof Error ? error.message : "Authentication failed";
    logger.error({ err: error }, "MCP API key authentication failed");
    return {
      content: [
        {
          type: "text" as const,
          text: JSON.stringify({ success: false, error: message }),
        },
      ],
      isError: true,
    };
  }
};

3. Resource

在 Drive 中,Resource 就是 File 内容和 Folder 下面的 File。我们定义 drive://files/{fileId}drive://folders/{folderId} 这两个动态 Resource URI。然后为其注入模板。

我的初步设计中的 Resource List 回调返回空对象,因为这个资源主要是让 Agent 自己获取或在 Attach file / Folder 中获取的,不需要一进入就 List(Drive 上下文也在 Sub Agent 中进行更精确的初始化了)、只需要实现动态获取的部分:

server.registerResource(
  "drive-file",
  new ResourceTemplate("drive://files/{fileId}", {
    list: async () => {
      // 文件不需要 list
      return { resources: [] };
    },
  }),
  {
    title: "Drive File Content",
    description:
      "Read the content of a file from the drive. Automatically handles " +
      "text files, PDF, and DOCX formats. Use drive://files/{fileId} to " +
      "inject file content as context for AI conversations.",
    mimeType: "text/plain",
  },
  async (uri, variables) => {
    const fileId = variables.fileId as string;

    try {
      const file = await File.findById(fileId);
      if (!file) {
        return {
          contents: [
            {
              uri: uri.href,
              mimeType: "text/plain",
              text: `[Error: File not found (id: ${fileId})]`,
            },
          ],
        };
      }

      if (!isTextExtractable(file.mimeType)) {
        return {
          contents: [
            {
              uri: uri.href,
              mimeType: "application/json",
              text: JSON.stringify(
                {
                  error: "binary_file",
                  message: `Cannot extract text from ${file.mimeType}. This is a binary file (e.g., image, video).`,
                  file: {
                    id: file._id.toString(),
                    name: file.name,
                    size: file.size,
                    mimeType: file.mimeType,
                  },
                  suggestion:
                    "Use the 'get_download_url' tool to get a download link for this file.",
                },
                null,
                2,
              ),
            },
          ],
        };
      }

      const userId = file.user.toString();
      const { text, extractionMethod } =
        await knowledgeService.extractFileContent(fileId, userId);

      // 根据上下文窗口进行裁剪
      // TODO:可以用 Summary 或者更智能的裁剪方式
      const MAX_RESOURCE_CHARS = 200_000;
      let content = text;
      let truncated = false;
      if (content.length > MAX_RESOURCE_CHARS) {
        content = content.slice(0, MAX_RESOURCE_CHARS);
        truncated = true;
      }

      const header =
        `# ${file.name}\n` +
        `> File ID: ${fileId} | Type: ${file.mimeType} | Size: ${formatBytes(file.size)} | ` +
        `Extraction: ${extractionMethod}` +
        (truncated
          ? ` | ⚠️ TRUNCATED (${formatBytes(text.length)} → ${formatBytes(MAX_RESOURCE_CHARS)})`
          : "") +
        `\n\n`;

      // 限制 Agent 对已提供的文件内容进行任何形式的重复验证或提取
      // 直接使用 Resource 内容进行后续推理
      const systemNote =
        `\n\n---\n` +
        `[SYSTEM NOTE — Agent Directive]\n` +
        `The content above is the COMPLETE and UP-TO-DATE text of file "${file.name}" (ID: ${fileId}).\n` +
        `You MUST NOT call 'extract_file_content', 'get_file_info', or any read tool on this file — the data is already here.\n` +
        `Proceed directly with the user's request using the content provided above.`;

      return {
        contents: [
          {
            uri: uri.href,
            mimeType: "text/plain",
            text: header + content + systemNote,
          },
        ],
      };
    } catch (error) {
      const message = error instanceof Error ? error.message : "Unknown error";
      logger.error(
        { err: error, fileId },
        "Failed to read drive://files resource",
      );
      return {
        contents: [
          {
            uri: uri.href,
            mimeType: "text/plain",
            text: `[Error reading file: ${message}]`,
          },
        ],
      };
    }
  },
);

注册完成之后,MCP Client 可以按需读取:

async readResource(uri: string): Promise<McpReadResourceResult> {
  await this.connect();

  const result = await this.client!.readResource({ uri });
  const contents: McpResourceContent[] = (result.contents || []).map((c) => ({
    uri: c.uri,
    text: "text" in c ? (c.text as string) : undefined,
    blob: "blob" in c ? (c.blob as string) : undefined,
    mimeType: c.mimeType,
  }));

  return { contents };
}

其实也可以让 Agent 自己调用 Tool 获取,但是这样子更加优雅。例如:Client 通过 MCP 协议的 resources/read 接口,直接向 MCP Server 请求 drive://files/123 的内容,这些内容自己会作为系统上下文附加在用户的 Prompt 旁边。而且不管什么东西调用、返回的内容都是一致的。

4. Server & Client

Drive 内部 Server 和 Client 使用 InMemoryTransport 连接:

async connect(): Promise<void> {
  if (this.client) return;

  this.server = createMcpServer(this.services);
  this.client = new Client({
    name: "mdrive-internal-agent",
    version: "1.0.0",
  });

  const [clientTransport, serverTransport] =
    InMemoryTransport.createLinkedPair();

  await Promise.all([
    this.client.connect(clientTransport),
    this.server.connect(serverTransport),
  ]);

  logger.info("MCP Client connected to in-process server");
}

提供给 Copilot 之类的外部 Agent 的使用 stdio 即可:

const server = createMcpServer(services, authContext);
const transport = new StdioServerTransport();

logger.info("Starting MCP stdio server...");
await server.connect(transport);

StreamableHTTP 的也很简单,使用 StreamableHTTPServerTransporthandleRequest 即可:

const transports = new Map<string, StreamableHTTPServerTransport>();

// POST /api/mcp
router.post("/", async (req: Request, res: Response) => {
  const sessionId = req.headers["mcp-session-id"] as string | undefined;

  // 已有会话,复用 transport
  if (sessionId && transports.has(sessionId)) {
    const transport = transports.get(sessionId)!;
    try {
      await transport.handleRequest(req, res, req.body);
    } catch (error) {
      logger.error(
        { err: error, sessionId },
        "Error handling MCP request for existing session",
      );
      if (!res.headersSent) {
        res.status(500).json({ error: "Internal server error" });
      }
    }
    return;
  }

  if (!sessionId && isInitializeRequest(req.body)) {
    const transport = new StreamableHTTPServerTransport({
      sessionIdGenerator: () => randomUUID(),
      onsessioninitialized: (sid) => {
        transports.set(sid, transport);
        logger.info({ sessionId: sid }, "MCP session initialized");
      },
    });

    // transport 关闭时清理
    transport.onclose = () => {
      const sid = Array.from(transports.entries()).find(
        ([, t]) => t === transport,
      )?.[0];
      if (sid) {
        transports.delete(sid);
        logger.info({ sessionId: sid }, "MCP session closed");
      }
    };

    const server = createMcpServer();
    try {
      await server.connect(transport);
      await transport.handleRequest(req, res, req.body);
    } catch (error) {
      logger.error({ err: error }, "Error initializing MCP session");
      if (!res.headersSent) {
        res.status(500).json({ error: "Failed to initialize MCP session" });
      }
    }
    return;
  }

  // 无效请求
  res.status(400).json({
    jsonrpc: "2.0",
    error: {
      code: -32600,
      message: "Invalid request: no valid session ID or initialize request",
    },
    id: null,
  });
});