LangGraph学习——agent——上

前言

本教程为langchain官方教程的学习记录

academy.langchain.com/enrollments

代码见[learn-rag-langchain/academy-langgraph at main · zxj-2023/learn-rag-langchain](https://github.com/zxj-2023/learn-rag-langchain/tree/main/academy-langgraph)

module-1

route路由

在 LangGraph 中,route(路由)*的核心作用是*根据当前状态动态决定“下一步应该执行哪个节点”

定义工具
1
2
3
4
5
6
7
8
9
10
11
12
13
def multiply(a: int, b: int) -> int:
"""Multiply a and b.

Args:
a: first int
b: second int
"""
return a * b

llm = ChatOpenAI(

)
llm_with_tools = llm.bind_tools([multiply])

三引号字符串叫 docstring,它会被 LangChain 拿来做两件事:

  1. 生成工具的 description(给大模型看的“说明书”) 没有它时,LangChain 只能退而求其次,把函数名 multiply 拼成一句 “multiply tool” 之类的默认描述。大模型拿到的工具列表里,这个工具就只有一个干巴巴的名字和参数列表,它可能猜不到这个工具到底是干什么的
  2. 给人类开发者自己看 IDE、文档生成器、静态检查工具都会读取这段文字,方便后期维护。
构建条件边

tool_calling_llm 是一个普通的计算节点(node),负责把当前对话状态交给大模型,让大模型决定要不要调用工具;

真正完成“路由”动作的是 tools_condition 这个函数——它才是 LangGraph 里的 route(条件边)

tools_condition 是 作为LangGraph 预置的“默认路由函数”,功能就是,如果大模型的最新回复中包含工具调用,就调用工具

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# Node
def tool_calling_llm(state: MessagesState):
#调用大模型后将最新的消息返回
return {"messages": [llm_with_tools.invoke(state["messages"])]}

# Build graph
#MessagesState 是 LangGraph 官方预置 的一种 状态(State)定义
#这个状态维护了一个消息list,有新的消息就加进这个消息list
builder = StateGraph(MessagesState)
builder.add_node("tool_calling_llm", tool_calling_llm)
builder.add_node("tools", ToolNode([multiply]))
builder.add_edge(START, "tool_calling_llm")
builder.add_conditional_edges(
"tool_calling_llm",
# 如果助手(结果)的最新消息是工具调用 -> tools_condition 路由到工具
# 如果助手(结果)的最新消息不是工具调用 -> tools_condition 路由到 END
tools_condition,
)
builder.add_edge("tools", END)
graph = builder.compile()

# View
display(Image(graph.get_graph().draw_mermaid_png()))
image-20250719142543838
调用
1
2
3
4
5
from langchain_core.messages import HumanMessage
messages = [HumanMessage(content="你好,2乘2是多少")]
messages = graph.invoke({"messages": messages})
for m in messages['messages']:
m.pretty_print()

输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
================================ Human Message =================================

你好,2乘2是多少
================================== Ai Message ==================================
Tool Calls:
multiply (call_e026ceb409e247748786ad)
Call ID: call_e026ceb409e247748786ad
Args:
a: 2
b: 2
================================= Tool Message =================================
Name: multiply

4

agent代理

在 LangGraph 中,代理(Agent) 被明确定义为“一个由大语言模型(LLM)驱动的、能够循环决策并调用外部工具来完成任务的节点或子图”

Agent = LLM + 工具集合 + 提示模板,三者在 LangGraph 的状态化图结构里循环运行,直到满足停止条件。

ReAct 是一种流行的通用智能体架构,它结合了这些扩展,并整合了三个核心概念。

  1. 工具调用:允许LLM根据需要选择和使用各种工具。
  2. 记忆:使智能体能够保留和使用之前步骤的信息。
  3. 规划:使LLM能够创建并遵循多步计划以实现目标。

act- 让模型调用特定工具

observe - 将工具输出传递回模型

reason - 让模型对工具输出进行推理,以决定下一步操作(例如,调用另一个工具或直接响应)

定义工具
1
2
3
4
5
6
7
tools = [add, multiply, divide]#工具函数具体内容省略
llm = ChatOpenAI()

# 在这个 ipynb 文件中,我们将并行工具调用(parallel tool calling)设置为 false,因为数学计算通常是按顺序执行的,并且这次我们有3个可以进行数学计算的工具。
# OpenAI 模型为了效率,默认进行并行工具调用,详情请参阅 `https://python.langchain.com/docs/how_to/tool_calling_parallel/`
# 不妨尝试一下,看看模型在处理数学方程式时的表现!
llm_with_tools = llm.bind_tools(tools, parallel_tool_calls=False)
创建代理

定义节点

1
2
3
4
5
6
7
8
9
from langgraph.graph import MessagesState
from langchain_core.messages import HumanMessage, SystemMessage

# System message
sys_msg = SystemMessage(content="你是一个乐于助人的助手,负责对一组输入执行算术运算。")

# Node
def assistant(state: MessagesState):
return {"messages": [llm_with_tools.invoke([sys_msg] + state["messages"])]}

这一步相当于定义了系统提示词,然后在 assistant 这个节点里,通过 [sys_msg] + state[“messages”] 这部分代码,这个系统提示词被添加到了整个对话历史的最前面,然后一起发送给模型。这样一来,模型在生成回复时就会遵循这个系统提示词的指示。

与上一个不同的是,我们将 Tools 节点 回环 连接到 Assistant,从而形成一个回路。

在 assistant节点执行后,tools_condition检查模型的输出是否为工具调用。

如果是工具调用,则流程被导向至 tools 节点。

tools节点重新连接到assistant 只要模型决定调用工具,此循环就会继续。

如果模型的响应不是工具调用,则流程被导向至结束,终止该过程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# Graph
builder = StateGraph(MessagesState)

# Define nodes: these do the work
builder.add_node("assistant", assistant)
builder.add_node("tools", ToolNode(tools))

# Define edges: these determine how the control flow moves
builder.add_edge(START, "assistant")
builder.add_conditional_edges(
"assistant",
# If the latest message (result) from assistant is a tool call -> tools_condition routes to tools
# If the latest message (result) from assistant is a not a tool call -> tools_condition routes to END
tools_condition,
)
builder.add_edge("tools", "assistant")
react_graph = builder.compile()

# Show
display(Image(react_graph.get_graph(xray=True).draw_mermaid_png()))
image-20250719145948088
调用
1
2
3
4
5
messages = [HumanMessage(content="将3和4相加。将结果乘以2。再将结果除以5。")]
messages = react_graph.invoke({"messages": messages})

for m in messages['messages']:
m.pretty_print()

parallel_tool_calls=False的输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
================================ Human Message =================================

将3和4相加。将结果乘以2。再将结果除以5。
================================== Ai Message ==================================
Tool Calls:
add (call_6c69898dba0342bfbb889e)
Call ID: call_6c69898dba0342bfbb889e
Args:
a: 3
b: 4
================================= Tool Message =================================
Name: add

7
================================== Ai Message ==================================
Tool Calls:
multiply (call_9940e7603ecf4a13a5f2fb)
Call ID: call_9940e7603ecf4a13a5f2fb
Args:
a: 7
b: 2
================================= Tool Message =================================
Name: multiply

14
================================== Ai Message ==================================
Tool Calls:
divide (call_d48fbbe205a14dfbaa3500)
Call ID: call_d48fbbe205a14dfbaa3500
Args:
a: 14
b: 5
================================= Tool Message =================================
Name: divide

2.8
================================== Ai Message ==================================

最终结果是2.8。

parallel_tool_calls=True的输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
================================ Human Message =================================

将3和4相加。将结果乘以2。再将结果除以5。
================================== Ai Message ==================================
Tool Calls:
add (call_e0c7d8e65f2c49e8aecd3e)
Call ID: call_e0c7d8e65f2c49e8aecd3e
Args:
a: 3
b: 4
multiply (call_5bf824058e64489aaace91)
Call ID: call_5bf824058e64489aaace91
Args:
a: 7
b: 2
divide (call_36c34f69f6574028b28847)
Call ID: call_36c34f69f6574028b28847
Args:
a: 14
b: 5
================================= Tool Message =================================
Name: add

7
================================= Tool Message =================================
Name: multiply

14
================================= Tool Message =================================
Name: divide

2.8
================================== Ai Message ==================================

最终结果是 **2.8**。

Agent memory代理记忆

使用chekpointer检查点的功能,最简单的检查点之一是 MemorySaver,这是一个用于图形状态的内存键值存储。

这个检查点就相当于把图的每一次“状态快照”持久化到外部存储的机制。

1
2
3
from langgraph.checkpoint.memory import MemorySaver
memory = MemorySaver()
react_graph_memory = builder.compile(checkpointer=memory)

我们可以使用 记忆功能 来解决这个问题!LangGraph 可以使用检查点工具在每一步之后自动保存图的状态。这一内置的持久化层为我们提供了内存功能,使 LangGraph 能够从最后一次状态更新处继续。

1
2
3
4
5
6
7
8
9
10
# Specify a thread
config = {"configurable": {"thread_id": "1"}}

# Specify an input
messages = [HumanMessage(content="Add 3 and 4.")]

# Run
messages = react_graph_memory.invoke({"messages": messages},config)
for m in messages['messages']:
m.pretty_print()

当我们使用内存时,我们需要指定一个 thread_id。这 thread_id 将存储我们的图形状态集合。

如下图,检查点在图的每一步写入状态,这些检查点保存在一个线程中 ,我们可以使用 thread_id 在未来访问该线程

state.jpg

module-2

state-scheme状态模式

LangGraph 的 state-scheme(状态模式) 就是“一张蓝图”,它告诉框架:“在整个图的生命周期里,状态对象应该长什么样、每个字段怎样被更新、以及节点之间如何共享或隔离数据。”

state-scheme 用 TypedDictPydantic BaseModel 来声明,定义了:

  • 状态里有哪些字段(key)
  • 每个字段的 Python 类型
  • 可选 该字段的 reducer(更新规则)
TypedDict

基本定义

1
2
3
4
5
from typing_extensions import TypedDict

class TypedDictState(TypedDict):
foo: str
bar: str

可增加像 Literal 这样的类型提示,使其更有价值

1
2
3
4
5
from typing import Literal

class TypedDictState(TypedDict):
name: str
mood: Literal["happy","sad"]

在这里,mood 只能是 “happy” 或 “sad”。

加 reducer:让更新“可追加”而不覆盖

1
2
3
4
class MathState(TypedDict):
question: str
scratchpad: Annotated[list[str], add_message] # 新元素自动追加
answer: int

Annotated[list[str], add] 告诉 LangGraph:当节点返回 {"scratchpad": ["新步骤"]} 时,追加到现有列表,而不是替换

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import random
from IPython.display import Image, display
from langgraph.graph import StateGraph, START, END
#定义节点
def node_1(state):
print("---Node 1---")
return {"name": state['name'] + " is ... "}

def node_2(state):
print("---Node 2---")
return {"mood": "happy"}

def node_3(state):
print("---Node 3---")
return {"mood": "sad"}
#路由函数
def decide_mood(state) -> Literal["node_2", "node_3"]:

# Here, let's just do a 50 / 50 split between nodes 2, 3
if random.random() < 0.5:

# 50% of the time, we return Node 2
return "node_2"

# 50% of the time, we return Node 3
return "node_3"

# Build graph
builder = StateGraph(TypedDictState)
builder.add_node("node_1", node_1)
builder.add_node("node_2", node_2)
builder.add_node("node_3", node_3)

# Logic
builder.add_edge(START, "node_1")
builder.add_conditional_edges("node_1", decide_mood)
builder.add_edge("node_2", END)
builder.add_edge("node_3", END)

# Add
graph = builder.compile()

# View
display(Image(graph.get_graph().draw_mermaid_png()))
image-20250719154439415

因为我们的状态是一个字典,我们只需用一个字典调用图,以设置状态中 name 键的初始值。

1
graph.invoke({"name":"Lance"})
Dataclass数据类

python的dataclasses库提供了一种简洁的语法,用于创建主要用于存储数据的类。

1
2
3
4
5
6
from dataclasses import dataclass

@dataclass
class DataclassState:
name: str
mood: Literal["happy","sad"]

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def node_1(state):
print("---Node 1---")
return {"name": state.name + " is ... "}

# Build graph
builder = StateGraph(DataclassState)
builder.add_node("node_1", node_1)
builder.add_node("node_2", node_2)
builder.add_node("node_3", node_3)

# Logic
builder.add_edge(START, "node_1")
builder.add_conditional_edges("node_1", decide_mood)
builder.add_edge("node_2", END)
builder.add_edge("node_3", END)

# Add
graph = builder.compile()

# View
display(Image(graph.get_graph().draw_mermaid_png()))

要访问 dataclass 的键,我们只需修改在 node_1 中使用的下标即可:

我们使用 state.name 来表示 dataclass 状态,而不是使用 state["name"] 来表示上面的 TypedDict

你会注意到一个有点奇怪的地方:在每个节点中,我们仍然返回一个字典来执行状态更新。

Dataclass 只是“描述”状态的形状,而真正在 LangGraph 的节点之间流动的依旧是「字典」,这是框架设计层面的约定

在这种情况下,dataclass 拥有键 name,因此我们可以通过从节点传递一个字典来更新它,就像在状态为 TypedDict 时所做的那样。

1
graph.invoke(DataclassState(name="Lance",mood="sad"))

我们通过 dataclass 来设置状态中每个键/通道的初始值!

State Reducers状态更新函数

Reducers 为我们指定了如何执行更新。它接收 旧状态一次变更指令(action / 增量字段)返回全新的状态对象,整个过程中不能修改原有数据

我们可以使用 Annotated 类型来指定一个 reducer 函数。在这种情况下,让我们将每个节点返回的值附加到结果中,而不是覆盖它们。

1
2
3
4
5
from operator import add
from typing import Annotated

class State(TypedDict):
foo: Annotated[list[int], add]

我们只需要一个可以执行此操作的缩减器:operator.add 是 Python 内置 operator 模块中的一个函数。当 operator.add 应用于列表时,它执行列表连接。

Custom Reducers 自定义 Reducers

我们同样可以自定义reducers函数,解决一些特殊情况,比如,如下可以解决传入参数为none的情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def reduce_list(left: list | None, right: list | None) -> list:
"""安全地合并两个列表,处理其中一个或两个输入可能为 None 的情况。

参数:
left (list | None): 要合并的第一个列表,或 None。
right (list | None): 要合并的第二个列表,或 None。

返回:
list: 一个包含两个输入列表所有元素的新列表。
如果输入为 None,则将其视为空列表。
"""
if not left:
left = []
if not right:
right = []
return left + right

class DefaultState(TypedDict):
foo: Annotated[list[int], add]

class CustomReducerState(TypedDict):
foo: Annotated[list[int], reduce_list]
MessagesState

我可以使用内置的 reducer add_messages 来处理状态中的消息

MessagesState 内置了一个 messages 键 它还为该键内置了一个 add_messages 合并器,这两个是等价的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from typing import Annotated
from langgraph.graph import MessagesState
from langchain_core.messages import AnyMessage
from langgraph.graph.message import add_messages

# 定义一个自定义的 TypedDict,其中包含一个带有 add_messages reducer 的消息列表。
class CustomMessagesState(TypedDict):
messages: Annotated[list[AnyMessage], add_messages]
added_key_1: str
added_key_2: str
# etc

# 使用 MessagesState ,它包含带有 add_messages reducer 的 messages 键。
class ExtendedMessagesState(MessagesState):
# 添加除 messages 之外所需的任何键, messages 是预构建的。
added_key_1: str
added_key_2: str
# etc

在使用 add_messages reducer 时,让我们展示一些有用的技巧。

重写(Re-writing)

如果我们传递的消息与 messages 列表中已有的消息具有相同的 ID,则该消息将被覆盖!

1
2
3
4
5
6
7
8
9
10
# Initial state
initial_messages = [AIMessage(content="Hello! How can I assist you?", name="Model", id="1"),
HumanMessage(content="I'm looking for information on marine biology.", name="Lance", id="2")
]

# New message to add
new_message = HumanMessage(content="I'm looking for information on whales, specifically", name="Lance", id="2")

# Test
add_messages(initial_messages , new_message)
1
2
[AIMessage(content='Hello! How can I assist you?', name='Model', id='1'),
HumanMessage(content="I'm looking for information on whales, specifically", name='Lance', id='2')]

删除(Removal)

add_messages同样支持删除。为此,我们简单地使用 RemoveMessage 来自 langchain_core

1
2
3
4
5
6
7
8
9
10
11
from langchain_core.messages import RemoveMessage

# Message list
messages = [AIMessage("Hi.", name="Bot", id="1")]
messages.append(HumanMessage("Hi.", name="Lance", id="2"))
messages.append(AIMessage("So you said you were researching ocean mammals?", name="Bot", id="3"))
messages.append(HumanMessage("Yes, I know about whales. But what others should I learn about?", name="Lance", id="4"))

# Isolate messages to delete
delete_messages = [RemoveMessage(id=m.id) for m in messages[:-2]]
print(delete_messages)

Multiple Schemas 多种状态

Private State 私有状态

首先,让我们讨论在节点之间传递 private state 的情况。这对于图的中间计算逻辑中需要的任何内容都很有用,但与图的整体输入或输出无关。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from typing_extensions import TypedDict
from IPython.display import Image, display
from langgraph.graph import StateGraph, START, END

class OverallState(TypedDict):
foo: int

class PrivateState(TypedDict):
baz: int

def node_1(state: OverallState) -> PrivateState:
print("---Node 1---")
return {"baz": state['foo'] + 1}

def node_2(state: PrivateState) -> OverallState:
print("---Node 2---")
return {"foo": state['baz'] + 1}

# Build graph
builder = StateGraph(OverallState)
builder.add_node("node_1", node_1)
builder.add_node("node_2", node_2)

# Logic
builder.add_edge(START, "node_1")
builder.add_edge("node_1", "node_2")
builder.add_edge("node_2", END)

# Add
graph = builder.compile()

# View
display(Image(graph.get_graph().draw_mermaid_png()))
image-20250719171509917

我们将定义一个 OverallState 和一个 PrivateStatenode_2 使用 PrivateState 作为输入,但输出写入到 OverallState

baz 仅包含在 PrivateState 中。因此,我们可以看到 baz 被排除在图形输出之外,因为它不在 OverallState 中。

Input / Output Schema 输入/输出模式

在 LangGraph 中,Input / Output Schema 就是“图的对外接口协议”:调用者只能按 Input Schema 传参;图运行完后,只吐出 Output Schema 规定的字段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict

# 定义输入的模式
class InputState(TypedDict):
question: str

# 定义输出的模式
class OutputState(TypedDict):
answer: str

# 定义整体模式,结合输入和输出
class OverallState(InputState, OutputState):
pass

# 定义处理输入并生成答案的节点
def answer_node(state: InputState):
# 示例答案和额外键
return {"answer": "bye", "question": state["question"]}

# 构建图,并指定输入和输出模式
builder = StateGraph(OverallState, input_schema=InputState, output_schema=OutputState)
builder.add_node(answer_node) # 添加答案节点
builder.add_edge(START, "answer_node") # 定义起始边
builder.add_edge("answer_node", END) # 定义结束边
graph = builder.compile() # 编译图

# 使用输入调用图并打印结果
print(graph.invoke({"question": "hi"}))

输出

1
{'answer': 'bye Lance'}

在这里,input / output 模式对图的输入和输出上允许的键进行 过滤。可以看到 output 模式将输出限制为仅包含 answer 键。

Filtering and trimming messages筛选和精简消息

如果我们在处理长时间对话时不够小心,会导致高令牌使用量和延迟,因为我们传递给模型的是一系列不断增加的消息。所以要进行筛选和精简消息。

简化器(Reducer)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from langchain_core.messages import RemoveMessage

# Nodes
def filter_messages(state: MessagesState):
# 删除除最近两条消息外的所有消息
delete_messages = [RemoveMessage(id=m.id) for m in state["messages"][:-2]]
return {"messages": delete_messages}

def chat_model_node(state: MessagesState):
return {"messages": [llm.invoke(state["messages"])]}

# Build graph
builder = StateGraph(MessagesState)
builder.add_node("filter", filter_messages)
builder.add_node("chat_model", chat_model_node)
builder.add_edge(START, "filter")
builder.add_edge("filter", "chat_model")
builder.add_edge("chat_model", END)
graph = builder.compile()

# View
display(Image(graph.get_graph().draw_mermaid_png()))
image-20250719203204234
筛选消息(Filtering messages)

如果你不需要或不希望修改图状态,可以直接过滤传递给聊天模型的消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
# Node
def chat_model_node(state: MessagesState):
return {"messages": [llm.invoke(state["messages"][-1:])]}

# Build graph
builder = StateGraph(MessagesState)
builder.add_node("chat_model", chat_model_node)
builder.add_edge(START, "chat_model")
builder.add_edge("chat_model", END)
graph = builder.compile()

# View
display(Image(graph.get_graph().draw_mermaid_png()))

例如,只需传递一个过滤后的列表:llm.invoke(messages[-1:]) 给模型。

状态包含了所有消息。但这里模型调用仅使用最后一条消息

裁剪消息(Trim messages)

另一种方法是根据设定一定数量的tokens进行 trim messages。在把对话历史发给大模型之前,按 token 预算 把超长消息列表“剪”到合适长度。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from langchain_core.messages import trim_messages

# Node
def chat_model_node(state: MessagesState):
# 使用 trim_messages 函数修剪消息列表
# max_tokens: 限制消息的最大令牌数
# strategy: 修剪策略,这里是“last”,表示保留最新的消息
# token_counter: 用于计算令牌数的模型实例
# allow_partial: 是否允许部分修剪
messages = trim_messages(
state["messages"],
max_tokens=100,
strategy="last",
token_counter= ChatOpenAI(
model="qwen-plus-2025-04-28",
api_key="sk-",
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"),
allow_partial=False,
)
# 调用语言模型(llm)处理修剪后的消息,并返回结果
return {"messages": [llm.invoke(messages)]}

# Build graph
builder = StateGraph(MessagesState)
builder.add_node("chat_model", chat_model_node)
builder.add_edge(START, "chat_model")
builder.add_edge("chat_model", END)
graph = builder.compile()

# View
display(Image(graph.get_graph().draw_mermaid_png()))

Chatbot with message summarization带有消息总结功能的聊天机器人

与其仅仅修剪或过滤消息,我们将展示如何使用大型语言模型(LLMs)来生成对话的实时摘要。

这使我们能够保留整个对话的压缩表示,而不仅仅是通过修剪或过滤将其移除。

我们将为该聊天机器人配备记忆功能,支持长时间对话,同时不会产生高昂的 token 成本或延迟。

定义总结状态
1
2
3
from langgraph.graph import MessagesState
class State(MessagesState):
summary: str

除了内置的 messages 键之外,我们现在还将包含一个自定义键(summary)。

定义LLM节点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from langchain_core.messages import SystemMessage, HumanMessage, RemoveMessage 

# 定义调用模型的逻辑
def call_model(state: State):

# 获取摘要(如果存在)
summary = state.get("summary", "")

# 如果有摘要,则添加它
if summary:

# 将摘要添加到系统消息中
system_message = f"先前对话的摘要:{summary}"

# 将摘要附加到任何较新的消息中
messages = [SystemMessage(content=system_message)] + state["messages"]

else:
messages = state["messages"]

response = model.invoke(messages)
return {"messages": response}

我们将定义一个节点来调用我们的LLM,如果存在摘要,则将其纳入提示中。

当 call_model 函数返回 {“messages”: response} 时,它是在告诉 langgraph :“请用 response (即模型的新输出)来更新 State 对象中 messages 键对应的值。” langgraph 会将这个新消息追加到 messages 列表中,从而保持了对话历史的连续性

定义摘要节点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def summarize_conversation(state: State): 

# 首先,我们获取任何现有的摘要
summary = state.get("summary", "")

# 创建我们的摘要提示
if summary:

# 摘要已存在
summary_message = (
f"这是迄今为止对话的摘要:{summary}\n\n"
"请结合以上新消息扩展摘要:"
)

else:
summary_message = "创建以上对话的摘要:"

# 将提示添加到我们的历史记录中
messages = state["messages"] + [HumanMessage(content=summary_message)]
response = model.invoke(messages)

# 删除除最近2条消息外的所有消息
delete_messages = [RemoveMessage(id=m.id) for m in state["messages"][:-2]]
return {"summary": response.content, "messages": delete_messages}

我们将定义一个节点来生成摘要。请注意,这里我们将使用 RemoveMessage 在生成摘要后过滤我们的状态。

定义路由函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
from langgraph.graph import END 
# 决定是结束对话还是总结对话
def should_continue(state: State):

"""返回要执行的下一个节点。"""

messages = state["messages"]

# 如果消息超过六条,那么我们总结对话
if len(messages) > 6:
return "summarize_conversation"

# 否则我们就可以结束了
return END

我们将添加一个条件边,以根据对话长度确定是否生成摘要。

在 langgraph 中, Command 是一个特殊的类型,用于指导图形(graph)决定接下来应该执行哪个节点。 您可以把它看作是给图形下达的一个“命令”。

添加内存并编译图
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from IPython.display import Image, display
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, START

# Define a new graph
workflow = StateGraph(State)
workflow.add_node("conversation", call_model)
workflow.add_node("summarize_conversation",summarize_conversation)

# Set the entrypoint as conversation
workflow.add_edge(START, "conversation")
workflow.add_conditional_edges("conversation", should_continue)
workflow.add_edge("summarize_conversation", END)



# Compile
memory = MemorySaver()
graph = workflow.compile(checkpointer=memory)
display(Image(graph.get_graph().draw_mermaid_png()))
使用线程调用
1
2
3
4
5
6
config = {"configurable": {"thread_id": "2"}}

input_message = HumanMessage(content="我喜欢玩lol,你知道这个游戏吗")
output = graph.invoke({"messages": [input_message]}, config)
for m in output['messages'][-1:]:
m.pretty_print()

当对话大于6,可生成概要

1
graph.get_state(config).values.get("summary","")

Chatbot with message summarization & external DB memory具有消息总结和外部数据库记忆的聊天机器人

使用数据库

SqliteSaver 是 LangGraph 提供的一个 轻量级状态持久化工具,它将图的运行状态(即 checkpoint)保存到本地的 SQLite 数据库中,使得你可以在程序中断或重启后恢复执行上下文,特别适合本地开发、实验性项目或中小规模应用。

如果我们提供 “:memory:” ,它将创建一个内存中的 SQLite 数据库。

1
2
3
import sqlite3
# In memory
conn = sqlite3.connect(":memory:", check_same_thread = False)

如果我们提供一个 db 路径,那么它将为我们创建一个数据库!

1
2
3
4
5
#在本地创建一个目录 state_db,并尝试从 GitHub 下载一个名为 example.db 的 SQLite 数据库文件
!mkdir -p state_db && [ ! -f state_db/example.db ] && wget -P state_db https://github.com/langchain-ai/langchain-academy/raw/main/module-2/state_db/example.db

db_path = "state_db/example.db"
conn = sqlite3.connect(db_path, check_same_thread=False)

定义checkpoint

1
2
from langgraph.checkpoint.sqlite import SqliteSaver
memory = SqliteSaver(conn)

像上一个形式编译图

让我们确认一下我们的状态是否已保存到本地。

1
2
3
config = {"configurable": {"thread_id": "1"}}
graph_state = graph.get_state(config)
graph_state

使用像 Sqlite 这样的数据库意味着状态会被持久化!

module-3

Streaming 流式传输

现在,让我们来谈谈 流式传输我们的图状态 的方法。.stream.astream 是用于流式返回结果的同步和异步方法。

values:这将在每个节点被调用后流式传输图的完整状态。 updates:这将在每个节点被调用后流式传输图的状态更新。

values_vs_updates.png
stream_mode=“updates”
1
2
3
4
5
6
# Create a thread
config = {"configurable": {"thread_id": "1"}}

# Start conversation
for chunk in graph.stream({"messages": [HumanMessage(content="你好我是zxj")]}, config, stream_mode="updates"):
print(chunk)
1
{'conversation': {'messages': AIMessage(content='你好,zxj!很高兴认识你~有什么我可以帮你的吗?😊', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 16, 'prompt_tokens': 576, 'total_tokens': 592, 'completion_tokens_details': None, 'prompt_tokens_details': None}, 'model_name': 'qwen-plus-2025-04-28', 'system_fingerprint': None, 'id': 'chatcmpl-891471ae-2fe8-9b3d-b5f7-f4fcd55a4e16', 'service_tier': None, 'finish_reason': 'stop', 'logprobs': None}, id='run--f36409f3-af43-4e9b-8a46-39646ad7c106-0', usage_metadata={'input_tokens': 576, 'output_tokens': 16, 'total_tokens': 592, 'input_token_details': {}, 'output_token_details': {}})}}

