Drive MCP 设计
在封装 Drive Agent 的 Tool 调用时想到了使用 MCP,因为 MCP 是统一的协议、可以让我们的 Tool 被内置 Agent 外的东西通过 MCP Client 访问。比如实现完成后在 Copilot 访问 Drive 内文件:
下面大致记录一下具体的系统设计。
1. Tool
Drive 中的工具分为下面几类:
- File 相关操作。
- Folder 相关操作。
- 对 Document 的修改写入。
- 知识层相关(其实就是 Embedding,给所有文档都做个 Embedding 存起来,方便 Sematic Search)与搜索工具。
- Share 相关(Drive 要可以分享文件之类的嘛)
- MapReduce 处理(这个是处理并发调用返回的大量结果的,在之后的文章会简单提一下)。
MCP Tool 的实现很简单,就是 server.registerTool 然后注册对应 Tool 逻辑即可,例如:
server.registerTool(
"get_file_info",
{
description:
"Get detailed metadata about a specific file (name, size, mimeType, timestamps, permissions). " +
"WHEN TO USE: When you need file metadata (size, type, dates) and already have the file ID. " +
"WHEN NOT TO USE: When you need the file's text content (use extract_file_content instead). " +
"NOTES: Returns metadata only, NOT file content. If a resource URI already provides this info, do not call this.",
inputSchema: z.object({
userId: userIdParam,
fileId: z.string().describe("The file ID to get information for"),
}),
},
async ({ userId: rawUserId, fileId }) => {
try {
const userId = resolveUserId(rawUserId, authContext);
const file = await fileService.getFileById(fileId, userId);
return {
content: [
{
type: "text" as const,
text: JSON.stringify(file, null, 2),
},
],
};
} catch (error) {
const message = error instanceof Error ? error.message : "Unknown error";
return {
content: [{ type: "text" as const, text: `Error: ${message}` }],
isError: true,
};
}
},
);
这里要注意的是 description 的写法。我采用的最终写法是正例+反例+一些注意的 NOTE。一个经验是不要写一些太过绝对的指令,不然如果你的 Agent Model 不太好的话可能会被绕进去。比如 Agent 最开始搭建的时候让它搜索相关文档并做总结、它总是跑去用关键词搜索,于是在 Sematic File Search 工具中加入了
只要搜索文件相关内容就必须要用这个工具的 description,结果 Agent 就经常跑去用这个 Tool 读文件了。
2. MCP 鉴权
这里讲讲我自己的 MCP 鉴权设计。我们给 MCP 引入一个 AuthContext 来传递用户相关鉴权状态:
export interface McpAuthContext {
userId?: string;
userEmail?: string;
userName?: string;
authenticatedAt?: Date;
authMethod?: "api_key" | "none";
keyName?: string;
}
用户的 API Key 存在 MongoDB 后端中,然后在做别的事情前,Agent 会校验并填充对应 Context:
export async function authenticateWithApiKey(
rawKey: string,
authContext: McpAuthContext,
): Promise<boolean> {
const apiKeyService = getApiKeyService();
const result = await apiKeyService.validateKey(rawKey);
if (!result) {
return false;
}
authContext.userId = result.userId;
authContext.userEmail = result.userEmail;
authContext.userName = result.userName;
authContext.authenticatedAt = new Date();
authContext.authMethod = "api_key";
authContext.keyName = result.keyName;
logger.info(
{
userId: result.userId,
email: result.userEmail,
keyName: result.keyName,
},
"MCP session authenticated via API key",
);
return true;
}
这个东西作为 Tool 的上下文传递,Agent 每次调用 Tool 都要校验一下:
async ({ userId: rawUserId, fileId }) => {
const userId = resolveUserId(rawUserId, authContext);
......
};
其实有些类似 jwt payload。
在 Drive 内部 Agent 是不会调用这个的,因为 jwt token 中已经有 userId 了。但是在外部 Agent(比如我测试的 Copilot 中),就需要在 MCP Server 中传入这个或者让 Agent 自己调用 Auth Tool 了:
async ({ apiKey }) => {
try {
const success = await authenticateWithApiKey(apiKey, authContext);
if (!success) {
return {
content: [
{
type: "text" as const,
text: JSON.stringify({
success: false,
error:
"Invalid or expired API key. Please generate a new one in the Drive web UI.",
}),
},
],
isError: true,
};
}
return {
content: [
{
type: "text" as const,
text: JSON.stringify({
success: true,
message: `Authenticated as ${authContext.userName} (${authContext.userEmail})`,
userId: authContext.userId,
name: authContext.userName,
email: authContext.userEmail,
keyName: authContext.keyName,
}),
},
],
};
} catch (error) {
const message =
error instanceof Error ? error.message : "Authentication failed";
logger.error({ err: error }, "MCP API key authentication failed");
return {
content: [
{
type: "text" as const,
text: JSON.stringify({ success: false, error: message }),
},
],
isError: true,
};
}
};
3. Resource
在 Drive 中,Resource 就是 File 内容和 Folder 下面的 File。我们定义 drive://files/{fileId} 和 drive://folders/{folderId} 这两个动态 Resource URI。然后为其注入模板。
我的初步设计中的 Resource List 回调返回空对象,因为这个资源主要是让 Agent 自己获取或在 Attach file / Folder 中获取的,不需要一进入就 List(Drive 上下文也在 Sub Agent 中进行更精确的初始化了)、只需要实现动态获取的部分:
server.registerResource(
"drive-file",
new ResourceTemplate("drive://files/{fileId}", {
list: async () => {
// 文件不需要 list
return { resources: [] };
},
}),
{
title: "Drive File Content",
description:
"Read the content of a file from the drive. Automatically handles " +
"text files, PDF, and DOCX formats. Use drive://files/{fileId} to " +
"inject file content as context for AI conversations.",
mimeType: "text/plain",
},
async (uri, variables) => {
const fileId = variables.fileId as string;
try {
const file = await File.findById(fileId);
if (!file) {
return {
contents: [
{
uri: uri.href,
mimeType: "text/plain",
text: `[Error: File not found (id: ${fileId})]`,
},
],
};
}
if (!isTextExtractable(file.mimeType)) {
return {
contents: [
{
uri: uri.href,
mimeType: "application/json",
text: JSON.stringify(
{
error: "binary_file",
message: `Cannot extract text from ${file.mimeType}. This is a binary file (e.g., image, video).`,
file: {
id: file._id.toString(),
name: file.name,
size: file.size,
mimeType: file.mimeType,
},
suggestion:
"Use the 'get_download_url' tool to get a download link for this file.",
},
null,
2,
),
},
],
};
}
const userId = file.user.toString();
const { text, extractionMethod } =
await knowledgeService.extractFileContent(fileId, userId);
// 根据上下文窗口进行裁剪
// TODO:可以用 Summary 或者更智能的裁剪方式
const MAX_RESOURCE_CHARS = 200_000;
let content = text;
let truncated = false;
if (content.length > MAX_RESOURCE_CHARS) {
content = content.slice(0, MAX_RESOURCE_CHARS);
truncated = true;
}
const header =
`# ${file.name}\n` +
`> File ID: ${fileId} | Type: ${file.mimeType} | Size: ${formatBytes(file.size)} | ` +
`Extraction: ${extractionMethod}` +
(truncated
? ` | ⚠️ TRUNCATED (${formatBytes(text.length)} → ${formatBytes(MAX_RESOURCE_CHARS)})`
: "") +
`\n\n`;
// 限制 Agent 对已提供的文件内容进行任何形式的重复验证或提取
// 直接使用 Resource 内容进行后续推理
const systemNote =
`\n\n---\n` +
`[SYSTEM NOTE — Agent Directive]\n` +
`The content above is the COMPLETE and UP-TO-DATE text of file "${file.name}" (ID: ${fileId}).\n` +
`You MUST NOT call 'extract_file_content', 'get_file_info', or any read tool on this file — the data is already here.\n` +
`Proceed directly with the user's request using the content provided above.`;
return {
contents: [
{
uri: uri.href,
mimeType: "text/plain",
text: header + content + systemNote,
},
],
};
} catch (error) {
const message = error instanceof Error ? error.message : "Unknown error";
logger.error(
{ err: error, fileId },
"Failed to read drive://files resource",
);
return {
contents: [
{
uri: uri.href,
mimeType: "text/plain",
text: `[Error reading file: ${message}]`,
},
],
};
}
},
);
注册完成之后,MCP Client 可以按需读取:
async readResource(uri: string): Promise<McpReadResourceResult> {
await this.connect();
const result = await this.client!.readResource({ uri });
const contents: McpResourceContent[] = (result.contents || []).map((c) => ({
uri: c.uri,
text: "text" in c ? (c.text as string) : undefined,
blob: "blob" in c ? (c.blob as string) : undefined,
mimeType: c.mimeType,
}));
return { contents };
}
其实也可以让 Agent 自己调用 Tool 获取,但是这样子更加优雅。例如:Client 通过 MCP 协议的 resources/read 接口,直接向 MCP Server 请求
drive://files/123的内容,这些内容自己会作为系统上下文附加在用户的 Prompt 旁边。而且不管什么东西调用、返回的内容都是一致的。
4. Server & Client
Drive 内部 Server 和 Client 使用 InMemoryTransport 连接:
async connect(): Promise<void> {
if (this.client) return;
this.server = createMcpServer(this.services);
this.client = new Client({
name: "mdrive-internal-agent",
version: "1.0.0",
});
const [clientTransport, serverTransport] =
InMemoryTransport.createLinkedPair();
await Promise.all([
this.client.connect(clientTransport),
this.server.connect(serverTransport),
]);
logger.info("MCP Client connected to in-process server");
}
提供给 Copilot 之类的外部 Agent 的使用 stdio 即可:
const server = createMcpServer(services, authContext);
const transport = new StdioServerTransport();
logger.info("Starting MCP stdio server...");
await server.connect(transport);
StreamableHTTP 的也很简单,使用
StreamableHTTPServerTransport的handleRequest即可:
const transports = new Map<string, StreamableHTTPServerTransport>();
// POST /api/mcp
router.post("/", async (req: Request, res: Response) => {
const sessionId = req.headers["mcp-session-id"] as string | undefined;
// 已有会话,复用 transport
if (sessionId && transports.has(sessionId)) {
const transport = transports.get(sessionId)!;
try {
await transport.handleRequest(req, res, req.body);
} catch (error) {
logger.error(
{ err: error, sessionId },
"Error handling MCP request for existing session",
);
if (!res.headersSent) {
res.status(500).json({ error: "Internal server error" });
}
}
return;
}
if (!sessionId && isInitializeRequest(req.body)) {
const transport = new StreamableHTTPServerTransport({
sessionIdGenerator: () => randomUUID(),
onsessioninitialized: (sid) => {
transports.set(sid, transport);
logger.info({ sessionId: sid }, "MCP session initialized");
},
});
// transport 关闭时清理
transport.onclose = () => {
const sid = Array.from(transports.entries()).find(
([, t]) => t === transport,
)?.[0];
if (sid) {
transports.delete(sid);
logger.info({ sessionId: sid }, "MCP session closed");
}
};
const server = createMcpServer();
try {
await server.connect(transport);
await transport.handleRequest(req, res, req.body);
} catch (error) {
logger.error({ err: error }, "Error initializing MCP session");
if (!res.headersSent) {
res.status(500).json({ error: "Failed to initialize MCP session" });
}
}
return;
}
// 无效请求
res.status(400).json({
jsonrpc: "2.0",
error: {
code: -32600,
message: "Invalid request: no valid session ID or initialize request",
},
id: null,
});
});