前言

本教程为langchain官方教程的学习记录

academy.langchain.com/enrollments

代码见learn-rag-langchain/academy-langgraph at main · zxj-2023/learn-rag-langchain

module-1

route路由

在 LangGraph 中,route(路由)\的核心作用是*根据当前状态动态决定“下一步应该执行哪个节点”*

定义工具
1
2
3
4
5
6
7
8
9
10
11
12
13
def multiply(a: int, b: int) -> int:
"""Multiply a and b.

Args:
a: first int
b: second int
"""
return a * b

llm = ChatOpenAI(

)
llm_with_tools = llm.bind_tools([multiply])

三引号字符串叫 docstring,它会被 LangChain 拿来做两件事:

  1. 生成工具的 description(给大模型看的“说明书”)
    没有它时,LangChain 只能退而求其次,把函数名 multiply 拼成一句 “multiply tool” 之类的默认描述。大模型拿到的工具列表里,这个工具就只有一个干巴巴的名字和参数列表,它可能猜不到这个工具到底是干什么的
  2. 给人类开发者自己看
    IDE、文档生成器、静态检查工具都会读取这段文字,方便后期维护。
构建条件边

tool_calling_llm 是一个普通的计算节点(node),负责把当前对话状态交给大模型,让大模型决定要不要调用工具;

真正完成“路由”动作的是 tools_condition 这个函数——它才是 LangGraph 里的 route(条件边)

tools_condition 是 作为LangGraph 预置的“默认路由函数”,功能就是,如果大模型的最新回复中包含工具调用,就调用工具

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# Node
def tool_calling_llm(state: MessagesState):
#调用大模型后将最新的消息返回
return {"messages": [llm_with_tools.invoke(state["messages"])]}

# Build graph
#MessagesState 是 LangGraph 官方预置 的一种 状态(State)定义
#这个状态维护了一个消息list,有新的消息就加进这个消息list
builder = StateGraph(MessagesState)
builder.add_node("tool_calling_llm", tool_calling_llm)
builder.add_node("tools", ToolNode([multiply]))
builder.add_edge(START, "tool_calling_llm")
builder.add_conditional_edges(
"tool_calling_llm",
# 如果助手(结果)的最新消息是工具调用 -> tools_condition 路由到工具
# 如果助手(结果)的最新消息不是工具调用 -> tools_condition 路由到 END
tools_condition,
)
builder.add_edge("tools", END)
graph = builder.compile()

# View
display(Image(graph.get_graph().draw_mermaid_png()))

image-20250719142543838

调用
1
2
3
4
5
from langchain_core.messages import HumanMessage
messages = [HumanMessage(content="你好,2乘2是多少")]
messages = graph.invoke({"messages": messages})
for m in messages['messages']:
m.pretty_print()

输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
================================ Human Message =================================

你好,2乘2是多少
================================== Ai Message ==================================
Tool Calls:
multiply (call_e026ceb409e247748786ad)
Call ID: call_e026ceb409e247748786ad
Args:
a: 2
b: 2
================================= Tool Message =================================
Name: multiply

4

agent代理

在 LangGraph 中,代理(Agent) 被明确定义为“一个由大语言模型(LLM)驱动的、能够循环决策并调用外部工具来完成任务的节点或子图”

Agent = LLM + 工具集合 + 提示模板,三者在 LangGraph 的状态化图结构里循环运行,直到满足停止条件。

ReAct 是一种流行的通用智能体架构,它结合了这些扩展,并整合了三个核心概念。

  1. 工具调用:允许LLM根据需要选择和使用各种工具。
  2. 记忆:使智能体能够保留和使用之前步骤的信息。
  3. 规划:使LLM能够创建并遵循多步计划以实现目标。

act- 让模型调用特定工具

observe - 将工具输出传递回模型

reason - 让模型对工具输出进行推理,以决定下一步操作(例如,调用另一个工具或直接响应)

定义工具
1
2
3
4
5
6
7
tools = [add, multiply, divide]#工具函数具体内容省略
llm = ChatOpenAI()

# 在这个 ipynb 文件中,我们将并行工具调用(parallel tool calling)设置为 false,因为数学计算通常是按顺序执行的,并且这次我们有3个可以进行数学计算的工具。
# OpenAI 模型为了效率,默认进行并行工具调用,详情请参阅 `https://python.langchain.com/docs/how_to/tool_calling_parallel/`
# 不妨尝试一下,看看模型在处理数学方程式时的表现!
llm_with_tools = llm.bind_tools(tools, parallel_tool_calls=False)
创建代理

定义节点

1
2
3
4
5
6
7
8
9
from langgraph.graph import MessagesState
from langchain_core.messages import HumanMessage, SystemMessage

# System message
sys_msg = SystemMessage(content="你是一个乐于助人的助手,负责对一组输入执行算术运算。")

# Node
def assistant(state: MessagesState):
return {"messages": [llm_with_tools.invoke([sys_msg] + state["messages"])]}

这一步相当于定义了系统提示词,然后在 assistant 这个节点里,通过 [sys_msg] + state[“messages”] 这部分代码,这个系统提示词被添加到了整个对话历史的最前面,然后一起发送给模型。这样一来,模型在生成回复时就会遵循这个系统提示词的指示。

与上一个不同的是,我们将 Tools 节点 回环 连接到 Assistant,从而形成一个回路。

在 assistant节点执行后,tools_condition检查模型的输出是否为工具调用。

如果是工具调用,则流程被导向至 tools 节点。

tools节点重新连接到assistant 只要模型决定调用工具,此循环就会继续。

如果模型的响应不是工具调用,则流程被导向至结束,终止该过程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# Graph
builder = StateGraph(MessagesState)

# Define nodes: these do the work
builder.add_node("assistant", assistant)
builder.add_node("tools", ToolNode(tools))

# Define edges: these determine how the control flow moves
builder.add_edge(START, "assistant")
builder.add_conditional_edges(
"assistant",
# If the latest message (result) from assistant is a tool call -> tools_condition routes to tools
# If the latest message (result) from assistant is a not a tool call -> tools_condition routes to END
tools_condition,
)
builder.add_edge("tools", "assistant")
react_graph = builder.compile()

# Show
display(Image(react_graph.get_graph(xray=True).draw_mermaid_png()))

image-20250719145948088

调用
1
2
3
4
5
messages = [HumanMessage(content="将3和4相加。将结果乘以2。再将结果除以5。")]
messages = react_graph.invoke({"messages": messages})

for m in messages['messages']:
m.pretty_print()

parallel_tool_calls=False的输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
================================ Human Message =================================

将3和4相加。将结果乘以2。再将结果除以5。
================================== Ai Message ==================================
Tool Calls:
add (call_6c69898dba0342bfbb889e)
Call ID: call_6c69898dba0342bfbb889e
Args:
a: 3
b: 4
================================= Tool Message =================================
Name: add

7
================================== Ai Message ==================================
Tool Calls:
multiply (call_9940e7603ecf4a13a5f2fb)
Call ID: call_9940e7603ecf4a13a5f2fb
Args:
a: 7
b: 2
================================= Tool Message =================================
Name: multiply

14
================================== Ai Message ==================================
Tool Calls:
divide (call_d48fbbe205a14dfbaa3500)
Call ID: call_d48fbbe205a14dfbaa3500
Args:
a: 14
b: 5
================================= Tool Message =================================
Name: divide

2.8
================================== Ai Message ==================================

最终结果是2.8。

parallel_tool_calls=True的输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
================================ Human Message =================================

将3和4相加。将结果乘以2。再将结果除以5。
================================== Ai Message ==================================
Tool Calls:
add (call_e0c7d8e65f2c49e8aecd3e)
Call ID: call_e0c7d8e65f2c49e8aecd3e
Args:
a: 3
b: 4
multiply (call_5bf824058e64489aaace91)
Call ID: call_5bf824058e64489aaace91
Args:
a: 7
b: 2
divide (call_36c34f69f6574028b28847)
Call ID: call_36c34f69f6574028b28847
Args:
a: 14
b: 5
================================= Tool Message =================================
Name: add

7
================================= Tool Message =================================
Name: multiply

14
================================= Tool Message =================================
Name: divide

2.8
================================== Ai Message ==================================

最终结果是 **2.8**。

Agent memory代理记忆

使用chekpointer检查点的功能,最简单的检查点之一是 MemorySaver,这是一个用于图形状态的内存键值存储。

这个检查点就相当于把图的每一次“状态快照”持久化到外部存储的机制。

1
2
3
from langgraph.checkpoint.memory import MemorySaver
memory = MemorySaver()
react_graph_memory = builder.compile(checkpointer=memory)

我们可以使用 记忆功能 来解决这个问题!LangGraph 可以使用检查点工具在每一步之后自动保存图的状态。这一内置的持久化层为我们提供了内存功能,使 LangGraph 能够从最后一次状态更新处继续。

1
2
3
4
5
6
7
8
9
10
# Specify a thread
config = {"configurable": {"thread_id": "1"}}

# Specify an input
messages = [HumanMessage(content="Add 3 and 4.")]

# Run
messages = react_graph_memory.invoke({"messages": messages},config)
for m in messages['messages']:
m.pretty_print()

当我们使用内存时,我们需要指定一个 thread_id。这 thread_id 将存储我们的图形状态集合。

如下图,检查点在图的每一步写入状态,这些检查点保存在一个线程中 ,我们可以使用 thread_id 在未来访问该线程

state.jpg

module-2

state-scheme状态模式

LangGraph 的 state-scheme(状态模式) 就是“一张蓝图”,它告诉框架:“在整个图的生命周期里,状态对象应该长什么样、每个字段怎样被更新、以及节点之间如何共享或隔离数据。”

state-scheme 用 TypedDictPydantic BaseModel 来声明,定义了:

  • 状态里有哪些字段(key)
  • 每个字段的 Python 类型
  • 可选 该字段的 reducer(更新规则)
TypedDict

基本定义

1
2
3
4
5
from typing_extensions import TypedDict

class TypedDictState(TypedDict):
foo: str
bar: str

可增加像 Literal 这样的类型提示,使其更有价值

1
2
3
4
5
from typing import Literal

class TypedDictState(TypedDict):
name: str
mood: Literal["happy","sad"]

在这里,mood 只能是 “happy” 或 “sad”。

加 reducer:让更新“可追加”而不覆盖

1
2
3
4
class MathState(TypedDict):
question: str
scratchpad: Annotated[list[str], add_message] # 新元素自动追加
answer: int

Annotated[list[str], add] 告诉 LangGraph:当节点返回 {"scratchpad": ["新步骤"]} 时,追加到现有列表,而不是替换

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import random
from IPython.display import Image, display
from langgraph.graph import StateGraph, START, END
#定义节点
def node_1(state):
print("---Node 1---")
return {"name": state['name'] + " is ... "}

def node_2(state):
print("---Node 2---")
return {"mood": "happy"}

def node_3(state):
print("---Node 3---")
return {"mood": "sad"}
#路由函数
def decide_mood(state) -> Literal["node_2", "node_3"]:

# Here, let's just do a 50 / 50 split between nodes 2, 3
if random.random() < 0.5:

# 50% of the time, we return Node 2
return "node_2"

# 50% of the time, we return Node 3
return "node_3"

# Build graph
builder = StateGraph(TypedDictState)
builder.add_node("node_1", node_1)
builder.add_node("node_2", node_2)
builder.add_node("node_3", node_3)

# Logic
builder.add_edge(START, "node_1")
builder.add_conditional_edges("node_1", decide_mood)
builder.add_edge("node_2", END)
builder.add_edge("node_3", END)

# Add
graph = builder.compile()

# View
display(Image(graph.get_graph().draw_mermaid_png()))

image-20250719154439415

因为我们的状态是一个字典,我们只需用一个字典调用图,以设置状态中 name 键的初始值。

1
graph.invoke({"name":"Lance"})
Dataclass数据类

python的dataclasses库提供了一种简洁的语法,用于创建主要用于存储数据的类。

1
2
3
4
5
6
from dataclasses import dataclass

@dataclass
class DataclassState:
name: str
mood: Literal["happy","sad"]

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def node_1(state):
print("---Node 1---")
return {"name": state.name + " is ... "}

# Build graph
builder = StateGraph(DataclassState)
builder.add_node("node_1", node_1)
builder.add_node("node_2", node_2)
builder.add_node("node_3", node_3)

# Logic
builder.add_edge(START, "node_1")
builder.add_conditional_edges("node_1", decide_mood)
builder.add_edge("node_2", END)
builder.add_edge("node_3", END)

# Add
graph = builder.compile()

# View
display(Image(graph.get_graph().draw_mermaid_png()))

要访问 dataclass 的键,我们只需修改在 node_1 中使用的下标即可:

我们使用 state.name 来表示 dataclass 状态,而不是使用 state["name"] 来表示上面的 TypedDict

你会注意到一个有点奇怪的地方:在每个节点中,我们仍然返回一个字典来执行状态更新。

Dataclass 只是“描述”状态的形状,而真正在 LangGraph 的节点之间流动的依旧是「字典」,这是框架设计层面的约定

在这种情况下,dataclass 拥有键 name,因此我们可以通过从节点传递一个字典来更新它,就像在状态为 TypedDict 时所做的那样。

1
graph.invoke(DataclassState(name="Lance",mood="sad"))

我们通过 dataclass 来设置状态中每个键/通道的初始值!

State Reducers状态更新函数

Reducers 为我们指定了如何执行更新。它接收 旧状态一次变更指令(action / 增量字段)返回全新的状态对象,整个过程中不能修改原有数据

我们可以使用 Annotated 类型来指定一个 reducer 函数。在这种情况下,让我们将每个节点返回的值附加到结果中,而不是覆盖它们。

1
2
3
4
5
from operator import add
from typing import Annotated

class State(TypedDict):
foo: Annotated[list[int], add]

我们只需要一个可以执行此操作的缩减器:operator.add 是 Python 内置 operator 模块中的一个函数。当 operator.add 应用于列表时,它执行列表连接。

Custom Reducers 自定义 Reducers

我们同样可以自定义reducers函数,解决一些特殊情况,比如,如下可以解决传入参数为none的情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def reduce_list(left: list | None, right: list | None) -> list:
"""安全地合并两个列表,处理其中一个或两个输入可能为 None 的情况。

参数:
left (list | None): 要合并的第一个列表,或 None。
right (list | None): 要合并的第二个列表,或 None。

返回:
list: 一个包含两个输入列表所有元素的新列表。
如果输入为 None,则将其视为空列表。
"""
if not left:
left = []
if not right:
right = []
return left + right

class DefaultState(TypedDict):
foo: Annotated[list[int], add]

class CustomReducerState(TypedDict):
foo: Annotated[list[int], reduce_list]
MessagesState

我可以使用内置的 reducer add_messages 来处理状态中的消息

MessagesState 内置了一个 messages 键 它还为该键内置了一个 add_messages 合并器,这两个是等价的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from typing import Annotated
from langgraph.graph import MessagesState
from langchain_core.messages import AnyMessage
from langgraph.graph.message import add_messages

# 定义一个自定义的 TypedDict,其中包含一个带有 add_messages reducer 的消息列表。
class CustomMessagesState(TypedDict):
messages: Annotated[list[AnyMessage], add_messages]
added_key_1: str
added_key_2: str
# etc

# 使用 MessagesState ,它包含带有 add_messages reducer 的 messages 键。
class ExtendedMessagesState(MessagesState):
# 添加除 messages 之外所需的任何键, messages 是预构建的。
added_key_1: str
added_key_2: str
# etc

在使用 add_messages reducer 时,让我们展示一些有用的技巧。

重写(Re-writing)

如果我们传递的消息与 messages 列表中已有的消息具有相同的 ID,则该消息将被覆盖!

1
2
3
4
5
6
7
8
9
10
# Initial state
initial_messages = [AIMessage(content="Hello! How can I assist you?", name="Model", id="1"),
HumanMessage(content="I'm looking for information on marine biology.", name="Lance", id="2")
]

# New message to add
new_message = HumanMessage(content="I'm looking for information on whales, specifically", name="Lance", id="2")

# Test
add_messages(initial_messages , new_message)
1
2
[AIMessage(content='Hello! How can I assist you?', name='Model', id='1'),
HumanMessage(content="I'm looking for information on whales, specifically", name='Lance', id='2')]