让我们来看一下 stream_mode="updates"

因为我们使用 updates 进行流式传输,所以只有在图中的节点运行后,我们才能看到状态的更新。每个 chunk 是一个字典,以 node_name 为键,更新后的状态为值。

1
2
3
# Start conversation
for chunk in graph.stream({"messages": [HumanMessage(content="你好我是zxj")]}, config, stream_mode="updates"):
chunk['conversation']["messages"].pretty_print()

现在我们直接打印状态更新。

1
2
3
================================== Ai Message ==================================

你好呀,zxj!再次见到你真高兴~😊 有什么我可以帮忙的吗?
stream_mode=“values”
1
2
3
4
5
6
7
8
9
# Start conversation, again
config = {"configurable": {"thread_id": "2"}}

# Start conversation
input_message = HumanMessage(content="你好我是zxj")
for event in graph.stream({"messages": [input_message]}, config, stream_mode="values"):
for m in event['messages']:
m.pretty_print()
print("---"*25)

现在,我们可以看到 stream_mode="values".这是在 conversation 节点被调用后,图的整个状态。

1
2
3
4
5
6
7
8
9
10
11
================================ Human Message =================================

你好我是zxj
---------------------------------------------------------------------------
================================ Human Message =================================

你好我是zxj
================================== Ai Message ==================================

