Plan-and-Execute模式

什么是Plan-and-Execute模式

Plan-and-Execute架构流程:先指定计划,后交给执行agent,执行后交给replan节点,判断是否需要更新计划,若要更新计划返回返回更新后的机会,否则返回response,然后路由判断是执行agent还是response

与ReAct模式不同的是,ReAct只做一次规划,而Plan-and-Execute模式核心思想是首先制定一个多步骤计划,然后逐项执行该计划。完成特定任务后,可以重新审视计划并进行适当修改。

举个例子,用户在问一个问题后,agent产生一份任务清单,选取第一份任务开始执行,执行后的结果结合任务清单,执行replan,结合新的信息,更改任务清单的内容,让后续大模型更好地执行,并去除已经完成的任务

实际生产中,应该在planner之前再进行一次判断,如果问题过于简单,不需要进行Plan-and-Execute模式

实战

安装包

1
pip install --quiet -U langgraph langchain-community langchain-openai tavily-python

定义网络搜索工具与执行agent

在产生plan后,要有一个agent对任务清单进行执行,这里以一个ReAct的网络搜索agent为例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#定义工具
from langchain_tavily import TavilySearch
tools = [TavilySearch(max_results=3)]

from langchain_openai import ChatOpenAI

from langgraph.prebuilt import create_react_agent


llm = ChatOpenAI(
model="qwen3-235b-a22b-thinking-2507",
api_key="sk-",
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"
)
prompt = "You are a helpful assistant."
agent_executor = create_react_agent(llm, tools, prompt=prompt)

测试功能

1
agent_executor.invoke({"messages": [("user", "今天是几月几日")]})

定义执行节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from typing import Literal
from langgraph.graph import END


async def execute_step(state: PlanExecute):
"""执行计划中的步骤"""
plan = state["plan"]
# 将计划格式化为带编号的字符串
plan_str = "\n".join(f"{i + 1}. {step}" for i, step in enumerate(plan))
task = plan[0] # 获取第一个待执行的任务
task_formatted = f"""对于以下计划:
{plan_str}\n\n你被分配执行第 {1} 步, {task}。"""
# 调用代理执行器来执行任务
agent_response = await agent_executor.ainvoke(
{"messages": [("user", task_formatted)]}
)
# 返回执行结果,添加到历史步骤中
return {
"past_steps": [(task, agent_response["messages"][-1].content)],
}

create_react_agent 是 LangGraph 提供的一个预构建函数,位于 langgraph.prebuilt 模块中,用于快速创建一个基于 ReAct(Reasoning + Acting)架构的智能代理。

langgraph预设的其他常用组件如下

ToolNode

功能:把 LangChain 工具(BaseTool)封装成一个图节点,负责:

  • 接收 LLM 生成的工具调用请求

  • 真正执行工具

  • 把结果返回给图

1
2
3
from langgraph.prebuilt import ToolNode

tool_node = ToolNode(tools=[search, calculator])

tools_condition

功能:判断 LLM 是否要继续调用工具的“路由函数”。

在 ReAct 图里通常放在节点之间的 条件边:

1
2
3
4
5
6
7
8
9
from langgraph.prebuilt import tools_condition

graph.add_conditional_edges("agent", tools_condition, {

"tools": "tool_node", # 需要工具 → 去 ToolNode

"**__end__**": END # 不需要 → 结束

})

定义状态

1
2
3
4
5
6
7
8
9
10
import operator
from typing import Annotated, List, Tuple
from typing_extensions import TypedDict


class PlanExecute(TypedDict):
input: str
plan: List[str]
past_steps: Annotated[List[Tuple], operator.add]
response: str
1
2
3
4
5
6
7
8
9
from pydantic import BaseModel, Field


class Plan(BaseModel):
"""未来要遵循的计划"""

steps: List[str] = Field(
description="需要遵循的不同步骤,应该按排序顺序排列"
)

定义模型

1
2
3
4
5
from langchain_community.chat_models import ChatTongyi
model=ChatTongyi(
model="qwen-plus-2025-07-14",
api_key="sk-"
)

定义初始计划节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from langchain_core.prompts import ChatPromptTemplate