删除(Removal)

add_messages同样支持删除。为此,我们简单地使用 RemoveMessage 来自 langchain_core

1
2
3
4
5
6
7
8
9
10
11
from langchain_core.messages import RemoveMessage

# Message list
messages = [AIMessage("Hi.", name="Bot", id="1")]
messages.append(HumanMessage("Hi.", name="Lance", id="2"))
messages.append(AIMessage("So you said you were researching ocean mammals?", name="Bot", id="3"))
messages.append(HumanMessage("Yes, I know about whales. But what others should I learn about?", name="Lance", id="4"))

# Isolate messages to delete
delete_messages = [RemoveMessage(id=m.id) for m in messages[:-2]]
print(delete_messages)

Multiple Schemas 多种状态

Private State 私有状态

首先,让我们讨论在节点之间传递 private state 的情况。这对于图的中间计算逻辑中需要的任何内容都很有用,但与图的整体输入或输出无关。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from typing_extensions import TypedDict
from IPython.display import Image, display
from langgraph.graph import StateGraph, START, END

class OverallState(TypedDict):
foo: int

class PrivateState(TypedDict):
baz: int

def node_1(state: OverallState) -> PrivateState:
print("---Node 1---")
return {"baz": state['foo'] + 1}

def node_2(state: PrivateState) -> OverallState:
print("---Node 2---")
return {"foo": state['baz'] + 1}

# Build graph
builder = StateGraph(OverallState)
builder.add_node("node_1", node_1)
builder.add_node("node_2", node_2)

# Logic
builder.add_edge(START, "node_1")
builder.add_edge("node_1", "node_2")
builder.add_edge("node_2", END)

# Add
graph = builder.compile()

# View
display(Image(graph.get_graph().draw_mermaid_png()))

image-20250719171509917

我们将定义一个 OverallState 和一个 PrivateStatenode_2 使用 PrivateState 作为输入,但输出写入到 OverallState

baz 仅包含在 PrivateState 中。因此,我们可以看到 baz 被排除在图形输出之外,因为它不在 OverallState 中。

Input / Output Schema 输入/输出模式

在 LangGraph 中,Input / Output Schema 就是“图的对外接口协议”:调用者只能按 Input Schema 传参;图运行完后,只吐出 Output Schema 规定的字段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict

# 定义输入的模式
class InputState(TypedDict):
question: str

# 定义输出的模式
class OutputState(TypedDict):
answer: str

# 定义整体模式,结合输入和输出
class OverallState(InputState, OutputState):
pass

# 定义处理输入并生成答案的节点
def answer_node(state: InputState):
# 示例答案和额外键
return {"answer": "bye", "question": state["question"]}

# 构建图,并指定输入和输出模式
builder = StateGraph(OverallState, input_schema=InputState, output_schema=OutputState)
builder.add_node(answer_node) # 添加答案节点
builder.add_edge(START, "answer_node") # 定义起始边
builder.add_edge("answer_node", END) # 定义结束边
graph = builder.compile() # 编译图

# 使用输入调用图并打印结果
print(graph.invoke({"question": "hi"}))

输出

1
{'answer': 'bye Lance'}

在这里,input / output 模式对图的输入和输出上允许的键进行 过滤。可以看到 output 模式将输出限制为仅包含 answer 键。

Filtering and trimming messages筛选和精简消息

如果我们在处理长时间对话时不够小心,会导致高令牌使用量和延迟,因为我们传递给模型的是一系列不断增加的消息。所以要进行筛选和精简消息。

简化器(Reducer)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from langchain_core.messages import RemoveMessage

# Nodes
def filter_messages(state: MessagesState):
# 删除除最近两条消息外的所有消息
delete_messages = [RemoveMessage(id=m.id) for m in state["messages"][:-2]]
return {"messages": delete_messages}

def chat_model_node(state: MessagesState):
return {"messages": [llm.invoke(state["messages"])]}

# Build graph
builder = StateGraph(MessagesState)
builder.add_node("filter", filter_messages)
builder.add_node("chat_model", chat_model_node)
builder.add_edge(START, "filter")
builder.add_edge("filter", "chat_model")
builder.add_edge("chat_model", END)
graph = builder.compile()

# View
display(Image(graph.get_graph().draw_mermaid_png()))

image-20250719203204234

筛选消息(Filtering messages)

如果你不需要或不希望修改图状态,可以直接过滤传递给聊天模型的消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
# Node
def chat_model_node(state: MessagesState):
return {"messages": [llm.invoke(state["messages"][-1:])]}

# Build graph
builder = StateGraph(MessagesState)
builder.add_node("chat_model", chat_model_node)
builder.add_edge(START, "chat_model")
builder.add_edge("chat_model", END)
graph = builder.compile()

# View
display(Image(graph.get_graph().draw_mermaid_png()))

例如,只需传递一个过滤后的列表:llm.invoke(messages[-1:]) 给模型。

状态包含了所有消息。但这里模型调用仅使用最后一条消息

裁剪消息(Trim messages)

另一种方法是根据设定一定数量的tokens进行 trim messages。在把对话历史发给大模型之前,按 token 预算 把超长消息列表“剪”到合适长度。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from langchain_core.messages import trim_messages

# Node
def chat_model_node(state: MessagesState):
# 使用 trim_messages 函数修剪消息列表
# max_tokens: 限制消息的最大令牌数
# strategy: 修剪策略,这里是“last”,表示保留最新的消息
# token_counter: 用于计算令牌数的模型实例
# allow_partial: 是否允许部分修剪
messages = trim_messages(
state["messages"],
max_tokens=100,
strategy="last",
token_counter= ChatOpenAI(
model="qwen-plus-2025-04-28",
api_key="sk-",
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"),
allow_partial=False,
)
# 调用语言模型(llm)处理修剪后的消息,并返回结果
return {"messages": [llm.invoke(messages)]}

# Build graph
builder = StateGraph(MessagesState)
builder.add_node("chat_model", chat_model_node)
builder.add_edge(START, "chat_model")
builder.add_edge("chat_model", END)
graph = builder.compile()

# View
display(Image(graph.get_graph().draw_mermaid_png()))

Chatbot with message summarization带有消息总结功能的聊天机器人

与其仅仅修剪或过滤消息,我们将展示如何使用大型语言模型(LLMs)来生成对话的实时摘要。

这使我们能够保留整个对话的压缩表示,而不仅仅是通过修剪或过滤将其移除。

我们将为该聊天机器人配备记忆功能,支持长时间对话,同时不会产生高昂的 token 成本或延迟。

定义总结状态
1
2
3
from langgraph.graph import MessagesState
class State(MessagesState):
summary: str

除了内置的 messages 键之外,我们现在还将包含一个自定义键(summary)。

定义LLM节点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from langchain_core.messages import SystemMessage, HumanMessage, RemoveMessage 

# 定义调用模型的逻辑
def call_model(state: State):

# 获取摘要(如果存在)
summary = state.get("summary", "")

# 如果有摘要,则添加它
if summary:

# 将摘要添加到系统消息中
system_message = f"先前对话的摘要:{summary}"

# 将摘要附加到任何较新的消息中
messages = [SystemMessage(content=system_message)] + state["messages"]

else:
messages = state["messages"]

response = model.invoke(messages)
return {"messages": response}

我们将定义一个节点来调用我们的LLM,如果存在摘要,则将其纳入提示中。

当 call_model 函数返回 {“messages”: response} 时,它是在告诉 langgraph :“请用 response (即模型的新输出)来更新 State 对象中 messages 键对应的值。” langgraph 会将这个新消息追加到 messages 列表中,从而保持了对话历史的连续性

定义摘要节点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def summarize_conversation(state: State): 

# 首先,我们获取任何现有的摘要
summary = state.get("summary", "")

# 创建我们的摘要提示
if summary:

# 摘要已存在
summary_message = (
f"这是迄今为止对话的摘要:{summary}\n\n"
"请结合以上新消息扩展摘要:"
)

else:
summary_message = "创建以上对话的摘要:"

# 将提示添加到我们的历史记录中
messages = state["messages"] + [HumanMessage(content=summary_message)]
response = model.invoke(messages)

# 删除除最近2条消息外的所有消息
delete_messages = [RemoveMessage(id=m.id) for m in state["messages"][:-2]]
return {"summary": response.content, "messages": delete_messages}

我们将定义一个节点来生成摘要。请注意,这里我们将使用 RemoveMessage 在生成摘要后过滤我们的状态。

定义路由函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
from langgraph.graph import END 
# 决定是结束对话还是总结对话
def should_continue(state: State):

"""返回要执行的下一个节点。"""

messages = state["messages"]

# 如果消息超过六条,那么我们总结对话
if len(messages) > 6:
return "summarize_conversation"

# 否则我们就可以结束了
return END

我们将添加一个条件边,以根据对话长度确定是否生成摘要。

在 langgraph 中, Command 是一个特殊的类型,用于指导图形(graph)决定接下来应该执行哪个节点。 您可以把它看作是给图形下达的一个“命令”。

添加内存并编译图
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from IPython.display import Image, display
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, START

# Define a new graph
workflow = StateGraph(State)
workflow.add_node("conversation", call_model)
workflow.add_node("summarize_conversation",summarize_conversation)

# Set the entrypoint as conversation
workflow.add_edge(START, "conversation")
workflow.add_conditional_edges("conversation", should_continue)
workflow.add_edge("summarize_conversation", END)



# Compile
memory = MemorySaver()
graph = workflow.compile(checkpointer=memory)
display(Image(graph.get_graph().draw_mermaid_png()))
使用线程调用
1
2
3
4
5
6
config = {"configurable": {"thread_id": "2"}}

input_message = HumanMessage(content="我喜欢玩lol,你知道这个游戏吗")
output = graph.invoke({"messages": [input_message]}, config)
for m in output['messages'][-1:]:
m.pretty_print()

当对话大于6,可生成概要

1
graph.get_state(config).values.get("summary","")

Chatbot with message summarization & external DB memory具有消息总结和外部数据库记忆的聊天机器人

使用数据库

SqliteSaver 是 LangGraph 提供的一个 轻量级状态持久化工具,它将图的运行状态(即 checkpoint)保存到本地的 SQLite 数据库中,使得你可以在程序中断或重启后恢复执行上下文,特别适合本地开发、实验性项目或中小规模应用。

如果我们提供 “:memory:” ,它将创建一个内存中的 SQLite 数据库。

1
2
3
import sqlite3
# In memory
conn = sqlite3.connect(":memory:", check_same_thread = False)

如果我们提供一个 db 路径,那么它将为我们创建一个数据库!

1
2
3
4
5
#在本地创建一个目录 state_db,并尝试从 GitHub 下载一个名为 example.db 的 SQLite 数据库文件
!mkdir -p state_db && [ ! -f state_db/example.db ] && wget -P state_db https://github.com/langchain-ai/langchain-academy/raw/main/module-2/state_db/example.db

db_path = "state_db/example.db"
conn = sqlite3.connect(db_path, check_same_thread=False)

定义checkpoint

1
2
from langgraph.checkpoint.sqlite import SqliteSaver
memory = SqliteSaver(conn)

像上一个形式编译图

让我们确认一下我们的状态是否已保存到本地。

1
2
3
config = {"configurable": {"thread_id": "1"}}
graph_state = graph.get_state(config)
graph_state

使用像 Sqlite 这样的数据库意味着状态会被持久化!

module-3

Streaming 流式传输

现在,让我们来谈谈 流式传输我们的图状态 的方法。.stream.astream 是用于流式返回结果的同步和异步方法。

values:这将在每个节点被调用后流式传输图的完整状态。 updates:这将在每个节点被调用后流式传输图的状态更新。

values_vs_updates.png

stream_mode=”updates”
1
2
3
4
5
6
# Create a thread
config = {"configurable": {"thread_id": "1"}}

# Start conversation
for chunk in graph.stream({"messages": [HumanMessage(content="你好我是zxj")]}, config, stream_mode="updates"):
print(chunk)
1
{'conversation': {'messages': AIMessage(content='你好,zxj!很高兴认识你~有什么我可以帮你的吗?😊', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 16, 'prompt_tokens': 576, 'total_tokens': 592, 'completion_tokens_details': None, 'prompt_tokens_details': None}, 'model_name': 'qwen-plus-2025-04-28', 'system_fingerprint': None, 'id': 'chatcmpl-891471ae-2fe8-9b3d-b5f7-f4fcd55a4e16', 'service_tier': None, 'finish_reason': 'stop', 'logprobs': None}, id='run--f36409f3-af43-4e9b-8a46-39646ad7c106-0', usage_metadata={'input_tokens': 576, 'output_tokens': 16, 'total_tokens': 592, 'input_token_details': {}, 'output_token_details': {}})}}

让我们来看一下 stream_mode="updates"

因为我们使用 updates 进行流式传输,所以只有在图中的节点运行后,我们才能看到状态的更新。每个 chunk 是一个字典,以 node_name 为键,更新后的状态为值。

1
2
3
# Start conversation
for chunk in graph.stream({"messages": [HumanMessage(content="你好我是zxj")]}, config, stream_mode="updates"):
chunk['conversation']["messages"].pretty_print()

现在我们直接打印状态更新。

1
2
3
================================== Ai Message ==================================

你好呀,zxj!再次见到你真高兴~😊 有什么我可以帮忙的吗?
stream_mode=”values”
1
2
3
4
5
6
7
8
9
# Start conversation, again
config = {"configurable": {"thread_id": "2"}}

# Start conversation
input_message = HumanMessage(content="你好我是zxj")
for event in graph.stream({"messages": [input_message]}, config, stream_mode="values"):
for m in event['messages']:
m.pretty_print()
print("---"*25)

现在,我们可以看到 stream_mode="values".这是在 conversation 节点被调用后,图的整个状态。

1
2
3
4
5
6
7
8
9
10
11
================================ Human Message =================================

你好我是zxj
---------------------------------------------------------------------------
================================ Human Message =================================

你好我是zxj
================================== Ai Message ==================================

你好,zxj!有什么我可以帮你的吗?😊
---------------------------------------------------------------------------
Streaming tokens 流式传输令牌

在 LangGraph 中,“流式传输令牌(Streaming tokens)”指的是在节点内部的大模型(LLM)生成过程中,逐 token 地将中间结果实时推送到客户端的能力。实现这一能力的核心方法是 astream_events,它会以事件流的形式暴露整个执行过程中的所有细节,包括每一次 LLM 调用产生的 token。

每个事件是一个包含几个键的字典:

event:这是正在发出的事件的类型。

name:这是事件的名称。

data:这是与事件相关联的数据。

metadata:包含 langgraph_node,即发出事件的节点。

要点是,图表中聊天模型的令牌具有 on_chat_model_stream 类型。我们可以使用 event['metadata']['langgraph_node'] 来选择要流式的节点。并且我们可以使用 event['data'] 来获取每个事件的实际数据,而在这种情况下,数据是一个 AIMessageChunk.

1
2
3
4
5
6
7
8
9
node_to_stream = 'conversation'#定义流式传输的节点
config = {"configurable": {"thread_id": "5"}}
input_message = HumanMessage(content="为我介绍lol")
async for event in graph.astream_events({"messages": [input_message]}, config, version="v2"):
# 从特定节点获取聊天模型生成的 Token
#事件类型必须是 逐 token 流式输出(on_chat_model_stream)。
if event["event"] == "on_chat_model_stream" and event['metadata'].get('langgraph_node','') == node_to_stream:
data = event["data"]
print(data["chunk"].content, end="|")

event的常见类型

事件类型 (event) 触发时机与说明
on_chain_start 任意 Runnable(节点、子图或整个图)开始执行
on_chain_stream 节点/图在运行过程中 增量输出 chunk
on_chain_end 任意 Runnable 执行完成
on_chat_model_start ChatModel 开始调用
on_chat_model_stream ChatModel 逐 token 返回内容(打字机效果)
on_chat_model_end ChatModel 调用结束
on_tool_start Tool 开始调用
on_tool_end Tool 调用结束
on_retriever_start Retriever 开始检索
on_retriever_end Retriever 检索结束

Breakpoints 断点

