Dark Dwarf Blog background

Agent 任务队列与消息发送设计

Agent 任务队列与消息发送设计

在前面搭建了最基本的 Agent 架构后,我们使用任务队列和 Redis Pub/Sub 来处理 Agent 的执行流程与消息发送。

1. 总体架构

使用任务队列和 Redis Pub/Sub 的具体方式如下:

  1. 用户发送一个消息后,API 将任务入队然后返回 taskId,具体的完成交给 Worker。
  2. Worker 处理任务并产生中间结果(也就是之前的 Agent 架构中的事件),将结果 Publish 到 Redis 对应频道。
  3. 订阅这个频道的 API 收到了这个消息,立即通过 SSE 推送到前端、让前端进行相应渲染。

这个过程是高度解耦与可拓展的:API 只负责转发消息给任务队列、其他全部交给 worker;worker 的数量也可以便捷地拓展。同时由于模块间通过 Redis Pub/Sub 进行消息发布与接收,不仅可以利用 Redis 来处理内存中的大量消息,由于 Redis 广播特性,发送消息和订阅消息的 API 还可以不在同一个服务器上。

整体任务流转如下:

1. 发任务
2. 入队 (taskId)
3. 领任务
4. 调用
5. 发布状态 (Pub)
6. 接收状态 (Sub)
7. 推送 (SSE)

👤 前端 (User)

🚪 API 服务器

📦 Redis (队列 + 订阅)

👷 Worker (干活)

🤖 AI (大模型)

2. 任务队列实现

a.a. 队列及Sub、Pub操作

我们创建一个 agent-task 队列:

export const agentTaskQueue = new Queue<AgentTaskData, AgentTaskResult>(
  AGENT_TASK_QUEUE_NAME,
  {
    connection: redisClient,
    defaultJobOptions: {
      attempts: 1, // Agent 任务不重试
      removeOnComplete: { age: TASK_RESULT_TTL, count: 200 },
      removeOnFail: { age: TASK_RESULT_TTL, count: 200 },
    },
  },
);

然后在队列中加入 eventListener 来监听消息。这里我们不让每个订阅者自己去监听消息(也就是下面这种写法):

// 每次调用都会增加一个全局监听器
export function subscribeTaskEvents(taskId: string, handler: TaskEventHandler) {
  const channel = EVENT_CHANNEL_PREFIX + taskId;

  const messageListener = (_ch: string, message: string) => {
    if (_ch !== channel) return; // <--- 关键所在:这行代码在每个监听器里都会跑!
    // 处理逻辑...
  };

  redisSubscriber.on("message", messageListener); // 向全局添加
  // ...
}

这样当 Redis 中消息发送过来时,所有的订阅者都需要看看这个消息是不是自己的、会打断当前工作。相反,我们创建一个全局的监听器,然后在这个监听器挂载每个订阅者的 Handler,这个监听器负责接收消息然后分发给对应的 Handler:

type TaskEventHandler = (event: AgentStreamEvent) => void;

// 调度员存储:Channel -> 处理器集合
const taskEventHandlers = new Map<string, Set<TaskEventHandler>>();
let isTaskListenerAttached = false;

// 确保全局 eventListener 已挂载(只挂载一次)
function ensureTaskListenerAttached(): void {
  if (isTaskListenerAttached) return;
  isTaskListenerAttached = true;

  redisSubscriber.on("message", (channel, message) => {
    // 仅处理属于 Agent 任务事件的消息
    if (!channel.startsWith(EVENT_CHANNEL_PREFIX)) return;

    const handlers = taskEventHandlers.get(channel);
    if (!handlers || handlers.size === 0) return;

    try {
      const envelope = JSON.parse(message) as TaskEventEnvelope;
      handlers.forEach((handler) => handler(envelope.event));
    } catch (err) {
      logger.warn({ err, channel }, "Failed to dispatch task event");
    }
  });
}

然后是 Sub 操作,我们使用“多路复用”的方法,只让第一个人订阅,之后的人复用它的 Handler:

export function subscribeTaskEvents(
  taskId: string,
  handler: TaskEventHandler,
): () => void {
  const channel = EVENT_CHANNEL_PREFIX + taskId;

  ensureTaskListenerAttached();

  // 1. 注册处理器
  if (!taskEventHandlers.has(channel)) {
    taskEventHandlers.set(channel, new Set());
    // 只有第一个订阅者需要真正执行 Redis SUBSCRIBE
    redisSubscriber.subscribe(channel).catch((err) => {
      logger.error({ err, channel }, "Failed to subscribe to Redis channel");
    });
  }
  taskEventHandlers.get(channel)!.add(handler);

  // 2. 返回清理函数
  return () => {
    const handlers = taskEventHandlers.get(channel);
    if (handlers) {
      handlers.delete(handler);
      // 如果没有活跃订阅者了,执行 UNSUBSCRIBE 释放资源
      if (handlers.size === 0) {
        taskEventHandlers.delete(channel);
        redisSubscriber.unsubscribe(channel).catch(() => {});
      }
    }
  };
}

然后是 Pub 操作,我们在另一个 Redis Client 中 Publish 一个 Event Envelope 即可:

interface TaskEventEnvelope {
  taskId: string;
  userId: string;
  event: AgentStreamEvent;
}

export function publishTaskEvent(
  taskId: string,
  userId: string,
  event: AgentStreamEvent,
) {
  const channel = EVENT_CHANNEL_PREFIX + taskId;
  const envelope: TaskEventEnvelope = { taskId, userId, event };
  redisClient.publish(channel, JSON.stringify(envelope)).catch((err) => {
    logger.warn({ err, taskId }, "Failed to publish agent task event");
  });
}