你好,zxj!有什么我可以帮你的吗?😊
---------------------------------------------------------------------------
Streaming tokens 流式传输令牌

在 LangGraph 中,“流式传输令牌(Streaming tokens)”指的是在节点内部的大模型(LLM)生成过程中,逐 token 地将中间结果实时推送到客户端的能力。实现这一能力的核心方法是 astream_events,它会以事件流的形式暴露整个执行过程中的所有细节,包括每一次 LLM 调用产生的 token。

每个事件是一个包含几个键的字典:

event:这是正在发出的事件的类型。

name:这是事件的名称。

data:这是与事件相关联的数据。

metadata:包含 langgraph_node,即发出事件的节点。

要点是,图表中聊天模型的令牌具有 on_chat_model_stream 类型。我们可以使用 event['metadata']['langgraph_node'] 来选择要流式的节点。并且我们可以使用 event['data'] 来获取每个事件的实际数据,而在这种情况下,数据是一个 AIMessageChunk.

1
2
3
4
5
6
7
8
9
node_to_stream = 'conversation'#定义流式传输的节点
config = {"configurable": {"thread_id": "5"}}
input_message = HumanMessage(content="为我介绍lol")
async for event in graph.astream_events({"messages": [input_message]}, config, version="v2"):
# 从特定节点获取聊天模型生成的 Token
#事件类型必须是 逐 token 流式输出(on_chat_model_stream)。
if event["event"] == "on_chat_model_stream" and event['metadata'].get('langgraph_node','') == node_to_stream:
data = event["data"]
print(data["chunk"].content, end="|")

