什么是Plan-and-Execute模式
Plan-and-Execute架构流程:先指定计划,后交给执行agent,执行后交给replan节点,判断是否需要更新计划,若要更新计划返回返回更新后的机会,否则返回response,然后路由判断是执行agent还是response
与ReAct模式不同的是,ReAct只做一次规划,而Plan-and-Execute模式核心思想是首先制定一个多步骤计划,然后逐项执行该计划。完成特定任务后,可以重新审视计划并进行适当修改。
举个例子,用户在问一个问题后,agent产生一份任务清单,选取第一份任务开始执行,执行后的结果结合任务清单,执行replan,结合新的信息,更改任务清单的内容,让后续大模型更好地执行,并去除已经完成的任务
实际生产中,应该在planner之前再进行一次判断,如果问题过于简单,不需要进行Plan-and-Execute模式
实战
安装包
1 pip install --quiet -U langgraph langchain-community langchain-openai tavily-python
定义网络搜索工具与执行agent
在产生plan后,要有一个agent对任务清单进行执行,这里以一个ReAct的网络搜索agent为例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 #定义工具 from langchain_tavily import TavilySearch tools = [TavilySearch(max_results=3)] from langchain_openai import ChatOpenAI from langgraph.prebuilt import create_react_agent llm = ChatOpenAI( model="qwen3-235b-a22b-thinking-2507", api_key="sk-", base_url="https://dashscope.aliyuncs.com/compatible-mode/v1" ) prompt = "You are a helpful assistant." agent_executor = create_react_agent(llm, tools, prompt=prompt)
测试功能
1 agent_executor.invoke({"messages": [("user", "今天是几月几日")]})
定义执行节点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 from typing import Literal from langgraph.graph import END async def execute_step(state: PlanExecute): """执行计划中的步骤""" plan = state["plan"] # 将计划格式化为带编号的字符串 plan_str = "\n".join(f"{i + 1}. {step}" for i, step in enumerate(plan)) task = plan[0] # 获取第一个待执行的任务 task_formatted = f"""对于以下计划: {plan_str}\n\n你被分配执行第 {1} 步, {task}。""" # 调用代理执行器来执行任务 agent_response = await agent_executor.ainvoke( {"messages": [("user", task_formatted)]} ) # 返回执行结果,添加到历史步骤中 return { "past_steps": [(task, agent_response["messages"][-1].content)], }
create_react_agent 是 LangGraph 提供的一个预构建函数,位于
langgraph.prebuilt 模块中,用于快速创建一个基于 ReAct(Reasoning +
Acting)架构的智能代理。
langgraph预设的其他常用组件如下
ToolNode
功能:把 LangChain 工具(BaseTool)封装成一个图节点,负责:
接收 LLM 生成的工具调用请求
真正执行工具
把结果返回给图
1 2 3 from langgraph.prebuilt import ToolNode tool_node = ToolNode(tools=[search, calculator])
tools_condition
功能:判断 LLM 是否要继续调用工具的“路由函数”。
在 ReAct 图里通常放在节点之间的 条件边:
1 2 3 4 5 6 7 8 9 from langgraph.prebuilt import tools_condition graph.add_conditional_edges("agent", tools_condition, { "tools": "tool_node", # 需要工具 → 去 ToolNode "**__end__**": END # 不需要 → 结束 })
定义状态
1 2 3 4 5 6 7 8 9 10 import operator from typing import Annotated, List, Tuple from typing_extensions import TypedDict class PlanExecute(TypedDict): input: str plan: List[str] past_steps: Annotated[List[Tuple], operator.add] response: str
1 2 3 4 5 6 7 8 9 from pydantic import BaseModel, Field class Plan(BaseModel): """未来要遵循的计划""" steps: List[str] = Field( description="需要遵循的不同步骤,应该按排序顺序排列" )
定义模型
1 2 3 4 5 from langchain_community.chat_models import ChatTongyi model=ChatTongyi( model="qwen-plus-2025-07-14", api_key="sk-" )
定义初始计划节点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 from langchain_core.prompts import ChatPromptTemplate planner_prompt = ChatPromptTemplate.from_messages( [ ( "system", """对于给定的目标,制定一个简单的逐步计划。\ 这个计划应该包含独立的任务,如果正确执行这些任务将得到正确的答案。不要添加任何多余的步骤。\ 最后一步的结果应该是最终答案。确保每个步骤都包含所需的所有信息——不要跳过任何步骤。""", ), ("placeholder", "{messages}"), ] ) planner = planner_prompt | model.with_structured_output(Plan) async def plan_step(state: PlanExecute): """制定初始计划步骤""" # 使用规划器为用户输入制定计划 plan = await planner.ainvoke({"messages": [("user", state["input"])]}) return {"plan": plan.steps}
定义再计划节点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 from typing import Union class Response(BaseModel): """对用户的响应""" response: str class Act(BaseModel): """要执行的动作""" action: Union[Response, Plan] = Field( description="要执行的动作。如果你想响应用户,使用 Response。" "如果你需要进一步使用工具来获取答案,使用 Plan。" ) replanner_prompt = ChatPromptTemplate.from_template( """对于给定的目标,制定一个简单的逐步计划。\ 这个 计划应该包含独立的任务,如果正确执行这些任务将得到正确的答案。不要添加任何多余的步骤。\ 最后一步的结果应该是最终答案。确保每个步骤都包含所需的所有信息——不要跳过任何步骤。 你的目标是: {input} 你的原始计划是: {plan} 你目前已经完成了以下步骤: {past_steps} 相应地更新你的计划。如果不需要更多步骤并且可以返回给用户,则直接响应。否则,填写计划。只添加仍需要完成的步骤到计划中。不要将已经完成的步骤作为计划的一部分返回。""" ) replanner = replanner_prompt | model.with_structured_output(Act)
1 2 3 4 5 6 7 8 9 10 async def replan_step(state: PlanExecute): """重新规划步骤""" # 使用重新规划器根据当前状态更新计划 output = await replanner.ainvoke(state) if isinstance(output.action, Response): # 如果动作是响应,返回最终响应 return {"response": output.action.response} else: # 如果动作是计划,返回新的计划步骤 return {"plan": output.action.steps}
定义路由
1 2 3 4 5 6 7 8 def should_end(state: PlanExecute): """判断是否结束执行流程""" if "response" in state and state["response"]: # 如果存在响应内容,结束流程 return END else: # 否则继续执行代理步骤 return "agent"
编译图
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 from langgraph.graph import StateGraph, START from langgraph.checkpoint.memory import MemorySaver # 创建内存检查点保存器 memory = MemorySaver() # 创建工作流图,使用 PlanExecute 状态类型 workflow = StateGraph(PlanExecute) # 添加计划节点 workflow.add_node("planner", plan_step) # 添加执行步骤节点 workflow.add_node("agent", execute_step) # 添加重新规划节点 workflow.add_node("replan", replan_step) # 从开始节点连接到计划节点 workflow.add_edge(START, "planner") # 从计划节点连接到代理执行节点 workflow.add_edge("planner", "agent") # 从代理执行节点连接到重新规划节点 workflow.add_edge("agent", "replan") # 添加条件边 - 从重新规划节点根据条件决定下一步 workflow.add_conditional_edges( "replan", # 传入决定下一个调用节点的函数 should_end, ["agent", END], # 可能的下一个节点:代理节点或结束 ) # 最后,编译工作流并添加检查点功能! # 这将其编译为 LangChain Runnable, # 意味着你可以像使用其他任何 runnable 一样使用它 app = workflow.compile(checkpointer=memory)
image-20250729092408872
调用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 # 配置递归限制,防止无限循环 config = {"configurable": { "thread_id": "1", # 必需:线程ID },"recursion_limit": 50} # 输入问题:2024年澳大利亚网球公开赛男单冠军的家乡是哪里? inputs = {"input": "2024年澳大利亚网球公开赛男单冠军的家乡是哪里?"} # 异步流式执行应用 async for event in app.astream(inputs, config=config): # 遍历每个事件 for k, v in event.items(): # 排除结束标记,打印其他所有事件内容 if k != "__end__": print(v)
资源
计划与执行
— Plan-and-Execute