b.b. Worker

由于这个项目还没遇到 Worker 负载过高的情况, Worker 就先也在做 Pub 的 Redis Client 中进行队列操作,不建立额外连接。

我们定义一个 Processor 类型:

export type AgentTaskProcessor = (
  data: AgentTaskData,
  onEvent: (event: AgentStreamEvent) => void,
) => Promise<AgentTaskResult>;

然后在 Worker 的不同阶段 Pub 不同的消息。这里的回调我们传入之前说的 Redis Pub,Handler 收到消息后会进行具体的逻辑处理的:

export function createAgentTaskWorker(
  processor: AgentTaskProcessor,
): Worker<AgentTaskData, AgentTaskResult> {
  const worker = new Worker<AgentTaskData, AgentTaskResult>(
    AGENT_TASK_QUEUE_NAME,
    async (job: Job<AgentTaskData, AgentTaskResult>) => {
      const { taskId, userId } = job.data;

      logger.info({ taskId, userId, jobId: job.id }, "Processing agent task");

      // 构造 SSE 事件回调:通过 Pub/Sub 广播
      const onEvent = (event: AgentStreamEvent) => {
        publishTaskEvent(taskId, userId, event);
      };

      try {
        const result = await processor(job.data, onEvent);

        // 广播 DONE 事件
        publishTaskEvent(taskId, userId, {
          type: AGENT_EVENT_TYPE.DONE,
          data: result as unknown as Record<string, unknown>,
        });

        return result;
      } catch (error) {
        const errMsg = error instanceof Error ? error.message : "Unknown error";
        logger.error({ error, taskId, userId }, "Agent task failed");

        // 广播 ERROR 事件
        publishTaskEvent(taskId, userId, {
          type: AGENT_EVENT_TYPE.ERROR,
          data: { message: errMsg },
        });

        throw error;
      }
    },
    {
      connection: redisClient,
      concurrency: WORKER_CONCURRENCY,
    },
  );

  worker.on("completed", (job) => {
    logger.info(
      { taskId: job.data.taskId, jobId: job.id },
      "Agent task completed",
    );
    redisClient
      .srem(userActiveTasksKey(job.data.userId), job.data.taskId)
      .catch(() => {});
  });

  worker.on("failed", (job, err) => {
    logger.error(
      { taskId: job?.data.taskId, jobId: job?.id, error: err.message },
      "Agent task failed",
    );
    if (job) {
      redisClient
        .srem(userActiveTasksKey(job.data.userId), job.data.taskId)
        .catch(() => {});
    }
  });

  return worker;
}

3. 使用任务队列

完成任务队列基础设施搭建后,我们就只需要暴露一个 processor 了,其他乱七八糟的逻辑全部封装到了 service 方法中,比如核心的 Agent Chat 只需要调用我们前面写的 Chat:

buildTaskProcessor() {
  return async (
    data: AgentTaskData,
    onEvent: (event: AgentStreamEvent) => void,
  ): Promise<AgentTaskResult> => {
    const request: AgentChatRequest = {
      message: data.message,
      conversationId: data.conversationId,
      context: data.context,
    };

    const result = await this.chat(
      data.userId,
      request,
      onEvent,
      undefined,
      data.taskId,
    );

    return {
      taskId: data.taskId,
      conversationId: result.conversationId,
      agentType: result.agentType,
      content: result.message.content,
      success: true,
    };
  };
}

同时对于前面讲的 Approve 逻辑,我们之前存 resolve 的 localMap 还是要的(因为 Redis 不解决跨进程传输),然后我们只需要把之前的直接的 onEvent 回调给改成 Publish,订阅者会处理其他的东西:

export async function resolveApproval(
  approvalId: string,
  userId: string,
  approved: boolean,
  modifiedArgs?: Record<string, unknown>,
): Promise<ApprovalRequest | null> {
  const key = APPROVAL_KEY_PREFIX + approvalId;
  const raw = await redisClient.get(key);
  if (!raw) return null;

  const request = JSON.parse(raw) as ApprovalRequest;
  if (request.userId !== userId) return null;
  if (request.status !== APPROVAL_STATUS.PENDING) return null;

  const elapsed = Date.now() - new Date(request.createdAt).getTime();
  if (elapsed > request.ttlSeconds * 1000) {
    request.status = APPROVAL_STATUS.EXPIRED;
    request.resolvedAt = new Date();
    await redisClient.del(key);
    await redisClient.publish(
      APPROVAL_CHANNEL,
      JSON.stringify({ approvalId, approved: false }),
    );

    return request;
  }

  request.status = approved
    ? APPROVAL_STATUS.APPROVED
    : APPROVAL_STATUS.REJECTED;
  request.resolvedAt = new Date();

  const remainingTtl = Math.max(
    1,
    request.ttlSeconds - Math.floor(elapsed / 1000),
  );
  await redisClient.set(key, JSON.stringify(request), "EX", remainingTtl);

  // 通过 Pub/Sub 广播
  await redisClient.publish(
    APPROVAL_CHANNEL,
    JSON.stringify({ approvalId, approved, modifiedArgs }),
  );

  logger.info(
    {
      approvalId,
      approved,
      toolName: request.toolName,
      hasModifiedArgs: !!modifiedArgs,
    },
    "Approval resolved and broadcasted via Pub/Sub",
  );

  return request;
}