event的常见类型

事件类型 (event) 触发时机与说明
on_chain_start 任意 Runnable(节点、子图或整个图)开始执行
on_chain_stream 节点/图在运行过程中 增量输出 chunk
on_chain_end 任意 Runnable 执行完成
on_chat_model_start ChatModel 开始调用
on_chat_model_stream ChatModel 逐 token 返回内容(打字机效果)
on_chat_model_end ChatModel 调用结束
on_tool_start Tool 开始调用
on_tool_end Tool 调用结束
on_retriever_start Retriever 开始检索
on_retriever_end Retriever 检索结束

Breakpoints 断点

human-in-the-loop(人工介入/人在回路)的三大动机:

1️⃣ Approval(审批)我们可以中断智能体,将当前状态呈现给用户,并让用户决定是否执行该操作。

2️⃣ Debugging(调试/回放)我们可以回退图形以重现或避免问题

3️⃣ Editing(编辑)AI 产出的中间结果不符合预期,但不想重跑整图,可以直接修改状态

我们将介绍 breakpoints,它提供了一种在特定步骤停止图的简单方法。

Breakpoints for human approval用于人类审批的断点

假设我们关注工具的使用:我们希望批准代理使用其任何工具。

我们所需要做的就是简单地用 interrupt_before=["tools"] 编译图形,其中 tools 是我们的工具节点。