human-in-the-loop(人工介入/人在回路)的三大动机:

1️⃣ Approval(审批)我们可以中断智能体,将当前状态呈现给用户,并让用户决定是否执行该操作。

2️⃣ Debugging(调试/回放)我们可以回退图形以重现或避免问题

3️⃣ Editing(编辑)AI 产出的中间结果不符合预期,但不想重跑整图,可以直接修改状态

我们将介绍 breakpoints,它提供了一种在特定步骤停止图的简单方法。

Breakpoints for human approval用于人类审批的断点

假设我们关注工具的使用:我们希望批准代理使用其任何工具。

我们所需要做的就是简单地用 interrupt_before=["tools"] 编译图形,其中 tools 是我们的工具节点。

这意味着在执行工具调用的节点 tools 之前,执行将被中断。

1
graph = builder.compile(interrupt_before=["tools"], checkpointer=memory)

image-20250720225640772

1
2
3
4
5
6
7
8
9
# Input
initial_input = {"messages": HumanMessage(content="2乘3")}

# Thread
thread = {"configurable": {"thread_id": "1"}}

# Run the graph until the first interruption
for event in graph.stream(initial_input, thread, stream_mode="values"):
event['messages'][-1].pretty_print()
1
2
3
4
5
6
7
8
9
10
================================ Human Message =================================

2乘3
================================== Ai Message ==================================
Tool Calls:
multiply (call_92a4bcf88d25476d925775)
Call ID: call_92a4bcf88d25476d925775
Args:
a: 2
b: 3

我们可以获取状态并查看要调用的下一个节点。这是一种很好的方法,可以发现图已被中断。

现在,我们将介绍一个很好的技巧。当我们使用 None 调用图时,它将直接从最后一个状态检查点继续!

breakpoints.jpg

状态快照(StateSnapshot)

  • 类型:专门用来存 一个时刻 的完整状态
  • 获取方式:
    • Graph.get_state()最新的 快照
    • Graph.get_state_history()所有 快照列表

继续/重跑图

  • Graph.stream(None, {"thread_id": "xxx"})
    • 不传新输入 None 表示 从当前最新状态继续跑
    • 也可回退到历史快照,再重跑(调试/回放)
1
2
for event in graph.stream(None, thread, stream_mode="values"):
event['messages'][-1].pretty_print()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
================================== Ai Message ==================================
Tool Calls:
multiply (call_92a4bcf88d25476d925775)
Call ID: call_92a4bcf88d25476d925775
Args:
a: 2
b: 3
================================= Tool Message =================================
Name: multiply

6
================================== Ai Message ==================================

2乘3的结果是6。

Editing graph state 编辑图状态

断点也是修改图状态的机会让我们在 assistant 节点之前为代理设置断点。

1
graph = builder.compile(interrupt_before=["assistant"], checkpointer=memory)

image-20250720230924672

1
2
3
4
5
6
7
8
9
# Input
initial_input = {"messages": "2乘3"}

# Thread
thread = {"configurable": {"thread_id": "1"}}

# Run the graph until the first interruption
for event in graph.stream(initial_input, thread, stream_mode="values"):
event['messages'][-1].pretty_print()
1
2
3
================================ Human Message =================================

2乘3

当状态中断时,我们可以直接应用状态更新

记住,对 messages 键的更新将使用 add_messages reducer:

如果我们想覆盖现有的消息,可以提供带有 id 的消息。 如果我们只想将消息添加到消息列表中,则可以传递未指定 id 的消息,如下所示。

1
2
3
4
graph.update_state(
thread,
{"messages": [HumanMessage(content="不要,实际上要3乘3!")]},
)
1
2
3
new_state = graph.get_state(thread).values
for m in new_state['messages']:
m.pretty_print()
1
2
3
4
5
6
================================ Human Message =================================

2乘3
================================ Human Message =================================

不要,实际上要3乘3!

现在,让我们继续进行我们的代理操作,只需传递 None 并允许其从当前状态继续执行。我们输出当前内容,然后继续执行剩余的节点。

1
2
for event in graph.stream(None, thread, stream_mode="values"):
event['messages'][-1].pretty_print()

Dynamic breakpoints 动态断点

你可以根据条件来实现它(从节点内部基于开发人员定义的逻辑)。您可以向用户说明其中断原因(通过将您想传递的内容发送到 NodeInterrupt)。

让我们创建一个图表,其中根据输入的长度会抛出一个 NodeInterrupt

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
from IPython.display import Image, display

from typing_extensions import TypedDict
from langgraph.checkpoint.memory import MemorySaver
from langgraph.errors import NodeInterrupt
from langgraph.graph import START, END, StateGraph

class State(TypedDict):
input: str

def step_1(state: State) -> State:
print("---Step 1---")
return state

def step_2(state: State) -> State:
# 如果输入字符串长度超过5个字符,我们可以选择抛出NodeInterrupt异常
if len(state['input']) > 5:
raise NodeInterrupt(f"收到长度超过5个字符的输入: {state['input']}")

print("---Step 2---")
return state

def step_3(state: State) -> State:
print("---Step 3---")
return state

builder = StateGraph(State)
builder.add_node("step_1", step_1)
builder.add_node("step_2", step_2)
builder.add_node("step_3", step_3)
builder.add_edge(START, "step_1")
builder.add_edge("step_1", "step_2")
builder.add_edge("step_2", "step_3")
builder.add_edge("step_3", END)

# Set up memory
memory = MemorySaver()

# Compile the graph with memory
graph = builder.compile(checkpointer=memory)

# View
display(Image(graph.get_graph().draw_mermaid_png()))

image-20250721222709712

让我们运行一个输入超过5个字符的图。

1
2
3
4
5
6
initial_input = {"input": "hello world"}
thread_config = {"configurable": {"thread_id": "1"}}

# Run the graph until the first interruption
for event in graph.stream(initial_input, thread_config, stream_mode="values"):
print(event)
1
2
3
{'input': 'hello world'}
---Step 1---
{'input': 'hello world'}

我们可以尝试从断点恢复图。但是,这只会重新运行相同的节点!除非状态发生变化,否则我们将一直卡在这里。

1
2
3
4
graph.update_state(
thread_config,
{"input": "hi"},
)

使用update_state更新状态

Time travel 时间旅行

现在,让我们通过查看、重播,甚至从过去的状态叉出,来展示 LangGraph 支持debug 的功能。

Browsing History 浏览历史

我们可以使用 get_state 来查看给定 thread_id 的图的 当前 状态!

1
graph.get_state({'configurable': {'thread_id': '1'}})

我们还可以浏览代理的状态历史。get_state_history 让我们能够获取所有先前步骤的状态。

1
2
3
all_states = [s for s in graph.get_state_history(thread)]
len(all_states)
print(all_states)
Replaying 回放
1
2
3
4
5
to_replay = all_states[-2]
to_replay.values
{'messages': [HumanMessage(content='2乘3', additional_kwargs={}, response_metadata={}, id='0676d9b5-cd59-4630-924d-b5c8d950e8d8')]}
to_replay.next
('assistant',)

我们还获取了配置,它告诉了我们 checkpoint_id 以及 thread_id

1
2
3
4
to_replay.config
{'configurable': {'thread_id': '1',
'checkpoint_ns': '',
'checkpoint_id': '1f066c0e-2ee2-66d5-8000-5dde78194aae'}}

要从这里重播,我们只需将配置传回给代理!图知道这个检查点已经执行过了。它只是从这个检查点重新播放!

1
2
for event in graph.stream(None, to_replay.config, stream_mode="values"):
event['messages'][-1].pretty_print()
Forking 分叉

如果我们想从相同的步骤运行,但使用不同的输入,该怎么办呢?这是分叉。

fig3.jpg

让我们修改此检查点的状态。我们可以直接使用提供的 checkpoint_id 来运行 update_state

请记住我们对 messages 的 reducer 是如何工作的:

  • 它会追加消息,除非我们提供了一个消息 ID。
  • 我们提供消息 ID 是为了覆盖消息,而不是将消息追加到状态中!

因此,要覆盖消息,我们只需提供消息 ID,而我们已有 to_fork.values["messages"].id

1
2
3
4
5
fork_config = graph.update_state(
to_fork.config,
{"messages": [HumanMessage(content='5乘3',
id=to_fork.values["messages"][0].id)]},
)

基础知识

message

LangChain 中的 HumanMessage 、 AIMessage 、 SystemMessage 和 ToolMessage 。这些消息类型是构建与语言模型(LLM)交互的核心组件,它们共同构成了一个完整的对话历史,帮助模型理解上下文并做出恰当的回应。

  1. SystemMessage

SystemMessage 的结构最简单,它只包含内容和类型。

数据结构 :

  • content (str): 消息的具体内容,即给 AI 的指令。
  • type (str): 固定为字符串 ‘system’ 。
  1. HumanMessage

HumanMessage 的结构也同样简单,代表用户的输入。

数据结构 :

  • content (str): 用户输入的文本。
  • type (str): 固定为字符串 ‘human’ 。
  1. AIMessage

AIMessage 的结构相对复杂,因为它不仅可以包含文本响应,还可以包含对工具的调用请求。

数据结构 :

  • content (str): AI 生成的文本响应。如果 AI 的回复是发起工具调用,此字段可以为空字符串。
  • tool_calls (list[dict], 可选): 一个字典列表,每个字典代表一个工具调用请求。这是支持“Function Calling”或“Tool Calling”功能的核心。其结构通常包含:
    • name (str): 要调用的工具名称。
    • args (dict): 调用工具时需要传入的参数。
    • id (str): 此次工具调用的唯一标识符,用于后续 ToolMessage 的关联。
  • type (str): 固定为字符串 ‘ai’ 。
  1. ToolMessage

ToolMessage 用于承载工具执行后的返回结果。

数据结构 :

  • content (str): 工具执行返回的结果。通常是一个字符串,比如 JSON 格式的字符串。
  • tool_call_id (str): 此次工具调用的唯一标识符, 必须 与之前 AIMessage 中 tool_calls 里的 id 相对应。这使得模型能够准确地将结果与请求匹配起来。
  • type (str): 固定为字符串 ‘tool’ 。

libreoffice部署

查看Linux发行版

image-20250718095931232

系统是 银河麒麟高级服务器操作系统 V10(Kylin Linux Advanced Server V10),属于 中国国产、兼容 CentOS/RHEL 生态 的 Linux 发行版。

因此它使用 RPM 包管理dnf/yum),而不是 .deb

查看CPU 处理器架构

1
uname -m

x86_64

不用部署了,镜像里有,直接用了

使用libreoffice处理doc文件,转成pdf

将当前目录下所有 .doc.docx 转为 PDF:

1
2
3
4
5
6
7
libreoffice --headless --convert-to pdf *.doc *.docx --outdir ./pdf_output/

# 检查是否有残留进程
ps aux | grep libreoffice

# 如果有残留进程,杀死它们
killall soffice.bin 2>/dev/null

python

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import os
import subprocess
import argparse
import glob
from pathlib import Path
def batch_convert_documents(input_path, output_dir):
"""
批量转换文档的函数版本

Args:
input_path (str): 输入路径(文件、目录或通配符)
output_dir (str): 输出目录

Returns:
bool: 转换是否成功
"""

# 确保输出目录存在
Path(output_dir).mkdir(parents=True, exist_ok=True)

# 收集所有要转换的文件
files_to_convert = []

if os.path.isfile(input_path):
# 单个文件
if input_path.lower().endswith(('.doc', '.docx')):
files_to_convert.append(input_path)
elif os.path.isdir(input_path):
# 目录中的所有doc/docx文件
for pattern in ['*.doc', '*.docx']:
files_to_convert.extend(glob.glob(os.path.join(input_path, pattern)))
else:
# 通配符模式
files_to_convert = glob.glob(input_path)
# 过滤出doc和docx文件
files_to_convert = [f for f in files_to_convert if f.lower().endswith(('.doc', '.docx'))]

if not files_to_convert:
print("未找到要转换的文档文件")
return False

print(f"找到 {len(files_to_convert)} 个文件需要转换")

# 构建命令
cmd = [
'libreoffice',
'--headless',
'--convert-to', 'pdf',
'--outdir', output_dir
] + files_to_convert

try:
print("正在转换文件...")
result = subprocess.run(cmd, capture_output=True, text=True, timeout=600)

if result.returncode == 0:
print(f"成功转换 {len(files_to_convert)} 个文件")
return True
else:
print(f"转换失败: {result.stderr}")
return False

except Exception as e:
print(f"转换过程中发生错误: {e}")
return False

if __name__ == "__main__":
success = batch_convert_documents("./docs", "./pdf_output")

Linux扫盲

发行版

像Ubuntu,CentOS都属于Linux的发行版,就像Windows11属于Windows的关系

常见发行版分类:

系列 代表发行版 包格式 特点
Debian 系 Debian、Ubuntu、Kali、Linux Mint .deb 包多、社区大、教程多
Red Hat 系 CentOS、RHEL、Rocky、Alma、Fedora .rpm 企业级稳定、官方支持长
SUSE 系 openSUSE Leap / Tumbleweed .rpm YaST 管理工具、欧洲流行
Arch 系 Arch Linux、Manjaro .pkg.tar.zst 滚动更新、极客向
轻量/最小 Alpine、Debian netinst、CentOS Stream Minimal 任意 镜像小、资源占用低

如何查看Linux发行版

1
cat /etc/os-release

deb和rpm

.deb.rpm 想象成 “Linux 世界里的安装程序”,就像 Windows 的 .exe / .msi

格式 适用系统 安装命令
.deb Debian、Ubuntu、Linux Mint、Kali 等 sudo dpkg -i xxx.debsudo apt install ./xxx.deb
.rpm CentOS、RHEL、Fedora、openSUSE、Rocky、Alma 等 sudo rpm -ivh xxx.rpmsudo dnf/yum install xxx.rpm

cpu处理器架构

目录名 代表架构 适用场景
x86_64 Intel/AMD 64 位 绝大多数台式机、服务器(如 Xeon、EPYC、Core、Ryzen)
aarch64 ARM 64 位 树莓派 4/5、苹果 M 系列(Asahi Linux)、鲲鹏、飞腾、Ampere ARM 服务器等

查看处理器架构:

1
uname -m

实战demo

agent实战

langchain的agent与langgraph的agent主要差异点在create_openai_functions_agent, AgentExecutor这两个函数

前者的作用类似构建 Runnable 链,返回一个RunnablePassthrough.assign(...)|prompt|llm_with_tools|ToolsAgentOutputParser(),但其invoke仅能完成单步的调用,而AgentExecutor 会自动完成3 步循环(调用工具→拼回结果→再次调用 LLM),直到任务结束。

以下为ai的解释

直接使用 agent (Runnable) 的局限性:

  1. 单步执行: 你直接调用 agent.invoke()agent.ainvoke() 时,它通常只执行一步。对于像 create_tool_calling_agent 生成的 agent 来说,这一步就是:
    • 接收输入(包括历史消息和 agent_scratchpad)。
    • 让 LLM 决定是给出最终答案 (AgentFinish) 还是调用工具 (AgentAction)。
    • 返回这个决定。
  2. 工具调用需要手动处理: 如果 LLM 决定调用工具(返回 AgentAction),需要负责:
    • 从返回的 AgentAction 中找出工具名称和输入参数。
    • 在你的工具列表中找到对应的工具。
    • 执行这个工具。
    • 获取工具的输出(Observation)。
    • 再次手动调用 agent.invoke(...),把工具的输出(通常需要格式化成 ToolMessage)放回 agent_scratchpadintermediate_steps 中。
    • 重复这个过程,直到 agent 最终返回 AgentFinish

使用 AgentExecutor 的优势:

AgentExecutor 就是为了解决上述问题而设计的。它本质上是一个自动化的执行引擎,为你管理整个 Agent 的思考-行动-观察循环。

  1. 自动化循环: AgentExecutor 内部会自动运行那个循环:
    • 调用 agent (Runnable)。
    • 检查返回的是 AgentAction 还是 AgentFinish
    • 如果是 AgentAction,它会自动根据你提供的 tools 列表找到并执行对应的工具。
    • 它会自动将工具的输出(Observation)记录下来,并作为下一步的输入(放入 agent_scratchpad)再次调用 agent
    • 这个过程会一直重复。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from langchain.agents import create_openai_functions_agent, AgentExecutor
