前言
本教程为langchain官方教程的学习记录
academy.langchain.com/enrollments
代码见[learn-rag-langchain/academy-langgraph
at main ·
zxj-2023/learn-rag-langchain](https://github.com/zxj-2023/learn-rag-langchain/tree/main/academy-langgraph)
module-1
route路由
在 LangGraph
中,route(路由)*的核心作用是* 根据当前状态动态决定“下一步应该执行哪个节点”
定义工具
1 2 3 4 5 6 7 8 9 10 11 12 13 def multiply(a: int, b: int) -> int: """Multiply a and b. Args: a: first int b: second int """ return a * b llm = ChatOpenAI( ) llm_with_tools = llm.bind_tools([multiply])
三引号字符串叫 docstring ,它会被 LangChain
拿来做两件事:
生成工具的 description(给大模型看的“说明书”)
没有它时,LangChain 只能退而求其次,把函数名 multiply
拼成一句 “multiply tool”
之类的默认描述。大模型拿到的工具列表里,这个工具就只有一个干巴巴的名字和参数列表,它可能猜不到这个工具到底是干什么的
给人类开发者自己看
IDE、文档生成器、静态检查工具都会读取这段文字,方便后期维护。
构建条件边
tool_calling_llm
是一个普通的计算节点 (node),负责把当前对话状态交给大模型,让大模型决定要不要调用工具;
真正完成“路由”动作的是 tools_condition
这个函数——它才是 LangGraph 里的 route(条件边) 。
tools_condition 是 作为LangGraph
预置的“默认路由函数” ,功能就是,如果大模型的最新回复中包含工具调用,就调用工具
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 # Node def tool_calling_llm(state: MessagesState): #调用大模型后将最新的消息返回 return {"messages": [llm_with_tools.invoke(state["messages"])]} # Build graph #MessagesState 是 LangGraph 官方预置 的一种 状态(State)定义 #这个状态维护了一个消息list,有新的消息就加进这个消息list builder = StateGraph(MessagesState) builder.add_node("tool_calling_llm", tool_calling_llm) builder.add_node("tools", ToolNode([multiply])) builder.add_edge(START, "tool_calling_llm") builder.add_conditional_edges( "tool_calling_llm", # 如果助手(结果)的最新消息是工具调用 -> tools_condition 路由到工具 # 如果助手(结果)的最新消息不是工具调用 -> tools_condition 路由到 END tools_condition, ) builder.add_edge("tools", END) graph = builder.compile() # View display(Image(graph.get_graph().draw_mermaid_png()))
image-20250719142543838
调用
1 2 3 4 5 from langchain_core.messages import HumanMessage messages = [HumanMessage(content="你好,2乘2是多少")] messages = graph.invoke({"messages": messages}) for m in messages['messages']: m.pretty_print()
输出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 ================================[1m Human Message [0m================================= 你好,2乘2是多少 ==================================[1m Ai Message [0m================================== Tool Calls: multiply (call_e026ceb409e247748786ad) Call ID: call_e026ceb409e247748786ad Args: a: 2 b: 2 =================================[1m Tool Message [0m================================= Name: multiply 4
agent代理
在 LangGraph 中,代理(Agent)
被明确定义为“一个由大语言模型(LLM)驱动的、能够循环决策并调用外部工具来完成任务的节点或子图” 。
Agent = LLM + 工具集合 + 提示模板 ,三者在 LangGraph
的状态化图结构里循环运行,直到满足停止条件。
ReAct
是一种流行的通用智能体架构,它结合了这些扩展,并整合了三个核心概念。
工具调用 :允许LLM根据需要选择和使用各种工具。
记忆 :使智能体能够保留和使用之前步骤的信息。
规划 :使LLM能够创建并遵循多步计划以实现目标。
即
act- 让模型调用特定工具
observe - 将工具输出传递回模型
reason -
让模型对工具输出进行推理,以决定下一步操作(例如,调用另一个工具或直接响应)
定义工具
1 2 3 4 5 6 7 tools = [add, multiply, divide]#工具函数具体内容省略 llm = ChatOpenAI() # 在这个 ipynb 文件中,我们将并行工具调用(parallel tool calling)设置为 false,因为数学计算通常是按顺序执行的,并且这次我们有3个可以进行数学计算的工具。 # OpenAI 模型为了效率,默认进行并行工具调用,详情请参阅 `https://python.langchain.com/docs/how_to/tool_calling_parallel/` # 不妨尝试一下,看看模型在处理数学方程式时的表现! llm_with_tools = llm.bind_tools(tools, parallel_tool_calls=False)
创建代理
定义节点
1 2 3 4 5 6 7 8 9 from langgraph.graph import MessagesState from langchain_core.messages import HumanMessage, SystemMessage # System message sys_msg = SystemMessage(content="你是一个乐于助人的助手,负责对一组输入执行算术运算。") # Node def assistant(state: MessagesState): return {"messages": [llm_with_tools.invoke([sys_msg] + state["messages"])]}
这一步相当于定义了系统提示词,然后在 assistant 这个节点里,通过
[sys_msg] + state[“messages”]
这部分代码,这个系统提示词被添加到了整个对话历史的最前面,然后一起发送给模型。这样一来,模型在生成回复时就会遵循这个系统提示词的指示。
与上一个不同的是,我们将 Tools 节点
回环 连接到
Assistant,从而形成一个回路。
在
assistant节点执行后,tools_condition检查模型的输出是否为工具调用。
如果是工具调用,则流程被导向至 tools 节点。
tools节点重新连接到assistant。
只要模型决定调用工具,此循环就会继续。
如果模型的响应不是工具调用,则流程被导向至结束,终止该过程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 # Graph builder = StateGraph(MessagesState) # Define nodes: these do the work builder.add_node("assistant", assistant) builder.add_node("tools", ToolNode(tools)) # Define edges: these determine how the control flow moves builder.add_edge(START, "assistant") builder.add_conditional_edges( "assistant", # If the latest message (result) from assistant is a tool call -> tools_condition routes to tools # If the latest message (result) from assistant is a not a tool call -> tools_condition routes to END tools_condition, ) builder.add_edge("tools", "assistant") react_graph = builder.compile() # Show display(Image(react_graph.get_graph(xray=True).draw_mermaid_png()))
image-20250719145948088
调用
1 2 3 4 5 messages = [HumanMessage(content="将3和4相加。将结果乘以2。再将结果除以5。")] messages = react_graph.invoke({"messages": messages}) for m in messages['messages']: m.pretty_print()
parallel_tool_calls=False的输出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 ================================[1m Human Message [0m================================= 将3和4相加。将结果乘以2。再将结果除以5。 ==================================[1m Ai Message [0m================================== Tool Calls: add (call_6c69898dba0342bfbb889e) Call ID: call_6c69898dba0342bfbb889e Args: a: 3 b: 4 =================================[1m Tool Message [0m================================= Name: add 7 ==================================[1m Ai Message [0m================================== Tool Calls: multiply (call_9940e7603ecf4a13a5f2fb) Call ID: call_9940e7603ecf4a13a5f2fb Args: a: 7 b: 2 =================================[1m Tool Message [0m================================= Name: multiply 14 ==================================[1m Ai Message [0m================================== Tool Calls: divide (call_d48fbbe205a14dfbaa3500) Call ID: call_d48fbbe205a14dfbaa3500 Args: a: 14 b: 5 =================================[1m Tool Message [0m================================= Name: divide 2.8 ==================================[1m Ai Message [0m================================== 最终结果是2.8。
parallel_tool_calls=True的输出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 ================================[1m Human Message [0m================================= 将3和4相加。将结果乘以2。再将结果除以5。 ==================================[1m Ai Message [0m================================== Tool Calls: add (call_e0c7d8e65f2c49e8aecd3e) Call ID: call_e0c7d8e65f2c49e8aecd3e Args: a: 3 b: 4 multiply (call_5bf824058e64489aaace91) Call ID: call_5bf824058e64489aaace91 Args: a: 7 b: 2 divide (call_36c34f69f6574028b28847) Call ID: call_36c34f69f6574028b28847 Args: a: 14 b: 5 =================================[1m Tool Message [0m================================= Name: add 7 =================================[1m Tool Message [0m================================= Name: multiply 14 =================================[1m Tool Message [0m================================= Name: divide 2.8 ==================================[1m Ai Message [0m================================== 最终结果是 **2.8**。
Agent memory代理记忆
使用chekpointer检查点的功能,最简单的检查点之一是
MemorySaver,这是一个用于图形状态的内存键值存储。
这个检查点就相当于把图的每一次“状态快照”持久化到外部存储 的机制。
1 2 3 from langgraph.checkpoint.memory import MemorySaver memory = MemorySaver() react_graph_memory = builder.compile(checkpointer=memory)
我们可以使用 记忆功能
来解决这个问题!LangGraph
可以使用检查点工具在每一步之后自动保存图的状态。这一内置的持久化层为我们提供了内存功能,使
LangGraph 能够从最后一次状态更新处继续。
1 2 3 4 5 6 7 8 9 10 # Specify a thread config = {"configurable": {"thread_id": "1"}} # Specify an input messages = [HumanMessage(content="Add 3 and 4.")] # Run messages = react_graph_memory.invoke({"messages": messages},config) for m in messages['messages']: m.pretty_print()
当我们使用内存时,我们需要指定一个 thread_id。这
thread_id 将存储我们的图形状态集合。
如下图,检查点在图的每一步写入状态,这些检查点保存在一个线程中
,我们可以使用 thread_id 在未来访问该线程
state.jpg
module-2
state-scheme状态模式
LangGraph 的 state-scheme(状态模式)
就是“一张蓝图 ”,它告诉框架:“在整个图的生命周期里,状态对象应该长什么样、每个字段怎样被更新、以及节点之间如何共享或隔离数据。”
state-scheme 用 TypedDict 或 Pydantic
BaseModel 来声明,定义了:
状态里有哪些字段(key)
每个字段的 Python 类型
可选 该字段的
reducer (更新规则)
TypedDict
基本定义
1 2 3 4 5 from typing_extensions import TypedDict class TypedDictState(TypedDict): foo: str bar: str
可增加像 Literal
这样的类型提示,使其更有价值
1 2 3 4 5 from typing import Literal class TypedDictState(TypedDict): name: str mood: Literal["happy","sad"]
在这里,mood 只能是 “happy” 或 “sad”。
加 reducer:让更新“可追加”而不覆盖
1 2 3 4 class MathState(TypedDict): question: str scratchpad: Annotated[list[str], add_message] # 新元素自动追加 answer: int
Annotated[list[str], add] 告诉 LangGraph:当节点返回
{"scratchpad": ["新步骤"]}
时,追加 到现有列表,而不是替换
示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 import random from IPython.display import Image, display from langgraph.graph import StateGraph, START, END #定义节点 def node_1(state): print("---Node 1---") return {"name": state['name'] + " is ... "} def node_2(state): print("---Node 2---") return {"mood": "happy"} def node_3(state): print("---Node 3---") return {"mood": "sad"} #路由函数 def decide_mood(state) -> Literal["node_2", "node_3"]: # Here, let's just do a 50 / 50 split between nodes 2, 3 if random.random() < 0.5: # 50% of the time, we return Node 2 return "node_2" # 50% of the time, we return Node 3 return "node_3" # Build graph builder = StateGraph(TypedDictState) builder.add_node("node_1", node_1) builder.add_node("node_2", node_2) builder.add_node("node_3", node_3) # Logic builder.add_edge(START, "node_1") builder.add_conditional_edges("node_1", decide_mood) builder.add_edge("node_2", END) builder.add_edge("node_3", END) # Add graph = builder.compile() # View display(Image(graph.get_graph().draw_mermaid_png()))
image-20250719154439415
因为我们的状态是一个字典,我们只需用一个字典调用图,以设置状态中
name 键的初始值。
1 graph.invoke({"name":"Lance"})
Dataclass数据类
python的dataclasses库提供了一种简洁的语法,用于创建主要用于存储数据的类。
1 2 3 4 5 6 from dataclasses import dataclass @dataclass class DataclassState: name: str mood: Literal["happy","sad"]
示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 def node_1(state): print("---Node 1---") return {"name": state.name + " is ... "} # Build graph builder = StateGraph(DataclassState) builder.add_node("node_1", node_1) builder.add_node("node_2", node_2) builder.add_node("node_3", node_3) # Logic builder.add_edge(START, "node_1") builder.add_conditional_edges("node_1", decide_mood) builder.add_edge("node_2", END) builder.add_edge("node_3", END) # Add graph = builder.compile() # View display(Image(graph.get_graph().draw_mermaid_png()))
要访问 dataclass 的键,我们只需修改在
node_1 中使用的下标即可:
我们使用 state.name 来表示 dataclass
状态,而不是使用 state["name"] 来表示上面的
TypedDict。
你会注意到一个有点奇怪的地方:在每个节点中,我们仍然返回一个字典来执行状态更新。
Dataclass 只是“描述”状态的形状,而真正在 LangGraph
的节点之间流动的依旧是「字典」 ,这是框架设计层面的约定
在这种情况下,dataclass 拥有键
name,因此我们可以通过从节点传递一个字典来更新它,就像在状态为
TypedDict 时所做的那样。
1 graph.invoke(DataclassState(name="Lance",mood="sad"))
我们通过 dataclass 来设置状态中每个键/通道的初始值!
State
Reducers 状态更新函数
Reducers
为我们指定了如何执行更新。它接收 旧状态 与
一次变更指令(action /
增量字段) ,返回全新的状态对象 ,整个过程中不能修改原有数据 。
我们可以使用 Annotated 类型来指定一个 reducer
函数。在这种情况下,让我们将每个节点返回的值附加到结果中,而不是覆盖它们。
1 2 3 4 5 from operator import add from typing import Annotated class State(TypedDict): foo: Annotated[list[int], add]
我们只需要一个可以执行此操作的缩减器:operator.add 是
Python 内置 operator 模块中的一个函数。当 operator.add
应用于列表时,它执行列表连接。
Custom Reducers 自定义
Reducers
我们同样可以自定义reducers函数,解决一些特殊情况,比如,如下可以解决传入参数为none的情况
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 def reduce_list(left: list | None, right: list | None) -> list: """安全地合并两个列表,处理其中一个或两个输入可能为 None 的情况。 参数: left (list | None): 要合并的第一个列表,或 None。 right (list | None): 要合并的第二个列表,或 None。 返回: list: 一个包含两个输入列表所有元素的新列表。 如果输入为 None,则将其视为空列表。 """ if not left: left = [] if not right: right = [] return left + right class DefaultState(TypedDict): foo: Annotated[list[int], add] class CustomReducerState(TypedDict): foo: Annotated[list[int], reduce_list]
MessagesState
我可以使用内置的 reducer add_messages
来处理状态中的消息
而MessagesState 内置了一个
messages 键 它还为该键内置了一个
add_messages 合并器,这两个是等价的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 from typing import Annotated from langgraph.graph import MessagesState from langchain_core.messages import AnyMessage from langgraph.graph.message import add_messages # 定义一个自定义的 TypedDict,其中包含一个带有 add_messages reducer 的消息列表。 class CustomMessagesState(TypedDict): messages: Annotated[list[AnyMessage], add_messages] added_key_1: str added_key_2: str # etc # 使用 MessagesState ,它包含带有 add_messages reducer 的 messages 键。 class ExtendedMessagesState(MessagesState): # 添加除 messages 之外所需的任何键, messages 是预构建的。 added_key_1: str added_key_2: str # etc
在使用 add_messages reducer
时,让我们展示一些有用的技巧。
重写(Re-writing)
如果我们传递的消息与 messages 列表中已有的消息具有相同的
ID,则该消息将被覆盖!
1 2 3 4 5 6 7 8 9 10 # Initial state initial_messages = [AIMessage(content="Hello! How can I assist you?", name="Model", id="1"), HumanMessage(content="I'm looking for information on marine biology.", name="Lance", id="2") ] # New message to add new_message = HumanMessage(content="I'm looking for information on whales, specifically", name="Lance", id="2") # Test add_messages(initial_messages , new_message)
1 2 [AIMessage(content='Hello! How can I assist you?', name='Model', id='1'), HumanMessage(content="I'm looking for information on whales, specifically", name='Lance', id='2')]
删除(Removal)
add_messages 也 同样支持删除 。为此,我们简单地使用
RemoveMessage
来自 langchain_core。
1 2 3 4 5 6 7 8 9 10 11 from langchain_core.messages import RemoveMessage # Message list messages = [AIMessage("Hi.", name="Bot", id="1")] messages.append(HumanMessage("Hi.", name="Lance", id="2")) messages.append(AIMessage("So you said you were researching ocean mammals?", name="Bot", id="3")) messages.append(HumanMessage("Yes, I know about whales. But what others should I learn about?", name="Lance", id="4")) # Isolate messages to delete delete_messages = [RemoveMessage(id=m.id) for m in messages[:-2]] print(delete_messages)
Multiple Schemas
多种状态
Private State
私有状态
首先,让我们讨论在节点之间传递 private
state
的情况。这对于图的中间计算逻辑中需要的任何内容都很有用,但与图的整体输入或输出无关。
示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 from typing_extensions import TypedDict from IPython.display import Image, display from langgraph.graph import StateGraph, START, END class OverallState(TypedDict): foo: int class PrivateState(TypedDict): baz: int def node_1(state: OverallState) -> PrivateState: print("---Node 1---") return {"baz": state['foo'] + 1} def node_2(state: PrivateState) -> OverallState: print("---Node 2---") return {"foo": state['baz'] + 1} # Build graph builder = StateGraph(OverallState) builder.add_node("node_1", node_1) builder.add_node("node_2", node_2) # Logic builder.add_edge(START, "node_1") builder.add_edge("node_1", "node_2") builder.add_edge("node_2", END) # Add graph = builder.compile() # View display(Image(graph.get_graph().draw_mermaid_png()))
image-20250719171509917
我们将定义一个 OverallState 和一个
PrivateState。node_2 使用
PrivateState 作为输入,但输出写入到
OverallState。
baz 仅包含在 PrivateState
中。因此,我们可以看到 baz 被排除在图形输出之外,因为它不在
OverallState 中。
在 LangGraph 中,Input / Output Schema
就是“图的对外接口协议 ”:调用者只能按 Input
Schema 传参;图运行完后,只吐出 Output Schema 规定的字段。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 from langgraph.graph import StateGraph, START, END from typing_extensions import TypedDict # 定义输入的模式 class InputState(TypedDict): question: str # 定义输出的模式 class OutputState(TypedDict): answer: str # 定义整体模式,结合输入和输出 class OverallState(InputState, OutputState): pass # 定义处理输入并生成答案的节点 def answer_node(state: InputState): # 示例答案和额外键 return {"answer": "bye", "question": state["question"]} # 构建图,并指定输入和输出模式 builder = StateGraph(OverallState, input_schema=InputState, output_schema=OutputState) builder.add_node(answer_node) # 添加答案节点 builder.add_edge(START, "answer_node") # 定义起始边 builder.add_edge("answer_node", END) # 定义结束边 graph = builder.compile() # 编译图 # 使用输入调用图并打印结果 print(graph.invoke({"question": "hi"}))
输出
在这里,input / output
模式对图的输入和输出上允许的键进行 过滤 。可以看到
output 模式将输出限制为仅包含 answer 键。
Filtering
and trimming messages 筛选和精简消息
如果我们在处理长时间对话时不够小心,会导致高令牌使用量和延迟,因为我们传递给模型的是一系列不断增加的消息。所以要进行筛选和精简消息。
简化器(Reducer)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 from langchain_core.messages import RemoveMessage # Nodes def filter_messages(state: MessagesState): # 删除除最近两条消息外的所有消息 delete_messages = [RemoveMessage(id=m.id) for m in state["messages"][:-2]] return {"messages": delete_messages} def chat_model_node(state: MessagesState): return {"messages": [llm.invoke(state["messages"])]} # Build graph builder = StateGraph(MessagesState) builder.add_node("filter", filter_messages) builder.add_node("chat_model", chat_model_node) builder.add_edge(START, "filter") builder.add_edge("filter", "chat_model") builder.add_edge("chat_model", END) graph = builder.compile() # View display(Image(graph.get_graph().draw_mermaid_png()))
image-20250719203204234
筛选消息(Filtering
messages)
如果你不需要或不希望修改图状态,可以直接过滤传递给聊天模型的消息。
1 2 3 4 5 6 7 8 9 10 11 12 13 # Node def chat_model_node(state: MessagesState): return {"messages": [llm.invoke(state["messages"][-1:])]} # Build graph builder = StateGraph(MessagesState) builder.add_node("chat_model", chat_model_node) builder.add_edge(START, "chat_model") builder.add_edge("chat_model", END) graph = builder.compile() # View display(Image(graph.get_graph().draw_mermaid_png()))
例如,只需传递一个过滤后的列表:llm.invoke(messages[-1:])
给模型。
状态包含了所有消息。但这里模型调用仅使用最后一条消息
裁剪消息(Trim
messages)
另一种方法是根据设定一定数量的tokens进行 trim
messages 。在把对话历史发给大模型之前,按 token 预算
把超长消息列表“剪”到合适长度。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 from langchain_core.messages import trim_messages # Node def chat_model_node(state: MessagesState): # 使用 trim_messages 函数修剪消息列表 # max_tokens: 限制消息的最大令牌数 # strategy: 修剪策略,这里是“last”,表示保留最新的消息 # token_counter: 用于计算令牌数的模型实例 # allow_partial: 是否允许部分修剪 messages = trim_messages( state["messages"], max_tokens=100, strategy="last", token_counter= ChatOpenAI( model="qwen-plus-2025-04-28", api_key="sk-", base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"), allow_partial=False, ) # 调用语言模型(llm)处理修剪后的消息,并返回结果 return {"messages": [llm.invoke(messages)]} # Build graph builder = StateGraph(MessagesState) builder.add_node("chat_model", chat_model_node) builder.add_edge(START, "chat_model") builder.add_edge("chat_model", END) graph = builder.compile() # View display(Image(graph.get_graph().draw_mermaid_png()))
Chatbot
with message summarization 带有消息总结功能的聊天机器人
与其仅仅修剪或过滤消息,我们将展示如何使用大型语言模型(LLMs)来生成对话的实时摘要。
这使我们能够保留整个对话的压缩表示,而不仅仅是通过修剪或过滤将其移除。
我们将为该聊天机器人配备记忆功能,支持长时间对话,同时不会产生高昂的
token 成本或延迟。
定义总结状态
1 2 3 from langgraph.graph import MessagesState class State(MessagesState): summary: str
除了内置的 messages
键之外,我们现在还将包含一个自定义键(summary)。
定义LLM节点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 from langchain_core.messages import SystemMessage, HumanMessage, RemoveMessage # 定义调用模型的逻辑 def call_model(state: State): # 获取摘要(如果存在) summary = state.get("summary", "") # 如果有摘要,则添加它 if summary: # 将摘要添加到系统消息中 system_message = f"先前对话的摘要:{summary}" # 将摘要附加到任何较新的消息中 messages = [SystemMessage(content=system_message)] + state["messages"] else: messages = state["messages"] response = model.invoke(messages) return {"messages": response}
我们将定义一个节点来调用我们的LLM ,如果存在摘要,则将其纳入提示中。
当 call_model 函数返回 {“messages”: response} 时,它是在告诉
langgraph :“请用 response (即模型的新输出)来更新 State 对象中
messages 键对应的值。” langgraph 会将这个新消息追加到 messages
列表中,从而保持了对话历史的连续性
定义摘要节点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 def summarize_conversation(state: State): # 首先,我们获取任何现有的摘要 summary = state.get("summary", "") # 创建我们的摘要提示 if summary: # 摘要已存在 summary_message = ( f"这是迄今为止对话的摘要:{summary}\n\n" "请结合以上新消息扩展摘要:" ) else: summary_message = "创建以上对话的摘要:" # 将提示添加到我们的历史记录中 messages = state["messages"] + [HumanMessage(content=summary_message)] response = model.invoke(messages) # 删除除最近2条消息外的所有消息 delete_messages = [RemoveMessage(id=m.id) for m in state["messages"][:-2]] return {"summary": response.content, "messages": delete_messages}
我们将定义一个节点来生成摘要 。请注意,这里我们将使用
RemoveMessage 在生成摘要后过滤我们的状态。
定义路由函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 from langgraph.graph import END # 决定是结束对话还是总结对话 def should_continue(state: State): """返回要执行的下一个节点。""" messages = state["messages"] # 如果消息超过六条,那么我们总结对话 if len(messages) > 6: return "summarize_conversation" # 否则我们就可以结束了 return END
我们将添加一个条件边,以根据对话长度确定是否生成摘要。
在 langgraph 中, Command
是一个特殊的类型,用于指导图形(graph)决定接下来应该执行哪个节点。
您可以把它看作是给图形下达的一个“命令”。
添加内存并编译图
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 from IPython.display import Image, display from langgraph.checkpoint.memory import MemorySaver from langgraph.graph import StateGraph, START # Define a new graph workflow = StateGraph(State) workflow.add_node("conversation", call_model) workflow.add_node("summarize_conversation",summarize_conversation) # Set the entrypoint as conversation workflow.add_edge(START, "conversation") workflow.add_conditional_edges("conversation", should_continue) workflow.add_edge("summarize_conversation", END) # Compile memory = MemorySaver() graph = workflow.compile(checkpointer=memory) display(Image(graph.get_graph().draw_mermaid_png()))
使用线程调用
1 2 3 4 5 6 config = {"configurable": {"thread_id": "2"}} input_message = HumanMessage(content="我喜欢玩lol,你知道这个游戏吗") output = graph.invoke({"messages": [input_message]}, config) for m in output['messages'][-1:]: m.pretty_print()
当对话大于6,可生成概要
1 graph.get_state(config).values.get("summary","")
Chatbot
with message summarization & external DB
memory 具有消息总结和外部数据库记忆的聊天机器人
使用数据库
SqliteSaver 是 LangGraph 提供的一个
轻量级状态持久化工具 ,它将图的运行状态(即
checkpoint)保存到本地的 SQLite
数据库中,使得你可以在程序中断或重启后恢复执行上下文 ,特别适合本地开发、实验性项目或中小规模应用。
如果我们提供 “:memory:” ,它将创建一个内存中的 SQLite 数据库。
1 2 3 import sqlite3 # In memory conn = sqlite3.connect(":memory:", check_same_thread = False)
如果我们提供一个 db 路径,那么它将为我们创建一个数据库!
1 2 3 4 5 #在本地创建一个目录 state_db,并尝试从 GitHub 下载一个名为 example.db 的 SQLite 数据库文件 !mkdir -p state_db && [ ! -f state_db/example.db ] && wget -P state_db https://github.com/langchain-ai/langchain-academy/raw/main/module-2/state_db/example.db db_path = "state_db/example.db" conn = sqlite3.connect(db_path, check_same_thread=False)
定义checkpoint
1 2 from langgraph.checkpoint.sqlite import SqliteSaver memory = SqliteSaver(conn)
像上一个形式编译图
让我们确认一下我们的状态是否已保存到本地。
1 2 3 config = {"configurable": {"thread_id": "1"}} graph_state = graph.get_state(config) graph_state
使用像 Sqlite 这样的数据库意味着状态会被持久化!
module-3
Streaming 流式传输
现在,让我们来谈谈 流式传输我们的图状态
的方法。.stream 和 .astream
是用于流式返回结果的同步和异步方法。
values:这将在每个节点被调用后流式传输图的完整状态。
updates:这将在每个节点被调用后流式传输图的状态更新。
values_vs_updates.png
stream_mode=“updates”
1 2 3 4 5 6 # Create a thread config = {"configurable": {"thread_id": "1"}} # Start conversation for chunk in graph.stream({"messages": [HumanMessage(content="你好我是zxj")]}, config, stream_mode="updates"): print(chunk)
1 {'conversation': {'messages': AIMessage(content='你好,zxj!很高兴认识你~有什么我可以帮你的吗?😊', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 16, 'prompt_tokens': 576, 'total_tokens': 592, 'completion_tokens_details': None, 'prompt_tokens_details': None}, 'model_name': 'qwen-plus-2025-04-28', 'system_fingerprint': None, 'id': 'chatcmpl-891471ae-2fe8-9b3d-b5f7-f4fcd55a4e16', 'service_tier': None, 'finish_reason': 'stop', 'logprobs': None}, id='run--f36409f3-af43-4e9b-8a46-39646ad7c106-0', usage_metadata={'input_tokens': 576, 'output_tokens': 16, 'total_tokens': 592, 'input_token_details': {}, 'output_token_details': {}})}}
让我们来看一下 stream_mode="updates"。
因为我们使用 updates
进行流式传输,所以只有在图中的节点运行后,我们才能看到状态的更新。每个
chunk 是一个字典,以 node_name
为键,更新后的状态为值。
1 2 3 # Start conversation for chunk in graph.stream({"messages": [HumanMessage(content="你好我是zxj")]}, config, stream_mode="updates"): chunk['conversation']["messages"].pretty_print()
现在我们直接打印状态更新。
1 2 3 ==================================[1m Ai Message [0m================================== 你好呀,zxj!再次见到你真高兴~😊 有什么我可以帮忙的吗?
stream_mode=“values”
1 2 3 4 5 6 7 8 9 # Start conversation, again config = {"configurable": {"thread_id": "2"}} # Start conversation input_message = HumanMessage(content="你好我是zxj") for event in graph.stream({"messages": [input_message]}, config, stream_mode="values"): for m in event['messages']: m.pretty_print() print("---"*25)
现在,我们可以看到 stream_mode="values".这是在
conversation 节点被调用后,图的整个状态。
1 2 3 4 5 6 7 8 9 10 11 ================================[1m Human Message [0m================================= 你好我是zxj --------------------------------------------------------------------------- ================================[1m Human Message [0m================================= 你好我是zxj ==================================[1m Ai Message [0m================================== 你好,zxj!有什么我可以帮你的吗?😊 ---------------------------------------------------------------------------
Streaming tokens
流式传输令牌
在 LangGraph 中,“流式传输令牌(Streaming
tokens)”指的是在节点内部的大模型(LLM)生成过程中,逐 token
地将中间结果实时推送到客户端 的能力。实现这一能力的核心方法是
astream_events,它会以事件流的形式暴露整个执行过程中的所有细节,包括每一次
LLM 调用产生的 token。
每个事件是一个包含几个键的字典:
event:这是正在发出的事件的类型。
name:这是事件的名称。
data:这是与事件相关联的数据。
metadata:包含
langgraph_node,即发出事件的节点。
要点是,图表中聊天模型的令牌具有 on_chat_model_stream
类型。我们可以使用 event['metadata']['langgraph_node']
来选择要流式的节点。并且我们可以使用 event['data']
来获取每个事件的实际数据,而在这种情况下,数据是一个
AIMessageChunk.
1 2 3 4 5 6 7 8 9 node_to_stream = 'conversation'#定义流式传输的节点 config = {"configurable": {"thread_id": "5"}} input_message = HumanMessage(content="为我介绍lol") async for event in graph.astream_events({"messages": [input_message]}, config, version="v2"): # 从特定节点获取聊天模型生成的 Token #事件类型必须是 逐 token 流式输出(on_chat_model_stream)。 if event["event"] == "on_chat_model_stream" and event['metadata'].get('langgraph_node','') == node_to_stream: data = event["data"] print(data["chunk"].content, end="|")
event的常见类型
事件类型 (event)
触发时机与说明
on_chain_start
任意 Runnable(节点、子图或整个图)开始执行
on_chain_stream
节点/图在运行过程中 增量输出 chunk
on_chain_end
任意 Runnable 执行完成
on_chat_model_start
ChatModel 开始调用
on_chat_model_stream
ChatModel 逐 token 返回内容(打字机效果)
on_chat_model_end
ChatModel 调用结束
on_tool_start
Tool 开始调用
on_tool_end
Tool 调用结束
on_retriever_start
Retriever 开始检索
on_retriever_end
Retriever 检索结束
Breakpoints 断点
human-in-the-loop(人工介入/人在回路)的三大动机:
1️⃣
Approval(审批)我们可以中断智能体,将当前状态呈现给用户,并让用户决定是否执行该操作。
2️⃣ Debugging(调试/回放)我们可以回退图形以重现或避免问题
3️⃣ Editing(编辑)AI
产出的中间结果不符合预期,但不想重跑整图,可以直接修改状态
我们将介绍 breakpoints ,它提供了一种在特定步骤停止图的简单方法。
Breakpoints
for human approval 用于人类审批的断点
假设我们关注工具的使用:我们希望批准代理使用其任何工具。
我们所需要做的就是简单地用 interrupt_before=["tools"]
编译图形,其中 tools 是我们的工具节点。
这意味着在执行工具调用的节点 tools
之前,执行将被中断。
1 graph = builder.compile(interrupt_before=["tools"], checkpointer=memory)
image-20250720225640772
1 2 3 4 5 6 7 8 9 # Input initial_input = {"messages": HumanMessage(content="2乘3")} # Thread thread = {"configurable": {"thread_id": "1"}} # Run the graph until the first interruption for event in graph.stream(initial_input, thread, stream_mode="values"): event['messages'][-1].pretty_print()
1 2 3 4 5 6 7 8 9 10 ================================[1m Human Message [0m================================= 2乘3 ==================================[1m Ai Message [0m================================== Tool Calls: multiply (call_92a4bcf88d25476d925775) Call ID: call_92a4bcf88d25476d925775 Args: a: 2 b: 3
我们可以获取状态并查看要调用的下一个节点。这是一种很好的方法,可以发现图已被中断。
现在,我们将介绍一个很好的技巧。当我们使用 None
调用图时,它将直接从最后一个状态检查点继续!
breakpoints.jpg
状态快照(StateSnapshot)
类型:专门用来存 一个时刻 的完整状态
获取方式:
Graph.get_state() → 最新的 快照
Graph.get_state_history() → 所有
快照列表
继续/重跑图
Graph.stream(None, {"thread_id": "xxx"})
不传新输入 None 表示
从当前最新状态继续跑
也可回退到历史快照,再重跑(调试/回放)
1 2 for event in graph.stream(None, thread, stream_mode="values"): event['messages'][-1].pretty_print()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 ==================================[1m Ai Message [0m================================== Tool Calls: multiply (call_92a4bcf88d25476d925775) Call ID: call_92a4bcf88d25476d925775 Args: a: 2 b: 3 =================================[1m Tool Message [0m================================= Name: multiply 6 ==================================[1m Ai Message [0m================================== 2乘3的结果是6。
Editing graph state
编辑图状态
断点也是修改图状态的机会 让我们在
assistant 节点之前为代理设置断点。
1 graph = builder.compile(interrupt_before=["assistant"], checkpointer=memory)
image-20250720230924672
1 2 3 4 5 6 7 8 9 # Input initial_input = {"messages": "2乘3"} # Thread thread = {"configurable": {"thread_id": "1"}} # Run the graph until the first interruption for event in graph.stream(initial_input, thread, stream_mode="values"): event['messages'][-1].pretty_print()
1 2 3 ================================[1m Human Message [0m================================= 2乘3
当状态中断时,我们可以直接应用状态更新
记住,对 messages 键的更新将使用
add_messages reducer:
**如果我们想覆盖现有的消息,可以提供带有* id
*的消息。** 如果我们只想将消息添加到消息列表中,则可以传递未指定
id 的消息,如下所示。
1 2 3 4 graph.update_state( thread, {"messages": [HumanMessage(content="不要,实际上要3乘3!")]}, )
1 2 3 new_state = graph.get_state(thread).values for m in new_state['messages']: m.pretty_print()
1 2 3 4 5 6 ================================[1m Human Message [0m================================= 2乘3 ================================[1m Human Message [0m================================= 不要,实际上要3乘3!
现在,让我们继续进行我们的代理操作,只需传递 None
并允许其从当前状态继续执行。我们输出当前内容,然后继续执行剩余的节点。
1 2 for event in graph.stream(None, thread, stream_mode="values"): event['messages'][-1].pretty_print()
Dynamic breakpoints
动态断点
你可以根据条件来实现它(从节点内部基于开发人员定义的逻辑)。您可以向用户说明其中断原因(通过将您想传递的内容发送到
NodeInterrupt)。
让我们创建一个图表,其中根据输入的长度会抛出一个
NodeInterrupt。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 from IPython.display import Image, display from typing_extensions import TypedDict from langgraph.checkpoint.memory import MemorySaver from langgraph.errors import NodeInterrupt from langgraph.graph import START, END, StateGraph class State(TypedDict): input: str def step_1(state: State) -> State: print("---Step 1---") return state def step_2(state: State) -> State: # 如果输入字符串长度超过5个字符,我们可以选择抛出NodeInterrupt异常 if len(state['input']) > 5: raise NodeInterrupt(f"收到长度超过5个字符的输入: {state['input']}") print("---Step 2---") return state def step_3(state: State) -> State: print("---Step 3---") return state builder = StateGraph(State) builder.add_node("step_1", step_1) builder.add_node("step_2", step_2) builder.add_node("step_3", step_3) builder.add_edge(START, "step_1") builder.add_edge("step_1", "step_2") builder.add_edge("step_2", "step_3") builder.add_edge("step_3", END) # Set up memory memory = MemorySaver() # Compile the graph with memory graph = builder.compile(checkpointer=memory) # View display(Image(graph.get_graph().draw_mermaid_png()))
image-20250721222709712
让我们运行一个输入超过5个字符的图。
1 2 3 4 5 6 initial_input = {"input": "hello world"} thread_config = {"configurable": {"thread_id": "1"}} # Run the graph until the first interruption for event in graph.stream(initial_input, thread_config, stream_mode="values"): print(event)
1 2 3 {'input': 'hello world'} ---Step 1--- {'input': 'hello world'}
我们可以尝试从断点恢复图。但是,这只会重新运行相同的节点!除非状态发生变化,否则我们将一直卡在这里。
1 2 3 4 graph.update_state( thread_config, {"input": "hi"}, )
使用update_state更新状态
Time travel 时间旅行
现在,让我们通过查看、重播,甚至从过去的状态叉出,来展示 LangGraph 支持debug
的功能。
Browsing History
浏览历史
我们可以使用 get_state 来查看给定 thread_id
的图的 当前 状态!
1 graph.get_state({'configurable': {'thread_id': '1'}})
我们还可以浏览代理的状态历史。get_state_history
让我们能够获取所有先前步骤的状态。
1 2 3 all_states = [s for s in graph.get_state_history(thread)] len(all_states) print(all_states)
Replaying
回放
1 2 3 4 5 to_replay = all_states[-2] to_replay.values {'messages': [HumanMessage(content='2乘3', additional_kwargs={}, response_metadata={}, id='0676d9b5-cd59-4630-924d-b5c8d950e8d8')]} to_replay.next ('assistant',)
我们还获取了配置,它告诉了我们 checkpoint_id 以及
thread_id。
1 2 3 4 to_replay.config {'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f066c0e-2ee2-66d5-8000-5dde78194aae'}}
要从这里重播,我们只需将配置传回给代理!图知道这个检查点已经执行过了。它只是从这个检查点重新播放!
1 2 for event in graph.stream(None, to_replay.config, stream_mode="values"): event['messages'][-1].pretty_print()
Forking 分叉
如果我们想从相同的步骤运行,但使用不同的输入,该怎么办呢?这是分叉。
fig3.jpg
让我们修改此检查点的状态。我们可以直接使用提供的
checkpoint_id 来运行 update_state。
请记住我们对 messages 的 reducer 是如何工作的:
它会追加消息,除非我们提供了一个消息 ID。
我们提供消息 ID 是为了覆盖消息,而不是将消息追加到状态中!
因此,要覆盖消息,我们只需提供消息 ID,而我们已有
to_fork.values["messages"].id。
1 2 3 4 5 fork_config = graph.update_state( to_fork.config, {"messages": [HumanMessage(content='5乘3', id=to_fork.values["messages"][0].id)]}, )
基础知识
message
LangChain 中的 HumanMessage 、 AIMessage 、 SystemMessage 和
ToolMessage
。这些消息类型是构建与语言模型(LLM)交互的核心组件,它们共同构成了一个完整的对话历史,帮助模型理解上下文并做出恰当的回应。
SystemMessage
SystemMessage 的结构最简单,它只包含内容和类型。
数据结构 :
content (str): 消息的具体内容,即给 AI 的指令。
type (str): 固定为字符串 ‘system’ 。
HumanMessage
HumanMessage 的结构也同样简单,代表用户的输入。
数据结构 :
content (str): 用户输入的文本。
type (str): 固定为字符串 ‘human’ 。
AIMessage
AIMessage
的结构相对复杂,因为它不仅可以包含文本响应,还可以包含对工具的调用请求。
数据结构 :
content (str): AI 生成的文本响应。如果 AI
的回复是发起工具调用,此字段可以为空字符串。
tool_calls (list[dict], 可选):
一个字典列表,每个字典代表一个工具调用请求。这是支持“Function
Calling”或“Tool Calling”功能的核心。其结构通常包含:
name (str): 要调用的工具名称。
args (dict): 调用工具时需要传入的参数。
id (str): 此次工具调用的唯一标识符,用于后续 ToolMessage
的关联。
type (str): 固定为字符串 ‘ai’ 。
ToolMessage
ToolMessage 用于承载工具执行后的返回结果。
数据结构 :
content (str): 工具执行返回的结果。通常是一个字符串,比如 JSON
格式的字符串。
tool_call_id (str): 此次工具调用的唯一标识符, 必须 与之前 AIMessage
中 tool_calls 里的 id
相对应。这使得模型能够准确地将结果与请求匹配起来。
type (str): 固定为字符串 ‘tool’ 。