这意味着在执行工具调用的节点 tools 之前,执行将被中断。

1
graph = builder.compile(interrupt_before=["tools"], checkpointer=memory)
image-20250720225640772
1
2
3
4
5
6
7
8
9
# Input
initial_input = {"messages": HumanMessage(content="2乘3")}

# Thread
thread = {"configurable": {"thread_id": "1"}}

# Run the graph until the first interruption
for event in graph.stream(initial_input, thread, stream_mode="values"):
event['messages'][-1].pretty_print()
1
2
3
4
5
6
7
8
9
10
================================ Human Message =================================

2乘3
================================== Ai Message ==================================
Tool Calls:
multiply (call_92a4bcf88d25476d925775)
Call ID: call_92a4bcf88d25476d925775
Args:
a: 2
b: 3

我们可以获取状态并查看要调用的下一个节点。这是一种很好的方法,可以发现图已被中断。

现在,我们将介绍一个很好的技巧。当我们使用 None 调用图时,它将直接从最后一个状态检查点继续!

breakpoints.jpg

状态快照(StateSnapshot)

  • 类型:专门用来存 一个时刻 的完整状态
  • 获取方式:
    • Graph.get_state()最新的 快照
    • Graph.get_state_history()所有 快照列表

继续/重跑图

  • Graph.stream(None, {"thread_id": "xxx"})
    • 不传新输入 None 表示 从当前最新状态继续跑
    • 也可回退到历史快照,再重跑(调试/回放)