from langchain.tools import tool
from langchain_openai import ChatOpenAI
from pydantic import BaseModel, Field

# 1. 定义工具
class WeatherInput(BaseModel):
location: str = Field(description="城市名称")

@tool("get_weather", args_schema=WeatherInput)
def get_weather(location: str) -> str:
"""查询城市天气"""
return f"{location} 今天是晴天,25°C"

# 2. 创建Agent
llm = ChatOpenAI(model="gpt-4")
tools = [get_weather]
prompt = ChatPromptTemplate.from_messages([
("system", "你是一个助手,可以调用工具"),
("human", "{input}")
])
agent = create_openai_functions_agent(llm, tools, prompt)
agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)

# 3. 执行
result = agent_executor.invoke({"input": "北京天气如何?"})
print(result["output"])

工具调用

利用bind_tools绑定工具,当大模型需要调用工具的时候,会返回工具信息,tool_calls,如下

[{‘name’: ‘add_numbers’, ‘args’: {‘a’: 15, ‘b’: 27}, ‘id’: ‘4e7b261cce6d4e3da09134086c704c3c’, ‘type’: ‘tool_call’}]

llm_with_tools.invoke(...) 只是一个单步调用,LLM 返回的是“我想调用哪个工具、传什么参数”(即 tool_calls)。
但 LLM 并不会自动执行工具,所以你必须:

  1. 手动执行工具(或让 AgentExecutor 帮你执行)。
  2. 把执行结果拼回对话(作为 ToolMessage)。
  3. 再次调用 LLM,让它基于工具返回的结果生成最终答案。

这里展示的是手动拼接,并传给大模型,如下

1
2
3
4
5
6
7
# 将工具的输出发送回LLM,让LLM基于结果生成最终回答
# 这是一个关键步骤,通常在Agent中自动处理。这里手动演示。
final_response = llm_with_tools.invoke([
HumanMessage(content="What is 15 + 27?"),
AIMessage(content="", tool_calls=[tool_call]), # 告知LLM它之前建议的工具调用
ToolMessage(content=str(result), tool_call_id=tool_call['id']) # 告知LLM工具的执行结果
])
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
from langchain_core.tools import tool
from typing import Literal
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage, ToolMessage

# 定义一个加法工具
@tool
def add_numbers(a: float, b: float) -> float:
"""
Adds two numbers together.

Args:
a: The first number.
b: The second number.
"""
return a + b

# 我们可以定义更多的工具,例如一个乘法工具
@tool
def multiply_numbers(a: float, b: float) -> float:
"""
Multiplies two numbers together.

Args:
a: The first number.
b: The second number.
"""
return a * b

# 将我们定义的工具放在一个列表中
tools = [add_numbers, multiply_numbers]


# 初始化LLM
llm = ChatOpenAI(
temperature=0.5,
model_name="deepseek-v3-0324", # 聊天模型通常使用"gpt-3.5-turbo"或"gpt-4"
openai_api_base="https://api.qnaigc.com/v1", # 例如,您可以指定base_url
openai_api_key="sk-" # 直接在此处设置API密钥,或者通过环境变量设置
)
# 将工具绑定到LLM
# LLM现在知道了add_numbers和multiply_numbers这两个工具及其功能
llm_with_tools = llm.bind_tools(tools)


# 场景一:LLM直接回答,不需要工具
print("--- 场景一:LLM直接回答 ---")
response1 = llm_with_tools.invoke([HumanMessage(content="Hello, what's your name?")])
print(response1.content) # LLM直接生成文本回复

print("\n--- 场景二:LLM决定调用工具 ---")
# 场景二:LLM决定调用工具
# 当LLM的响应中包含tool_calls时,意味着它想要调用一个或多个工具
response2 = llm_with_tools.invoke([HumanMessage(content="What is 15 + 27?")])
print(response2.tool_calls) # 打印LLM决定调用的工具信息

# 检查并执行LLM建议的工具调用
if response2.tool_calls:
for tool_call in response2.tool_calls:
if tool_call['name'] == "add_numbers":
# 提取LLM为工具调用生成的参数
args = tool_call['args']
result = add_numbers.invoke(args) # 执行工具
print(f"Tool call: add_numbers({args['a']}, {args['b']}) = {result}")

# 将工具的输出发送回LLM,让LLM基于结果生成最终回答
# 这是一个关键步骤,通常在Agent中自动处理。这里手动演示。
final_response = llm_with_tools.invoke([
HumanMessage(content="What is 15 + 27?"),
AIMessage(content="", tool_calls=[tool_call]), # 告知LLM它之前建议的工具调用
ToolMessage(content=str(result), tool_call_id=tool_call['id']) # 告知LLM工具的执行结果
])
print("Final LLM response based on tool output:")
print(final_response.content)

概念扫盲

Document 对象

Document 对象是 LangChain 用来封装和处理文本数据的基本单位。无论您是从 PDF、Markdown 文件、网站还是数据库加载数据,LangChain 都会将这些数据转换成一个或多个 Document 对象,以便在后续的流程中使用。

一个 Document 对象主要包含两个部分:

  1. page_content (字符串)

    • 这是文档对象的核心,存储了原始的文本内容。例如,如果加载一个 Markdown 文件, page_content 就会包含该文件的所有文本。
  2. metadata (字典)

    • 这是一个字典,用于存储关于文档的“元数据”或附加信息。这些信息对于过滤、追踪或增强文档处理流程非常有用。常见的元数据包括:
      • source :文档的来源,比如文件名、URL等。
      • page :如果文档来自多页文件(如PDF),这里可以存储页码。
      • 其他自定义信息:您可以添加任何有助于您应用的信息,如作者、创建日期等。

除了通过文档加载器(Loaders)自动创建,您也可以手动创建一个 Document 对象。这在测试或处理简单文本时非常方便。

1
2
3
4
5
6
7
8
9
# 创建一个简单的 Document 对象
doc = Document(
page_content="这是文档的主要内容。LangChain 真酷!",
metadata={
'source': 'my_notebook.ipynb',
'author': 'AI Assistant',
'chapter': 2
}
)

Runnable协议

“Runnable”协议

参考资料

2025最新版!langchain入门到精通实战教程!结合实战案例,干货拉满!99%的人不知道的暴利玩法,学完敢谷歌工程师叫板!_哔哩哔哩_bilibili

introduction | LangChain中文网

跟着官网学langchain2025(version 0.3)_哔哩哔哩_bilibili

LangGraph Platform - Docs by LangChain

构建langgraph聊天机器人的基本流程

创建一个 StateGraph

首先创建一个 StateGraph。一个 StateGraph 对象将我们的聊天机器人结构定义为“状态机”。我们将添加 节点 来表示 LLM 和聊天机器人可以调用的函数,并添加 来指定机器人应如何在这些函数之间进行转换。

1
2
3
4
5
6
7
8
9
10
11
12
13
from typing import Annotated

from typing_extensions import TypedDict

from langgraph.graph import StateGraph, START
from langgraph.graph.message import add_messages


class State(TypedDict):
messages: Annotated[list, add_messages]

#定义状态
graph_builder = StateGraph(State)

在 langgraph 中,状态会在图的各个节点之间传递。当一个节点产生新的消息时,它会更新 State 中的 messages 字段。

我们的图现在可以处理两个关键任务

  1. 每个 节点 都可以接收当前 状态 作为输入,并输出状态的更新。
  2. 消息 的更新将追加到现有列表而不是覆盖它,这得益于与 Annotated 语法一起使用的预构建 add_messages 函数。

langgraph中每个消息对象通常包含以下关键属性:

  • role : 一个字符串,标识消息的发送者(例如 ‘human’ , ‘ai’ , ‘system’ )。
  • content : 消息的具体内容,通常是字符串,但也可以是更复杂的结构(例如,用于多模态输入)。
  • id : 一个可选的唯一标识符。

Annotated 的作用 : 通过使用 Annotated[list, add_messages] ,你改变了这个默认行为。 add_messages 函数(由 langgraph 提供或由你自定义)的逻辑是 追加 而不是覆盖。所以,当一个新节点返回消息时, langgraph 会调用 add_messages 函数,将新消息 追加 到现有 messages 列表的末尾。

定义一个聊天模型

两种方法:

1.使用 init_chat_model(通用高层封装)
这是一个通用的辅助函数,旨在提供一个统一的接口来初始化来自 不同提供商 的聊天模型。

init_chat_model — 🦜🔗 LangChain 文档 —- init_chat_model — 🦜🔗 LangChain documentation

2.使用如ChatOpenAI (特定于提供商的类)
这是一个专门为 OpenAI API 设计的类,提供了对 OpenAI 模型所有功能的完全访问。

1
2
3
4
5
6
7
from langchain.chat_models import init_chat_model

os.environ["OPENAI_API_KEY"] = "sk-"
#使用‘{model_provider}:{model}’格式在单个参数中指定模型和模型提供者,例如“openai:o1”
llm = init_chat_model("openai:qwen-plus-2025-04-28",
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"
)

添加一个节点

现在我们可以将聊天模型集成到一个简单的节点中

1
2
3
4
5
#定义节点chatbot
def chatbot(state: State):
return {"messages": [llm.invoke(state["messages"])]}

graph_builder.add_node("chatbot", chatbot)

chatbot 节点函数如何将当前 状态 作为输入,并返回一个包含更新的 消息 列表的字典,键为“messages”。这是所有 LangGraph 节点函数的基本模式。

我们 状态 中的 add_messages 函数会将 LLM 的响应消息追加到状态中已有的消息之后。

添加一个 入口

添加一个 入口 点,以告诉图每次运行时从何处开始工作

1
graph_builder.add_edge(START, "chatbot")

编译图

在运行图之前,我们需要对其进行编译。我们可以通过在图构建器上调用 compile() 来完成。这将创建一个 CompiledGraph,我们可以在我们的状态上调用它。

1
graph = graph_builder.compile()

可视化图

您可以使用 get_graph 方法和其中一个“绘图”方法(例如 draw_asciidraw_png)来可视化图。这些 draw 方法都需要额外的依赖项。

1
2
3
4
5
6
7
from IPython.display import Image, display

try:
display(Image(graph.get_graph().draw_mermaid_png()))
except Exception:
# This requires some extra dependencies and is optional
pass

运行聊天机器人

运行聊天机器人

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def stream_graph_updates(user_input: str):
for event in graph.stream({"messages": [{"role": "user", "content": user_input}]}):
print(event)
#stream 返回的每个 event 通常是一个字典,键是图中节点的名称,值是该节点完成后的状态更新。
for value in event.values():
#消息列表中的最后一条消息的文本内容
print("Assistant:", value["messages"][-1].content)


while True:
try:
user_input = input("User: ")
#退出
if user_input.lower() in ["quit", "exit", "q"]:
print("Goodbye!")
break
#调用
stream_graph_updates(user_input)
except:
# fallback if input() is not available
user_input = "What do you know about LangGraph?"
print("User: " + user_input)
stream_graph_updates(user_input)
break

graph.stream() 是 LangGraph 的核心功能之一。它会执行整个图(Graph),但不是一次性返回最终结果,而是像视频流一样,一步一步地返回中间过程的更新。这使得您可以实时看到模型生成内容的每一个部分。

添加网页搜索工具

获取Tavily api

Tavily 的搜索 API 是一款专为 AI 代理 (LLM) 构建的搜索引擎,能够快速提供实时、准确和基于事实的结果。

每月 1,000 次免费搜索

Tavily Search | 🦜️🔗 LangChain 框架

获取apiTavily AI —- Tavily AI

添加工具

1
2
3
4
5
6
7
from langchain_tavily import TavilySearch

tool = TavilySearch(
tavily_api_key="tvly-dev-",
max_results=2)
tools = [tool]
tool.invoke("李超是谁?")

定义图

在LLM上添加bind_tools。这让LLM知道如果它想使用搜索引擎,应使用正确的JSON格式。

定义聊天模型llm(代码同上)

将tools整合到StateGraph

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from typing import Annotated

from typing_extensions import TypedDict

from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages

class State(TypedDict):
messages: Annotated[list, add_messages]

graph_builder = StateGraph(State)

# 将一个或多个**工具(tools) 绑定到一个 大型语言模型(LLM)**上,从而创建一个新的、具备工具调用能力的 LLM 实例
llm_with_tools = llm.bind_tools(tools)

def chatbot(state: State):
return {"messages": [llm_with_tools.invoke(state["messages"])]}

graph_builder.add_node("chatbot", chatbot)

创建一个运行工具的函数

现在,创建一个函数来运行被调用的工具。通过将工具添加到一个名为BasicToolNode的新节点来完成,该节点检查状态中的最新消息,如果消息包含tool_calls,则调用工具。它依赖于LLM的tool_calling支持,该支持在Anthropic、OpenAI、Google Gemini以及许多其他LLM提供商中可用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# 定义 __call__ 方法,让这个类的实例可以像函数一样被调用
# inputs 是 langgraph 传进来的当前状态,是一个字典
def __call__(self, inputs: dict):
# 1. 从状态中获取最新的消息
# 使用了“海象操作符” :=,先从 inputs 中获取 'messages' 列表,如果不存在则返回空列表 []
# 然后检查列表是否为空。如果不为空,则取出最后一条消息。
if messages := inputs.get("messages", []):
message = messages[-1] # 通常,最后一条消息是 AI 发出的,其中包含工具调用请求
else:
# 如果没有消息,就报错,因为这个节点不知道该做什么
raise ValueError("No message found in input")

# 2. 准备一个列表,用来存放所有工具的执行结果
outputs = []

# 3. 遍历 AI 消息中请求的所有工具调用
for tool_call in message.tool_calls:

# 4. 执行工具
# a. tool_call["name"] 获取工具名称 (例如 'tavily_search_results_json')
# b. self.tools_by_name[...] 从预存的工具字典中找到对应的工具对象
# c. .invoke(tool_call["args"]) 使用 LLM 提供的参数来调用该工具
tool_result = self.tools_by_name[tool_call["name"]].invoke(
tool_call["args"]
)

# 5. 将工具执行结果打包成 ToolMessage
# 这是 langgraph/langchain 的标准格式,用于告诉 LLM 工具执行的结果是什么
outputs.append(
ToolMessage(
content=json.dumps(tool_result), # 工具结果必须是字符串,所以用 json.dumps 序列化
name=tool_call["name"], # 告诉 LLM 这是哪个工具的结果
tool_call_id=tool_call["id"], # 必须提供原始请求的 ID,以便 LLM 知道这个结果对应哪个请求
)
)

# 6. 返回结果,更新图的状态
# 返回一个字典,其中 'messages' 键对应着包含所有 ToolMessage 的列表
# langgraph 会将这个列表中的消息追加到主状态的 'messages' 列表中
return {"messages": outputs}

call 是 Python 中一个非常特殊的“魔术方法”(magic method)。它的作用是 让一个类的实例(对象)能够像函数一样被调用 。

这在 langgraph 中是一种常见且核心的设计模式。它的含义是:

  1. 节点即函数 : BasicToolNode 的实例(比如 tool_node )本身就代表了图中的一个可执行节点。
  2. 执行逻辑 :当 langgraph 的状态机运行到这个 tool_node 节点时,它会直接“调用”这个节点对象,并把当前的状态( inputs 字典)传递给它

可以使用LangGraph预构建的ToolNode

1
2
3
4
from langgraph.prebuilt import ToolNode

tool_node = ToolNode(tools=[tool])
graph_builder.add_node("tools", tool_node)

定义conditional_edges

添加了工具节点后,现在您可以定义conditional_edges

边(Edges)将控制流从一个节点路由到下一个节点。条件边(Conditional edges)从单个节点开始,通常包含“if”语句,根据当前图状态路由到不同的节点。这些函数接收当前的图state并返回一个字符串或字符串列表,指示接下来要调用哪个(或哪些)节点。

接下来,定义一个名为route_tools的路由函数,它检查聊天机器人输出中的tool_calls。通过调用add_conditional_edges将此函数提供给图,这会告诉图,无论何时chatbot节点完成,都要检查此函数以确定下一步去哪里。

