Agent 任务队列与消息发送设计
在前面搭建了最基本的 Agent 架构后,我们使用任务队列和 Redis Pub/Sub 来处理 Agent 的执行流程与消息发送。
1. 总体架构
使用任务队列和 Redis Pub/Sub 的具体方式如下:
- 用户发送一个消息后,API 将任务入队然后返回 taskId,具体的完成交给 Worker。
- Worker 处理任务并产生中间结果(也就是之前的 Agent 架构中的事件),将结果 Publish 到 Redis 对应频道。
- 订阅这个频道的 API 收到了这个消息,立即通过 SSE 推送到前端、让前端进行相应渲染。
这个过程是高度解耦与可拓展的:API 只负责转发消息给任务队列、其他全部交给 worker;worker 的数量也可以便捷地拓展。同时由于模块间通过 Redis Pub/Sub 进行消息发布与接收,不仅可以利用 Redis 来处理内存中的大量消息,由于 Redis 广播特性,发送消息和订阅消息的 API 还可以不在同一个服务器上。
整体任务流转如下:
2. 任务队列实现
队列及Sub、Pub操作
我们创建一个 agent-task 队列:
export const agentTaskQueue = new Queue<AgentTaskData, AgentTaskResult>(
AGENT_TASK_QUEUE_NAME,
{
connection: redisClient,
defaultJobOptions: {
attempts: 1, // Agent 任务不重试
removeOnComplete: { age: TASK_RESULT_TTL, count: 200 },
removeOnFail: { age: TASK_RESULT_TTL, count: 200 },
},
},
);
然后在队列中加入 eventListener 来监听消息。这里我们不让每个订阅者自己去监听消息(也就是下面这种写法):
// 每次调用都会增加一个全局监听器
export function subscribeTaskEvents(taskId: string, handler: TaskEventHandler) {
const channel = EVENT_CHANNEL_PREFIX + taskId;
const messageListener = (_ch: string, message: string) => {
if (_ch !== channel) return; // <--- 关键所在:这行代码在每个监听器里都会跑!
// 处理逻辑...
};
redisSubscriber.on("message", messageListener); // 向全局添加
// ...
}
这样当 Redis 中消息发送过来时,所有的订阅者都需要看看这个消息是不是自己的、会打断当前工作。相反,我们创建一个全局的监听器,然后在这个监听器挂载每个订阅者的 Handler,这个监听器负责接收消息然后分发给对应的 Handler:
type TaskEventHandler = (event: AgentStreamEvent) => void;
// 调度员存储:Channel -> 处理器集合
const taskEventHandlers = new Map<string, Set<TaskEventHandler>>();
let isTaskListenerAttached = false;
// 确保全局 eventListener 已挂载(只挂载一次)
function ensureTaskListenerAttached(): void {
if (isTaskListenerAttached) return;
isTaskListenerAttached = true;
redisSubscriber.on("message", (channel, message) => {
// 仅处理属于 Agent 任务事件的消息
if (!channel.startsWith(EVENT_CHANNEL_PREFIX)) return;
const handlers = taskEventHandlers.get(channel);
if (!handlers || handlers.size === 0) return;
try {
const envelope = JSON.parse(message) as TaskEventEnvelope;
handlers.forEach((handler) => handler(envelope.event));
} catch (err) {
logger.warn({ err, channel }, "Failed to dispatch task event");
}
});
}
然后是 Sub 操作,我们使用“多路复用”的方法,只让第一个人订阅,之后的人复用它的 Handler:
export function subscribeTaskEvents(
taskId: string,
handler: TaskEventHandler,
): () => void {
const channel = EVENT_CHANNEL_PREFIX + taskId;
ensureTaskListenerAttached();
// 1. 注册处理器
if (!taskEventHandlers.has(channel)) {
taskEventHandlers.set(channel, new Set());
// 只有第一个订阅者需要真正执行 Redis SUBSCRIBE
redisSubscriber.subscribe(channel).catch((err) => {
logger.error({ err, channel }, "Failed to subscribe to Redis channel");
});
}
taskEventHandlers.get(channel)!.add(handler);
// 2. 返回清理函数
return () => {
const handlers = taskEventHandlers.get(channel);
if (handlers) {
handlers.delete(handler);
// 如果没有活跃订阅者了,执行 UNSUBSCRIBE 释放资源
if (handlers.size === 0) {
taskEventHandlers.delete(channel);
redisSubscriber.unsubscribe(channel).catch(() => {});
}
}
};
}
然后是 Pub 操作,我们在另一个 Redis Client 中 Publish 一个 Event Envelope 即可:
interface TaskEventEnvelope {
taskId: string;
userId: string;
event: AgentStreamEvent;
}
export function publishTaskEvent(
taskId: string,
userId: string,
event: AgentStreamEvent,
) {
const channel = EVENT_CHANNEL_PREFIX + taskId;
const envelope: TaskEventEnvelope = { taskId, userId, event };
redisClient.publish(channel, JSON.stringify(envelope)).catch((err) => {
logger.warn({ err, taskId }, "Failed to publish agent task event");
});
}
Worker
由于这个项目还没遇到 Worker 负载过高的情况, Worker 就先也在做 Pub 的 Redis Client 中进行队列操作,不建立额外连接。
我们定义一个 Processor 类型:
export type AgentTaskProcessor = (
data: AgentTaskData,
onEvent: (event: AgentStreamEvent) => void,
) => Promise<AgentTaskResult>;
然后在 Worker 的不同阶段 Pub 不同的消息。这里的回调我们传入之前说的 Redis Pub,Handler 收到消息后会进行具体的逻辑处理的:
export function createAgentTaskWorker(
processor: AgentTaskProcessor,
): Worker<AgentTaskData, AgentTaskResult> {
const worker = new Worker<AgentTaskData, AgentTaskResult>(
AGENT_TASK_QUEUE_NAME,
async (job: Job<AgentTaskData, AgentTaskResult>) => {
const { taskId, userId } = job.data;
logger.info({ taskId, userId, jobId: job.id }, "Processing agent task");
// 构造 SSE 事件回调:通过 Pub/Sub 广播
const onEvent = (event: AgentStreamEvent) => {
publishTaskEvent(taskId, userId, event);
};
try {
const result = await processor(job.data, onEvent);
// 广播 DONE 事件
publishTaskEvent(taskId, userId, {
type: AGENT_EVENT_TYPE.DONE,
data: result as unknown as Record<string, unknown>,
});
return result;
} catch (error) {
const errMsg = error instanceof Error ? error.message : "Unknown error";
logger.error({ error, taskId, userId }, "Agent task failed");
// 广播 ERROR 事件
publishTaskEvent(taskId, userId, {
type: AGENT_EVENT_TYPE.ERROR,
data: { message: errMsg },
});
throw error;
}
},
{
connection: redisClient,
concurrency: WORKER_CONCURRENCY,
},
);
worker.on("completed", (job) => {
logger.info(
{ taskId: job.data.taskId, jobId: job.id },
"Agent task completed",
);
redisClient
.srem(userActiveTasksKey(job.data.userId), job.data.taskId)
.catch(() => {});
});
worker.on("failed", (job, err) => {
logger.error(
{ taskId: job?.data.taskId, jobId: job?.id, error: err.message },
"Agent task failed",
);
if (job) {
redisClient
.srem(userActiveTasksKey(job.data.userId), job.data.taskId)
.catch(() => {});
}
});
return worker;
}
3. 使用任务队列
完成任务队列基础设施搭建后,我们就只需要暴露一个 processor 了,其他乱七八糟的逻辑全部封装到了 service 方法中,比如核心的 Agent Chat 只需要调用我们前面写的 Chat:
buildTaskProcessor() {
return async (
data: AgentTaskData,
onEvent: (event: AgentStreamEvent) => void,
): Promise<AgentTaskResult> => {
const request: AgentChatRequest = {
message: data.message,
conversationId: data.conversationId,
context: data.context,
};
const result = await this.chat(
data.userId,
request,
onEvent,
undefined,
data.taskId,
);
return {
taskId: data.taskId,
conversationId: result.conversationId,
agentType: result.agentType,
content: result.message.content,
success: true,
};
};
}
同时对于前面讲的 Approve 逻辑,我们之前存 resolve 的 localMap 还是要的(因为 Redis 不解决跨进程传输),然后我们只需要把之前的直接的 onEvent 回调给改成 Publish,订阅者会处理其他的东西:
export async function resolveApproval(
approvalId: string,
userId: string,
approved: boolean,
modifiedArgs?: Record<string, unknown>,
): Promise<ApprovalRequest | null> {
const key = APPROVAL_KEY_PREFIX + approvalId;
const raw = await redisClient.get(key);
if (!raw) return null;
const request = JSON.parse(raw) as ApprovalRequest;
if (request.userId !== userId) return null;
if (request.status !== APPROVAL_STATUS.PENDING) return null;
const elapsed = Date.now() - new Date(request.createdAt).getTime();
if (elapsed > request.ttlSeconds * 1000) {
request.status = APPROVAL_STATUS.EXPIRED;
request.resolvedAt = new Date();
await redisClient.del(key);
await redisClient.publish(
APPROVAL_CHANNEL,
JSON.stringify({ approvalId, approved: false }),
);
return request;
}
request.status = approved
? APPROVAL_STATUS.APPROVED
: APPROVAL_STATUS.REJECTED;
request.resolvedAt = new Date();
const remainingTtl = Math.max(
1,
request.ttlSeconds - Math.floor(elapsed / 1000),
);
await redisClient.set(key, JSON.stringify(request), "EX", remainingTtl);
// 通过 Pub/Sub 广播
await redisClient.publish(
APPROVAL_CHANNEL,
JSON.stringify({ approvalId, approved, modifiedArgs }),
);
logger.info(
{
approvalId,
approved,
toolName: request.toolName,
hasModifiedArgs: !!modifiedArgs,
},
"Approval resolved and broadcasted via Pub/Sub",
);
return request;
}