1
2
for event in graph.stream(None, thread, stream_mode="values"):
event['messages'][-1].pretty_print()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
================================== Ai Message ==================================
Tool Calls:
multiply (call_92a4bcf88d25476d925775)
Call ID: call_92a4bcf88d25476d925775
Args:
a: 2
b: 3
================================= Tool Message =================================
Name: multiply

6
================================== Ai Message ==================================

2乘3的结果是6。

Editing graph state 编辑图状态

断点也是修改图状态的机会让我们在 assistant 节点之前为代理设置断点。

1
graph = builder.compile(interrupt_before=["assistant"], checkpointer=memory)
image-20250720230924672
1
2
3
4
5
6
7
8
9
# Input
initial_input = {"messages": "2乘3"}

# Thread
thread = {"configurable": {"thread_id": "1"}}

# Run the graph until the first interruption
for event in graph.stream(initial_input, thread, stream_mode="values"):
event['messages'][-1].pretty_print()
1
2
3
================================ Human Message =================================

2乘3

当状态中断时,我们可以直接应用状态更新

记住,对 messages 键的更新将使用 add_messages reducer:

**如果我们想覆盖现有的消息,可以提供带有* id *的消息。** 如果我们只想将消息添加到消息列表中,则可以传递未指定 id 的消息,如下所示。