如果存在工具调用,条件将路由到tools;如果不存在,则路由到END。由于条件可以返回END,因此这次您不需要明确设置finish_point

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def route_tools(
state: State,
):
"""
用于 conditional_edge 的路由函数:
- 如果最后一条消息包含工具调用,则路由到 ToolNode
- 否则路由到结束节点
"""
# 处理 state 为列表的情况(可能是消息列表)
if isinstance(state, list):
ai_message = state[-1] # 获取最后一条消息
# 处理 state 为字典的情况(包含 messages 字段)
elif messages := state.get("messages", []):
ai_message = messages[-1] # 获取最后一条消息
else:
raise ValueError(f"输入状态中没有找到消息: {state}")

# 检查消息是否有工具调用
if hasattr(ai_message, "tool_calls") and len(ai_message.tool_calls) > 0:
return "tools" # 有工具调用,返回 "tools" 路由到工具节点
return END # 没有工具调用,返回 END 结束流程

可以使用预构建的tools_condition代替route_tools以使其更简洁。

tools_condition 函数在聊天机器人需要使用工具时返回 “tools”,如果可以不使用响应则返回 “END”。

1
2
3
4
5
6
7
8
9
from langgraph.prebuilt import tools_condition
graph_builder.add_conditional_edges(
"chatbot",
tools_condition,
{"tools": "tools", END: END},
)
graph_builder.add_edge("tools", "chatbot")
graph_builder.add_edge(START, "chatbot")
graph = graph_builder.compile()

可视化图

如上

向机器人提问

现在您可以向聊天机器人提出超出其训练数据范围的问题。

如上

添加记忆功能

LangGraph 通过持久性检查点解决了这个问题。如果您在编译图时提供一个checkpointer,并在调用图时提供一个thread_id,LangGraph 会在每一步之后自动保存状态。当您使用相同的thread_id再次调用图时,图会加载其保存的状态,允许聊天机器人从上次中断的地方继续。

我们稍后会看到,检查点比简单的聊天记忆功能强大得多——它允许您随时保存和恢复复杂状态,用于错误恢复、人工干预工作流、时间旅行交互等。但首先,让我们添加检查点以实现多轮对话。

创建 MemorySaver 检查点

1
2
3
from langgraph.checkpoint.memory import MemorySaver

memory = MemorySaver()

这是一个内存中的检查点,方便本教程使用。然而,在生产应用程序中,您可能会将其更改为使用 SqliteSaverPostgresSaver 并连接数据库。

编译图

使用提供的检查点编译图,图在遍历每个节点时将对 State 进行检查点。

1
graph = graph_builder.compile(checkpointer=memory)

与您的聊天机器人互动

  1. 选择一个线程作为此对话的键。

    thread_id决定对话窗口

    1
    config = {"configurable": {"thread_id": "1"}}
  2. 调用您的聊天机器人

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    user_input = "我是谁"

    # The config is the **second positional argument** to stream() or invoke()!
    events = graph.stream(
    {"messages": [{"role": "user", "content": user_input}]},
    config,
    stream_mode="values",
    )
    for event in events:
    event["messages"][-1].pretty_print()

添加人工干预

代理可能不可靠,并且可能需要人工输入才能成功完成任务。同样,对于某些操作,您可能需要在运行前要求人工批准,以确保一切按预期运行。

LangGraph 的持久化层支持人工干预工作流,允许根据用户反馈暂停和恢复执行。此功能的主要接口是interrupt函数。在节点内调用interrupt将暂停执行。通过传入Command,可以恢复执行并接收来自人工的新输入。interrupt在功能上类似于 Python 的内置input()但有一些注意事项

添加human_assistance工具

human_assistance工具添加到聊天机器人。此工具使用interrupt从人工接收信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 添加人工干预功能
# 导入LangGraph的中断机制和命令类型
from langgraph.types import Command, interrupt
# 导入LangChain的工具装饰器
from langchain_core.tools import tool

# 使用@tool装饰器将函数标记为可被LLM调用的工具
@tool
def human_assistance(query: str) -> str:
"""请求人工协助的工具函数。
当LLM遇到需要人工判断或帮助的情况时,会调用此工具。
该函数会暂停图的执行,等待人工操作员提供响应。
"""
# interrupt()函数会暂停图的执行,等待人工输入
# 传入的字典包含查询信息,人工操作员会看到这个查询
human_response = interrupt({"query": query})

# 从人工响应中提取数据并返回给LLM
# human_response是一个字典,"data"字段包含人工提供的实际响应
return human_response["data"]

简单来说, 调用哪个工具,以及何时调用,完全是由大语言模型(LLM)根据你给它的指令(Prompt)来决定的。

1
2
3
tool = TavilySearch(max_results=2)
tools = [tool, human_assistance]
llm_with_tools = llm.bind_tools(tools)

定义chatbot

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def chatbot(state: State):
# 调用绑定了工具的LLM(llm_with_tools),传入当前的消息历史
# LLM会根据最新的消息决定是生成文本回复,还是调用一个或多个工具
message = llm_with_tools.invoke(state["messages"])

# --- 关键断言逻辑 ---
assert len(message.tool_calls) <= 1

# 将LLM生成的新消息(可能是文本回复,也可能是工具调用请求)返回
# 这个返回值会以字典的形式更新到状态(State)对象中
return {"messages": [message]}

# 将 chatbot 函数作为名为 "chatbot" 的节点添加到图构建器中
graph_builder.add_node("chatbot", chatbot)

中断安全性的断言 ( assert ) : assert len(message.tool_calls) <= 1 是在实现人工干预时一个非常重要的 安全措施 。

  • 问题 : 现代 LLM 支持并行工具调用(一次请求执行多个工具)。但如果其中一个工具是 human_assistance 并触发了中断,整个图会暂停。当人工操作完成后,图会从中断点恢复。此时,如果不对工具调用数量做限制,LangGraph 可能会重新尝试执行所有在中断前请求的工具,导致已经执行过的工具被再次调用。
  • 解决方案 : 这个断言强制要求 LLM 在每一步最多只能请求调用一个工具。这样就保证了当中断发生并恢复后,不会有重复执行工具的风险,确保了流程的稳定性和可预测性。

编译图

1
2
3
memory = MemorySaver()

graph = graph_builder.compile(checkpointer=memory)

调用聊天机器人并中断

1
2
3
4
5
6
7
8
9
10
11
12
user_input = "我需要一些关于构建 AI 代理的专家指导。你能帮我请求协助吗?"
config = {"configurable": {"thread_id": "1"}}

events = graph.stream(
{"messages": [{"role": "user", "content": user_input}]},
config,
#它的作用是指定在进行流式处理时,你希望接收到的数据是以 完整的、累积的值 的形式返回,而不是以增量的、片段的形式返回。
stream_mode="values",
)
for event in events:
if "messages" in event:
event["messages"][-1].pretty_print()

聊天机器人生成了一个工具调用,但随后执行被中断。如果您检查图状态,您会看到它停止在工具节点

1
2
snapshot = graph.get_state(config)
snapshot.next

恢复执行

要恢复执行,请传入一个包含工具所需数据的Command对象。此数据的格式可以根据需要进行自定义。对于本示例,请使用一个带有键"data"的字典(由human_assistance决定)

1
2
3
4
5
6
7
8
9
10
human_response = (
"我们专家在此为您提供帮助!我们建议您查看 LangGraph 来构建您的代理。它比简单的自主代理更可靠、更具可扩展性。"
)
#从暂停状态恢复执行
human_command = Command(resume={"data": human_response})

events = graph.stream(human_command, config, stream_mode="values")
for event in events:
if "messages" in event:
event["messages"][-1].pretty_print()

1.工具的定义 ( human_assistance function):

  • 当 LLM 调用 human_assistance 工具时,这个函数被执行。
  • 函数内部, interrupt() 被调用,导致图暂停,并等待人工输入。
  • 在图恢复后, interrupt() 函数会返回一个值,这个值就是您通过 Command(resume=…) 注入的内容,也就是 {“data”: human_response} 。
  • 因此, human_assistance 函数中的 human_response 变量实际上就等于 {“data”: human_response} 。
  • 最后, return human_response[“data”] 从这个字典中提取出 “data” 键对应的值 ,并将其作为 human_assistance 工具的最终返回结果。

2.恢复指令 ( Command(resume=…) ):

  • 当您构建 Command(resume={“data”: human_response}) 时,您正在创建一个符合 human_assistance 函数期望的结构。
  • 您将人工回复包装在一个字典里,并使用 “data” 作为键。
  • 这个结构被传递回 interrupt() ,然后被 human_assistance 函数接收和解析。

因为 human_assistance 函数的 return 语句期望从返回的字典中访问 “data” 键,所以我们在恢复执行时必须提供一个具有相同结构的字典。这是为了确保数据能够正确地在中断和恢复的过程中传递。

在 LangGraph 中,Command 是一个用于控制图执行流程、更新状态、实现人机交互的核心类。它支持以下四个参数:

参数名 类型 说明
update dict 用于更新图的状态(state)。例如:Command(update={"foo": "bar"})
resume Any interrupt() 配合使用,用于恢复被中断的图执行,并传递用户输入。
goto strSend 或 `List[str Send]` 控制下一步要执行的节点,支持跳转到指定节点、多个节点序列,或使用 Send 对象。
graph str 可选,指定命令作用的图。默认是当前图,也可以设为 Command.PARENT 表示父图。

自定义状态

在本教程中,您将向状态添加额外字段,以定义复杂行为,而无需依赖消息列表。聊天机器人将使用其搜索工具查找特定信息,并将其转发给人工进行审查。

向状态添加键

通过向状态添加 namebirthday 键,更新聊天机器人以研究实体的生日

1
2
3
4
class State(TypedDict):
messages: Annotated[list, add_messages]
name: str
birthday: str

将此信息添加到状态中,可以使其轻松被其他图节点(例如存储或处理信息的下游节点)以及图的持久层访问。

在工具内部更新状态

现在,在 human_assistance 工具内部填充状态键。这允许人工在信息存储到状态之前对其进行审查。使用 Command工具内部发出状态更新。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# 从 langchain_core.messages 导入 ToolMessage,用于创建工具调用的响应消息
from langchain_core.messages import ToolMessage
# 从 langchain_core.tools 导入 InjectedToolCallId(用于自动注入工具调用ID)和 tool(工具装饰器)
from langchain_core.tools import InjectedToolCallId, tool

# 从 langgraph.types 导入 Command(用于向图发送指令)和 interrupt(用于中断图的执行)
from langgraph.types import Command, interrupt
from typing import Annotated
# @tool 装饰器将这个函数声明为一个可供 LLM 调用的工具
@tool
def human_assistance(
name: str,
birthday: str,
# tool_call_id 这个参数非常特殊。Annotated[...] 和 InjectedToolCallId 告诉 LangGraph:
# 1. 这个参数不应暴露给 LLM,LLM 在调用此工具时不需要提供它。
# 2. LangGraph 在执行时,会自动将触发此工具的那个工具调用的 ID 注入到这个参数中。
# 这个 ID 对于创建与原始请求相关联的 ToolMessage 至关重要。
tool_call_id: Annotated[str, InjectedToolCallId]
) -> str:
"""当需要人工确认或更正信息时,请求人类协助。"""
# 调用 interrupt() 来暂停图的执行,并向人类审核者呈现一个包含问题和待确认数据的字典。
# 图会在此处暂停,直到人类通过 resume 指令提供了响应。
human_response = interrupt(
{
"question": "Is this correct?",
"name": name,
"birthday": birthday,
},
)
# 检查人类的响应。如果响应中 'correct' 键的值是 'yes' 或 'y' 开头,
# 则认为信息是正确的。
if human_response.get("correct", "").lower().startswith("y"):
# 如果信息正确,直接使用从 LLM 获取的原始信息。
verified_name = name
verified_birthday = birthday
response = "Correct"
# 否则,认为人类审核者提供了更正后的信息。
else:
# 从人类的响应中获取更正后的姓名和生日。
# 如果人类没有提供新的值,则使用 .get() 的默认值,即原始值。
verified_name = human_response.get("name", name)
verified_birthday = human_response.get("birthday", birthday)
response = f"Made a correction: {human_response}"

# 在工具内部直接构造一个用于更新图状态的字典。
state_update = {
"name": verified_name, # 更新状态中的 'name' 字段
"birthday": verified_birthday, # 更新状态中的 'birthday' 字段
# 创建一个 ToolMessage,将其添加到状态的 'messages' 列表中。
# 这个消息将作为此工具调用的正式“答复”出现在对话历史中。
# tool_call_id 是必需的,用于将此答复与 LLM 的原始工具调用请求关联起来。
"messages": [ToolMessage(response, tool_call_id=tool_call_id)],
}
# 这个工具不返回一个简单的字符串或数字,而是返回一个 Command 对象。
# Command(update=...) 是一个明确的指令,告诉 LangGraph 执行器:
# “请不要将我的返回值当作普通工具输出,而是用 state_update 字典里的内容来直接更新当前的图状态。”
return Command(update=state_update)

图的其余部分保持不变。

提示聊天机器人调用人工审查

提示聊天机器人查找 LangGraph 库的“生日”,并在其获取所需信息后,指示聊天机器人使用 human_assistance 工具。通过在工具参数中设置 namebirthday,您将强制聊天机器人为这些字段生成提议。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
user_input = (
"你能查一下 LangGraph 是什么时候发布的吗? "
"当你有了答案后,使用 human_assistance 工具进行审查。"
)
config = {"configurable": {"thread_id": "1"}}

events = graph.stream(
{"messages": [{"role": "user", "content": user_input}]},
config,
stream_mode="values",
)
for event in events:
if "messages" in event:
event["messages"][-1].pretty_print()

我们再次在 human_assistance 工具中触发了 interrupt

添加人工协助

聊天机器人未能识别正确的日期,因此为其提供信息

1
2
3
4
5
6
7
8
9
10
11
human_command = Command(
resume={
"name": "LangGraph",
"birthday": "Jan 17, 2024",
},
)

events = graph.stream(human_command, config, stream_mode="values")
for event in events:
if "messages" in event:
event["messages"][-1].pretty_print()

请注意,这些字段现在已反映在状态中

1
2
3
snapshot = graph.get_state(config)

{k: v for k, v in snapshot.values.items() if k in ("name", "birthday")}

这使得下游节点(例如,进一步处理或存储信息的节点)可以轻松访问它们。

时间功能(从之前的某个状态开始)

在典型的聊天机器人工作流程中,用户与机器人进行一次或多次交互以完成任务。记忆人工干预功能可以为图状态启用检查点并控制未来的响应。

如果您希望用户能够从之前的响应开始并探索不同的结果,该怎么办?或者,如果您希望用户能够回溯聊天机器人的工作以纠正错误或尝试不同的策略,这在自主软件工程师等应用程序中很常见,那又该怎么办?

您可以使用 LangGraph 内置的时光旅行功能创建这些类型的体验。

回溯您的图

通过使用图的get_state_history方法获取检查点来回溯您的图。然后,您可以从之前的这个时间点恢复执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 初始化一个变量 to_replay 为 None,它将用于存储我们想要“时间旅行”回去的特定状态。
# to_replay 将在循环中被赋值为我们感兴趣的那个历史状态快照。
to_replay = None

# 遍历 `graph` 在给定 `config` 下的所有历史状态。
# `graph.get_state_history(config)` 会返回一个迭代器,其中包含了从开始到当前的所有状态快照。
for state in graph.get_state_history(config):
# 打印当前状态快照中的一些信息,以便我们观察和选择。
# `len(state.values["messages"])` 显示了到该状态为止,对话历史中的消息总数。
# `state.next` 显示了在该状态之后,图将要执行的下一个节点或步骤的名称。
print("Num Messages: ", len(state.values["messages"]), "Next: ", state.next)

# 打印一条分隔线,使输出更易读。
print("-" * 80)

# 这里是选择“时间旅行”目标点的关键逻辑。
# 我们设定一个条件:当对话历史中的消息数量正好等于4时,我们就找到了想要回到的那个点。
# 这是一个为了演示而设定的任意条件,在实际应用中,您可以根据需要设置更复杂的选择逻辑。
if len(state.values["messages"]) == 4:
# We are somewhat arbitrarily selecting a specific state based on the number of chat messages in the state.
# 将当前这个符合条件的状态(state)保存到 to_replay 变量中。
# 循环结束后,to_replay 变量将持有我们选中的那个历史时刻的完整状态,
# 之后我们就可以用它来恢复或修改执行流程。
to_replay = state