planner_prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"""对于给定的目标,制定一个简单的逐步计划。\
这个计划应该包含独立的任务,如果正确执行这些任务将得到正确的答案。不要添加任何多余的步骤。\
最后一步的结果应该是最终答案。确保每个步骤都包含所需的所有信息——不要跳过任何步骤。""",
),
("placeholder", "{messages}"),
]
)
planner = planner_prompt | model.with_structured_output(Plan)

async def plan_step(state: PlanExecute):
"""制定初始计划步骤"""
# 使用规划器为用户输入制定计划
plan = await planner.ainvoke({"messages": [("user", state["input"])]})

return {"plan": plan.steps}

定义再计划节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
from typing import Union


class Response(BaseModel):
"""对用户的响应"""

response: str


class Act(BaseModel):
"""要执行的动作"""

action: Union[Response, Plan] = Field(
description="要执行的动作。如果你想响应用户,使用 Response。"
"如果你需要进一步使用工具来获取答案,使用 Plan。"
)


replanner_prompt = ChatPromptTemplate.from_template(
"""对于给定的目标,制定一个简单的逐步计划。\
这个 计划应该包含独立的任务,如果正确执行这些任务将得到正确的答案。不要添加任何多余的步骤。\
最后一步的结果应该是最终答案。确保每个步骤都包含所需的所有信息——不要跳过任何步骤。

你的目标是:
{input}

你的原始计划是:
{plan}

你目前已经完成了以下步骤:
{past_steps}

相应地更新你的计划。如果不需要更多步骤并且可以返回给用户,则直接响应。否则,填写计划。只添加仍需要完成的步骤到计划中。不要将已经完成的步骤作为计划的一部分返回。"""
)


replanner = replanner_prompt | model.with_structured_output(Act)
1
2
3
4
5
6
7
8
9
10
async def replan_step(state: PlanExecute):
"""重新规划步骤"""
# 使用重新规划器根据当前状态更新计划
output = await replanner.ainvoke(state)
if isinstance(output.action, Response):
# 如果动作是响应,返回最终响应
return {"response": output.action.response}
else:
# 如果动作是计划,返回新的计划步骤
return {"plan": output.action.steps}

定义路由

1
2
3
4
5
6
7
8
def should_end(state: PlanExecute):
"""判断是否结束执行流程"""
if "response" in state and state["response"]:
# 如果存在响应内容,结束流程
return END
else:
# 否则继续执行代理步骤
return "agent"

编译图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from langgraph.graph import StateGraph, START
from langgraph.checkpoint.memory import MemorySaver

# 创建内存检查点保存器
memory = MemorySaver()

# 创建工作流图,使用 PlanExecute 状态类型
workflow = StateGraph(PlanExecute)

# 添加计划节点
workflow.add_node("planner", plan_step)

# 添加执行步骤节点
workflow.add_node("agent", execute_step)

# 添加重新规划节点
workflow.add_node("replan", replan_step)

# 从开始节点连接到计划节点
workflow.add_edge(START, "planner")

# 从计划节点连接到代理执行节点
workflow.add_edge("planner", "agent")

# 从代理执行节点连接到重新规划节点
workflow.add_edge("agent", "replan")

# 添加条件边 - 从重新规划节点根据条件决定下一步
workflow.add_conditional_edges(
"replan",
# 传入决定下一个调用节点的函数
should_end,
["agent", END], # 可能的下一个节点:代理节点或结束
)

# 最后,编译工作流并添加检查点功能!
# 这将其编译为 LangChain Runnable,
# 意味着你可以像使用其他任何 runnable 一样使用它
app = workflow.compile(checkpointer=memory)
image-20250729092408872

调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 配置递归限制,防止无限循环
config = {"configurable": {
"thread_id": "1", # 必需:线程ID
},"recursion_limit": 50}

# 输入问题:2024年澳大利亚网球公开赛男单冠军的家乡是哪里?
inputs = {"input": "2024年澳大利亚网球公开赛男单冠军的家乡是哪里?"}

# 异步流式执行应用
async for event in app.astream(inputs, config=config):
# 遍历每个事件
for k, v in event.items():
# 排除结束标记,打印其他所有事件内容
if k != "__end__":
print(v)

资源

计划与执行 — Plan-and-Execute