1
2
3
4
graph.update_state(
thread,
{"messages": [HumanMessage(content="不要,实际上要3乘3!")]},
)
1
2
3
new_state = graph.get_state(thread).values
for m in new_state['messages']:
m.pretty_print()
1
2
3
4
5
6
================================ Human Message =================================

2乘3
================================ Human Message =================================

不要,实际上要3乘3!

现在,让我们继续进行我们的代理操作,只需传递 None 并允许其从当前状态继续执行。我们输出当前内容,然后继续执行剩余的节点。

1
2
for event in graph.stream(None, thread, stream_mode="values"):
event['messages'][-1].pretty_print()

Dynamic breakpoints 动态断点

你可以根据条件来实现它(从节点内部基于开发人员定义的逻辑)。您可以向用户说明其中断原因(通过将您想传递的内容发送到 NodeInterrupt)。

让我们创建一个图表,其中根据输入的长度会抛出一个 NodeInterrupt

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
from IPython.display import Image, display

from typing_extensions import TypedDict
from langgraph.checkpoint.memory import MemorySaver
from langgraph.errors import NodeInterrupt
from langgraph.graph import START, END, StateGraph

class State(TypedDict):
input: str

def step_1(state: State) -> State:
print("---Step 1---")
return state

def step_2(state: State) -> State:
# 如果输入字符串长度超过5个字符,我们可以选择抛出NodeInterrupt异常
if len(state['input']) > 5:
raise NodeInterrupt(f"收到长度超过5个字符的输入: {state['input']}")