图的每一步都会保存检查点。这跨越了调用,因此您可以回溯整个线程的历史。

从特定时间点加载状态

to_replay状态恢复。从这一点恢复将接下来调用action节点。

1
2
print(to_replay.next)
print(to_replay.config)

检查点的to_replay.config包含一个checkpoint_id时间戳。提供此checkpoint_id值会告诉 LangGraph 的检查点器从该时间点加载状态。

1
2
3
for event in graph.stream(None, to_replay.config, stream_mode="values"):
if "messages" in event:
event["messages"][-1].pretty_print()

运行本地服务器

安装 LangGraph CLI

1
2
3
# Python >= 3.11 is required.

pip install --upgrade "langgraph-cli[inmem]"

创建 LangGraph 应用 🌱

new-langgraph-project-python 模板new-langgraph-project-js 模板 创建一个新应用。此模板展示了一个单节点应用程序,您可以根据自己的逻辑进行扩展。

1
langgraph new . --template new-langgraph-project-python

使用 langgraph new 而不指定模板,系统将显示一个交互式菜单,您可以从中选择可用的模板列表。

使用uv安装依赖项

uv 是一个用 Rust 编写的极速 Python 包和项目管理器 。它旨在解决传统 Python 包管理工具(如 pip 、 poetry 等)在速度和效率方面的痛点,提供更快的安装、依赖解析和环境管理。

1
pip install uv#安装uv

运行uv sync会根据 pyproject.toml的依赖创建虚拟环境并安装依赖

pyproject.toml 文件是 Python 项目中用于统一配置项目元数据、构建系统、依赖管理和各种工具设置的标准化文件。它通常用于替代旧的 requirements.txt 文件,提供更现代和集中的项目配置方式。

创建一个 .env 文件

您将在新 LangGraph 应用的根目录下找到一个 .env.example 文件。在新 LangGraph 应用的根目录下创建一个 .env 文件,并将 .env.example 文件的内容复制到其中,填入所需的 API 密钥。

添加环境变量如LANGSMITH_API_KEY,OPENAI_API_KEY等

启动 LangGraph 服务器

在本地启动 LangGraph API 服务器

1
langgraph dev

示例输出

