多 Agent 协作系统设计模式
王晨曦 · 2026年1月28日 · 30 分钟
从单 Agent 到多 Agent
早期的 LLM 应用大多是单一 Agent 模式:一个 Agent 接收用户输入,调用工具,生成回答。但随着应用场景的复杂化,单一 Agent 面临越来越多的挑战:
- Prompt 过长:塞了太多指令和工具描述导致上下文窗口溢出,性能严重下降
- 角色混淆:一个 Agent 既要做意图识别又要写代码又要做数据分析,每个能力都不够精
- 错误传播:一个步骤出错,后续所有步骤都受影响,没有隔离机制
- 推理路径不透明:复杂任务的决策链路无法追踪和调试
- 迭代困难:修改一个能力可能影响其他能力,系统越来越脆弱
多 Agent 系统通过任务分解和专业化分工来解决这些问题——就像软件工程中的微服务思想。
什么时候需要多 Agent?
适合多 Agent 的场景:
- 任务涉及多个专业领域(如"分析数据 → 生成报告 → 发送邮件")
- 需要不同的 LLM 或不同的参数设置
- 需要质量控制(审查-修改循环)
- 需要人在环路(Human-in-the-loop)
- 任务链路需要可观测和可调试
不适合多 Agent 的场景:
- 简单的问答或单轮对话
- 任务流程固定且简单
- 延迟要求极高(每增加一个 Agent 增加一次 LLM 调用延迟)
- 预算有限(多 Agent = 多次 Token 消耗)
原则:能用单 Agent 解决的就不用多 Agent。复杂度是有代价的。
常见设计模式
1. 管道模式(Pipeline)
Agent 按顺序依次处理,每个 Agent 聚焦一个子任务,前一个的输出是后一个的输入:
用户输入 → [意图识别 Agent] → [信息提取 Agent] → [执行 Agent] → [格式化 Agent] → 最终输出
适用场景: 处理流程固定、步骤明确的任务
优点:
- 简单、可预测、易于调试
- 每个 Agent 的 Prompt 简短聚焦
- 可以独立优化每个环节
缺点:
- 灵活性差,无法动态调整流程
- 上游错误会传播到下游
- 总延迟 = 各 Agent 延迟之和
实际案例——智能客服工单处理:
# LangGraph 实现的管道模式
from langgraph.graph import StateGraph
class TicketState(TypedDict):
raw_input: str
intent: str
entities: dict
response: str
formatted: str
def classify_intent(state: TicketState) -> TicketState:
"""意图分类 Agent"""
result = llm.invoke(f"""
对以下客户消息进行意图分类。
可选意图:退款、物流查询、产品咨询、投诉、其他
客户消息:{state['raw_input']}
只返回意图类别。
""")
return {"intent": result.content.strip()}
def extract_entities(state: TicketState) -> TicketState:
"""实体提取 Agent"""
result = llm.invoke(f"""
从以下客户消息中提取关键信息。
意图:{state['intent']}
消息:{state['raw_input']}
以 JSON 格式返回:订单号、产品名、日期等。
""")
return {"entities": json.loads(result.content)}
def generate_response(state: TicketState) -> TicketState:
"""回复生成 Agent"""
result = llm.invoke(f"""
基于以下信息生成客服回复:
意图:{state['intent']}
提取信息:{state['entities']}
原始消息:{state['raw_input']}
要求:专业、友好、简洁。
""")
return {"response": result.content}
graph = StateGraph(TicketState)
graph.add_node("classify", classify_intent)
graph.add_node("extract", extract_entities)
graph.add_node("respond", generate_response)
graph.add_edge("classify", "extract")
graph.add_edge("extract", "respond")
graph.set_entry_point("classify")
graph.set_finish_point("respond")
2. 路由模式(Router)
一个路由 Agent 根据输入动态分配任务给专业 Agent:
用户输入 → [Router Agent] → { [SQL Agent] | [文档 Agent] | [计算 Agent] | [闲聊 Agent] }
适用场景: 多技能集成的智能助手
关键技术点:
- Router 的准确率是整个系统的瓶颈——分类错了,后续再好也白搭
- 使用小模型做路由:路由是分类任务,不需要大模型,用小模型或规则引擎可以降低成本和延迟
- Fallback 机制:路由不确定时走通用 Agent,而不是随机选一个
Router 实现策略对比:
| 策略 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| LLM 分类 | 灵活,可处理模糊意图 | 成本高,延迟大 | 意图边界模糊 |
| 微调分类模型 | 快速,准确 | 需要标注数据 | 意图类别稳定 |
| 关键词规则 | 最快,成本为零 | 无法处理语义 | 意图非常明确 |
| 混合策略 | 兼顾速度和准确 | 实现复杂 | 生产环境推荐 |
混合路由策略(推荐):
1. 先用正则/关键词匹配高置信度的意图
2. 匹配不到的走 LLM 分类
3. LLM 也不确定的走通用 Agent
3. 辩论模式(Debate)
多个 Agent 对同一问题给出独立答案,最终由仲裁 Agent 综合判断:
问题 → [Agent A] + [Agent B] + [Agent C] → [仲裁 Agent] → 最终答案
(乐观视角) (保守视角) (技术视角)
适用场景: 需要高准确率的决策场景,如医疗建议、金融分析、法律判断
优点:
- 降低单个 LLM 的偏见和幻觉
- 多角度分析更全面
- 仲裁过程可以追溯
缺点:
- Token 消耗是单 Agent 的 N+1 倍
- 延迟更高(除非并行执行)
- 仲裁 Agent 的综合能力要求高
实际案例——投资分析:
async def debate_analysis(question: str) -> str:
# 三个 Agent 并行分析
tasks = [
bullish_agent.analyze(question), # 乐观分析
bearish_agent.analyze(question), # 风险分析
technical_agent.analyze(question), # 技术面分析
]
perspectives = await asyncio.gather(*tasks)
# 仲裁 Agent 综合判断
synthesis = await judge_agent.invoke(f"""
三位分析师对以下问题给出了不同观点:
问题:{question}
乐观分析:{perspectives[0]}
风险分析:{perspectives[1]}
技术分析:{perspectives[2]}
请综合三方观点,给出平衡的分析结论。
明确标注各方的共识点和分歧点。
""")
return synthesis
4. 层级模式(Hierarchy)
Manager Agent 负责任务规划和分配,Worker Agent 负责具体执行:
用户需求 → [Manager Agent]
├── 拆解为子任务 1 → [Worker Agent A] → 结果 1
├── 拆解为子任务 2 → [Worker Agent B] → 结果 2
└── 拆解为子任务 3 → [Worker Agent C] → 结果 3
[Manager Agent] ← 汇总结果 → 最终输出
适用场景: 复杂的多步骤任务,如"调研竞品并生成分析报告"
关键设计要点:
- Manager 的任务拆解能力是关键——拆解不好,Worker 执行再好也没用
- 定义清晰的 Worker 能力边界——让 Manager 知道每个 Worker 能做什么
- 子任务之间的依赖关系要明确——哪些可以并行,哪些必须串行
- 错误处理和重试——单个 Worker 失败不应该导致整个任务失败
Worker 能力描述模板:
WORKER_REGISTRY = {
"data_analyst": {
"description": "擅长 SQL 查询和数据分析",
"capabilities": ["执行 SQL", "数据可视化", "统计分析"],
"limitations": ["不能修改数据", "不处理非结构化数据"],
"tools": ["sql_executor", "chart_generator"]
},
"report_writer": {
"description": "擅长撰写结构化报告",
"capabilities": ["Markdown 报告", "PPT 大纲", "摘要生成"],
"limitations": ["不执行代码", "不访问数据库"],
"tools": ["markdown_writer", "template_engine"]
},
"web_researcher": {
"description": "擅长互联网信息检索和整理",
"capabilities": ["搜索引擎查询", "网页内容提取", "信息汇总"],
"limitations": ["不访问内部系统", "不执行代码"],
"tools": ["web_search", "web_scraper"]
}
}
5. 反思模式(Reflection)
执行 Agent 生成结果后,评审 Agent 进行检查,不合格则反馈修改:
[执行 Agent] → 输出 → [评审 Agent] → { 通过 → 最终输出
{ 不通过 → 反馈 → [执行 Agent] → ... (循环)
适用场景: 代码生成、文档写作、翻译等需要质量保证的场景
关键实现细节:
MAX_ITERATIONS = 3 # 设置最大迭代次数,防止无限循环
async def reflection_loop(task: str) -> str:
draft = await writer_agent.generate(task)
for i in range(MAX_ITERATIONS):
review = await reviewer_agent.review(draft)
if review.passed:
return draft
# 将评审反馈注入修改 Prompt
draft = await writer_agent.revise(
original=draft,
feedback=review.feedback,
iteration=i + 1
)
# 达到最大迭代次数,返回最后版本并标注
return f"[经过{MAX_ITERATIONS}轮修改] {draft}"
评审 Agent 的 Prompt 设计:
你是一个严格的代码审查者。检查以下代码是否满足要求:
检查项:
1. 功能正确性:是否实现了需求中的所有功能?
2. 错误处理:是否处理了边界情况和异常?
3. 安全性:是否存在安全漏洞?
4. 代码风格:是否符合项目规范?
对每个检查项给出 PASS 或 FAIL,FAIL 需要说明原因和修改建议。
只有所有项都 PASS 才算通过。
6. 竞争模式(Race)
多个 Agent 同时处理同一任务,取最先完成的或最优的结果:
任务 → [Agent A (GPT-4)] → 结果 A ─┐
→ [Agent B (Claude)] → 结果 B ─┼→ 选择最优 → 最终输出
→ [Agent C (Qwen)] → 结果 C ─┘
适用场景: 对质量要求极高,或需要对比不同模型效果的场景
实际应用: 我们在翻译场景中使用竞争模式,三个不同模型翻译同一段文字,由评审 Agent 选择最优翻译。翻译质量比单模型提升约 15%。
工程实践
通信机制
Agent 之间的通信方式直接影响系统的复杂度和可靠性:
| 机制 | 实现 | 适用场景 | 复杂度 |
|---|---|---|---|
| 直接函数调用 | Python 函数 | 简单同步场景 | 低 |
| 共享状态(Blackboard) | Redis / In-memory Dict | 中等复杂度 | 中 |
| 消息队列 | Kafka / Redis Streams | 异步 / 分布式场景 | 高 |
| Event Sourcing | 事件日志 | 需要完整审计追踪 | 高 |
推荐: 从共享状态模式(LangGraph 的 StateGraph)开始,复杂度增加后再考虑消息队列。
上下文管理
多 Agent 系统最大的工程挑战是上下文管理——每个 Agent 应该看到多少信息?
最小上下文原则:
- 每个 Agent 只接收完成其任务所必需的信息
- 避免将整个对话历史传给每个 Agent
- 使用结构化的消息格式(JSON Schema)传递中间结果
上下文压缩策略:
def compress_context(full_context: dict, agent_role: str) -> dict:
"""根据 Agent 角色过滤上下文"""
if agent_role == "sql_agent":
return {
"query_intent": full_context["intent"],
"table_schemas": full_context["relevant_schemas"],
"constraints": full_context["sql_constraints"]
}
elif agent_role == "report_agent":
return {
"analysis_results": full_context["data_results"],
"report_template": full_context["template"],
"audience": full_context["target_audience"]
}
错误处理与容错
class AgentExecutor:
def __init__(self, agent, max_retries=2, fallback_agent=None):
self.agent = agent
self.max_retries = max_retries
self.fallback_agent = fallback_agent
async def execute(self, task: str) -> AgentResult:
for attempt in range(self.max_retries + 1):
try:
result = await self.agent.run(task)
if result.is_valid():
return result
except Exception as e:
logger.warning(f"Agent {self.agent.name} attempt {attempt+1} failed: {e}")
# 所有重试都失败,使用 Fallback Agent
if self.fallback_agent:
return await self.fallback_agent.run(task)
return AgentResult(status="failed", error="Max retries exceeded")
可观测性
多 Agent 系统的调试和监控至关重要:
必须追踪的信息:
- 每个 Agent 的输入和输出(完整记录)
- 调用链路(类似分布式 Tracing 的 Span)
- Token 消耗(分 Agent 统计,用于成本分析)
- 延迟分布(识别瓶颈 Agent)
- 错误和重试次数
LangSmith / LangFuse 集成:
from langfuse.callback import CallbackHandler
handler = CallbackHandler(
public_key="pk-...",
secret_key="sk-...",
)
# 在每个 Agent 调用时传入 callback
result = agent.invoke(task, config={"callbacks": [handler]})
自定义追踪(如果不用 LangSmith):
import uuid
import time
class AgentTracer:
def __init__(self, trace_id=None):
self.trace_id = trace_id or str(uuid.uuid4())
self.spans = []
def start_span(self, agent_name: str, input_data: str) -> dict:
span = {
"trace_id": self.trace_id,
"span_id": str(uuid.uuid4()),
"agent": agent_name,
"input": input_data[:500], # 截断避免过大
"start_time": time.time()
}
self.spans.append(span)
return span
def end_span(self, span: dict, output: str, tokens: int):
span["output"] = output[:500]
span["tokens"] = tokens
span["duration_ms"] = (time.time() - span["start_time"]) * 1000
# 写入数据库或日志
logger.info(f"[Trace] {span}")
Token 成本控制
多 Agent 系统的 Token 消耗是单 Agent 的 N 倍,成本控制很重要:
策略:
- 分模型使用:路由/分类用小模型,核心推理用大模型
- 缓存:相同输入缓存 Agent 结果,避免重复调用
- 提前终止:如果中间结果已经足够好,跳过后续 Agent
- 上下文窗口管理:只传必要信息,避免大段无用上下文
各模型的成本对比(参考价格):
| 模型 | 输入价格 | 输出价格 | 适用 Agent 类型 |
|---|---|---|---|
| GPT-4o | $2.5/M | $10/M | 核心推理、复杂决策 |
| GPT-4o-mini | $0.15/M | $0.6/M | 路由、分类、简单任务 |
| Claude 3.5 Sonnet | $3/M | $15/M | 长文本处理、代码生成 |
| Qwen2.5-72B | 自部署 | 自部署 | 高频调用、数据安全要求 |
框架选型
LangGraph
特点: LangChain 生态,基于状态图的工作流引擎
优势:
- 状态管理完善(TypedDict 定义状态)
- 支持条件分支、循环、并行
- 与 LangChain 生态无缝集成
- LangSmith 提供完整的可观测性
适用: 复杂工作流、需要精确控制流程的场景
AutoGen
特点: 微软出品,对话式多 Agent 框架
优势:
- 对话驱动的交互模式
- 内置代码执行环境
- 支持Human-in-the-loop
- Group Chat 模式支持多 Agent 自由讨论
适用: 对话式协作、需要人机交互的场景
CrewAI
特点: 角色扮演模式的多 Agent 框架
优势:
- API 简洁,上手快
- 内置任务委托和协作机制
- 角色定义直观(Name, Goal, Backstory)
适用: 业务流程自动化、角色明确的简单场景
选型建议
| 需求 | 推荐框架 |
|---|---|
| 复杂工作流 + 精确控制 | LangGraph |
| 对话式协作 | AutoGen |
| 快速原型 | CrewAI |
| 自研需求 | 基于原始 API 自建 |
我们的经验: 在生产项目中,我们大多使用 LangGraph。它的状态管理和流程控制能力在复杂场景中优势明显。CrewAI 适合 POC 和简单场景。
实践中的注意事项
- 从简单开始:先用单 Agent 验证核心逻辑,确认需要多 Agent 后再拆分
- 明确每个 Agent 的职责:职责重叠会导致冲突和浪费
- 定义清晰的接口:Agent 之间的输入输出格式要标准化
- 限制迭代次数:反思模式和辩论模式都要设最大轮次
- 保持可观测性:完整记录每个 Agent 的输入输出和决策过程
- 成本意识:多 Agent = 多次 LLM 调用 = 更高成本,要有预算规划
总结
多 Agent 不是万能解药。在选择单 Agent 还是多 Agent 时,遵循一个原则:能用单 Agent 解决的就不用多 Agent。只有当任务复杂度确实需要专业化分工时,多 Agent 才能体现价值。
选择合适的设计模式、做好工程实践(通信、上下文、错误处理、可观测性)、控制好成本——这三者缺一不可。多 Agent 系统的难点不在于"让多个 Agent 对话",而在于让它们"高效协作"。这与管理一个人类团队的挑战惊人地相似。