print("---Step 2---")
return state

def step_3(state: State) -> State:
print("---Step 3---")
return state

builder = StateGraph(State)
builder.add_node("step_1", step_1)
builder.add_node("step_2", step_2)
builder.add_node("step_3", step_3)
builder.add_edge(START, "step_1")
builder.add_edge("step_1", "step_2")
builder.add_edge("step_2", "step_3")
builder.add_edge("step_3", END)

# Set up memory
memory = MemorySaver()

# Compile the graph with memory
graph = builder.compile(checkpointer=memory)

# View
display(Image(graph.get_graph().draw_mermaid_png()))
image-20250721222709712

让我们运行一个输入超过5个字符的图。

1
2
3
4
5
6
initial_input = {"input": "hello world"}
thread_config = {"configurable": {"thread_id": "1"}}

# Run the graph until the first interruption
for event in graph.stream(initial_input, thread_config, stream_mode="values"):
print(event)
1
2
3
{'input': 'hello world'}
---Step 1---
{'input': 'hello world'}

我们可以尝试从断点恢复图。但是,这只会重新运行相同的节点!除非状态发生变化,否则我们将一直卡在这里。

1
2
3
4
graph.update_state(
thread_config,
{"input": "hi"},
)

使用update_state更新状态

Time travel 时间旅行

现在,让我们通过查看、重播,甚至从过去的状态叉出,来展示 LangGraph 支持debug 的功能。

Browsing History 浏览历史

我们可以使用 get_state 来查看给定 thread_id 的图的 当前 状态!

1
graph.get_state({'configurable': {'thread_id': '1'}})

我们还可以浏览代理的状态历史。get_state_history 让我们能够获取所有先前步骤的状态。

1
2
3
all_states = [s for s in graph.get_state_history(thread)]
len(all_states)
print(all_states)
Replaying 回放
1
2
3
4
5
to_replay = all_states[-2]
to_replay.values
{'messages': [HumanMessage(content='2乘3', additional_kwargs={}, response_metadata={}, id='0676d9b5-cd59-4630-924d-b5c8d950e8d8')]}
to_replay.next
('assistant',)

我们还获取了配置,它告诉了我们 checkpoint_id 以及 thread_id

1
2
3
4
to_replay.config
{'configurable': {'thread_id': '1',
'checkpoint_ns': '',
'checkpoint_id': '1f066c0e-2ee2-66d5-8000-5dde78194aae'}}

要从这里重播,我们只需将配置传回给代理!图知道这个检查点已经执行过了。它只是从这个检查点重新播放!

1
2
for event in graph.stream(None, to_replay.config, stream_mode="values"):
event['messages'][-1].pretty_print()
Forking 分叉

如果我们想从相同的步骤运行,但使用不同的输入,该怎么办呢?这是分叉。

fig3.jpg

让我们修改此检查点的状态。我们可以直接使用提供的 checkpoint_id 来运行 update_state

请记住我们对 messages 的 reducer 是如何工作的:

  • 它会追加消息,除非我们提供了一个消息 ID。
  • 我们提供消息 ID 是为了覆盖消息,而不是将消息追加到状态中!

因此,要覆盖消息,我们只需提供消息 ID,而我们已有 to_fork.values["messages"].id

1
2
3
4
5
fork_config = graph.update_state(
to_fork.config,
{"messages": [HumanMessage(content='5乘3',
id=to_fork.values["messages"][0].id)]},
)

基础知识

message

LangChain 中的 HumanMessage 、 AIMessage 、 SystemMessage 和 ToolMessage 。这些消息类型是构建与语言模型(LLM)交互的核心组件,它们共同构成了一个完整的对话历史,帮助模型理解上下文并做出恰当的回应。

  1. SystemMessage

SystemMessage 的结构最简单,它只包含内容和类型。

数据结构 :

  • content (str): 消息的具体内容,即给 AI 的指令。
  • type (str): 固定为字符串 ‘system’ 。
  1. HumanMessage

HumanMessage 的结构也同样简单,代表用户的输入。

数据结构 :

  • content (str): 用户输入的文本。
  • type (str): 固定为字符串 ‘human’ 。
  1. AIMessage

AIMessage 的结构相对复杂,因为它不仅可以包含文本响应,还可以包含对工具的调用请求。

数据结构 :

  • content (str): AI 生成的文本响应。如果 AI 的回复是发起工具调用,此字段可以为空字符串。
  • tool_calls (list[dict], 可选): 一个字典列表,每个字典代表一个工具调用请求。这是支持“Function Calling”或“Tool Calling”功能的核心。其结构通常包含:
    • name (str): 要调用的工具名称。
    • args (dict): 调用工具时需要传入的参数。
    • id (str): 此次工具调用的唯一标识符,用于后续 ToolMessage 的关联。
  • type (str): 固定为字符串 ‘ai’ 。
  1. ToolMessage

ToolMessage 用于承载工具执行后的返回结果。

数据结构 :

  • content (str): 工具执行返回的结果。通常是一个字符串,比如 JSON 格式的字符串。
  • tool_call_id (str): 此次工具调用的唯一标识符, 必须 与之前 AIMessage 中 tool_calls 里的 id 相对应。这使得模型能够准确地将结果与请求匹配起来。
  • type (str): 固定为字符串 ‘tool’ 。