1
2
3
4
5
6
7
>    Ready!
>
> - API: [https://:2024](https://:2024/)
>
> - Docs: https://:2024/docs
>
> - LangGraph Studio Web UI: https://smith.langchain.com/studio/?baseUrl=http://127.0.0.1:2024

LangGraph 服务器(如您通过 langgraph dev 命令启动的服务器)的主要作用是提供一个运行环境和接口,用于开发、测试、部署和管理基于 LangGraph 构建的 AI 代理和应用程序。具体来说,它有以下几个主要用途:

  1. API 接口暴露 :它将您用 LangGraph 定义的复杂代理逻辑(即图结构)通过标准的 RESTful API 接口暴露出来。这意味着其他应用程序、前端界面或者其他服务可以通过 HTTP 请求与您的 LangGraph 代理进行交互,而无需直接集成 LangGraph 的 Python 代码。
  2. 简化部署 :通过将 LangGraph 应用程序打包成一个可运行的服务,您可以更容易地将其部署到云服务器、容器(如 Docker)或其他生产环境中。这使得 LangGraph 代理可以作为一个独立的微服务运行,方便扩展和管理。
  3. 开发和调试便利 :

    • 实时预览和调试 :服务器通常会提供一个 Studio UI(如您在 http://127.0.0.1:2024/studio 看到的),让开发者能够可视化地查看代理的图结构、执行流程、状态变化和中间步骤,这对于理解和调试复杂的代理行为至关重要。
    • API 文档 :自动生成的 API 文档(如 http://127.0.0.1:2024/docs )提供了所有可用接口的详细说明和交互式测试功能,极大地加速了开发和集成过程。
  4. 状态管理和持久化 :LangGraph 代理通常涉及复杂的状态管理。服务器可以负责处理这些状态的持久化,确保代理在多次交互之间能够记住上下文和历史信息。

在 LangGraph Studio 中测试您的应用程序

LangGraph Studio 是一个专门的 UI,您可以连接到 LangGraph API 服务器,以便在本地可视化、交互和调试您的应用程序。通过访问 langgraph dev 命令输出中提供的 URL,在 LangGraph Studio 中测试您的图。

1
>    - LangGraph Studio Web UI: https://smith.langchain.com/studio/?baseUrl=http://127.0.0.1:2024

参考资料

构建一个基本聊天机器人 - LangChain 框架

Overview - Docs by LangChain

官方教程,但是英文https://academy.langchain.com/collections

3.5 小时出证!LangGraph 官方课程 🆓 重磅上线🔥🔥🔥_哔哩哔哩_bilibili

配置langsmith

安装LangSmith SDK

1
pip install langsmith

环境变量

获取apiLangSmith

设置相应的环境变量。这将把跟踪记录到default项目(尽管您可以轻松更改)。

1
2
3
export LANGSMITH_TRACING=true
export LANGSMITH_API_KEY=
export LANGSMITH_PROJECT=default
1
2
3
4
5
LANGSMITH_TRACING=true
LANGSMITH_ENDPOINT="https://api.smith.langchain.com"
LANGSMITH_API_KEY="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"
LANGSMITH_PROJECT="langgraph"

资源

官网《LangSmith》 —- LangSmith

参考文档LangSmith 入门 | 🦜️🛠️ LangSmith 文档

Get started with LangSmith | 🦜️🛠️ LangSmith

注入上下文

“注入上下文”就是在运行过程中节点/大模型 可能需要、但不会(也不应该)去改变的只读信息的集合。

场景 注入上下文里可能放什么
权限控制 user_id, tenant_id(决定能访问哪些数据)
外部依赖 db_connection, api_key, s3_bucket(节点里要用)
个性化参数 language, timezone, model_temperature
会话元信息 session_id, channel(Slack / 微信 / Web)

案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from dataclasses import dataclass

from langgraph.graph import StateGraph
from langgraph.runtime import Runtime

@dataclass
class Context:
"""Context schema defined by the developer."""
user_id: str
db_connection: str

def node(state: State, runtime: Runtime[Context]):
# type safe access to context attributes
user_id = runtime.context.user_id
db_conn = runtime.context.db_connection
...

builder = StateGraph(state_schema=State, context_schema=Context)

# add nodes, edges, compile the graph...

# top level context arg is typed as Context for autocomplete and type checking
result = graph.invoke(
{'input': 'abc'},
context=Context(user_id='123', db_conn='conn_mock')
)

Runtime 类提供了一个单一接口,用于访问信息,例如:

  • 上下文:在运行开始时传递的静态数据
  • 存储:长期记忆的存储机制
  • 流写入器:用于向图输出流写入的自定义函数
  • 对于功能 API 用户,previous 也可用:给定线程的前一个返回值

现在,开发者不再需要将上述所有内容作为单独的参数注入到节点函数中,
而是可以通过一个 runtime 参数来访问它们。

环境配置

python虚拟环境构建

1
python -m venv .venv
1
2
3
4
5
pip install langgraph==0.2.74                  
pip install langchain-openai==0.3.6
pip install fastapi==0.115.8
pip install uvicorn==0.34.0
pip install gradio==5.18.0

查看包pip list

构建一个基本的fastapi+langgraph应用

llm示例的构建(利用ChatOpenAI)

1
2
3
4
5
6
7
8
9
# 创建LLM实例
llm = ChatOpenAI(
base_url=config["base_url"],
api_key=config["api_key"],
model=config["model"],
temperature=DEFAULT_TEMPERATURE,
timeout=30, # 添加超时配置(秒)
max_retries=2 # 添加重试次数
)

数据类型的构建

继承于pydantic

规范化 API 请求和响应的数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# 定义消息类,用于封装API接口返回数据
#基于 Pydantic 的数据模型
# 定义Message类
class Message(BaseModel):
'''
role (角色): 这是一个字符串,表示消息的发送者。常见的角色包括:

- user (用户): 表示用户输入的消息。
- assistant (助手): 表示聊天机器人或模型生成的消息。
- system (系统): 表示为模型提供上下文或指令的系统消息。
'''
role: str
content: str

# 定义ChatCompletionRequest类
#聊天 API 请求
class ChatCompletionRequest(BaseModel):
messages: List[Message]
stream: Optional[bool] = False#是否流式方式响应
userId: Optional[str] = None#用于标识发起请求的用户
conversationId: Optional[str] = None#用于标识特定的对话会话,这对于管理对话上下文或历史记录非常有用

# 定义ChatCompletionResponseChoice类
#聊天完成响应中的一个“选择”或一个生成的回复
class ChatCompletionResponseChoice(BaseModel):
index: int
message: Message
finish_reason: Optional[str] = None

# 定义ChatCompletionResponse类
class ChatCompletionResponse(BaseModel):
id: str = Field(default_factory=lambda: f"chatcmpl-{uuid.uuid4().hex}")
object: str = "chat.completion"
created: int = Field(default_factory=lambda: int(time.time()))
choices: List[ChatCompletionResponseChoice]#模型生成的所有可能的回复选项
system_fingerprint: Optional[str] = None

定义fastapi应用并管理应用的生命周期

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# 定义了一个异步函数lifespan,它接收一个FastAPI应用实例app作为参数。这个函数将管理应用的生命周期,包括启动和关闭时的操作
# 函数在应用启动时执行一些初始化操作,如加载上下文数据、以及初始化问题生成器
# 函数在应用关闭时执行一些清理操作
# @asynccontextmanager 装饰器用于创建一个异步上下文管理器,它允许你在 yield 之前和之后执行特定的代码块,分别表示启动和关闭时的操作
@asynccontextmanager
async def lifespan(app: FastAPI):
# 启动时执行
# 申明引用全局变量,在函数中被初始化,并在整个应用中使用
global graph

try:
logger.info("正在初始化模型、定义Graph...")
#(1)初始化LLM
llm = get_llm(llm_type)
#(2)定义Graph
graph = create_graph(llm)
#(3)将Graph可视化图保存
save_graph_visualization(graph)
logger.info("初始化完成")
except Exception as e:
logger.error(f"初始化过程中出错: {str(e)}")
# raise 关键字重新抛出异常,以确保程序不会在错误状态下继续运行
raise

# yield 关键字将控制权交还给FastAPI框架,使应用开始运行
# 分隔了启动和关闭的逻辑。在yield 之前的代码在应用启动时运行,yield 之后的代码在应用关闭时运行
yield
# 关闭时执行
logger.info("正在关闭...")


# lifespan参数用于在应用程序生命周期的开始和结束时执行一些初始化或清理工作
app = FastAPI(lifespan=lifespan)

langgraph核心逻辑

创建langgraph

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 定义chatbot的状态
class State(TypedDict):
messages: Annotated[list, add_messages]

# 创建和配置chatbot的状态图
def create_graph(llm) -> StateGraph:
try:
# 构建graph
#创建一个 StateGraph 的实例,并将其配置为使用 State 类作为其状态管理的数据模型
graph_builder = StateGraph(State)

# 定义chatbot的node
def chatbot(state: State) -> dict:
# 处理当前状态并返回 LLM 响应
return {"messages": [llm.invoke(state["messages"])]}

# 配置graph
#第二个参数 chatbot :这是一个可调用对象(通常是一个函数或方法),它定义了当执行流程到达这个名为 "chatbot" 的节点时,应该执行什么操作。
graph_builder.add_node("chatbot", chatbot)
graph_builder.add_edge(START, "chatbot")
graph_builder.add_edge("chatbot", END)

# 这里使用内存存储 也可以持久化到数据库
memory = MemorySaver()

# 编译生成graph并返回
#checkpointer 参数将 memory 实例传递给编译过程,使得图能够管理其状态的保存和加载。编译后的图对象被返回,这个对象可以被调用来运行聊天机器人。
return graph_builder.compile(checkpointer=memory)

except Exception as e:
raise RuntimeError(f"Failed to create graph: {str(e)}")

可视化langgraph节点

1
2
3
4
5
6
7
8
# 将构建的graph可视化保存为 PNG 文件
def save_graph_visualization(graph: StateGraph, filename: str = "graph.png") -> None:
try:
with open(filename, "wb") as f:
f.write(graph.get_graph().draw_mermaid_png())
logger.info(f"Graph visualization saved as {filename}")
except IOError as e:
logger.info(f"Warning: Failed to save graph visualization: {str(e)}")

封装接口

包含流式输出与非流式输出的处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# 封装POST请求接口,与大模型进行问答
@app.post("/v1/chat/completions")
async def chat_completions(request: ChatCompletionRequest):
# 判断初始化是否完成
if not graph:
logger.error("服务未初始化")
raise HTTPException(status_code=500, detail="服务未初始化")

try:
logger.info(f"收到聊天完成请求: {request}")

query_prompt = request.messages[-1].content
logger.info(f"用户问题是: {query_prompt}")

config = {"configurable": {"thread_id": request.userId+"@@"+request.conversationId}}
logger.info(f"用户当前会话信息: {config}")

prompt_template_system = PromptTemplate.from_file(PROMPT_TEMPLATE_TXT_SYS)
prompt_template_user = PromptTemplate.from_file(PROMPT_TEMPLATE_TXT_USER)
prompt = [
{"role": "system", "content": prompt_template_system.template},
{"role": "user", "content": prompt_template_user.template.format(query=query_prompt)}
]

# 处理流式响应
if request.stream:
async def generate_stream():
chunk_id = f"chatcmpl-{uuid.uuid4().hex}"
async for message_chunk, metadata in graph.astream({"messages": prompt}, config, stream_mode="messages"):
chunk = message_chunk.content
logger.info(f"chunk: {chunk}")
# 在处理过程中产生每个块
yield f"data: {json.dumps({'id': chunk_id,'object': 'chat.completion.chunk','created': int(time.time()),'choices': [{'index': 0,'delta': {'content': chunk},'finish_reason': None}]})}\n\n"
# 流结束的最后一块
yield f"data: {json.dumps({'id': chunk_id,'object': 'chat.completion.chunk','created': int(time.time()),'choices': [{'index': 0,'delta': {},'finish_reason': 'stop'}]})}\n\n"
# 返回fastapi.responses中StreamingResponse对象
return StreamingResponse(generate_stream(), media_type="text/event-stream")

# 处理非流式响应处理
else:
try:
events = graph.stream({"messages": prompt}, config)
for event in events:
for value in event.values():
result = value["messages"][-1].content
except Exception as e:
logger.info(f"Error processing response: {str(e)}")

formatted_response = str(format_response(result))
logger.info(f"格式化的搜索结果: {formatted_response}")
#封装响应
response = ChatCompletionResponse(
choices=[
ChatCompletionResponseChoice(
index=0,
message=Message(role="assistant", content=formatted_response),
finish_reason="stop"
)
]
)
logger.info(f"发送响应内容: \n{response}")
# 返回fastapi.responses中JSONResponse对象
# model_dump()方法通常用于将Pydantic模型实例的内容转换为一个标准的Python字典,以便进行序列化
return JSONResponse(content=response.model_dump())

except Exception as e:
logger.error(f"处理聊天完成时出错:\n\n {str(e)}")
raise HTTPException(status_code=500, detail=str(e))

langgraph的短期记忆与长期记忆

LangGraph支持两种对于构建对话代理至关重要的内存类型:

  • 短期内存:通过在会话中维护消息历史来跟踪正在进行的对话。
  • 长期内存:在不同会话之间存储用户特定或应用程序级别的数据。

image-20250715094855646

在LangGraph中

  • 短期内存也称为线程级内存
  • 长期内存也称为跨线程内存

教程地址

NanGePlus/LangGraphChatBot: 使用LangGraph+DeepSeek-R1+FastAPI+Gradio实现一个带有记忆功能的流量包推荐智能客服web端用例,同时也支持gpt大模型、国产大模型(OneApi方式)、Ollama本地开源大模型、阿里通义千问大模型

LangGraph+deepseek-r1+FastAPI+Gradio实现拥有记忆的流量包推荐智能客服web端用例,同时也支持gpt、国产大模型、Ollama_哔哩哔哩_bilibili

prompt Engineering

Prompt Engineering是与大型语言模型(LLM)交互的基础,其核心在于精心设计输入内容,以引导模型生成期望的输出。

尽管 Prompt Engineering 至关重要,但对于构建稳健、可用于生产环境的系统而言,它存在固有的局限性:

  • 脆弱性&不可复现性: 提示中微小的措辞变化可能导致输出结果的巨大差异,使得这一过程更像是一种依赖反复试错的“艺术”,而非可复现的“科学” 。

  • 扩展性差: 手动、迭代地优化提示的过程,在面对大量用户、多样化用例和不断出现的边缘情况时,难以有效扩展 。

  • 用户负担: 这种方法将精心构建一套详尽指令的负担完全压在了用户身上,对于需要自主运行、或处理高并发请求的系统而言是不切实际的 。

  • 无状态性: Prompt Engineering 本质上是为单轮、“一次性”的交互而设计的,难以处理需要记忆和状态管理的长对话或多步骤任务 。

Context Engineering

Context Engineering是一门设计、构建并优化动态自动化系统的学科,旨在为大型语言模型在正确的时间、以正确的格式,提供正确的信息和工具,从而可靠、可扩展地完成复杂任务

prompt 告诉模型如何思考,而 Context 则赋予模型完成工作所需的知识和工具。

  • Context Engineering 决定用什么内容填充 Context Window

  • Prompt Engineering 则负责优化窗口内的具体指令

Context Engineering 的基石:RAG(Retrieval-Augmented Generation)

本部分将阐述检索增强生成(RAG)作为实现 Context Engineering 的主要架构模式。

解决LLM的核心弱点

RAG直接解决了标准LLM在企业应用中存在的固有局限性:

  • 知识冻结: LLM的知识被冻结在其训练数据的时间点。RAG通过在推理时注入实时的、最新的信息来解决这个问题 。

  • 缺乏领域专有知识: 标准LLM无法访问组织的内部私有数据。RAG则能够将LLM连接到这些内部知识库,如技术手册、政策文件等 。

  • 幻觉(Hallucination): LLM 会不同程度上地编造事实。RAG通过将模型的回答“锚定”在可验证的、检索到的证据上,提高事实的准确性和可信度 。

RAG工作流

  1. 索引(离线阶段): 在这个阶段,系统会处理外部知识源。文档被加载、分割成更小的 chunks,然后通过Embedding Model 转换为向量表示,并最终存储在专门的向量数据库中以备检索 。

  2. 推理(在线阶段): 当用户提出请求时,系统执行以下步骤:

    1. 检索(Retrieve): 将用户的查询同样转换为向量,然后在向量数据库中进行相似性搜索,找出与查询最相关的文档块。
    2. 增强(Augment): 将检索到的这些文档块与原始的用户查询、系统指令等结合起来,构建一个内容丰富的、增强的最终提示。
    3. 生成(Generate): 将这个增强后的提示输入给LLM,LLM会基于提供的上下文生成一个有理有据的回答 。

Context 工程化:如何判断和提取哪些内容应该进入上下文?

1.chunking

文本分块(Chunking)是RAG流程中最关键也最容易被忽视的一步。其目标是创建在语义上自成一体的文本块。

2.Reranking

为了平衡检索的速度和准确性,业界普遍采用两阶段检索流程。

  • 两阶段流程:

    • 第一阶段(召回): 使用一个快速、高效的检索器(如基于 bi-encoder 的向量搜索或BM25等词法搜索)进行广泛撒网,召回一个较大的候选文档集(例如,前100个) 。
    • 第二阶段(精排/重排序): 使用一个更强大但计算成本更高的模型,对这个较小的候选集进行重新评估,以识别出最相关的少数几个文档(例如,前5个) 。
  • Cross-Encoder: 交叉编码器之所以在重排序阶段表现优越,是因为它与双编码器的工作方式不同。双编码器独立地为查询和文档生成嵌入向量,然后计算它们的相似度。而交叉编码器则是将查询和文档同时作为输入,让模型在内部通过 Attention Mechanism 对二者进行深度交互。这使得模型能够捕捉到更细微的语义关系,从而给出更准确的相关性评分 。

  • 实际影响: 重排序显著提高了最终送入LLM的上下文质量,从而产出更准确、幻觉更少的答案。在金融、法律等高风险领域,重排序被认为是必不可少而非可选的步骤 。

3.优化上下文窗口:压缩与摘要

本节详细介绍用于主动管理上下文的技术,确保最有价值的信息被优先呈现。

  • 上下文压缩的目标: 缩短检索到的文档列表和/或精简单个文档的内容,只将最相关的信息传递给LLM。这能有效降低API调用成本、减少延迟,并缓解 Lost in the Middle 的问题 。

  • 压缩方法:

    • 过滤式压缩: 这类方法决定是保留还是丢弃整个检索到的文档。
      • LLMChainFilter: 利用一个LLM对每个文档的相关性做出简单的“是/否”判断 。
      • EmbeddingsFilter: 更经济快速的方法,根据文档嵌入与查询嵌入的余弦相似度来过滤文档 。
    • 内容提取式压缩: 这类方法会直接修改文档内容。
      • LLMChainExtractor: 遍历每个文档,并使用LLM从中提取仅与查询相关的句子或陈述 。
    • 用 top N 代替压缩: 像LLMListwiseRerank这样的技术,使用LLM对检索到的文档进行重排序,并只返回排名最高的N个,从而起到高质量过滤器的作用 。
  • 作为压缩策略的摘要: 对于非常长的文档或冗长的对话历史,可以利用LLM生成摘要。这些摘要随后被注入上下文,既保留了关键信息,又大幅减少了 Token 数量。这是在长时程运行的智能体中管理上下文的关键技术 。

智能体架构中的数据流与工作流编排

工作流(Workflow) vs. 智能体(Agent)

  • 工作流(Workflows)

    • 指的是LLM和工具通过预定义的代码路径进行编排的系统。在这种模式下,数据流动的路径是固定的、由开发者明确设计的,类似于上世纪流行的“专家系统”。例如,“第一步:分析用户邮件;第二步:根据分析结果在日历中查找空闲时段;第三步:起草会议邀请邮件”。这种模式确定性高,易于调试和控制,非常适合有明确业务流程的场景(如风控需求高、数据敏感、安全等级要求)。
  • 智能体(Agents)

    • 指的是LLM动态地指导自己的流程和工具使用,自主控制如何完成任务的系统。在这种模式下,数据流动的路径不是预先固定的,而是由LLM在每一步根据当前情况和目标动态决定的。这种模式灵活性高,能处理开放式问题,但可控性和可预测性较低 。

复杂的智能体通常是这两种模式的混合体,在宏观层面遵循一个预定义的工作流,但在某些节点内部,又赋予LLM一定的自主决策权。管理这一切的核心,我们称之为编排层(Orchestration Layer)

核心架构:预定义数据流的实现

  1. 链式工作流(Prompt Chaining)

  2. 路由工作流(Routing)

  3. 编排器-工作者模式(Orchestrator-Workers)

框架与工具

上述的架构和机制并非凭空存在,而是通过具体的开发框架实现的。其中,LangGraph作为LangChain的扩展,为构建具有显式数据流的智能体系统提供了强大的工具集。

LangGraph:用图(Graph)定义工作流(Workflow)

LangGraph的核心思想是将智能体应用构建成一个状态图(State Graph) 。这个图由节点和边组成,清晰地定义了数据如何在不同模块间流动

  • 状态(State): 这是整个图的核心,一个所有节点共享的中央数据对象。

    • 你可以把它想象成一个“数据总线”或共享内存。开发者需要预先定义State的结构,每个节点在执行时都可以读取和更新这个State对象 。
  • 节点(Nodes): 代表工作流中的一个计算单元或一个步骤。

    • 每个节点通常是一个Python函数,它接收当前的State作为输入,执行特定任务(如调用LLM、执行工具、处理数据),然后返回对State的更新 。
  • 边(Edges): 连接节点,定义了工作流的路径,即数据在State更新后应该流向哪个节点。

    • 简单边(Simple Edges): 定义了固定的、无条件的流向,用于实现链式工作流 。
    • 条件边(Conditional Edges): 用于实现路由逻辑。它会根据一个函数的输出来决定接下来应该走向哪个节点,从而实现流程的分支 。
  • 检查点(Checkpointer): LangGraph提供了持久化机制,可以在每一步执行后自动保存State的状态。这对于构建需要长期记忆、可中断和恢复、或需要 Human-in-the-Loop 的复杂业务流程至关重要 。

复杂业务流程的AI智能体,其核心挑战已从单纯优化信息检索(如RAG)或提示词,转向了对内部工作流和数据流的精心设计与编排

rag评估的指标

忠诚度Faithfulness

Faithfulness:衡量生成答案与给定上下文之间的事实一致性。忠实度得分是基于答案和检索到的上下文
计算出来的,答案的评分范围在0到1之间,分数越高越好。

image-20250711154457878

计算方式:将大模型给出的答案进行切片,检索给出的上下文,计算这些切片是否在上下文中

image-20250711155257239

答案相关性Answerrelevance

Answerrelevance:答案相关性的评估指标旨在评估生成的答案与给定提示的相关程度。如果答案不完
整或包含冗余信息,则会被赋予较低的分数。这个指标使用问题和答案来计算,其值介于0到1之间,得
分越高表明答案的相关性越好

image-20250711155128553

计算方式:根据答案生成多个问题,然后计算生成的答案与原答案的余弦相似度,再取平均

image-20250711155407518

上下文精确度ContextPrecision

ContextPrecision:上下文精确度衡量上下文中所有相关的真实信息是否被排在了较高的位置。理想情
况下,所有相关的信息块都应该出现在排名的最前面。这个指标是根据问题和上下文来计算的,数值范
围在0到1之间,分数越高表示精确度越好。

  • K:检索返回的文档总数(如 top-5)
  • rel(k):第 k 个文档是否相关(相关=1,无关=0)
  • Precision@k:前 k 个文档的精确率(相关文档数 / k)
  • Ideal Precision@k:理想情况下前 k 个文档的精确率(假设所有相关文档都排在最前面)

上下文召回率ContextRecall

ContextRecall:用来衡量检索到的上下文与被视为事实真相的标注答案的一致性程度。它根据事实真相
和检索到的上下文来计算,数值范围在0到1之间,数值越高表示性能越好。
为了从事实真相的答案中估计上下文召回率,需要分析答案中的每个句子是否可以归因于检索到的
上下文。在理想情况下,事实真相答案中的所有句子都应该能够对应到检索到的上下文中。

计算方式:上下文是否包括了标准答案的内容

检索性能的评估

平均倒数排名(Mean Reciprocal Rank, MRR)

平均倒数排名(Mean Reciprocal Rank, MRR) 是一种常用于评估信息检索系统、推荐系统或问答系统性能的评价指标。它特别适用于“每个查询只有一个正确答案”或“我们只关心第一个正确结果”的场景。

  • 倒数排名(Reciprocal Rank, RR):对于一个查询,如果第一个正确答案出现在排序结果的第 $ k $ 位,那么它的倒数排名为:

    如果没有正确答案,则 $ RR = 0 $。

  • 平均倒数排名(MRR):对多个查询的倒数排名取平均值:

    其中:

    • $ |Q| $ 是查询的总数,
    • $ \text{rank}_i $ 是第 $ i $ 个查询中第一个正确答案的排名(位置)。

image-20250818093714324

平均精确率均值(Mean Average Precision, MAP)

MAP(Mean Average Precision) 是对多个查询或样本的 平均精确率(Average Precision, AP) 取平均,用来衡量排序结果的相关性质量。它综合考虑了:

  • 排序中相关结果的数量(召回)
  • 相关结果在排序中的位置(越靠前越好)

平均精确率(Average Precision, AP)

AP 是对一个查询而言的,衡量该查询下所有相关文档在排序中的整体表现。

直观理解:AP 是“在每个相关文档被检索到时”的精确率的平均值。

公式定义:

其中:

  • $ P(k) $:在第 $ k $ 个位置的精确率(即前 k 个结果中有多少是相关的)
  • $ \text{rel}(k) $:第 $ k $ 个文档是否相关(1 表示相关,0 表示不相关)

也就是说,只在相关文档出现的位置计算并累加精确率,最后除以总相关文档数。

平均精确率均值(MAP)

将所有查询的 AP 求平均:

其中:

  • $ |Q| $:查询总数
  • $ AP_i $:第 $ i $ 个查询的平均精确率

image-20250818094149682

归一化折损累积增益(Normalized Discounted Cumulative Gain, nDCG)

nDCG 的核心思想是:

  1. 高相关性的文档更有价值
  2. 排在前面的结果比排在后面的价值更高(位置越靠前,权重越大)
  3. 将系统的得分与“理想排序”对比,进行归一化,便于跨查询比较

1.累积增益(Cumulative Gain, CG)

CG 是前 $ k $ 个结果的相关性评分之和,不考虑位置

其中 $ rel_i $ 是第 $ i $ 个文档的相关性评分。

❌ 缺点:CG 不关心排序顺序。无论相关文档排第1还是第10,CG 都一样。

  1. 折损累积增益(Discounted Cumulative Gain, DCG)

DCG 引入“位置折损”:越靠后的结果,其贡献被“打折”。

常用公式(两种形式,第二种更常见):

💡 解释:第1个位置不打折,第2个位置除以 $ \log_2(2) = 1 $,第3个位置除以 $ \log_2(3) \approx 1.58 $,相当于打了约 63% 的折扣。

这样,相关文档越早出现,DCG 越高

  1. 理想折损累积增益(Ideal DCG, IDCG)

IDCG 是在理想排序下(所有相关文档按相关性从高到低排列)的 DCG 值。

IDCG 是当前查询下 DCG 的理论最大值。

  1. 归一化折损累积增益(nDCG@k)

✅ nDCG 的取值范围是 $[0, 1]$:

  • 1.0:排序完全理想
  • 接近 1:排序质量高
  • 接近 0:排序很差

image-20250818095351579

利用RAGAS评估rag性能

learn-rag-langchain/RAGAS-langchian.ipynb at main · zxj-2023/learn-rag-langchain

检索器
1.Contextprecision(上下文精确度):评估检索质量。
2.Context Recall(上下文召回率):衡量检索的完整性。
生成器
1.Faithfulness(忠实度):衡量生成答案中的幻觉情况。
2.AnswerRelevance(答案相关性):衡量答案对问题的直接性(紧扣问题的核心)。

最终的RAGAS得分是以上各个指标得分的调和平均值。简而言之,这些指标用来综合评估
-个系统整体的性能。

RAG的构建

创建RAG文本分割、Embedding model 、 向量库存储Chroma

我们主要使用 RecursiveCharacterTextSplitter 切割文本,通过OpenAIEmbeddings()进行文本编码,存储到 VectorStore

1
2
3
4
5
6
7
8
9
10
11
12
13
from langchain.vectorstores import Chroma
from langchain.embeddings import OpenAIEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.embeddings import DashScopeEmbeddings
embeddings_model = DashScopeEmbeddings(
model="text-embedding-v2",
dashscope_api_key=openai.api_key,
)
text_splitter = RecursiveCharacterTextSplitter(chunk_size=500)
#进行文本分割,生成更小、更易于处理的文档块
docs = text_splitter.split_documents(paper_docs)

vectorstore = Chroma.from_documents(docs, embeddings_model)

Chroma 向量数据库默认情况下是内存存储,这意味着数据在程序运行结束后不会保留。
但是,Chroma 也支持持久化存储,您可以指定一个路径将数据保存到磁盘上。这样,即使程序关闭,数据也会被保留,并在下次启动时自动加载。

检索器的构建

现在我们可以利用 Chroma 向量库的 .as_retriever() 方式进行检索,需要控制的主要参数为 k

1
base_retriever = vectorstore.as_retriever(search_kwargs={"k" : 3})
  • ectorstore.as_retriever() : 这个方法的作用是将一个向量数据库实例( vectorstore )转换为 LangChain 中的一个检索器( Retriever )对象。检索器是 LangChain 中负责根据用户查询从数据源中获取相关文档的核心组件。
  • “k” : 这个键表示要检索的“最相似”文档的数量。在这里, “k” : 3 意味着当检索器接收到一个查询时,它将从向量存储中返回与该查询最相似的 3 个文档。这在 RAG(检索增强生成)系统中非常常见,用于限制传递给大型语言模型的上下文信息量,以提高效率和相关性。

检索器的作用
检索器(Retriever)是一个核心组件,其主要作用是从一个数据源(如向量数据库、文档加载器等)中根据给定的查询(query)检索出相关的文档或信息。

prompt的构建

我们需要利用LLMContext 生成一系列的问题的answer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from langchain import PromptTemplate

template = """You are an assistant for question-answering tasks.
Use the following pieces of retrieved context to answer the question.
If you don't know the answer, just say that you don't know.

Question: {question}

Context: {context}

Answer:
"""

prompt = PromptTemplate(
template=template,
input_variables=["context","question"]
)

print(prompt)

生成answer,利用LLM

利用 Runnable 定义一个 chain 实现rag全流程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from langchain.schema.runnable import RunnablePassthrough
from langchain.schema.output_parser import StrOutputParser
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(
model_name="qwen-plus-2025-04-28",
temperature=0,
api_key="",
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"
)
#RunnablePassthrough将输入数据原封不动地传递到输出
#StrOutputParser() 它被用作 RAG 链的最后一步,确保最终的答案以字符串形式输出。
rag_chain = (
{"context": base_retriever, "question": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser()
)

创建 RAGAs 所需的数据

question Answer contexts ground_truths

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# Ragas 数据集格式要求  ['question', 'answer', 'contexts', 'ground_truths']
'''
{
"question": [], <-- 问题基于Context的
"answer": [], <-- 答案基于LLM生成的
"contexts": [], <-- context
"ground_truths": [] <-- 标准答案
}
'''

from datasets import Dataset
#构建问题与标准答案(黄金数据集)
questions = ["What is faithfulness ?",
"How many pages are included in the WikiEval dataset, and which years do they cover information from?",
"Why is evaluating Retrieval Augmented Generation (RAG) systems challenging?",
]
ground_truths = ["Faithfulness refers to the idea that the answer should be grounded in the given context.",
" To construct the dataset, we first selected 50 Wikipedia pages covering events that have happened since the start of 2022.",
"Evaluating RAG architectures is, however, challenging because there are several dimensions to consider: the ability of the retrieval system to identify relevant and focused context passages, the ability of the LLM to exploit such passages in a faithful way, or the quality of the generation itself."]
answers = []
contexts = []

# 生成答案
for query in questions:
answers.append(rag_chain.invoke(query))
contexts.append([docs.page_content for docs in base_retriever.get_relevant_documents(query)])

# 构建数据
data = {
"user_input": questions,
"response": answers,
"retrieved_contexts": contexts,
"reference": ground_truths
}
dataset = Dataset.from_dict(data)

使用RAGAs 进行评估

1
2
3
#将评估数据转换成 Ragas 框架专用的格式 。
from ragas import EvaluationDataset
evaluation_dataset = EvaluationDataset.from_list(dataset)

我们可以使用一组常用的RAG评估指标,在收集的数据集上评估我们的RAG系统。您可以选择任何模型作为评估用LLM来进行评估。
ragas默认使用openai的api

1
2
from ragas.llms import LangchainLLMWrapper
evaluator_llm = LangchainLLMWrapper(llm)

调用

1
2
3
4
from ragas.metrics import LLMContextRecall, Faithfulness, FactualCorrectness
from ragas import evaluate
result = evaluate(dataset=evaluation_dataset,metrics=[LLMContextRecall(), Faithfulness(), FactualCorrectness()],llm=evaluator_llm)
result

image-20250713164037571

查看结果

1
2
3
4
5
import pandas as pd
pd.set_option("display.max_colwidth", None)

df = result.to_pandas()
df

参考资料

RAG系统效果难评?2025年必备的RAG评估框架与工具详解 - 知乎

如何利用RAGAs评估RAG系统的好坏_哔哩哔哩_bilibili

ragas中文文档Evaluate a simple RAG - Ragas

人工智能 - RAG系统的7个检索指标:信息检索任务准确性评估指南 - deephub - SegmentFault 思否

分块策略

以下是 RAG 应用程序的典型工作流程:

6878b8fa-5e74-45a1-9a89-5aab92889126_2366x990

主流主要有五种分块策略:

https___substack-post-media.s3.amazonaws.com_public_images_92c70184-ba0f-4877-9a55-e4add0e311ad_870x1116

Fixed-size chunking 固定大小的分块

98c422a0-f0e2-457c-a256-4476a56a601f_943x232

将文本以固定长度分块,overlap为每个块的重合程度

1
2
3
4
5
6
7
8
9
10
11
text = "大家好,我是果粒奶优有果粒,欢迎关注我,让我们一起探索AI。"
from langchain_text_splitters import CharacterTextSplitter

text_splitter = CharacterTextSplitter(
separator="",#按字切分
chunk_size=5,
chunk_overlap=1,
length_function=len,#以长度计算
is_separator_regex=False,#不视为正则表达式
)
text_splitter.split_text(text)

Semantic chunking 语义分块

https___substack-post-media.s3.amazonaws.com_public_images_a6ad83a6-2879-4c77-9e49-393f16577aef_1066x288

先将文本分段,然后为每个段进行嵌入,若两个段有较高的余弦相似度,则合并成一个块,一直合并到余弦相似度显著下降,再从新的块开始

需要设定阈值来确定余弦相似度是否显著下降,这因文档而异。

image-20250710150106274

具体实现思路:利用滑动窗口,从第一句往后移动滑动窗口,如图,emed1与emed2相差sen3,计算出来的distance决定sen3是否加入chunk1,以此类推

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#利用langchain调用
from langchain_experimental.text_splitter import SemanticChunker
from langchain_community.embeddings import DashScopeEmbeddings
embeddings_model = DashScopeEmbeddings(
model="text-embedding-v2",
dashscope_api_key="",
)
semantic_chunk=SemanticChunker(
embeddings=embeddings_model,#嵌入模型
breakpoint_threshold_type="percentile",#定义如何计算语义断点阈值
breakpoint_threshold_amount=95,#设定阈值
#min_chunk_size=500#限制生成块最小的字符数,避免生成无意义的块
sentence_split_regex=r'[。!?.\n]',#语句切分
)

LangChain 搭配 QWen 踩坑-阿里云开发者社区

使用OpenAIEmbeddings配置embedding模型,需要设置一个关键参数

check_embedding_ctx_length = False 的作用是:

关闭 langchain_openai 在调用嵌入模型前对输入文本长度的检查与自动截断/分段逻辑。

但 DashScope 的 text-embedding-v4 接口:

  • 对输入格式要求更严格(必须是字符串或字符串列表,不能是分段后的复杂结构)。
  • 不接受 langchain_openai 默认生成的分段后的列表嵌套结构
1
2
3
4
5
6
7
8
9
10
from langchain_openai import OpenAIEmbeddings,  OpenAI
embeddings = OpenAIEmbeddings(
api_key="sk-",
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
model="text-embedding-v4",
check_embedding_ctx_length = False,
dimensions=1536
)
result=embeddings.embed_query("Hello, world!")
print(len(result))

源代码理解见最后

Recursive chunking 递归分块

f4009caa-34fc-48d6-8102-3d0f6f2c1386_1066x316

先依据大的段落进行分块,再对每个块进行处理,若符合chunk-size的限制,则不会再分

结果可能如下

b0e40cc1-996f-48f4-9306-781b112536e4_984x428

首先,我们定义两个块(紫色的两个段落。接下来,第1段进一步拆分为更小的块。

1
2
3
4
5
6
7
8
from langchain_text_splitters import RecursiveCharacterTextSplitter

recursive_splitter_chinese = RecursiveCharacterTextSplitter(
chunk_size=50,
chunk_overlap=10,
length_function=len,
separators=["\n\n", "。", ",", " ", ""]#中文的分隔符,可以用逗号句号
)

Document structure-based chunking 基于文档结构的分块

e8febecd-ee68-42ff-ab06-41a0a3a43cd3_1102x306

根据文档的固有结构进行分块,如markdown的一级标题二级标题等

langchain.text_splitter中有两个用于md文档分块的类,MarkdownTextSplitterMarkdownHeaderTextSplitter

二者区别主要在:前者继承于RecursiveCharacterTextSplitter递归分块,它会尝试沿着 Markdown 格式的标题进行分割,但其核心仍然是基于字符的递归分割;后者专注于 基于 Markdown 标题的结构化分割 ,并能将标题信息作为元数据保留,更适合需要保持 Markdown 文档层级结构的应用场景。

需要注意的是MarkdownHeaderTextSplitter 本身不直接提供限制块内容长度的参数,但可以通过与 RecursiveCharacterTextSplitter 等其他文本分割器结合使用来有效控制块的大小。

1
2
3
4
5
6
7
8
9
from langchain.text_splitter import MarkdownHeaderTextSplitter
headers_to_split_on = [
("#", "Header 1"),
("##", "Header 2"),
("###", "Header 3"),
]

markdown_splitter = MarkdownHeaderTextSplitter(headers_to_split_on)
md_header_splits = markdown_splitter.split_text(markdown_document)

存储结构类似如下

1
2
3
[Document(metadata={'Header 1': 'Foo', 'Header 2': 'Bar'}, page_content='Hi this is Jim  \nHi this is Joe'),
Document(metadata={'Header 1': 'Foo', 'Header 2': 'Bar', 'Header 3': 'Boo'}, page_content='Hi this is Lance'),
Document(metadata={'Header 1': 'Foo', 'Header 2': 'Baz'}, page_content='Hi this is Molly')]

LLM-based chunking 基于 LLM 的分块

4d1b6d60-8956-4030-8525-d899ee61a9d5_1140x198

利用大模型进行分块

langchain没有提供官方的类实现LLM-based chunking

但是我在找到了别人实现的agentic_chunkerRetrievalTutorials/tutorials/LevelsOfTextSplitting/agentic_chunker.py at main · FullStackRetrieval-com/RetrievalTutorials,可供参考

后记:agentic chunk大概的思路为先进行初步分段,按照长度或递归,然后让大模型生成这一段的概要,将段与段合并生成块,但是测试下来,一个文档的内容同质化很严重,基本上都分到一块里了,而且这个主要还是提示词工程,分块并不系统,看个乐吧

chunks-strategy-/agentic_chunker.py at main · zxj-2023/chunks-strategy-代码稍作更新,弃用了部分库

embedding

之前对chunking和embedding的理解不够清晰,chunking是对文本进行分块,由于大多数文本嵌入模型对输入文本长度有严格限制,如果不分块则无法embedding,从而无法更好的进行向量化或者更好地储存在知识库中,提升retriever性能;embedding则是将文本映射到向量空间,为了更好的相似度计算

语义分块的源代码实战

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
'''
将文本划分成单句,可以按照标点符号划分
'''
single_sentences_list = re.split(r'(?<=[。!?])', essay)
# 移除可能存在的空字符串
single_sentences_list = [s.strip() for s in single_sentences_list if s.strip()]

'''
我们需要为单个句子拼接更多的句子,但是 `list` 添加比较困难。因此将其转换为字典列(`List[dict]`)
{ 'sentence' : XXX , 'index' : 0}
'''
sentences = [{'sentence': x, 'index' : i} for i, x in enumerate(single_sentences_list)]

#利用滑动窗口分段
def combine_sentences(sentences, buffer_size=1):
combined_sentences = [
' '.join(sentences[j]['sentence'] for j in range(max(i - buffer_size, 0), min(i + buffer_size + 1, len(sentences))))
for i in range(len(sentences))
]
# 更新原始字典列表,添加组合后的句子
for i, combined_sentence in enumerate(combined_sentences):
sentences[i]['combined_sentence'] = combined_sentence

return sentences

sentences = combine_sentences(sentences)

'''
接下来使用**embedding model**对**sentences** 进行编码
'''
from langchain_community.embeddings import DashScopeEmbeddings
embeddings_model = DashScopeEmbeddings(
model="text-embedding-v2",
dashscope_api_key="",

)
# 提取所有组合后的句子用于 embedding
combined_sentences_to_embed = [x['combined_sentence'] for x in sentences]

# 对句子进行 embedding
embeddings = embeddings_model.embed_documents(combined_sentences_to_embed)
print(f"成功对 {len(embeddings)} 个句子进行了 embedding。")

#将embedding添加到sentence中
for i, sentence in enumerate(sentences):
sentence['combined_sentence_embedding'] = embeddings[i]
'''
接下来需要根据余弦相似度进行切分
通过计算两个向量的夹角余弦值来衡量相似性
'''
import numpy as np
def cosine_similarity(vec1, vec2):
"""Calculate the cosine similarity between two vectors."""
dot_product = np.dot(vec1, vec2)
norm_vec1 = np.linalg.norm(vec1)
norm_vec2 = np.linalg.norm(vec2)
return dot_product / (norm_vec1 * norm_vec2)
#遍历,计算余弦相似度
def calculate_cosine_distances(sentences):
distances = []
for i in range(len(sentences) - 1):
embedding_current = sentences[i]['combined_sentence_embedding']
embedding_next = sentences[i + 1]['combined_sentence_embedding']
# Calculate cosine similarity
similarity = cosine_similarity(embedding_current, embedding_next)
# Convert to cosine distance
distance = 1 - similarity
distances.append(distance)
# Store distance in the dictionary
sentences[i]['distance_to_next'] = distance
return distances, sentences

distances, sentences = calculate_cosine_distances(sentences)

#根据阈值划分
breakpoint_percentile_threshold = 95
breakpoint_distance_threshold = np.percentile(distances, breakpoint_percentile_threshold)
print("距离的第95个百分位阈值是:", breakpoint_distance_threshold)
# 找到所有距离大于阈值的点的索引,这些索引就是我们的切分点
indices_above_thresh = [i for i, x in enumerate(distances) if x > breakpoint_distance_threshold]

# 初始化块的起始句子索引。我们将根据之前计算出的语义分割点(`indices_above_thresh`)来切分句子列表。
start_index = 0

# 创建一个列表,用于存储最终组合成的、具有语义连贯性的文本块。
chunks = []

# 遍历所有识别出的语义分割点(这些是句子列表 `sentences` 中的索引)。
for index in indices_above_thresh:
# 确定当前文本块的结束点,即当前的分割点索引。
end_index = index

# 从原始句子列表(`sentences`)中切片,提取从上一个分割点到当前分割点之间的所有句子。
# `end_index + 1` 是为了在切片时包含结束索引指向的那个句子。
group = sentences[start_index:end_index + 1]

# 将切分出的句子组(`group`)中的所有 'sentence' 字段的值合并成一个单独的字符串,句子之间用空格隔开。
combined_text = ' '.join([d['sentence'] for d in group])

# 将合并后的文本块添加到 `chunks` 列表中。
chunks.append(combined_text)

# 更新下一个文本块的起始索引,设置为当前分割点的下一个位置,为处理下一个块做准备。
start_index = index + 1

# 处理最后一个文本块。
# 循环结束后,如果 `start_index` 仍然小于句子总数,说明从最后一个分割点到文本末尾还有剩余的句子。
if start_index < len(sentences):
# 将这些剩余的句子合并成最后一个文本块。
combined_text = ' '.join([d['sentence'] for d in sentences[start_index:]])
chunks.append(combined_text)

# 此时,`chunks` 列表包含了所有根据语义距离切分和重组后的文本块。
for i, chunk in enumerate(chunks):
buffer = 200
print (f"Chunk #{i}")
print (chunk[:buffer].strip())
print ("...")
print (chunk[-buffer:].strip())
print ("\n")

参考资料

RAG 的 5 种分块策略 —- 5 Chunking Strategies For RAG

一文读懂 Qwen3 最新开源的 Embedding 和 Rerank 模型优势!_qwen-rerank-CSDN博客

一站帮你选择RAG中的文本切分策略_哔哩哔哩_bilibili

LangChain 语义文本拆分指南:基于语义相似度的智能分块技术实战_langchain 语义分割-CSDN博客

0%