Research Assistant 研究助理

我们的目标是围绕聊天模型构建一个轻量级、多智能体系统,以定制研究过程。

Source Selection

  • 用户可为研究自行选择任意输入源。

Planning

  • 用户提供主题后,系统生成一组 AI 分析师,每位分析师聚焦一个子主题。
  • 在研究开始前,采用 人机协同 方式对子主题进行精调。

LLM Utilization

  • 每位分析师基于所选源,与专家 AI 开展深度访谈。
  • 访谈采用多轮对话形式,以 STORM 论文所示方式提取详尽洞见。
  • 访谈过程将以“子图”形式记录,并保存各自内部状态。

Research Process

  • 专家 AI 并行收集信息,实时回答分析师提问。
  • 所有访谈通过 map-reduce 架构同时展开。
Screenshot 2024-08-26 at 7.26.33 PM.png
Generate Analysts: Human-In-The-Loop 生成分析师

创建分析师并使用人工循环(human-in-the-loop)来审查他们。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
from typing import List
from typing_extensions import TypedDict
from pydantic import BaseModel, Field

class Analyst(BaseModel):
"""
分析师模型类
用于定义单个分析师的基本信息和属性
"""
affiliation: str = Field(
description="分析师的主要隶属机构。",
)
name: str = Field(
description="分析师的姓名"
)
role: str = Field(
description="分析师在该主题背景下的角色。",
)
description: str = Field(
description="分析师的关注点、担忧和动机的描述。",
)

@property#@property 装饰器的作用是将一个方法转换为只读属性 ,让方法可以像访问属性一样使用,而不需要加括号调用。
def persona(self) -> str:
"""
生成分析师的人设信息
返回格式化的字符串包含分析师的所有关键信息
"""
return f"姓名: {self.name}\n角色: {self.role}\n隶属机构: {self.affiliation}\n描述: {self.description}\n"

class Perspectives(BaseModel):
"""
视角模型类
用于管理多个分析师的集合
"""
analysts: List[Analyst] = Field(
description="包含角色和隶属机构的分析师综合列表。",
)

class GenerateAnalystsState(TypedDict):
"""
生成分析师状态类型定义
用于类型提示,定义在生成分析师过程中需要的状态信息
"""
topic: str # 研究主题 - 用户输入的研究话题
max_analysts: int # 分析师数量 - 需要生成的分析师最大数量
human_analyst_feedback: str # 人类反馈 - 来自用户的反馈信息,用于调整分析师生成
analysts: List[Analyst] # 提出问题的分析师 - 已生成的分析师列表

Field 是 Pydantic 提供的一个函数,主要用于为模型字段添加额外的元数据和配置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
from IPython.display import Image, display
from langgraph.graph import START, END, StateGraph
from langgraph.checkpoint.memory import MemorySaver
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage

# 分析师指令模板
analyst_instructions="""您需要创建一组AI分析师角色。请仔细遵循以下指令:

1. 首先,查看研究主题:
{topic}

2. 检查任何可选的编辑反馈,这些反馈用于指导分析师的创建:

{human_analyst_feedback}

3. 根据上述文档和/或反馈确定最有趣的主题。

4. 选择前 {max_analysts} 个主题。

5. 为每个主题分配一个分析师。

6. 重要:请以JSON格式返回您的响应,结构如下:
{{
"analysts": [
{{
"name": "分析师姓名",
"affiliation": "所属机构",
"role": "角色描述",
"description": "详细描述"
}}
]
}}

7. 请确保响应中包含'json'这个词,这是必需的。"""

def create_analysts(state: GenerateAnalystsState):
""" 创建分析师 """

topic=state['topic']
max_analysts=state['max_analysts']
human_analyst_feedback=state.get('human_analyst_feedback', '')#防止为空

# 强制结构化输出
structured_llm = llm.with_structured_output(Perspectives)

# 系统消息
system_message = analyst_instructions.format(topic=topic,human_analyst_feedback=human_analyst_feedback, max_analysts=max_analysts)

# 生成分析师
analysts = structured_llm.invoke([SystemMessage(content=system_message)]+[HumanMessage(content="生成分析师集合。")])

# 将分析师列表写入状态
return {"analysts": analysts.analysts}

def human_feedback(state: GenerateAnalystsState):
""" 无操作节点,应该在此处中断 """
pass

def should_continue(state: GenerateAnalystsState):
""" 返回下一个要执行的节点 """

# 检查是否有用户反馈
human_analyst_feedback=state.get('human_analyst_feedback', None)
if human_analyst_feedback:
return "create_analysts"

# 否则结束
return END

# 添加节点和边
builder = StateGraph(GenerateAnalystsState)
builder.add_node("create_analysts", create_analysts) # 添加创建分析师节点
builder.add_node("human_feedback", human_feedback) # 添加用户反馈节点
builder.add_edge(START, "create_analysts") # 从开始到创建分析师
builder.add_edge("create_analysts", "human_feedback") # 从创建分析师到用户反馈
builder.add_conditional_edges("human_feedback", should_continue, ["create_analysts", END]) # 条件边

# 编译图
memory = MemorySaver()
graph = builder.compile(interrupt_before=['human_feedback'], checkpointer=memory) # 在用户反馈前中断

# 显示图
display(Image(graph.get_graph(xray=1).draw_mermaid_png())) # 显示流程图
image-20250723171742889
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 输入
max_analysts = 3 # 最大分析师数量
topic = "采用LangGraph作为代理框架的好处" # 研究主题
thread = {"configurable": {"thread_id": "1"}} # 线程配置,用于会话跟踪

# 运行图直到第一次中断
for event in graph.stream({"topic":topic,"max_analysts":max_analysts,}, thread, stream_mode="values"):
# 审查结果
analysts = event.get('analysts', '') # 获取分析师列表
if analysts:
for analyst in analysts:
print(f"姓名: {analyst.name}") # 打印分析师姓名
print(f"隶属机构: {analyst.affiliation}") # 打印隶属机构
print(f"角色: {analyst.role}") # 打印角色
print(f"描述: {analyst.description}") # 打印描述
print("-" * 50) # 打印分隔线
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
姓名: 艾琳·史密斯
隶属机构: 人工智能与代理系统研究所
角色: LangGraph架构专家
描述: 专注于研究基于图结构的AI代理框架,尤其是LangGraph在复杂决策流程中的模块化与可扩展性优势。
--------------------------------------------------
姓名: 拉胡尔·梅赫塔
隶属机构: 分布式系统与AI实验室
角色: 代理框架性能分析师
描述: 研究LangGraph在多代理协作中的效率提升,以及其在异步通信、状态管理和错误恢复方面的优势。
--------------------------------------------------
姓名: 艾米丽·陈
隶属机构: 人机交互与智能系统中心
角色: 代理框架用户体验研究员
描述: 分析LangGraph如何支持开发者构建更具交互性和可解释性的AI代理系统,特别是在可视化流程设计和调试方面的优势。
--------------------------------------------------
1
2
3
# 我们现在像 human_feedback 节点一样更新状态
graph.update_state(thread, {"human_analyst_feedback":
"添加一个来自初创公司的人,以增加企业家视角"}, as_node="human_feedback")
1
2
3
4
5
6
7
8
9
10
11
# 继续图的执行
for event in graph.stream(None, thread, stream_mode="values"):
# 审查结果
analysts = event.get('analysts', '') # 获取分析师列表
if analysts:
for analyst in analysts:
print(f"姓名: {analyst.name}") # 打印分析师姓名
print(f"隶属机构: {analyst.affiliation}") # 打印隶属机构
print(f"角色: {analyst.role}") # 打印角色
print(f"描述: {analyst.description}") # 打印描述
print("-" * 50) # 打印分隔线
1
2
3
4
# 如果我们满意,那么就不提供任何反馈
further_feedback = None #如果满意就返回none,如果不满意就进行反馈
graph.update_state(thread, {"human_analyst_feedback":
further_feedback}, as_node="human_feedback")
Conduct Interview 进行面试

生成问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import operator
from typing import Annotated
from langgraph.graph import MessagesState

class InterviewState(MessagesState):
"""
访谈状态类
继承自MessagesState,用于管理访谈过程中的各种状态信息
"""
max_num_turns: int # 对话轮数上限
context: Annotated[list, operator.add] # 源文档,使用operator.add进行合并
analyst: Analyst # 提问的分析师
interview: str # 访谈记录(文字记录)
sections: list # 最终章节,我们在外部状态中重复此字段以用于Send() API

class SearchQuery(BaseModel):
"""
搜索查询模型
用于定义搜索查询的结构
"""
search_query: str = Field(None, description="用于检索的搜索查询。")
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# 问题生成指令
question_instructions = """你是一名分析师,任务是采访专家以了解特定主题。

你的目标是提炼出与你主题相关的有趣且具体的见解。

1. 有趣的:人们会感到惊讶或不明显的见解。

2. 具体的:避免泛泛而谈的见解,包含来自专家的具体例子。

这是你的关注主题和目标集合:{goals}

首先使用符合你人设的名字介绍自己,然后提出你的问题。

继续提问以深入挖掘和细化你对该主题的理解。

当你对理解满意时,用"非常感谢您的帮助!"来结束采访。

记住在整个回复中保持角色特征,体现提供给你的人设和目标。"""

def generate_question(state: InterviewState):
""" 生成问题的节点 """

# 获取状态
analyst = state["analyst"] # 当前分析师
messages = state["messages"] # 对话历史

# 生成问题
system_message = question_instructions.format(goals=analyst.persona) # 格式化系统消息
question = llm.invoke([SystemMessage(content=system_message)]+messages) # 调用LLM生成问题

# 将消息写入状态
return {"messages": [question]} # 返回新生成的问题

生成问题的并行处理

专家将并行从多个来源收集信息以回答问题。

  • 特定网站:例如通过 WebBaseLoader
  • 已索引文档:例如通过 RAG
  • 网页搜索
  • 维基百科搜索

你可以尝试不同的网络搜索工具,比如 Tavily

1
2
3
4
5
6
7
8
# Web search tool
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_tavily import TavilySearch
#tavily_search = TavilySearchResults(max_results=3)
tavily_search=TavilySearch(max_result=3)

# Wikipedia search tool
from langchain_community.document_loaders import WikipediaLoader

现在,我们创建节点以搜索网络和维基百科。

我们还将创建一个节点来回答分析师的问题。最后,我们将创建节点以保存完整的采访内容,并撰写采访的摘要(“部分”)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
from langchain_core.messages import get_buffer_string

# 搜索查询编写
search_instructions = SystemMessage(content=f"""你将获得分析师和专家之间的对话。

你的目标是生成一个结构化的 JSON 对象,该对象包含一个用于网络搜索的查询字符串。

请严格按照以下步骤操作:

1. 仔细分析整个对话。
2. 特别关注分析师提出的最后一个问题。
3. 根据该问题,生成一个清晰、简洁、有效的搜索查询字符串。
4. **你的最终输出必须是一个严格的 JSON 对象,格式如下,不要包含任何其他文字或解释:**
{{"search_query": "你的查询字符串放在这里"}}

**示例:**
如果最后一个问题涉及 "LangGraph 与其他代理框架(如 AutoGen)相比的优势",
你的输出必须严格是:
{{"search_query": "LangGraph vs AutoGen agent framework advantages"}}

分析师的最后一个问题才是关键,请基于它生成查询。

**你的输出:**""")

def search_web(state: InterviewState):

""" 从网络搜索中检索文档 """

# 搜索查询
structured_llm = llm.with_structured_output(SearchQuery)
search_query = structured_llm.invoke([search_instructions]+state['messages'])

# 搜索
search_docs = tavily_search.invoke(search_query.search_query)

# 格式化
formatted_search_docs = "\n\n---\n\n".join(
[
f'<Document href="{doc["url"]}"/>\n{doc["content"]}\n</Document>'
for doc in search_docs
]
)

return {"context": [formatted_search_docs]}

def search_wikipedia(state: InterviewState):

""" 从维基百科检索文档 """

# # 搜索查询
# structured_llm = llm.with_structured_output(SearchQuery)
# search_query = structured_llm.invoke([search_instructions]+state['messages'])

# # 搜索
# search_docs = WikipediaLoader(query=search_query.search_query,
# load_max_docs=2).load()

# # 格式化
# formatted_search_docs = "\n\n---\n\n".join(
# [
# f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>'
# for doc in search_docs
# ]
# )

# return {"context": [formatted_search_docs]}
return {"context": [""]}

answer_instructions = """你是一位正在接受分析师采访的专家。

这是分析师的关注领域:{goals}。

你的目标是回答面试官提出的问题。

要回答问题,请使用此上下文:

{context}

回答问题时,请遵循以下准则:

1. 仅使用上下文中提供的信息。

2. 不要引入外部信息或在上下文中明确说明之外进行假设。

3. 上下文包含每个独立文档主题的来源。

4. 在任何相关陈述旁边包含这些来源。例如,对于来源 # 1 使用 [1]。

5. 在答案底部按顺序列出你的来源。 [1] 来源 1,[2] 来源 2,等等

6. 如果来源是:<Document source="assistant/docs/llama3_1.pdf" page="7"/>' 那么只需列出:

[1] assistant/docs/llama3_1.pdf, page 7

并跳过括号的添加以及引用中的 Document source 前言。"""

def generate_answer(state: InterviewState):

""" 回答问题的节点 """

# 获取状态
analyst = state["analyst"]
messages = state["messages"]
context = state["context"]

# 回答问题
system_message = answer_instructions.format(goals=analyst.persona, context=context)
answer = llm.invoke([SystemMessage(content=system_message)]+messages)

# 将消息命名为来自专家
answer.name = "expert"

# 将其附加到状态
return {"messages": [answer]}

def save_interview(state: InterviewState):

""" 保存采访 """

# 获取消息
messages = state["messages"]

# 将采访转换为字符串
interview = get_buffer_string(messages)

# 保存到 interviews 键
return {"interview": interview}

def route_messages(state: InterviewState,
name: str = "expert"):

""" 在问题和答案之间路由 """

# 获取消息
messages = state["messages"]
max_num_turns = state.get('max_num_turns',2)

# 检查专家答案的数量
#isinstance(m, AIMessage) :检查当前消息 m 是否是 AIMessage 类的实例。这通常用于识别由 AI 模型生成的消息。
num_responses = len(
[m for m in messages if isinstance(m, AIMessage) and m.name == name]
)

# 如果专家回答的次数超过最大轮数,则结束
if num_responses >= max_num_turns:
return 'save_interview'

# 获取提出的最后一个问题,以检查它是否表示讨论结束
#messages[-2] :分析师提出的 最后一个问题 。
#messages[-1] :专家对这个问题的 最新回答 。
last_question = messages[-2]

if "非常感谢你的帮助" in last_question.content:
return 'save_interview'
return "ask_question"

section_writer_instructions = """你是一位专业的科技作家。

你的任务是根据一组源文档创建一份简短、易于理解的报告部分。

1. 分析源文档的内容:
- 每个源文档的名称都在文档开头,带有 <Document 标签。

2. 使用 markdown 格式创建报告结构:
- 使用 ## 作为章节标题
- 使用 ### 作为小节标题

3. 按照此结构编写报告:
a. 标题 (## header)
b. 摘要 (### header)
c. 来源 (### header)

4. 根据分析师的关注领域,使你的标题引人入胜:
{focus}

5. 对于摘要部分:
- 设置与分析师关注领域相关的通用背景/上下文的摘要
- 强调从采访中收集到的新颖、有趣或令人惊讶的见解
- 创建一个使用过的源文档的编号列表
- 不要提及面试官或专家的姓名
- 目标是最多约 400 字
- 根据源文档中的信息,在报告中使用编号来源(例如,[1]、[2])

6. 在来源部分:
- 包含报告中使用的所有来源
- 提供相关网站或特定文档路径的完整链接
- 每个来源用换行符分隔。在每行末尾使用两个空格以在 Markdown 中创建换行符。
- 它看起来像:

### 来源
[1] 链接或文档名称
[2] 链接或文档名称

7. 务必合并来源。例如,这不正确:

[3] https://ai.meta.com/blog/meta-llama-3-1/
[4] https://ai.meta.com/blog/meta-llama-3-1/

不应有冗余来源。它应该只是:

[3] https://ai.meta.com/blog/meta-llama-3-1/

8. 最终审查:
- 确保报告遵循所需的结构
- 在报告标题之前不包含任何前言
- 检查所有准则是否已遵循"""

def write_section(state: InterviewState):

""" 编写章节的节点 """

# 获取状态
interview = state["interview"]
context = state["context"]
analyst = state["analyst"]

# 使用从采访(上下文)或采访本身(interview)收集的源文档编写章节
system_message = section_writer_instructions.format(focus=analyst.description)
section = llm.invoke([SystemMessage(content=system_message)]+[HumanMessage(content=f"Use this source to write your section: {context}")])

# 将其附加到状态
return {"sections": [section.content]}

# 添加节点和边
interview_builder = StateGraph(InterviewState)
interview_builder.add_node("ask_question", generate_question)
interview_builder.add_node("search_web", search_web)
interview_builder.add_node("search_wikipedia", search_wikipedia)
interview_builder.add_node("answer_question", generate_answer)
interview_builder.add_node("save_interview", save_interview)
interview_builder.add_node("write_section", write_section)

# 流程
interview_builder.add_edge(START, "ask_question")
interview_builder.add_edge("ask_question", "search_web")
interview_builder.add_edge("ask_question", "search_wikipedia")
interview_builder.add_edge("search_web", "answer_question")
interview_builder.add_edge("search_wikipedia", "answer_question")
interview_builder.add_conditional_edges("answer_question", route_messages,['ask_question','save_interview'])
interview_builder.add_edge("save_interview", "write_section")
interview_builder.add_edge("write_section", END)

# 采访
memory = MemorySaver()
interview_graph = interview_builder.compile(checkpointer=memory).with_config(run_name="Conduct Interviews")

# 视图
display(Image(interview_graph.get_graph().draw_mermaid_png()))
image-20250725101725158
Map-Reduce(Parallelze interviews: Map-Reduce) 并行化访谈

我们通过 Send() API 并行化处理访谈,这是一个映射步骤。我们将它们在 reduce 步骤中组合成报告正文。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import operator
from typing import List, Annotated
from typing_extensions import TypedDict

class ResearchGraphState(TypedDict):
topic: str # 研究主题 (字符串类型)
max_analysts: int # 分析师数量 (整数类型)
human_analyst_feedback: str # 人类分析师的反馈 (字符串类型)
analysts: List[Analyst] # 提问的分析师列表 (Analyst 对象的列表)
sections: Annotated[list, operator.add] # 报告章节列表 (使用 operator.add 作为 Send() API 的键,意味着列表可以通过相加来合并)
introduction: str # 最终报告的引言部分 (字符串类型)
content: str # 最终报告的内容主体部分 (字符串类型)
conclusion: str # 最终报告的结论部分 (字符串类型)
final_report: str # 最终完整的报告 (字符串类型)

# 从 langgraph.constants 导入 Send 类
from langgraph.constants import Send
def initiate_all_interviews(state: ResearchGraphState):
""" 这是“map”(映射)步骤,我们使用 Send API 并行运行每个采访子图 """

# 检查是否存在人类反馈
human_analyst_feedback = state.get('human_analyst_feedback')
if human_analyst_feedback:
# 如果有反馈,则返回到 "create_analysts" 节点进行调整
return "create_analysts"

# 否则,通过 Send() API 并行启动所有采访
else:
topic = state["topic"]
# 为每个分析师创建一个 Send 对象
# 目标是 "conduct_interview" 节点
# 传递的参数包括该分析师对象和一条初始消息
return [Send("conduct_interview", {"analyst": analyst,
"messages": [HumanMessage(
content=f"所以你说你正在写一篇关于 {topic} 的文章?"
)
]}) for analyst in state["analysts"]]
Finalize 最终确定

我们添加最后一个步骤,为最终报告撰写引言和结论。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import operator
from typing import List, Annotated
from typing_extensions import TypedDict

# 定义用于撰写最终报告的指令模板
report_writer_instructions = """你是一位正在撰写关于以下主题报告的技术作家:

{topic}

你有一个分析师团队。每个分析师做了两件事:

1. 他们就一个特定的子主题与专家进行了采访。
2. 他们将他们的发现写成了一份备忘录。

你的任务:

1. 你将得到一份来自你所有分析师的备忘录集合。
2. 仔细思考每份备忘录中的见解。
3. 将这些见解整合成一个清晰的整体摘要,把所有备忘录中的核心思想联系起来。
4. 将每份备忘录中的要点总结成一个连贯的单一叙述。

报告格式要求:

1. 使用 Markdown 格式。
2. 报告开头不要有前言。
3. 不要使用子标题。
4. 报告开头使用一个一级标题:## Insights (## 见解)
5. 在报告中不要提及任何分析师的名字。
6. 保留备忘录中的所有引用,这些引用会用方括号标注,例如 [1] 或 [2]。
7. 创建一个最终的、合并的来源列表,并添加到以 `## Sources` 为标题的部分。
8. 按顺序列出你的来源,不要重复。

[1] 来源 1
[2] 来源 2

以下是你的分析师提供的备忘录,你需要根据它们来撰写报告:

{context}"""

def write_report(state: ResearchGraphState):
""" 撰写报告主体内容的节点函数 """
# 获取所有章节(备忘录)
sections = state["sections"]
topic = state["topic"]

# 将所有章节(备忘录)连接成一个字符串
formatted_str_sections = "\n\n".join([f"{section}" for section in sections])

# 使用指令模板和备忘录内容,调用 LLM 生成最终报告
system_message = report_writer_instructions.format(topic=topic, context=formatted_str_sections)
report = llm.invoke([SystemMessage(content=system_message)] + [HumanMessage(content="根据这些备忘录写一份报告。")])
# 返回报告内容
return {"content": report.content}

# 定义用于撰写引言和结论的指令模板
intro_conclusion_instructions = """你是一位正在完成关于 {topic} 报告的技术作家。

你将得到报告的所有章节。

你的工作是撰写一个清晰且有说服力的引言或结论部分。

用户会指示你是写引言还是结论。

两个部分都不要有前言。

目标大约 100 个词,简洁地预览(对于引言)或回顾(对于结论)报告的所有章节。

使用 Markdown 格式。

对于你的引言,创建一个引人注目的标题,并使用 # 标题级别。
对于你的引言,使用 ## Introduction (## 引言) 作为部分标题。

对于你的结论,使用 ## Conclusion (## 结论) 作为部分标题。

以下是供你参考以撰写相应部分的章节:{formatted_str_sections}"""

def write_introduction(state: ResearchGraphState):
""" 撰写报告引言的节点函数 """
# 获取所有章节
sections = state["sections"]
topic = state["topic"]

# 将所有章节连接成一个字符串
formatted_str_sections = "\n\n".join([f"{section}" for section in sections])

# 使用指令模板和章节内容,调用 LLM 生成引言
instructions = intro_conclusion_instructions.format(topic=topic, formatted_str_sections=formatted_str_sections)
intro = llm.invoke([instructions] + [HumanMessage(content="写报告的引言")])
# 返回引言内容
return {"introduction": intro.content}

def write_conclusion(state: ResearchGraphState):
""" 撰写报告结论的节点函数 """
# 获取所有章节
sections = state["sections"]
topic = state["topic"]

# 将所有章节连接成一个字符串
formatted_str_sections = "\n\n".join([f"{section}" for section in sections])

# 使用指令模板和章节内容,调用 LLM 生成结论
instructions = intro_conclusion_instructions.format(topic=topic, formatted_str_sections=formatted_str_sections)
conclusion = llm.invoke([instructions] + [HumanMessage(content="写报告的结论")])
# 返回结论内容
return {"conclusion": conclusion.content}

def finalize_report(state: ResearchGraphState):
""" 这是“reduce”(归约)步骤,我们收集所有部分,将它们组合起来,并进行反思以写出引言/结论 """
""" 最终整合报告的节点函数 """
# 获取报告主体内容
content = state["content"]
# 如果内容以 "## Insights" 开头,则移除这个标题
if content.startswith("## Insights"):
content = content.strip("## Insights")
# 尝试分离报告主体和来源部分
if "## Sources" in content:
try:
content, sources = content.split("\n## Sources\n")
except:
sources = None # 如果分离失败,则来源部分为空
else:
sources = None

# 将引言、主体内容和结论连接起来形成最终报告
final_report = state["introduction"] + "\n\n---\n\n" + content + "\n\n---\n\n" + state["conclusion"]
# 如果存在来源部分,则将其附加到最终报告末尾
if sources is not None:
final_report += "\n\n## Sources\n" + sources
# 返回最终报告
return {"final_report": final_report}

# 添加节点和边
builder = StateGraph(ResearchGraphState)
builder.add_node("create_analysts", create_analysts)
builder.add_node("human_feedback", human_feedback)
builder.add_node("conduct_interview", interview_builder.compile()) # 将之前定义的采访图编译后作为一个节点
builder.add_node("write_report", write_report)
builder.add_node("write_introduction", write_introduction)
builder.add_node("write_conclusion", write_conclusion)
builder.add_node("finalize_report", finalize_report)

# 定义工作流逻辑
builder.add_edge(START, "create_analysts") # 从开始到创建分析师
builder.add_edge("create_analysts", "human_feedback") # 创建分析师后进入人类反馈环节
# 条件边:根据 human_feedback 节点的输出,决定是回到创建分析师还是开始采访
builder.add_conditional_edges("human_feedback", initiate_all_interviews, ["create_analysts", "conduct_interview"])
# 采访完成后,并行执行撰写报告、引言和结论
builder.add_edge("conduct_interview", "write_report")
builder.add_edge("conduct_interview", "write_introduction")
builder.add_edge("conduct_interview", "write_conclusion")
# 撰写完报告的三个部分后,汇聚到最终整合步骤
builder.add_edge(["write_conclusion", "write_report", "write_introduction"], "finalize_report")
builder.add_edge("finalize_report", END) # 最终整合后结束

# 编译图
memory = MemorySaver()
# 编译图,并设置在 'human_feedback' 节点前中断,以及使用检查点保存器
graph = builder.compile(interrupt_before=['human_feedback'], checkpointer=memory)
# 显示图的可视化表示
display(Image(graph.get_graph(xray=1).draw_mermaid_png()))
image-20250725102148881
测试

让我们提出一个关于 LangGraph 的开放式问题。

1
2
3
4
5
6
7
8
9
10
max_analysts = 3 
topic = "采用 LangGraph 作为代理框架的好处"
thread = {"configurable": {"thread_id": "1"}}

# 使用 stream 但不执行打印逻辑
for event in graph.stream({
"topic": topic,
"max_analysts": max_analysts
}, thread, stream_mode="values"):
pass # 不执行任何操作
1
2
3
#人工更新节点
graph.update_state(thread, {"human_analyst_feedback":
"请加入这家原生生成式 AI 创业公司的首席执行官。"}, as_node="human_feedback")
1
2
3
# 确认我们已经满意,返回none
graph.update_state(thread, {"human_analyst_feedback":
None}, as_node="human_feedback")
1
2
3
4
5
# 继续执行
for event in graph.stream(None, thread, stream_mode="updates"):
print("--Node--")
node_name = next(iter(event.keys()))
print(node_name)
image-20250725103159942

https://smith.langchain.com/public/6504cafd-d314-48d1-8640-57dc3f472e61/r

Elasticsearch 是当今最流行的 开源分布式搜索与分析引擎,用 Java 开发,基于 Apache Lucene 构建。它把全文检索、实时分析、时序数据、地理空间查询向量检索统一到一个平台,被广泛用于日志、指标、安全、企业搜索以及 AI/RAG 场景。

RAG 系统中,Elasticsearch 不仅是向量数据库,更是语义检索引擎和上下文构建器,更准确地说,是向量数据库 + 全文检索引擎的混合角色。

查找语句

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
GET /test_full_v1/_search
{
"query": {
"multi_match": {
"query": "高压旁路管道磁粉检测使用的是哪种磁化方法和磁悬液浓度?",
"fields": ["text", "report_name","report_url"]
}
}
}

GET /_cat/indices?v#查看所有的所有

GET test_full_v1/_count#查看文件数

GET test_full_v1/_mapping#查看映射

GET test_full_v1#查看索引内容

GET test_full_v1/_search#查看索引内容
{
"size": 3,
"query": { "match_all": {} }
}

GET /_cat/indices/zxj_test#查看索引是否存在

可视化平台kibana的内网路径:

  • http://10.117.128.50:5601
  • http://10.117.128.50:5601/app/dev_tools#/console/shell 开发者工具

es支持的几种检索函数

向量搜索

这里的向量搜索也就是稠密向量检索,过程为“把文本/图像等非结构化数据映射成高维向量 → 在向量空间里做近似最近邻(ANN)搜索 → 按相似度排序返回结果”

1
2
3
4
5
6
7
8
9
10
11
12
13
def vector_query(search_query: str):
# 使用与索引时相同的嵌入模型将查询文本转换为向量
vector = embeddings.embed_query(search_query)

# 构建Elasticsearch KNN查询结构
return {
"knn": {
"field": dense_vector_field, # 指定存储向量的字段名
"query_vector": vector, # 查询向量
"k": 5, # 返回最相似的5个结果
"num_candidates": 10, # 在10个候选中选择最优的5个(提高搜索效率和准确性)
}
}
bm25

BM25(Best Matching 25)也就是全文关键词搜索,或者叫传统关键词搜索,他的核心思想为“词频越高、文档越短、词越稀有,则相关性越高”,在 TF-IDF 的基础上引入词频饱和、文档长度归一化两项修正。

TF-IDF(Term Frequency–Inverse Document Frequency,词频-逆文档频率)是一种经典的 文本特征权重计算方法,用于衡量 一个词对一篇文档的重要性

项目中的bm25检索采取多字段匹配的方式,还有几个参数需要了解,如下

  1. ‘type’ :决定了如何把多个字段的 BM25 打分合并成最终得分,常用的参数有以下三种

    type 中文含义 打分逻辑
    best_fields(默认) 最佳字段优先 得分最高的那个字段 做最终分(可用 tie_breaker 让次佳字段再贡献一点点)。
    most_fields 最多字段优先 把所有命中字段的得分 直接相加(类似 OR 逻辑),字段越多分越高。
    cross_fields 跨字段合并 把多个字段视为 一个虚拟大字段,统一计算 TF 和 IDF,解决“关键词分散在不同字段”问题。
  2. 'tie_breaker':当多个字段都匹配时,“最佳字段”得分 + 其余字段得分×tie_breaker 作为最终得分。

  3. 'operator':控制单个字段内的多个词项是“AND”还是“OR”关系。

    • "AND" 要求同一个字段必须同时包含所有查询词,减少噪音,提高精准度。适合地址、姓名等跨字段严格匹配。
    • "OR" 只要字段里出现任意一个词就匹配,召回量大,但可能引入不相关结果。

最后,每个字段还可以人工设置权重,如"fields": ["title^3", "content", "tags^2"]

1
2
3
4
5
6
7
8
9
10
11
def bm25_query(search_query: str):
# 使用BM25算法进行传统关键词匹配
return {
"query": {
"multi_match": { # 多字段匹配查询
"query": search_query,
"fields": ["text", "report_name", "report_url"] # 在这些字段中搜索
}
},
"size": 8, # 返回最多8个结果
}
混合检索

混合检索也就是 Reciprocal Rank Fusion(RRF),通过结合向量搜索和 BM25 搜索的结果综合判断。

但由于es中混合检索需要付费使用,后续检索效果评估时不做测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def hybrid_query(search_query: str):
# 使用与索引时相同的嵌入模型将查询文本转换为向量
vector = embeddings.embed_query(search_query)

# 构建混合搜索查询结构
return {
"retriever": {
# 使用RRF (Reciprocal Rank Fusion) 算法融合多个检索器的结果
"rrf": {
# 定义多个检索器列表
"retrievers": [
{
# 第一个检索器:多字段BM25关键词搜索
"standard": {
"query": {
# 使用multi_match在多个字段上进行BM25搜索
"multi_match": {
"query": search_query,
"type": "best_fields", # 选择最佳匹配字段
"fields": ["text", "report_name", "report_url"] # 在这些字段中搜索
}
}
}
},
{
# 第二个检索器:KNN向量近似搜索
"knn": {
"field": dense_vector_field, # 向量字段名(使用定义的变量)
"query_vector": vector, # 查询向量
"k": 5, # 返回5个最相似的向量结果
"num_candidates": 10, # 候选向量数量,用于提高搜索准确性
}
},
]
}
}
}
模糊检索

无论是在网页搜索、文件检索,还是数据库查询中,我们时常会因为拼写错误或信息不完整而无法找到需要的结果。模糊搜索(Fuzzy Search)应运而生,它通过识别与查询相似的词语来帮助我们获得更加灵活的搜索结果。

这里是将bm25与模糊搜索结合起来

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def fuzzy_query(search_query: str):
"""
创建模糊搜索查询函数,支持拼写错误和近似匹配

Args:
search_query (str): 用户输入的搜索查询文本

Returns:
Dict: Elasticsearch模糊搜索查询体
"""

return {
"query": {
# 使用match查询进行文本匹配
"match": {
# 在指定的文本字段中进行搜索
text_field: {
"query": search_query, # 用户的搜索查询文本
"fuzziness": "AUTO", # 自动模糊匹配设置
# fuzziness参数说明:
# - "AUTO":根据词长度自动调整模糊度
# - 0-2个字符:不允许错误
# - 3-5个字符:允许1个编辑距离错误
# - 5个以上字符:允许2个编辑距离错误
# - 也可以设置具体数值:0, 1, 2
}
},
},
}

  • AUTO 规则:0~2 字符不允许错;3~5 字符最多1错;>5 字符最多2错。
  • text 字段先分词,再对每个 token 做模糊 → 召回 Elasticsearch

elasticsearch的连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from langchain_elasticsearch import ElasticsearchRetriever
from embeddings_model import select_embeddings_model
# 根据模型名称选择嵌入模型
embedding_model_name='bge'
embeddings = select_embeddings_model(embedding_model_name)
dense_vector_field='vector'
text_field='text'
search_func=fuzzy_query
# 创建Elasticsearch检索器
retriever = ElasticsearchRetriever.from_es_params(
index_name="zxj_test", # 指定索引名称
body_func=fuzzy_query, # 查询函数
content_field="text", # 内容字段名
url='http://elasticsearch:9200/'# Elasticsearch服务器地址
)
print("连接成功")

elasticsearch的入库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
document = []
id_list = []

# 遍历文件夹中的所有.md文件
for filename in os.listdir(folder_path):
if filename.endswith(".md"):
file_path = os.path.join(folder_path, filename)

# 从文件名中提取chunk_id(去除.md扩展名)
chunk_id = filename.replace(".md", "")

# 读取MD文件内容
with open(file_path, 'r', encoding='utf-8') as file:
text_content = file.read()

# 查找对应的URL(根据文件名匹配)
report_url = ""
# 从folder_name中提取核心文档名(最后一个^后面的部分)
core_folder_name = folder_name.split('^')[-1] # 提取核心文档名

# 根据core_folder_name查找对应的URL
report_url = find_url_by_name_from_list(docs_data, core_folder_name)

# 构建document结构
doc = Document(
page_content=text_content,
metadata={
"report_name": folder_name,
"report_url": report_url,
"chunk_id": chunk_id
}
)
document.append(doc)
id_list.append(folder_name + "_" + chunk_id)

# 批量添加到向量库
es_vector_store.add_documents(documents=document, ids=id_list)
print(f" 文件夹 {folder_name} 已成功入库,共 {len(document)} 个文档块")

相关资料

Elasticsearch检索器 | 🦜️🔗 LangChain 框架

主流的检索策略

BM25 全文关键词检索

BM25(Best Matching 25)是一种久经考验的排序算法,广泛应用于传统搜索引擎中。它基于“词袋模型”,核心思想是通过关键词匹配程度来衡量文档与查询的相关性。

核心原理概览:

词频 (Term Frequency, TF):一个词在文档中出现的次数越多,通常意味着这篇文档与该词相关性越高。但BM25会进行“饱和度”处理,避免某些超高频词过度影响结果。可以想象成,一篇文章提到“苹果”10次,比提到1次更相关,但提到100次和提到50次,在“苹果”这个主题上的相关性增加可能就没那么显著了。 逆文档频率 (Inverse Document Frequency, IDF):如果一个词在整个文档集合中都很罕见(只在少数文档中出现),那么它对于区分文档主题就更重要,IDF值就高。比如“量子纠缠”这个词远比“的”、“是”这类词更能锁定专业文档。 文档长度归一化:用于平衡长短文档的得分,避免长文档仅仅因为内容多而获得不公平的高分。 工作方式举例:当用户搜索“深度学习入门教程”时,BM25会倾向于找出那些更频繁出现“深度学习”、“入门”、“教程”这些词,且这些词相对不那么常见的文档。

1. 公式整体结构 $$ \text{score}(D, Q) = \sum_{i=1}^{n} \text{IDF}(q_i) \cdot \underbrace{\frac{f(q_i, D) \cdot (k_1 + 1)}{f(q_i, D) + k_1 \cdot \left(1 - b + b \cdot \frac{|D|}{\text{avgdl}}\right)}}_{\text{词频归一化项(TF)}} $$ - IDF 部分:衡量词项 $ q_i $ 的区分能力(逆文档频率)。 - TF 部分:衡量词项 $ q_i $ 在文档 $ D $ 中的匹配程度(词频归一化)。 - 求和:对查询中的所有词项 $ q_i $ 的得分求和,得到最终相关性分数。

2. IDF 部分 $$ \text{IDF}(q_i) = \ln\left(\frac{N - n(q_i) + 0.5}{n(q_i) + 0.5}\right) $$ - 意义:IDF 值越高,词项 $ q_i $ 越能区分文档(常见于少数文档中的词)。 - 平滑处理:分子和分母均加 0.5,避免极端值(如 $ n(q_i) = 0 $ 时 ID 无限大)。 - 参数: - $ N $:文档总数。 - $ n(q_i) $:包含 $ q_i $ 的文档数。

3. 词频归一化项(TF) $$ \frac{f(q_i, D) \cdot (k_1 + 1)}{f(q_i, D) + k_1 \cdot \left(1 - b + b \cdot \frac{|D|}{\text{avgdl}}\right)} $$ - 非线性饱和:分子和分母均包含 $ f(q_i, D) $,使词频增长带来的增益逐渐减小(避免长文档中重复词项的过度影响)。 - 文档长度归一化: - $ |D| $:文档 $ D $ 的长度(词数)。 - $ $:整个文档集合的平均文档长度。 - $ b b=1 $ 时完全归一化,$ b=0 $ 时忽略长度)。 - 参数: - $ k_1 $:控制词频饱和的系数(通常 $ k_1 $,默认 $ k_1 = 1.5 $)。 - $ b $:默认 $ b = 0.75 $。

这个公式虽然看起来有些复杂,但它精妙地平衡了词频、词的稀有度以及文档长度这几个核心因素,是BM25算法效果出色的关键。

BM25、全文搜索与倒排索引:它们是如何协同工作的?

这三者是构建搜索系统的关键组件:

  • 全文搜索 (Full-Text Search):这是我们希望达成的目标——在大量文本中找到包含特定信息的文档。

  • 倒排索引 (Inverted Index):这是实现高效全文搜索的数据结构基础。它像一本书末尾的详细“关键词索引”,记录了每个词出现在哪些文档中以及相关位置信息。当用户查询时,系统可以通过倒排索引快速定位到包含查询词的候选文档。

  • BM25:在通过倒排索引找到候选文档后,BM25算法登场,为每个文档计算一个相关性得分,然后按分排序,将最相关的结果呈现给用户。

把它们比作在图书馆找特定主题的书籍:

  • 你告诉图书管理员你要找关于“天体物理学”的书(用户查询)。
  • 管理员查阅一个总卡片索引(倒排索引),迅速告诉你哪些书架(文档ID)上有包含“天体物理学”这个词的书。
  • 你走到这些书架,快速翻阅这些书(BM25评分过程),根据目录、摘要和提及“天体物理学”的频繁程度及重要性,判断哪几本最符合你的需求,并把它们按相关性高低排好。

Dense Vector / kNN 向量语义检索

向量语义检索(Dense Vector / k-Nearest Neighbor,简称 kNN)是一种基于高维稠密向量表示的语义检索技术,与传统关键词倒排索引不同,它通过自然语言的上下文含义而非字面匹配来寻找最相关的文档或实体。

  1. Embedding:使用预训练语言模型(如 BERT、Sentence-Transformers)把文本映射为固定维度的稠密向量(通常 128–1024 维)。

  2. kNN 搜索:给定查询向量,在向量空间中找距离最近的 k 个文档向量。距离度量常见:

    • 余弦相似度(cosine)
    • 内积 / dot-product
    • L2 欧氏距离

Hybrid Retrieval 混合检索

既然不同的检索策略各有千秋(例如,BM25擅长关键词精确匹配,Embedding擅长语义理解),那么将它们结合起来,是不是能达到“1+1>2”的效果呢?答案是肯定的,这就是混合检索的核心思想。

常见做法:同时使用BM25(或其他稀疏检索方法)和一种或多种Embedding检索方法,然后将它们各自的检索结果进行融合排序。

融合策略举例:

RRF (Reciprocal Rank Fusion, 倒数排序融合):一种简单但鲁棒的融合方法。它不关心不同检索系统输出的原始得分,只关心每个文档在各自结果列表中的排名。一个文档 doc 的RRF得分计算如下: ScoreRRF(doc) = Σs ∈ Systems(1/(k + ranks(doc))) 其中:

  • Systems 是所有参与融合的检索系统的集合。
  • rank_s(doc) 是文档 doc 在检索系统 s 给出的结果列表中的排名(例如,第一名是1,第二名是2)。
  • k 是一个小常数(例如,常设置为60),用于平滑得分,避免排名靠后的文档得分过小或排名第一的文档得分占比过高。

Reranker 重排序器

经过上述一种或多种检索策略的“粗筛”(召回阶段),我们通常会得到一个包含较多候选文档的列表(比如几百个)。为了进一步提升最终喂给LLM的文档质量,Reranker(重排序器)应运而生。它相当于一位“精炼师”或“质量品鉴官”,对初步召回的结果进行更细致、更精准的二次排序。

  1. 召回 倒排 / 向量 / 混合先给 Top-N(N≈100~1 k)。
  2. 拼特征 把 (query, doc) 拼成一条输入: [CLS] 用户问题 [SEP] 标题+正文 [SEP]
  3. 打分 扔给 Cross-Encoder 或 ColBERT → 输出一个相关性分数
  4. 重排 按分数从高到低重新排序,只留 Top-K(K≈10~20)。
  5. 返回 重排后的短列表交给前端或 LLM,完成一次“精读”。

参考

大模型RAG | 深入了解几种主流的检索策略(BM25、Embedding、混合检索、Reranker)-CSDN博客

项目关于elasticsearch代码阅读记录

阅读elasticsearch代码相关记录:

  1. embedding_model.select_embeddings_model:根据指定的模型名称加载
  2. make_es_vector_store
    1. docs_url = pd.read_excel(‘docs_new.xlsx’),从excel加载url并进行数据处理,保存筛选后的ur
    2. 完成文件加载测试:xizhang/retrival/docfile_test/test.py;
    3. 初始化es,使用elastic_search.load_es_index加载存储索引
    4. 完成测试elasticsearch连接与索引构建(索引名zxj_test):/aisys/repo_dev/xizhang/retrival/elasticsearch_test/test_es_connect.py,/aisys/repo_dev/xizhang/retrival/elasticsearch_test/test_add_es.py
    5. 顺序批量处理文件:共16000多份,每十份为一批进行处理,使用download_pdf.py进行文件下载,使用,使用vector_base.rewrite_file对文件进行处理,这里可以修改代码,增加对mineru处理pdf的markdown文件的处理,返回Document对象列表;
  3. elastic_search

重写了检索策略的函数,包括BM25,KNN,混合搜索

  1. elastic_retriever创建Elasticsearch检索器:根据搜索类型选择对应的查询函数,创建Elasticsearch检索器ElasticsearchRetriever.from_es_params

  2. retrievers

    1. 定义函数select_retriever,根据指定的名称选择并返回相应的检索器,目前只有bm25

libreoffice部署

查看Linux发行版

image-20250718095931232

系统是 银河麒麟高级服务器操作系统 V10(Kylin Linux Advanced Server V10),属于 中国国产、兼容 CentOS/RHEL 生态 的 Linux 发行版。

因此它使用 RPM 包管理dnf/yum),而不是 .deb

查看CPU 处理器架构

1
uname -m

x86_64

不用部署了,镜像里有,直接用了

使用libreoffice处理doc文件,转成pdf

将当前目录下所有 .doc.docx 转为 PDF:

1
2
3
4
5
6
7
libreoffice --headless --convert-to pdf *.doc *.docx --outdir ./pdf_output/

# 检查是否有残留进程
ps aux | grep libreoffice

# 如果有残留进程,杀死它们
killall soffice.bin 2>/dev/null

python

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import os
import subprocess
import argparse
import glob
from pathlib import Path
def batch_convert_documents(input_path, output_dir):
"""
批量转换文档的函数版本

Args:
input_path (str): 输入路径(文件、目录或通配符)
output_dir (str): 输出目录

Returns:
bool: 转换是否成功
"""

# 确保输出目录存在
Path(output_dir).mkdir(parents=True, exist_ok=True)

# 收集所有要转换的文件
files_to_convert = []

if os.path.isfile(input_path):
# 单个文件
if input_path.lower().endswith(('.doc', '.docx')):
files_to_convert.append(input_path)
elif os.path.isdir(input_path):
# 目录中的所有doc/docx文件
for pattern in ['*.doc', '*.docx']:
files_to_convert.extend(glob.glob(os.path.join(input_path, pattern)))
else:
# 通配符模式
files_to_convert = glob.glob(input_path)
# 过滤出doc和docx文件
files_to_convert = [f for f in files_to_convert if f.lower().endswith(('.doc', '.docx'))]

if not files_to_convert:
print("未找到要转换的文档文件")
return False

print(f"找到 {len(files_to_convert)} 个文件需要转换")

# 构建命令
cmd = [
'libreoffice',
'--headless',
'--convert-to', 'pdf',
'--outdir', output_dir
] + files_to_convert

try:
print("正在转换文件...")
result = subprocess.run(cmd, capture_output=True, text=True, timeout=600)

if result.returncode == 0:
print(f"成功转换 {len(files_to_convert)} 个文件")
return True
else:
print(f"转换失败: {result.stderr}")
return False

except Exception as e:
print(f"转换过程中发生错误: {e}")
return False

if __name__ == "__main__":
success = batch_convert_documents("./docs", "./pdf_output")

Linux扫盲

发行版

像Ubuntu,CentOS都属于Linux的发行版,就像Windows11属于Windows的关系

常见发行版分类:

系列 代表发行版 包格式 特点
Debian 系 Debian、Ubuntu、Kali、Linux Mint .deb 包多、社区大、教程多
Red Hat 系 CentOS、RHEL、Rocky、Alma、Fedora .rpm 企业级稳定、官方支持长
SUSE 系 openSUSE Leap / Tumbleweed .rpm YaST 管理工具、欧洲流行
Arch 系 Arch Linux、Manjaro .pkg.tar.zst 滚动更新、极客向
轻量/最小 Alpine、Debian netinst、CentOS Stream Minimal 任意 镜像小、资源占用低

如何查看Linux发行版

1
cat /etc/os-release

deb和rpm

.deb.rpm 想象成 “Linux 世界里的安装程序”,就像 Windows 的 .exe / .msi

格式 适用系统 安装命令
.deb Debian、Ubuntu、Linux Mint、Kali 等 sudo dpkg -i xxx.debsudo apt install ./xxx.deb
.rpm CentOS、RHEL、Fedora、openSUSE、Rocky、Alma 等 sudo rpm -ivh xxx.rpmsudo dnf/yum install xxx.rpm

cpu处理器架构

目录名 代表架构 适用场景
x86_64 Intel/AMD 64 位 绝大多数台式机、服务器(如 Xeon、EPYC、Core、Ryzen)
aarch64 ARM 64 位 树莓派 4/5、苹果 M 系列(Asahi Linux)、鲲鹏、飞腾、Ampere ARM 服务器等

查看处理器架构:

1
uname -m

前言

本教程为langchain官方教程的学习记录

academy.langchain.com/enrollments

代码见[learn-rag-langchain/academy-langgraph at main · zxj-2023/learn-rag-langchain](https://github.com/zxj-2023/learn-rag-langchain/tree/main/academy-langgraph)

module-1

route路由

在 LangGraph 中,route(路由)*的核心作用是*根据当前状态动态决定“下一步应该执行哪个节点”

定义工具
1
2
3
4
5
6
7
8
9
10
11
12
13
def multiply(a: int, b: int) -> int:
"""Multiply a and b.

Args:
a: first int
b: second int
"""
return a * b

llm = ChatOpenAI(

)
llm_with_tools = llm.bind_tools([multiply])

三引号字符串叫 docstring,它会被 LangChain 拿来做两件事:

  1. 生成工具的 description(给大模型看的“说明书”) 没有它时,LangChain 只能退而求其次,把函数名 multiply 拼成一句 “multiply tool” 之类的默认描述。大模型拿到的工具列表里,这个工具就只有一个干巴巴的名字和参数列表,它可能猜不到这个工具到底是干什么的
  2. 给人类开发者自己看 IDE、文档生成器、静态检查工具都会读取这段文字,方便后期维护。
构建条件边

tool_calling_llm 是一个普通的计算节点(node),负责把当前对话状态交给大模型,让大模型决定要不要调用工具;

真正完成“路由”动作的是 tools_condition 这个函数——它才是 LangGraph 里的 route(条件边)

tools_condition 是 作为LangGraph 预置的“默认路由函数”,功能就是,如果大模型的最新回复中包含工具调用,就调用工具

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# Node
def tool_calling_llm(state: MessagesState):
#调用大模型后将最新的消息返回
return {"messages": [llm_with_tools.invoke(state["messages"])]}

# Build graph
#MessagesState 是 LangGraph 官方预置 的一种 状态(State)定义
#这个状态维护了一个消息list,有新的消息就加进这个消息list
builder = StateGraph(MessagesState)
builder.add_node("tool_calling_llm", tool_calling_llm)
builder.add_node("tools", ToolNode([multiply]))
builder.add_edge(START, "tool_calling_llm")
builder.add_conditional_edges(
"tool_calling_llm",
# 如果助手(结果)的最新消息是工具调用 -> tools_condition 路由到工具
# 如果助手(结果)的最新消息不是工具调用 -> tools_condition 路由到 END
tools_condition,
)
builder.add_edge("tools", END)
graph = builder.compile()

# View
display(Image(graph.get_graph().draw_mermaid_png()))
image-20250719142543838
调用
1
2
3
4
5
from langchain_core.messages import HumanMessage
messages = [HumanMessage(content="你好,2乘2是多少")]
messages = graph.invoke({"messages": messages})
for m in messages['messages']:
m.pretty_print()

输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
================================ Human Message =================================

你好,2乘2是多少
================================== Ai Message ==================================
Tool Calls:
multiply (call_e026ceb409e247748786ad)
Call ID: call_e026ceb409e247748786ad
Args:
a: 2
b: 2
================================= Tool Message =================================
Name: multiply

4

agent代理

在 LangGraph 中,代理(Agent) 被明确定义为“一个由大语言模型(LLM)驱动的、能够循环决策并调用外部工具来完成任务的节点或子图”

Agent = LLM + 工具集合 + 提示模板,三者在 LangGraph 的状态化图结构里循环运行,直到满足停止条件。

ReAct 是一种流行的通用智能体架构,它结合了这些扩展,并整合了三个核心概念。

  1. 工具调用:允许LLM根据需要选择和使用各种工具。
  2. 记忆:使智能体能够保留和使用之前步骤的信息。
  3. 规划:使LLM能够创建并遵循多步计划以实现目标。

act- 让模型调用特定工具

observe - 将工具输出传递回模型

reason - 让模型对工具输出进行推理,以决定下一步操作(例如,调用另一个工具或直接响应)

定义工具
1
2
3
4
5
6
7
tools = [add, multiply, divide]#工具函数具体内容省略
llm = ChatOpenAI()

# 在这个 ipynb 文件中,我们将并行工具调用(parallel tool calling)设置为 false,因为数学计算通常是按顺序执行的,并且这次我们有3个可以进行数学计算的工具。
# OpenAI 模型为了效率,默认进行并行工具调用,详情请参阅 `https://python.langchain.com/docs/how_to/tool_calling_parallel/`
# 不妨尝试一下,看看模型在处理数学方程式时的表现!
llm_with_tools = llm.bind_tools(tools, parallel_tool_calls=False)
创建代理

定义节点

1
2
3
4
5
6
7
8
9
from langgraph.graph import MessagesState
from langchain_core.messages import HumanMessage, SystemMessage

# System message
sys_msg = SystemMessage(content="你是一个乐于助人的助手,负责对一组输入执行算术运算。")

# Node
def assistant(state: MessagesState):
return {"messages": [llm_with_tools.invoke([sys_msg] + state["messages"])]}

这一步相当于定义了系统提示词,然后在 assistant 这个节点里,通过 [sys_msg] + state[“messages”] 这部分代码,这个系统提示词被添加到了整个对话历史的最前面,然后一起发送给模型。这样一来,模型在生成回复时就会遵循这个系统提示词的指示。

与上一个不同的是,我们将 Tools 节点 回环 连接到 Assistant,从而形成一个回路。

在 assistant节点执行后,tools_condition检查模型的输出是否为工具调用。

如果是工具调用,则流程被导向至 tools 节点。

tools节点重新连接到assistant 只要模型决定调用工具,此循环就会继续。

如果模型的响应不是工具调用,则流程被导向至结束,终止该过程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# Graph
builder = StateGraph(MessagesState)

# Define nodes: these do the work
builder.add_node("assistant", assistant)
builder.add_node("tools", ToolNode(tools))

# Define edges: these determine how the control flow moves
builder.add_edge(START, "assistant")
builder.add_conditional_edges(
"assistant",
# If the latest message (result) from assistant is a tool call -> tools_condition routes to tools
# If the latest message (result) from assistant is a not a tool call -> tools_condition routes to END
tools_condition,
)
builder.add_edge("tools", "assistant")
react_graph = builder.compile()

# Show
display(Image(react_graph.get_graph(xray=True).draw_mermaid_png()))
image-20250719145948088
调用
1
2
3
4
5
messages = [HumanMessage(content="将3和4相加。将结果乘以2。再将结果除以5。")]
messages = react_graph.invoke({"messages": messages})

for m in messages['messages']:
m.pretty_print()

parallel_tool_calls=False的输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
================================ Human Message =================================

将3和4相加。将结果乘以2。再将结果除以5。
================================== Ai Message ==================================
Tool Calls:
add (call_6c69898dba0342bfbb889e)
Call ID: call_6c69898dba0342bfbb889e
Args:
a: 3
b: 4
================================= Tool Message =================================
Name: add

7
================================== Ai Message ==================================
Tool Calls:
multiply (call_9940e7603ecf4a13a5f2fb)
Call ID: call_9940e7603ecf4a13a5f2fb
Args:
a: 7
b: 2
================================= Tool Message =================================
Name: multiply

14
================================== Ai Message ==================================
Tool Calls:
divide (call_d48fbbe205a14dfbaa3500)
Call ID: call_d48fbbe205a14dfbaa3500
Args:
a: 14
b: 5
================================= Tool Message =================================
Name: divide

2.8
================================== Ai Message ==================================

最终结果是2.8。

parallel_tool_calls=True的输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
================================ Human Message =================================

将3和4相加。将结果乘以2。再将结果除以5。
================================== Ai Message ==================================
Tool Calls:
add (call_e0c7d8e65f2c49e8aecd3e)
Call ID: call_e0c7d8e65f2c49e8aecd3e
Args:
a: 3
b: 4
multiply (call_5bf824058e64489aaace91)
Call ID: call_5bf824058e64489aaace91
Args:
a: 7
b: 2
divide (call_36c34f69f6574028b28847)
Call ID: call_36c34f69f6574028b28847
Args:
a: 14
b: 5
================================= Tool Message =================================
Name: add

7
================================= Tool Message =================================
Name: multiply

14
================================= Tool Message =================================
Name: divide

2.8
================================== Ai Message ==================================

最终结果是 **2.8**。

Agent memory代理记忆

使用chekpointer检查点的功能,最简单的检查点之一是 MemorySaver,这是一个用于图形状态的内存键值存储。

这个检查点就相当于把图的每一次“状态快照”持久化到外部存储的机制。

1
2
3
from langgraph.checkpoint.memory import MemorySaver
memory = MemorySaver()
react_graph_memory = builder.compile(checkpointer=memory)

我们可以使用 记忆功能 来解决这个问题!LangGraph 可以使用检查点工具在每一步之后自动保存图的状态。这一内置的持久化层为我们提供了内存功能,使 LangGraph 能够从最后一次状态更新处继续。

1
2
3
4
5
6
7
8
9
10
# Specify a thread
config = {"configurable": {"thread_id": "1"}}

# Specify an input
messages = [HumanMessage(content="Add 3 and 4.")]

# Run
messages = react_graph_memory.invoke({"messages": messages},config)
for m in messages['messages']:
m.pretty_print()

当我们使用内存时,我们需要指定一个 thread_id。这 thread_id 将存储我们的图形状态集合。

如下图,检查点在图的每一步写入状态,这些检查点保存在一个线程中 ,我们可以使用 thread_id 在未来访问该线程

state.jpg

module-2

state-scheme状态模式

LangGraph 的 state-scheme(状态模式) 就是“一张蓝图”,它告诉框架:“在整个图的生命周期里,状态对象应该长什么样、每个字段怎样被更新、以及节点之间如何共享或隔离数据。”

state-scheme 用 TypedDictPydantic BaseModel 来声明,定义了:

  • 状态里有哪些字段(key)
  • 每个字段的 Python 类型
  • 可选 该字段的 reducer(更新规则)
TypedDict

基本定义

1
2
3
4
5
from typing_extensions import TypedDict

class TypedDictState(TypedDict):
foo: str
bar: str

可增加像 Literal 这样的类型提示,使其更有价值

1
2
3
4
5
from typing import Literal

class TypedDictState(TypedDict):
name: str
mood: Literal["happy","sad"]

在这里,mood 只能是 “happy” 或 “sad”。

加 reducer:让更新“可追加”而不覆盖

1
2
3
4
class MathState(TypedDict):
question: str
scratchpad: Annotated[list[str], add_message] # 新元素自动追加
answer: int

Annotated[list[str], add] 告诉 LangGraph:当节点返回 {"scratchpad": ["新步骤"]} 时,追加到现有列表,而不是替换

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import random
from IPython.display import Image, display
from langgraph.graph import StateGraph, START, END
#定义节点
def node_1(state):
print("---Node 1---")
return {"name": state['name'] + " is ... "}

def node_2(state):
print("---Node 2---")
return {"mood": "happy"}

def node_3(state):
print("---Node 3---")
return {"mood": "sad"}
#路由函数
def decide_mood(state) -> Literal["node_2", "node_3"]:

# Here, let's just do a 50 / 50 split between nodes 2, 3
if random.random() < 0.5:

# 50% of the time, we return Node 2
return "node_2"

# 50% of the time, we return Node 3
return "node_3"

# Build graph
builder = StateGraph(TypedDictState)
builder.add_node("node_1", node_1)
builder.add_node("node_2", node_2)
builder.add_node("node_3", node_3)

# Logic
builder.add_edge(START, "node_1")
builder.add_conditional_edges("node_1", decide_mood)
builder.add_edge("node_2", END)
builder.add_edge("node_3", END)

# Add
graph = builder.compile()

# View
display(Image(graph.get_graph().draw_mermaid_png()))
image-20250719154439415

因为我们的状态是一个字典,我们只需用一个字典调用图,以设置状态中 name 键的初始值。

1
graph.invoke({"name":"Lance"})
Dataclass数据类

python的dataclasses库提供了一种简洁的语法,用于创建主要用于存储数据的类。

1
2
3
4
5
6
from dataclasses import dataclass

@dataclass
class DataclassState:
name: str
mood: Literal["happy","sad"]

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def node_1(state):
print("---Node 1---")
return {"name": state.name + " is ... "}

# Build graph
builder = StateGraph(DataclassState)
builder.add_node("node_1", node_1)
builder.add_node("node_2", node_2)
builder.add_node("node_3", node_3)

# Logic
builder.add_edge(START, "node_1")
builder.add_conditional_edges("node_1", decide_mood)
builder.add_edge("node_2", END)
builder.add_edge("node_3", END)

# Add
graph = builder.compile()

# View
display(Image(graph.get_graph().draw_mermaid_png()))

要访问 dataclass 的键,我们只需修改在 node_1 中使用的下标即可:

我们使用 state.name 来表示 dataclass 状态,而不是使用 state["name"] 来表示上面的 TypedDict

你会注意到一个有点奇怪的地方:在每个节点中,我们仍然返回一个字典来执行状态更新。

Dataclass 只是“描述”状态的形状,而真正在 LangGraph 的节点之间流动的依旧是「字典」,这是框架设计层面的约定

在这种情况下,dataclass 拥有键 name,因此我们可以通过从节点传递一个字典来更新它,就像在状态为 TypedDict 时所做的那样。

1
graph.invoke(DataclassState(name="Lance",mood="sad"))

我们通过 dataclass 来设置状态中每个键/通道的初始值!

State Reducers状态更新函数

Reducers 为我们指定了如何执行更新。它接收 旧状态一次变更指令(action / 增量字段)返回全新的状态对象,整个过程中不能修改原有数据

我们可以使用 Annotated 类型来指定一个 reducer 函数。在这种情况下,让我们将每个节点返回的值附加到结果中,而不是覆盖它们。

1
2
3
4
5
from operator import add
from typing import Annotated

class State(TypedDict):
foo: Annotated[list[int], add]

我们只需要一个可以执行此操作的缩减器:operator.add 是 Python 内置 operator 模块中的一个函数。当 operator.add 应用于列表时,它执行列表连接。

Custom Reducers 自定义 Reducers

我们同样可以自定义reducers函数,解决一些特殊情况,比如,如下可以解决传入参数为none的情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def reduce_list(left: list | None, right: list | None) -> list:
"""安全地合并两个列表,处理其中一个或两个输入可能为 None 的情况。

参数:
left (list | None): 要合并的第一个列表,或 None。
right (list | None): 要合并的第二个列表,或 None。

返回:
list: 一个包含两个输入列表所有元素的新列表。
如果输入为 None,则将其视为空列表。
"""
if not left:
left = []
if not right:
right = []
return left + right

class DefaultState(TypedDict):
foo: Annotated[list[int], add]

class CustomReducerState(TypedDict):
foo: Annotated[list[int], reduce_list]
MessagesState

我可以使用内置的 reducer add_messages 来处理状态中的消息

MessagesState 内置了一个 messages 键 它还为该键内置了一个 add_messages 合并器,这两个是等价的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from typing import Annotated
from langgraph.graph import MessagesState
from langchain_core.messages import AnyMessage
from langgraph.graph.message import add_messages

# 定义一个自定义的 TypedDict,其中包含一个带有 add_messages reducer 的消息列表。
class CustomMessagesState(TypedDict):
messages: Annotated[list[AnyMessage], add_messages]
added_key_1: str
added_key_2: str
# etc

# 使用 MessagesState ,它包含带有 add_messages reducer 的 messages 键。
class ExtendedMessagesState(MessagesState):
# 添加除 messages 之外所需的任何键, messages 是预构建的。
added_key_1: str
added_key_2: str
# etc

在使用 add_messages reducer 时,让我们展示一些有用的技巧。

重写(Re-writing)

如果我们传递的消息与 messages 列表中已有的消息具有相同的 ID,则该消息将被覆盖!

1
2
3
4
5
6
7
8
9
10
# Initial state
initial_messages = [AIMessage(content="Hello! How can I assist you?", name="Model", id="1"),
HumanMessage(content="I'm looking for information on marine biology.", name="Lance", id="2")
]

# New message to add
new_message = HumanMessage(content="I'm looking for information on whales, specifically", name="Lance", id="2")

# Test
add_messages(initial_messages , new_message)
1
2
[AIMessage(content='Hello! How can I assist you?', name='Model', id='1'),
HumanMessage(content="I'm looking for information on whales, specifically", name='Lance', id='2')]

删除(Removal)

add_messages同样支持删除。为此,我们简单地使用 RemoveMessage 来自 langchain_core

1
2
3
4
5
6
7
8
9
10
11
from langchain_core.messages import RemoveMessage

# Message list
messages = [AIMessage("Hi.", name="Bot", id="1")]
messages.append(HumanMessage("Hi.", name="Lance", id="2"))
messages.append(AIMessage("So you said you were researching ocean mammals?", name="Bot", id="3"))
messages.append(HumanMessage("Yes, I know about whales. But what others should I learn about?", name="Lance", id="4"))

# Isolate messages to delete
delete_messages = [RemoveMessage(id=m.id) for m in messages[:-2]]
print(delete_messages)

Multiple Schemas 多种状态

Private State 私有状态

首先,让我们讨论在节点之间传递 private state 的情况。这对于图的中间计算逻辑中需要的任何内容都很有用,但与图的整体输入或输出无关。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from typing_extensions import TypedDict
from IPython.display import Image, display
from langgraph.graph import StateGraph, START, END

class OverallState(TypedDict):
foo: int

class PrivateState(TypedDict):
baz: int

def node_1(state: OverallState) -> PrivateState:
print("---Node 1---")
return {"baz": state['foo'] + 1}

def node_2(state: PrivateState) -> OverallState:
print("---Node 2---")
return {"foo": state['baz'] + 1}

# Build graph
builder = StateGraph(OverallState)
builder.add_node("node_1", node_1)
builder.add_node("node_2", node_2)

# Logic
builder.add_edge(START, "node_1")
builder.add_edge("node_1", "node_2")
builder.add_edge("node_2", END)

# Add
graph = builder.compile()

# View
display(Image(graph.get_graph().draw_mermaid_png()))
image-20250719171509917

我们将定义一个 OverallState 和一个 PrivateStatenode_2 使用 PrivateState 作为输入,但输出写入到 OverallState

baz 仅包含在 PrivateState 中。因此,我们可以看到 baz 被排除在图形输出之外,因为它不在 OverallState 中。

Input / Output Schema 输入/输出模式

在 LangGraph 中,Input / Output Schema 就是“图的对外接口协议”:调用者只能按 Input Schema 传参;图运行完后,只吐出 Output Schema 规定的字段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict

# 定义输入的模式
class InputState(TypedDict):
question: str

# 定义输出的模式
class OutputState(TypedDict):
answer: str

# 定义整体模式,结合输入和输出
class OverallState(InputState, OutputState):
pass

# 定义处理输入并生成答案的节点
def answer_node(state: InputState):
# 示例答案和额外键
return {"answer": "bye", "question": state["question"]}

# 构建图,并指定输入和输出模式
builder = StateGraph(OverallState, input_schema=InputState, output_schema=OutputState)
builder.add_node(answer_node) # 添加答案节点
builder.add_edge(START, "answer_node") # 定义起始边
builder.add_edge("answer_node", END) # 定义结束边
graph = builder.compile() # 编译图

# 使用输入调用图并打印结果
print(graph.invoke({"question": "hi"}))

输出

1
{'answer': 'bye Lance'}

在这里,input / output 模式对图的输入和输出上允许的键进行 过滤。可以看到 output 模式将输出限制为仅包含 answer 键。

Filtering and trimming messages筛选和精简消息

如果我们在处理长时间对话时不够小心,会导致高令牌使用量和延迟,因为我们传递给模型的是一系列不断增加的消息。所以要进行筛选和精简消息。

简化器(Reducer)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from langchain_core.messages import RemoveMessage

# Nodes
def filter_messages(state: MessagesState):
# 删除除最近两条消息外的所有消息
delete_messages = [RemoveMessage(id=m.id) for m in state["messages"][:-2]]
return {"messages": delete_messages}

def chat_model_node(state: MessagesState):
return {"messages": [llm.invoke(state["messages"])]}

# Build graph
builder = StateGraph(MessagesState)
builder.add_node("filter", filter_messages)
builder.add_node("chat_model", chat_model_node)
builder.add_edge(START, "filter")
builder.add_edge("filter", "chat_model")
builder.add_edge("chat_model", END)
graph = builder.compile()

# View
display(Image(graph.get_graph().draw_mermaid_png()))
image-20250719203204234
筛选消息(Filtering messages)

如果你不需要或不希望修改图状态,可以直接过滤传递给聊天模型的消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
# Node
def chat_model_node(state: MessagesState):
return {"messages": [llm.invoke(state["messages"][-1:])]}

# Build graph
builder = StateGraph(MessagesState)
builder.add_node("chat_model", chat_model_node)
builder.add_edge(START, "chat_model")
builder.add_edge("chat_model", END)
graph = builder.compile()

# View
display(Image(graph.get_graph().draw_mermaid_png()))

例如,只需传递一个过滤后的列表:llm.invoke(messages[-1:]) 给模型。

状态包含了所有消息。但这里模型调用仅使用最后一条消息

裁剪消息(Trim messages)

另一种方法是根据设定一定数量的tokens进行 trim messages。在把对话历史发给大模型之前,按 token 预算 把超长消息列表“剪”到合适长度。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from langchain_core.messages import trim_messages

# Node
def chat_model_node(state: MessagesState):
# 使用 trim_messages 函数修剪消息列表
# max_tokens: 限制消息的最大令牌数
# strategy: 修剪策略,这里是“last”,表示保留最新的消息
# token_counter: 用于计算令牌数的模型实例
# allow_partial: 是否允许部分修剪
messages = trim_messages(
state["messages"],
max_tokens=100,
strategy="last",
token_counter= ChatOpenAI(
model="qwen-plus-2025-04-28",
api_key="sk-",
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"),
allow_partial=False,
)
# 调用语言模型(llm)处理修剪后的消息,并返回结果
return {"messages": [llm.invoke(messages)]}

# Build graph
builder = StateGraph(MessagesState)
builder.add_node("chat_model", chat_model_node)
builder.add_edge(START, "chat_model")
builder.add_edge("chat_model", END)
graph = builder.compile()

# View
display(Image(graph.get_graph().draw_mermaid_png()))

Chatbot with message summarization带有消息总结功能的聊天机器人

与其仅仅修剪或过滤消息,我们将展示如何使用大型语言模型(LLMs)来生成对话的实时摘要。

这使我们能够保留整个对话的压缩表示,而不仅仅是通过修剪或过滤将其移除。

我们将为该聊天机器人配备记忆功能,支持长时间对话,同时不会产生高昂的 token 成本或延迟。

定义总结状态
1
2
3
from langgraph.graph import MessagesState
class State(MessagesState):
summary: str

除了内置的 messages 键之外,我们现在还将包含一个自定义键(summary)。

定义LLM节点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from langchain_core.messages import SystemMessage, HumanMessage, RemoveMessage 

# 定义调用模型的逻辑
def call_model(state: State):

# 获取摘要(如果存在)
summary = state.get("summary", "")

# 如果有摘要,则添加它
if summary:

# 将摘要添加到系统消息中
system_message = f"先前对话的摘要:{summary}"

# 将摘要附加到任何较新的消息中
messages = [SystemMessage(content=system_message)] + state["messages"]

else:
messages = state["messages"]

response = model.invoke(messages)
return {"messages": response}

我们将定义一个节点来调用我们的LLM,如果存在摘要,则将其纳入提示中。

当 call_model 函数返回 {“messages”: response} 时,它是在告诉 langgraph :“请用 response (即模型的新输出)来更新 State 对象中 messages 键对应的值。” langgraph 会将这个新消息追加到 messages 列表中,从而保持了对话历史的连续性

定义摘要节点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def summarize_conversation(state: State): 

# 首先,我们获取任何现有的摘要
summary = state.get("summary", "")

# 创建我们的摘要提示
if summary:

# 摘要已存在
summary_message = (
f"这是迄今为止对话的摘要:{summary}\n\n"
"请结合以上新消息扩展摘要:"
)

else:
summary_message = "创建以上对话的摘要:"

# 将提示添加到我们的历史记录中
messages = state["messages"] + [HumanMessage(content=summary_message)]
response = model.invoke(messages)

# 删除除最近2条消息外的所有消息
delete_messages = [RemoveMessage(id=m.id) for m in state["messages"][:-2]]
return {"summary": response.content, "messages": delete_messages}

我们将定义一个节点来生成摘要。请注意,这里我们将使用 RemoveMessage 在生成摘要后过滤我们的状态。

定义路由函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
from langgraph.graph import END 
# 决定是结束对话还是总结对话
def should_continue(state: State):

"""返回要执行的下一个节点。"""

messages = state["messages"]

# 如果消息超过六条,那么我们总结对话
if len(messages) > 6:
return "summarize_conversation"

# 否则我们就可以结束了
return END

我们将添加一个条件边,以根据对话长度确定是否生成摘要。

在 langgraph 中, Command 是一个特殊的类型,用于指导图形(graph)决定接下来应该执行哪个节点。 您可以把它看作是给图形下达的一个“命令”。

添加内存并编译图
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from IPython.display import Image, display
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, START

# Define a new graph
workflow = StateGraph(State)
workflow.add_node("conversation", call_model)
workflow.add_node("summarize_conversation",summarize_conversation)

# Set the entrypoint as conversation
workflow.add_edge(START, "conversation")
workflow.add_conditional_edges("conversation", should_continue)
workflow.add_edge("summarize_conversation", END)



# Compile
memory = MemorySaver()
graph = workflow.compile(checkpointer=memory)
display(Image(graph.get_graph().draw_mermaid_png()))
使用线程调用
1
2
3
4
5
6
config = {"configurable": {"thread_id": "2"}}

input_message = HumanMessage(content="我喜欢玩lol,你知道这个游戏吗")
output = graph.invoke({"messages": [input_message]}, config)
for m in output['messages'][-1:]:
m.pretty_print()

当对话大于6,可生成概要

1
graph.get_state(config).values.get("summary","")

Chatbot with message summarization & external DB memory具有消息总结和外部数据库记忆的聊天机器人

使用数据库

SqliteSaver 是 LangGraph 提供的一个 轻量级状态持久化工具,它将图的运行状态(即 checkpoint)保存到本地的 SQLite 数据库中,使得你可以在程序中断或重启后恢复执行上下文,特别适合本地开发、实验性项目或中小规模应用。

如果我们提供 “:memory:” ,它将创建一个内存中的 SQLite 数据库。

1
2
3
import sqlite3
# In memory
conn = sqlite3.connect(":memory:", check_same_thread = False)

如果我们提供一个 db 路径,那么它将为我们创建一个数据库!

1
2
3
4
5
#在本地创建一个目录 state_db,并尝试从 GitHub 下载一个名为 example.db 的 SQLite 数据库文件
!mkdir -p state_db && [ ! -f state_db/example.db ] && wget -P state_db https://github.com/langchain-ai/langchain-academy/raw/main/module-2/state_db/example.db

db_path = "state_db/example.db"
conn = sqlite3.connect(db_path, check_same_thread=False)

定义checkpoint

1
2
from langgraph.checkpoint.sqlite import SqliteSaver
memory = SqliteSaver(conn)

像上一个形式编译图

让我们确认一下我们的状态是否已保存到本地。

1
2
3
config = {"configurable": {"thread_id": "1"}}
graph_state = graph.get_state(config)
graph_state

使用像 Sqlite 这样的数据库意味着状态会被持久化!

module-3

Streaming 流式传输

现在,让我们来谈谈 流式传输我们的图状态 的方法。.stream.astream 是用于流式返回结果的同步和异步方法。

values:这将在每个节点被调用后流式传输图的完整状态。 updates:这将在每个节点被调用后流式传输图的状态更新。

values_vs_updates.png
stream_mode=“updates”
1
2
3
4
5
6
# Create a thread
config = {"configurable": {"thread_id": "1"}}

# Start conversation
for chunk in graph.stream({"messages": [HumanMessage(content="你好我是zxj")]}, config, stream_mode="updates"):
print(chunk)
1
{'conversation': {'messages': AIMessage(content='你好,zxj!很高兴认识你~有什么我可以帮你的吗?😊', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 16, 'prompt_tokens': 576, 'total_tokens': 592, 'completion_tokens_details': None, 'prompt_tokens_details': None}, 'model_name': 'qwen-plus-2025-04-28', 'system_fingerprint': None, 'id': 'chatcmpl-891471ae-2fe8-9b3d-b5f7-f4fcd55a4e16', 'service_tier': None, 'finish_reason': 'stop', 'logprobs': None}, id='run--f36409f3-af43-4e9b-8a46-39646ad7c106-0', usage_metadata={'input_tokens': 576, 'output_tokens': 16, 'total_tokens': 592, 'input_token_details': {}, 'output_token_details': {}})}}

让我们来看一下 stream_mode="updates"

因为我们使用 updates 进行流式传输,所以只有在图中的节点运行后,我们才能看到状态的更新。每个 chunk 是一个字典,以 node_name 为键,更新后的状态为值。

1
2
3
# Start conversation
for chunk in graph.stream({"messages": [HumanMessage(content="你好我是zxj")]}, config, stream_mode="updates"):
chunk['conversation']["messages"].pretty_print()

现在我们直接打印状态更新。

1
2
3
================================== Ai Message ==================================

你好呀,zxj!再次见到你真高兴~😊 有什么我可以帮忙的吗?
stream_mode=“values”
1
2
3
4
5
6
7
8
9
# Start conversation, again
config = {"configurable": {"thread_id": "2"}}

# Start conversation
input_message = HumanMessage(content="你好我是zxj")
for event in graph.stream({"messages": [input_message]}, config, stream_mode="values"):
for m in event['messages']:
m.pretty_print()
print("---"*25)

现在,我们可以看到 stream_mode="values".这是在 conversation 节点被调用后,图的整个状态。

1
2
3
4
5
6
7
8
9
10
11
================================ Human Message =================================

你好我是zxj
---------------------------------------------------------------------------
================================ Human Message =================================

你好我是zxj
================================== Ai Message ==================================

你好,zxj!有什么我可以帮你的吗?😊
---------------------------------------------------------------------------
Streaming tokens 流式传输令牌

在 LangGraph 中,“流式传输令牌(Streaming tokens)”指的是在节点内部的大模型(LLM)生成过程中,逐 token 地将中间结果实时推送到客户端的能力。实现这一能力的核心方法是 astream_events,它会以事件流的形式暴露整个执行过程中的所有细节,包括每一次 LLM 调用产生的 token。

每个事件是一个包含几个键的字典:

event:这是正在发出的事件的类型。

name:这是事件的名称。

data:这是与事件相关联的数据。

metadata:包含 langgraph_node,即发出事件的节点。

要点是,图表中聊天模型的令牌具有 on_chat_model_stream 类型。我们可以使用 event['metadata']['langgraph_node'] 来选择要流式的节点。并且我们可以使用 event['data'] 来获取每个事件的实际数据,而在这种情况下,数据是一个 AIMessageChunk.

1
2
3
4
5
6
7
8
9
node_to_stream = 'conversation'#定义流式传输的节点
config = {"configurable": {"thread_id": "5"}}
input_message = HumanMessage(content="为我介绍lol")
async for event in graph.astream_events({"messages": [input_message]}, config, version="v2"):
# 从特定节点获取聊天模型生成的 Token
#事件类型必须是 逐 token 流式输出(on_chat_model_stream)。
if event["event"] == "on_chat_model_stream" and event['metadata'].get('langgraph_node','') == node_to_stream:
data = event["data"]
print(data["chunk"].content, end="|")

event的常见类型

事件类型 (event) 触发时机与说明
on_chain_start 任意 Runnable(节点、子图或整个图)开始执行
on_chain_stream 节点/图在运行过程中 增量输出 chunk
on_chain_end 任意 Runnable 执行完成
on_chat_model_start ChatModel 开始调用
on_chat_model_stream ChatModel 逐 token 返回内容(打字机效果)
on_chat_model_end ChatModel 调用结束
on_tool_start Tool 开始调用
on_tool_end Tool 调用结束
on_retriever_start Retriever 开始检索
on_retriever_end Retriever 检索结束

Breakpoints 断点

human-in-the-loop(人工介入/人在回路)的三大动机:

1️⃣ Approval(审批)我们可以中断智能体,将当前状态呈现给用户,并让用户决定是否执行该操作。

2️⃣ Debugging(调试/回放)我们可以回退图形以重现或避免问题

3️⃣ Editing(编辑)AI 产出的中间结果不符合预期,但不想重跑整图,可以直接修改状态

我们将介绍 breakpoints,它提供了一种在特定步骤停止图的简单方法。

Breakpoints for human approval用于人类审批的断点

假设我们关注工具的使用:我们希望批准代理使用其任何工具。

我们所需要做的就是简单地用 interrupt_before=["tools"] 编译图形,其中 tools 是我们的工具节点。

这意味着在执行工具调用的节点 tools 之前,执行将被中断。

1
graph = builder.compile(interrupt_before=["tools"], checkpointer=memory)
image-20250720225640772
1
2
3
4
5
6
7
8
9
# Input
initial_input = {"messages": HumanMessage(content="2乘3")}

# Thread
thread = {"configurable": {"thread_id": "1"}}

# Run the graph until the first interruption
for event in graph.stream(initial_input, thread, stream_mode="values"):
event['messages'][-1].pretty_print()
1
2
3
4
5
6
7
8
9
10
================================ Human Message =================================

2乘3
================================== Ai Message ==================================
Tool Calls:
multiply (call_92a4bcf88d25476d925775)
Call ID: call_92a4bcf88d25476d925775
Args:
a: 2
b: 3

我们可以获取状态并查看要调用的下一个节点。这是一种很好的方法,可以发现图已被中断。

现在,我们将介绍一个很好的技巧。当我们使用 None 调用图时,它将直接从最后一个状态检查点继续!

breakpoints.jpg

状态快照(StateSnapshot)

  • 类型:专门用来存 一个时刻 的完整状态
  • 获取方式:
    • Graph.get_state()最新的 快照
    • Graph.get_state_history()所有 快照列表

继续/重跑图

  • Graph.stream(None, {"thread_id": "xxx"})
    • 不传新输入 None 表示 从当前最新状态继续跑
    • 也可回退到历史快照,再重跑(调试/回放)
1
2
for event in graph.stream(None, thread, stream_mode="values"):
event['messages'][-1].pretty_print()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
================================== Ai Message ==================================
Tool Calls:
multiply (call_92a4bcf88d25476d925775)
Call ID: call_92a4bcf88d25476d925775
Args:
a: 2
b: 3
================================= Tool Message =================================
Name: multiply

6
================================== Ai Message ==================================

2乘3的结果是6。

Editing graph state 编辑图状态

断点也是修改图状态的机会让我们在 assistant 节点之前为代理设置断点。

1
graph = builder.compile(interrupt_before=["assistant"], checkpointer=memory)
image-20250720230924672
1
2
3
4
5
6
7
8
9
# Input
initial_input = {"messages": "2乘3"}

# Thread
thread = {"configurable": {"thread_id": "1"}}

# Run the graph until the first interruption
for event in graph.stream(initial_input, thread, stream_mode="values"):
event['messages'][-1].pretty_print()
1
2
3
================================ Human Message =================================

2乘3

当状态中断时,我们可以直接应用状态更新

记住,对 messages 键的更新将使用 add_messages reducer:

**如果我们想覆盖现有的消息,可以提供带有* id *的消息。** 如果我们只想将消息添加到消息列表中,则可以传递未指定 id 的消息,如下所示。

1
2
3
4
graph.update_state(
thread,
{"messages": [HumanMessage(content="不要,实际上要3乘3!")]},
)
1
2
3
new_state = graph.get_state(thread).values
for m in new_state['messages']:
m.pretty_print()
1
2
3
4
5
6
================================ Human Message =================================

2乘3
================================ Human Message =================================

不要,实际上要3乘3!

现在,让我们继续进行我们的代理操作,只需传递 None 并允许其从当前状态继续执行。我们输出当前内容,然后继续执行剩余的节点。

1
2
for event in graph.stream(None, thread, stream_mode="values"):
event['messages'][-1].pretty_print()

Dynamic breakpoints 动态断点

你可以根据条件来实现它(从节点内部基于开发人员定义的逻辑)。您可以向用户说明其中断原因(通过将您想传递的内容发送到 NodeInterrupt)。

让我们创建一个图表,其中根据输入的长度会抛出一个 NodeInterrupt

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
from IPython.display import Image, display

from typing_extensions import TypedDict
from langgraph.checkpoint.memory import MemorySaver
from langgraph.errors import NodeInterrupt
from langgraph.graph import START, END, StateGraph

class State(TypedDict):
input: str

def step_1(state: State) -> State:
print("---Step 1---")
return state

def step_2(state: State) -> State:
# 如果输入字符串长度超过5个字符,我们可以选择抛出NodeInterrupt异常
if len(state['input']) > 5:
raise NodeInterrupt(f"收到长度超过5个字符的输入: {state['input']}")

print("---Step 2---")
return state

def step_3(state: State) -> State:
print("---Step 3---")
return state

builder = StateGraph(State)
builder.add_node("step_1", step_1)
builder.add_node("step_2", step_2)
builder.add_node("step_3", step_3)
builder.add_edge(START, "step_1")
builder.add_edge("step_1", "step_2")
builder.add_edge("step_2", "step_3")
builder.add_edge("step_3", END)

# Set up memory
memory = MemorySaver()

# Compile the graph with memory
graph = builder.compile(checkpointer=memory)

# View
display(Image(graph.get_graph().draw_mermaid_png()))
image-20250721222709712

让我们运行一个输入超过5个字符的图。

1
2
3
4
5
6
initial_input = {"input": "hello world"}
thread_config = {"configurable": {"thread_id": "1"}}

# Run the graph until the first interruption
for event in graph.stream(initial_input, thread_config, stream_mode="values"):
print(event)
1
2
3
{'input': 'hello world'}
---Step 1---
{'input': 'hello world'}

我们可以尝试从断点恢复图。但是,这只会重新运行相同的节点!除非状态发生变化,否则我们将一直卡在这里。

1
2
3
4
graph.update_state(
thread_config,
{"input": "hi"},
)

使用update_state更新状态

Time travel 时间旅行

现在,让我们通过查看、重播,甚至从过去的状态叉出,来展示 LangGraph 支持debug 的功能。

Browsing History 浏览历史

我们可以使用 get_state 来查看给定 thread_id 的图的 当前 状态!

1
graph.get_state({'configurable': {'thread_id': '1'}})

我们还可以浏览代理的状态历史。get_state_history 让我们能够获取所有先前步骤的状态。

1
2
3
all_states = [s for s in graph.get_state_history(thread)]
len(all_states)
print(all_states)
Replaying 回放
1
2
3
4
5
to_replay = all_states[-2]
to_replay.values
{'messages': [HumanMessage(content='2乘3', additional_kwargs={}, response_metadata={}, id='0676d9b5-cd59-4630-924d-b5c8d950e8d8')]}
to_replay.next
('assistant',)

我们还获取了配置,它告诉了我们 checkpoint_id 以及 thread_id

1
2
3
4
to_replay.config
{'configurable': {'thread_id': '1',
'checkpoint_ns': '',
'checkpoint_id': '1f066c0e-2ee2-66d5-8000-5dde78194aae'}}

要从这里重播,我们只需将配置传回给代理!图知道这个检查点已经执行过了。它只是从这个检查点重新播放!

1
2
for event in graph.stream(None, to_replay.config, stream_mode="values"):
event['messages'][-1].pretty_print()
Forking 分叉

如果我们想从相同的步骤运行,但使用不同的输入,该怎么办呢?这是分叉。

fig3.jpg

让我们修改此检查点的状态。我们可以直接使用提供的 checkpoint_id 来运行 update_state

请记住我们对 messages 的 reducer 是如何工作的:

  • 它会追加消息,除非我们提供了一个消息 ID。
  • 我们提供消息 ID 是为了覆盖消息,而不是将消息追加到状态中!

因此,要覆盖消息,我们只需提供消息 ID,而我们已有 to_fork.values["messages"].id

1
2
3
4
5
fork_config = graph.update_state(
to_fork.config,
{"messages": [HumanMessage(content='5乘3',
id=to_fork.values["messages"][0].id)]},
)

基础知识

message

LangChain 中的 HumanMessage 、 AIMessage 、 SystemMessage 和 ToolMessage 。这些消息类型是构建与语言模型(LLM)交互的核心组件,它们共同构成了一个完整的对话历史,帮助模型理解上下文并做出恰当的回应。

  1. SystemMessage

SystemMessage 的结构最简单,它只包含内容和类型。

数据结构 :

  • content (str): 消息的具体内容,即给 AI 的指令。
  • type (str): 固定为字符串 ‘system’ 。
  1. HumanMessage

HumanMessage 的结构也同样简单,代表用户的输入。

数据结构 :

  • content (str): 用户输入的文本。
  • type (str): 固定为字符串 ‘human’ 。
  1. AIMessage

AIMessage 的结构相对复杂,因为它不仅可以包含文本响应,还可以包含对工具的调用请求。

数据结构 :

  • content (str): AI 生成的文本响应。如果 AI 的回复是发起工具调用,此字段可以为空字符串。
  • tool_calls (list[dict], 可选): 一个字典列表,每个字典代表一个工具调用请求。这是支持“Function Calling”或“Tool Calling”功能的核心。其结构通常包含:
    • name (str): 要调用的工具名称。
    • args (dict): 调用工具时需要传入的参数。
    • id (str): 此次工具调用的唯一标识符,用于后续 ToolMessage 的关联。
  • type (str): 固定为字符串 ‘ai’ 。
  1. ToolMessage

ToolMessage 用于承载工具执行后的返回结果。

数据结构 :

  • content (str): 工具执行返回的结果。通常是一个字符串,比如 JSON 格式的字符串。
  • tool_call_id (str): 此次工具调用的唯一标识符, 必须 与之前 AIMessage 中 tool_calls 里的 id 相对应。这使得模型能够准确地将结果与请求匹配起来。
  • type (str): 固定为字符串 ‘tool’ 。

实战demo

agent实战

langchain的agent与langgraph的agent主要差异点在create_openai_functions_agent, AgentExecutor这两个函数

前者的作用类似构建 Runnable 链,返回一个RunnablePassthrough.assign(...)|prompt|llm_with_tools|ToolsAgentOutputParser(),但其invoke仅能完成单步的调用,而AgentExecutor 会自动完成3 步循环(调用工具→拼回结果→再次调用 LLM),直到任务结束。

以下为ai的解释

直接使用 agent (Runnable) 的局限性:

  1. 单步执行: 你直接调用 agent.invoke()agent.ainvoke() 时,它通常只执行一步。对于像 create_tool_calling_agent 生成的 agent 来说,这一步就是:
    • 接收输入(包括历史消息和 agent_scratchpad)。
    • 让 LLM 决定是给出最终答案 (AgentFinish) 还是调用工具 (AgentAction)。
    • 返回这个决定。
  2. 工具调用需要手动处理: 如果 LLM 决定调用工具(返回 AgentAction),需要负责:
    • 从返回的 AgentAction 中找出工具名称和输入参数。
    • 在你的工具列表中找到对应的工具。
    • 执行这个工具。
    • 获取工具的输出(Observation)。
    • 再次手动调用 agent.invoke(...),把工具的输出(通常需要格式化成 ToolMessage)放回 agent_scratchpadintermediate_steps 中。
    • 重复这个过程,直到 agent 最终返回 AgentFinish

使用 AgentExecutor 的优势:

AgentExecutor 就是为了解决上述问题而设计的。它本质上是一个自动化的执行引擎,为你管理整个 Agent 的思考-行动-观察循环。

  1. 自动化循环: AgentExecutor 内部会自动运行那个循环:
    • 调用 agent (Runnable)。
    • 检查返回的是 AgentAction 还是 AgentFinish
    • 如果是 AgentAction,它会自动根据你提供的 tools 列表找到并执行对应的工具。
    • 它会自动将工具的输出(Observation)记录下来,并作为下一步的输入(放入 agent_scratchpad)再次调用 agent
    • 这个过程会一直重复。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from langchain.agents import create_openai_functions_agent, AgentExecutor
from langchain.tools import tool
from langchain_openai import ChatOpenAI
from pydantic import BaseModel, Field

# 1. 定义工具
class WeatherInput(BaseModel):
location: str = Field(description="城市名称")

@tool("get_weather", args_schema=WeatherInput)
def get_weather(location: str) -> str:
"""查询城市天气"""
return f"{location} 今天是晴天,25°C"

# 2. 创建Agent
llm = ChatOpenAI(model="gpt-4")
tools = [get_weather]
prompt = ChatPromptTemplate.from_messages([
("system", "你是一个助手,可以调用工具"),
("human", "{input}")
])
agent = create_openai_functions_agent(llm, tools, prompt)
agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)

# 3. 执行
result = agent_executor.invoke({"input": "北京天气如何?"})
print(result["output"])

工具调用

利用bind_tools绑定工具,当大模型需要调用工具的时候,会返回工具信息,tool_calls,如下

[{‘name’: ‘add_numbers’, ‘args’: {‘a’: 15, ‘b’: 27}, ‘id’: ‘4e7b261cce6d4e3da09134086c704c3c’, ‘type’: ‘tool_call’}]

llm_with_tools.invoke(...) 只是一个单步调用,LLM 返回的是“我想调用哪个工具、传什么参数”(即 tool_calls)。 但 LLM 并不会自动执行工具,所以你必须:

  1. 手动执行工具(或让 AgentExecutor 帮你执行)。
  2. 把执行结果拼回对话(作为 ToolMessage)。
  3. 再次调用 LLM,让它基于工具返回的结果生成最终答案。

这里展示的是手动拼接,并传给大模型,如下

1
2
3
4
5
6
7
# 将工具的输出发送回LLM,让LLM基于结果生成最终回答
# 这是一个关键步骤,通常在Agent中自动处理。这里手动演示。
final_response = llm_with_tools.invoke([
HumanMessage(content="What is 15 + 27?"),
AIMessage(content="", tool_calls=[tool_call]), # 告知LLM它之前建议的工具调用
ToolMessage(content=str(result), tool_call_id=tool_call['id']) # 告知LLM工具的执行结果
])
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
from langchain_core.tools import tool
from typing import Literal
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage, ToolMessage

# 定义一个加法工具
@tool
def add_numbers(a: float, b: float) -> float:
"""
Adds two numbers together.

Args:
a: The first number.
b: The second number.
"""
return a + b

# 我们可以定义更多的工具,例如一个乘法工具
@tool
def multiply_numbers(a: float, b: float) -> float:
"""
Multiplies two numbers together.

Args:
a: The first number.
b: The second number.
"""
return a * b

# 将我们定义的工具放在一个列表中
tools = [add_numbers, multiply_numbers]


# 初始化LLM
llm = ChatOpenAI(
temperature=0.5,
model_name="deepseek-v3-0324", # 聊天模型通常使用"gpt-3.5-turbo"或"gpt-4"
openai_api_base="https://api.qnaigc.com/v1", # 例如,您可以指定base_url
openai_api_key="sk-" # 直接在此处设置API密钥,或者通过环境变量设置
)
# 将工具绑定到LLM
# LLM现在知道了add_numbers和multiply_numbers这两个工具及其功能
llm_with_tools = llm.bind_tools(tools)


# 场景一:LLM直接回答,不需要工具
print("--- 场景一:LLM直接回答 ---")
response1 = llm_with_tools.invoke([HumanMessage(content="Hello, what's your name?")])
print(response1.content) # LLM直接生成文本回复

print("\n--- 场景二:LLM决定调用工具 ---")
# 场景二:LLM决定调用工具
# 当LLM的响应中包含tool_calls时,意味着它想要调用一个或多个工具
response2 = llm_with_tools.invoke([HumanMessage(content="What is 15 + 27?")])
print(response2.tool_calls) # 打印LLM决定调用的工具信息

# 检查并执行LLM建议的工具调用
if response2.tool_calls:
for tool_call in response2.tool_calls:
if tool_call['name'] == "add_numbers":
# 提取LLM为工具调用生成的参数
args = tool_call['args']
result = add_numbers.invoke(args) # 执行工具
print(f"Tool call: add_numbers({args['a']}, {args['b']}) = {result}")

# 将工具的输出发送回LLM,让LLM基于结果生成最终回答
# 这是一个关键步骤,通常在Agent中自动处理。这里手动演示。
final_response = llm_with_tools.invoke([
HumanMessage(content="What is 15 + 27?"),
AIMessage(content="", tool_calls=[tool_call]), # 告知LLM它之前建议的工具调用
ToolMessage(content=str(result), tool_call_id=tool_call['id']) # 告知LLM工具的执行结果
])
print("Final LLM response based on tool output:")
print(final_response.content)

概念扫盲

Document 对象

Document 对象是 LangChain 用来封装和处理文本数据的基本单位。无论您是从 PDF、Markdown 文件、网站还是数据库加载数据,LangChain 都会将这些数据转换成一个或多个 Document 对象,以便在后续的流程中使用。

一个 Document 对象主要包含两个部分:

  1. page_content (字符串)

    • 这是文档对象的核心,存储了原始的文本内容。例如,如果加载一个 Markdown 文件, page_content 就会包含该文件的所有文本。
  2. metadata (字典)

    • 这是一个字典,用于存储关于文档的“元数据”或附加信息。这些信息对于过滤、追踪或增强文档处理流程非常有用。常见的元数据包括:
      • source :文档的来源,比如文件名、URL等。
      • page :如果文档来自多页文件(如PDF),这里可以存储页码。
      • 其他自定义信息:您可以添加任何有助于您应用的信息,如作者、创建日期等。

除了通过文档加载器(Loaders)自动创建,您也可以手动创建一个 Document 对象。这在测试或处理简单文本时非常方便。

1
2
3
4
5
6
7
8
9
# 创建一个简单的 Document 对象
doc = Document(
page_content="这是文档的主要内容。LangChain 真酷!",
metadata={
'source': 'my_notebook.ipynb',
'author': 'AI Assistant',
'chapter': 2
}
)

Runnable协议

“Runnable”协议

参考资料

2025最新版!langchain入门到精通实战教程!结合实战案例,干货拉满!99%的人不知道的暴利玩法,学完敢谷歌工程师叫板!_哔哩哔哩_bilibili

introduction | LangChain中文网

跟着官网学langchain2025(version 0.3)_哔哩哔哩_bilibili

LangGraph Platform - Docs by LangChain

构建langgraph聊天机器人的基本流程

创建一个 StateGraph

首先创建一个 StateGraph。一个 StateGraph 对象将我们的聊天机器人结构定义为“状态机”。我们将添加 节点 来表示 LLM 和聊天机器人可以调用的函数,并添加 来指定机器人应如何在这些函数之间进行转换。

1
2
3
4
5
6
7
8
9
10
11
12
13
from typing import Annotated

from typing_extensions import TypedDict

from langgraph.graph import StateGraph, START
from langgraph.graph.message import add_messages


class State(TypedDict):
messages: Annotated[list, add_messages]

#定义状态
graph_builder = StateGraph(State)

在 langgraph 中,状态会在图的各个节点之间传递。当一个节点产生新的消息时,它会更新 State 中的 messages 字段。

我们的图现在可以处理两个关键任务

  1. 每个 节点 都可以接收当前 状态 作为输入,并输出状态的更新。
  2. 消息 的更新将追加到现有列表而不是覆盖它,这得益于与 Annotated 语法一起使用的预构建 add_messages 函数。

langgraph中每个消息对象通常包含以下关键属性:

  • role : 一个字符串,标识消息的发送者(例如 ‘human’ , ‘ai’ , ‘system’ )。
  • content : 消息的具体内容,通常是字符串,但也可以是更复杂的结构(例如,用于多模态输入)。
  • id : 一个可选的唯一标识符。

Annotated 的作用 : 通过使用 Annotated[list, add_messages] ,你改变了这个默认行为。 add_messages 函数(由 langgraph 提供或由你自定义)的逻辑是 追加 而不是覆盖。所以,当一个新节点返回消息时, langgraph 会调用 add_messages 函数,将新消息 追加 到现有 messages 列表的末尾。

定义一个聊天模型

两种方法:

1.使用 init_chat_model(通用高层封装) 这是一个通用的辅助函数,旨在提供一个统一的接口来初始化来自 不同提供商 的聊天模型。

init_chat_model — 🦜🔗 LangChain 文档 — init_chat_model — 🦜🔗 LangChain documentation

2.使用如ChatOpenAI (特定于提供商的类) 这是一个专门为 OpenAI API 设计的类,提供了对 OpenAI 模型所有功能的完全访问。

1
2
3
4
5
6
7
from langchain.chat_models import init_chat_model

os.environ["OPENAI_API_KEY"] = "sk-"
#使用‘{model_provider}:{model}’格式在单个参数中指定模型和模型提供者,例如“openai:o1”
llm = init_chat_model("openai:qwen-plus-2025-04-28",
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"
)

添加一个节点

现在我们可以将聊天模型集成到一个简单的节点中

1
2
3
4
5
#定义节点chatbot
def chatbot(state: State):
return {"messages": [llm.invoke(state["messages"])]}

graph_builder.add_node("chatbot", chatbot)

chatbot 节点函数如何将当前 状态 作为输入,并返回一个包含更新的 消息 列表的字典,键为“messages”。这是所有 LangGraph 节点函数的基本模式。

我们 状态 中的 add_messages 函数会将 LLM 的响应消息追加到状态中已有的消息之后。

添加一个 入口

添加一个 入口 点,以告诉图每次运行时从何处开始工作

1
graph_builder.add_edge(START, "chatbot")

编译图

在运行图之前,我们需要对其进行编译。我们可以通过在图构建器上调用 compile() 来完成。这将创建一个 CompiledGraph,我们可以在我们的状态上调用它。

1
graph = graph_builder.compile()

可视化图

您可以使用 get_graph 方法和其中一个“绘图”方法(例如 draw_asciidraw_png)来可视化图。这些 draw 方法都需要额外的依赖项。

1
2
3
4
5
6
7
from IPython.display import Image, display

try:
display(Image(graph.get_graph().draw_mermaid_png()))
except Exception:
# This requires some extra dependencies and is optional
pass

运行聊天机器人

运行聊天机器人

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def stream_graph_updates(user_input: str):
for event in graph.stream({"messages": [{"role": "user", "content": user_input}]}):
print(event)
#stream 返回的每个 event 通常是一个字典,键是图中节点的名称,值是该节点完成后的状态更新。
for value in event.values():
#消息列表中的最后一条消息的文本内容
print("Assistant:", value["messages"][-1].content)


while True:
try:
user_input = input("User: ")
#退出
if user_input.lower() in ["quit", "exit", "q"]:
print("Goodbye!")
break
#调用
stream_graph_updates(user_input)
except:
# fallback if input() is not available
user_input = "What do you know about LangGraph?"
print("User: " + user_input)
stream_graph_updates(user_input)
break

graph.stream() 是 LangGraph 的核心功能之一。它会执行整个图(Graph),但不是一次性返回最终结果,而是像视频流一样,一步一步地返回中间过程的更新。这使得您可以实时看到模型生成内容的每一个部分。

添加网页搜索工具

获取Tavily api

Tavily 的搜索 API 是一款专为 AI 代理 (LLM) 构建的搜索引擎,能够快速提供实时、准确和基于事实的结果。

每月 1,000 次免费搜索

Tavily Search | 🦜️🔗 LangChain 框架

获取apiTavily AI — Tavily AI

添加工具

1
2
3
4
5
6
7
from langchain_tavily import TavilySearch

tool = TavilySearch(
tavily_api_key="tvly-dev-",
max_results=2)
tools = [tool]
tool.invoke("李超是谁?")

定义图

在LLM上添加bind_tools。这让LLM知道如果它想使用搜索引擎,应使用正确的JSON格式。

定义聊天模型llm(代码同上)

将tools整合到StateGraph

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from typing import Annotated

from typing_extensions import TypedDict

from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages

class State(TypedDict):
messages: Annotated[list, add_messages]

graph_builder = StateGraph(State)

# 将一个或多个**工具(tools) 绑定到一个 大型语言模型(LLM)**上,从而创建一个新的、具备工具调用能力的 LLM 实例
llm_with_tools = llm.bind_tools(tools)

def chatbot(state: State):
return {"messages": [llm_with_tools.invoke(state["messages"])]}

graph_builder.add_node("chatbot", chatbot)

创建一个运行工具的函数

现在,创建一个函数来运行被调用的工具。通过将工具添加到一个名为BasicToolNode的新节点来完成,该节点检查状态中的最新消息,如果消息包含tool_calls,则调用工具。它依赖于LLM的tool_calling支持,该支持在Anthropic、OpenAI、Google Gemini以及许多其他LLM提供商中可用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# 定义 __call__ 方法,让这个类的实例可以像函数一样被调用
# inputs 是 langgraph 传进来的当前状态,是一个字典
def __call__(self, inputs: dict):
# 1. 从状态中获取最新的消息
# 使用了“海象操作符” :=,先从 inputs 中获取 'messages' 列表,如果不存在则返回空列表 []
# 然后检查列表是否为空。如果不为空,则取出最后一条消息。
if messages := inputs.get("messages", []):
message = messages[-1] # 通常,最后一条消息是 AI 发出的,其中包含工具调用请求
else:
# 如果没有消息,就报错,因为这个节点不知道该做什么
raise ValueError("No message found in input")

# 2. 准备一个列表,用来存放所有工具的执行结果
outputs = []

# 3. 遍历 AI 消息中请求的所有工具调用
for tool_call in message.tool_calls:

# 4. 执行工具
# a. tool_call["name"] 获取工具名称 (例如 'tavily_search_results_json')
# b. self.tools_by_name[...] 从预存的工具字典中找到对应的工具对象
# c. .invoke(tool_call["args"]) 使用 LLM 提供的参数来调用该工具
tool_result = self.tools_by_name[tool_call["name"]].invoke(
tool_call["args"]
)

# 5. 将工具执行结果打包成 ToolMessage
# 这是 langgraph/langchain 的标准格式,用于告诉 LLM 工具执行的结果是什么
outputs.append(
ToolMessage(
content=json.dumps(tool_result), # 工具结果必须是字符串,所以用 json.dumps 序列化
name=tool_call["name"], # 告诉 LLM 这是哪个工具的结果
tool_call_id=tool_call["id"], # 必须提供原始请求的 ID,以便 LLM 知道这个结果对应哪个请求
)
)

# 6. 返回结果,更新图的状态
# 返回一个字典,其中 'messages' 键对应着包含所有 ToolMessage 的列表
# langgraph 会将这个列表中的消息追加到主状态的 'messages' 列表中
return {"messages": outputs}

call 是 Python 中一个非常特殊的“魔术方法”(magic method)。它的作用是 让一个类的实例(对象)能够像函数一样被调用 。

这在 langgraph 中是一种常见且核心的设计模式。它的含义是:

  1. 节点即函数 : BasicToolNode 的实例(比如 tool_node )本身就代表了图中的一个可执行节点。
  2. 执行逻辑 :当 langgraph 的状态机运行到这个 tool_node 节点时,它会直接“调用”这个节点对象,并把当前的状态( inputs 字典)传递给它

可以使用LangGraph预构建的ToolNode

1
2
3
4
from langgraph.prebuilt import ToolNode

tool_node = ToolNode(tools=[tool])
graph_builder.add_node("tools", tool_node)

定义conditional_edges

添加了工具节点后,现在您可以定义conditional_edges

边(Edges)将控制流从一个节点路由到下一个节点。条件边(Conditional edges)从单个节点开始,通常包含“if”语句,根据当前图状态路由到不同的节点。这些函数接收当前的图state并返回一个字符串或字符串列表,指示接下来要调用哪个(或哪些)节点。

接下来,定义一个名为route_tools的路由函数,它检查聊天机器人输出中的tool_calls。通过调用add_conditional_edges将此函数提供给图,这会告诉图,无论何时chatbot节点完成,都要检查此函数以确定下一步去哪里。

如果存在工具调用,条件将路由到tools;如果不存在,则路由到END。由于条件可以返回END,因此这次您不需要明确设置finish_point

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def route_tools(
state: State,
):
"""
用于 conditional_edge 的路由函数:
- 如果最后一条消息包含工具调用,则路由到 ToolNode
- 否则路由到结束节点
"""
# 处理 state 为列表的情况(可能是消息列表)
if isinstance(state, list):
ai_message = state[-1] # 获取最后一条消息
# 处理 state 为字典的情况(包含 messages 字段)
elif messages := state.get("messages", []):
ai_message = messages[-1] # 获取最后一条消息
else:
raise ValueError(f"输入状态中没有找到消息: {state}")

# 检查消息是否有工具调用
if hasattr(ai_message, "tool_calls") and len(ai_message.tool_calls) > 0:
return "tools" # 有工具调用,返回 "tools" 路由到工具节点
return END # 没有工具调用,返回 END 结束流程

可以使用预构建的tools_condition代替route_tools以使其更简洁。

tools_condition 函数在聊天机器人需要使用工具时返回 “tools”,如果可以不使用响应则返回 “END”。

1
2
3
4
5
6
7
8
9
from langgraph.prebuilt import tools_condition
graph_builder.add_conditional_edges(
"chatbot",
tools_condition,
{"tools": "tools", END: END},
)
graph_builder.add_edge("tools", "chatbot")
graph_builder.add_edge(START, "chatbot")
graph = graph_builder.compile()

可视化图

如上

向机器人提问

现在您可以向聊天机器人提出超出其训练数据范围的问题。

如上

添加记忆功能

LangGraph 通过持久性检查点解决了这个问题。如果您在编译图时提供一个checkpointer,并在调用图时提供一个thread_id,LangGraph 会在每一步之后自动保存状态。当您使用相同的thread_id再次调用图时,图会加载其保存的状态,允许聊天机器人从上次中断的地方继续。

我们稍后会看到,检查点比简单的聊天记忆功能强大得多——它允许您随时保存和恢复复杂状态,用于错误恢复、人工干预工作流、时间旅行交互等。但首先,让我们添加检查点以实现多轮对话。

创建 MemorySaver 检查点

1
2
3
from langgraph.checkpoint.memory import MemorySaver

memory = MemorySaver()

这是一个内存中的检查点,方便本教程使用。然而,在生产应用程序中,您可能会将其更改为使用 SqliteSaverPostgresSaver 并连接数据库。

编译图

使用提供的检查点编译图,图在遍历每个节点时将对 State 进行检查点。

1
graph = graph_builder.compile(checkpointer=memory)

与您的聊天机器人互动

  1. 选择一个线程作为此对话的键。

    thread_id决定对话窗口

    1
    config = {"configurable": {"thread_id": "1"}}
  2. 调用您的聊天机器人

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    user_input = "我是谁"

    # The config is the **second positional argument** to stream() or invoke()!
    events = graph.stream(
    {"messages": [{"role": "user", "content": user_input}]},
    config,
    stream_mode="values",
    )
    for event in events:
    event["messages"][-1].pretty_print()

添加人工干预

代理可能不可靠,并且可能需要人工输入才能成功完成任务。同样,对于某些操作,您可能需要在运行前要求人工批准,以确保一切按预期运行。

LangGraph 的持久化层支持人工干预工作流,允许根据用户反馈暂停和恢复执行。此功能的主要接口是interrupt函数。在节点内调用interrupt将暂停执行。通过传入Command,可以恢复执行并接收来自人工的新输入。interrupt在功能上类似于 Python 的内置input()但有一些注意事项

添加human_assistance工具

human_assistance工具添加到聊天机器人。此工具使用interrupt从人工接收信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 添加人工干预功能
# 导入LangGraph的中断机制和命令类型
from langgraph.types import Command, interrupt
# 导入LangChain的工具装饰器
from langchain_core.tools import tool

# 使用@tool装饰器将函数标记为可被LLM调用的工具
@tool
def human_assistance(query: str) -> str:
"""请求人工协助的工具函数。
当LLM遇到需要人工判断或帮助的情况时,会调用此工具。
该函数会暂停图的执行,等待人工操作员提供响应。
"""
# interrupt()函数会暂停图的执行,等待人工输入
# 传入的字典包含查询信息,人工操作员会看到这个查询
human_response = interrupt({"query": query})

# 从人工响应中提取数据并返回给LLM
# human_response是一个字典,"data"字段包含人工提供的实际响应
return human_response["data"]

简单来说, 调用哪个工具,以及何时调用,完全是由大语言模型(LLM)根据你给它的指令(Prompt)来决定的。

1
2
3
tool = TavilySearch(max_results=2)
tools = [tool, human_assistance]
llm_with_tools = llm.bind_tools(tools)

定义chatbot

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def chatbot(state: State):
# 调用绑定了工具的LLM(llm_with_tools),传入当前的消息历史
# LLM会根据最新的消息决定是生成文本回复,还是调用一个或多个工具
message = llm_with_tools.invoke(state["messages"])

# --- 关键断言逻辑 ---
assert len(message.tool_calls) <= 1

# 将LLM生成的新消息(可能是文本回复,也可能是工具调用请求)返回
# 这个返回值会以字典的形式更新到状态(State)对象中
return {"messages": [message]}

# 将 chatbot 函数作为名为 "chatbot" 的节点添加到图构建器中
graph_builder.add_node("chatbot", chatbot)

中断安全性的断言 ( assert ) : assert len(message.tool_calls) <= 1 是在实现人工干预时一个非常重要的 安全措施 。

  • 问题 : 现代 LLM 支持并行工具调用(一次请求执行多个工具)。但如果其中一个工具是 human_assistance 并触发了中断,整个图会暂停。当人工操作完成后,图会从中断点恢复。此时,如果不对工具调用数量做限制,LangGraph 可能会重新尝试执行所有在中断前请求的工具,导致已经执行过的工具被再次调用。
  • 解决方案 : 这个断言强制要求 LLM 在每一步最多只能请求调用一个工具。这样就保证了当中断发生并恢复后,不会有重复执行工具的风险,确保了流程的稳定性和可预测性。

编译图

1
2
3
memory = MemorySaver()

graph = graph_builder.compile(checkpointer=memory)

调用聊天机器人并中断

1
2
3
4
5
6
7
8
9
10
11
12
user_input = "我需要一些关于构建 AI 代理的专家指导。你能帮我请求协助吗?"
config = {"configurable": {"thread_id": "1"}}

events = graph.stream(
{"messages": [{"role": "user", "content": user_input}]},
config,
#它的作用是指定在进行流式处理时,你希望接收到的数据是以 完整的、累积的值 的形式返回,而不是以增量的、片段的形式返回。
stream_mode="values",
)
for event in events:
if "messages" in event:
event["messages"][-1].pretty_print()

聊天机器人生成了一个工具调用,但随后执行被中断。如果您检查图状态,您会看到它停止在工具节点

1
2
snapshot = graph.get_state(config)
snapshot.next

恢复执行

要恢复执行,请传入一个包含工具所需数据的Command对象。此数据的格式可以根据需要进行自定义。对于本示例,请使用一个带有键"data"的字典(由human_assistance决定)

1
2
3
4
5
6
7
8
9
10
human_response = (
"我们专家在此为您提供帮助!我们建议您查看 LangGraph 来构建您的代理。它比简单的自主代理更可靠、更具可扩展性。"
)
#从暂停状态恢复执行
human_command = Command(resume={"data": human_response})

events = graph.stream(human_command, config, stream_mode="values")
for event in events:
if "messages" in event:
event["messages"][-1].pretty_print()

1.工具的定义 ( human_assistance function):

  • 当 LLM 调用 human_assistance 工具时,这个函数被执行。
  • 函数内部, interrupt() 被调用,导致图暂停,并等待人工输入。
  • 在图恢复后, interrupt() 函数会返回一个值,这个值就是您通过 Command(resume=…) 注入的内容,也就是 {“data”: human_response} 。
  • 因此, human_assistance 函数中的 human_response 变量实际上就等于 {“data”: human_response} 。
  • 最后, return human_response[“data”] 从这个字典中提取出 “data” 键对应的值 ,并将其作为 human_assistance 工具的最终返回结果。

2.恢复指令 ( Command(resume=…) ):

  • 当您构建 Command(resume={“data”: human_response}) 时,您正在创建一个符合 human_assistance 函数期望的结构。
  • 您将人工回复包装在一个字典里,并使用 “data” 作为键。
  • 这个结构被传递回 interrupt() ,然后被 human_assistance 函数接收和解析。

因为 human_assistance 函数的 return 语句期望从返回的字典中访问 “data” 键,所以我们在恢复执行时必须提供一个具有相同结构的字典。这是为了确保数据能够正确地在中断和恢复的过程中传递。

在 LangGraph 中,Command 是一个用于控制图执行流程、更新状态、实现人机交互的核心类。它支持以下四个参数:

参数名 类型 说明
update dict 用于更新图的状态(state)。例如:Command(update={"foo": "bar"})
resume Any interrupt() 配合使用,用于恢复被中断的图执行,并传递用户输入。
goto strSendList[str|Send] 控制下一步要执行的节点,支持跳转到指定节点、多个节点序列,或使用 Send 对象。
graph str 可选,指定命令作用的图。默认是当前图,也可以设为 Command.PARENT 表示父图。

自定义状态

在本教程中,您将向状态添加额外字段,以定义复杂行为,而无需依赖消息列表。聊天机器人将使用其搜索工具查找特定信息,并将其转发给人工进行审查。

向状态添加键

通过向状态添加 namebirthday 键,更新聊天机器人以研究实体的生日

1
2
3
4
class State(TypedDict):
messages: Annotated[list, add_messages]
name: str
birthday: str

将此信息添加到状态中,可以使其轻松被其他图节点(例如存储或处理信息的下游节点)以及图的持久层访问。

在工具内部更新状态

现在,在 human_assistance 工具内部填充状态键。这允许人工在信息存储到状态之前对其进行审查。使用 Command工具内部发出状态更新。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# 从 langchain_core.messages 导入 ToolMessage,用于创建工具调用的响应消息
from langchain_core.messages import ToolMessage
# 从 langchain_core.tools 导入 InjectedToolCallId(用于自动注入工具调用ID)和 tool(工具装饰器)
from langchain_core.tools import InjectedToolCallId, tool

# 从 langgraph.types 导入 Command(用于向图发送指令)和 interrupt(用于中断图的执行)
from langgraph.types import Command, interrupt
from typing import Annotated
# @tool 装饰器将这个函数声明为一个可供 LLM 调用的工具
@tool
def human_assistance(
name: str,
birthday: str,
# tool_call_id 这个参数非常特殊。Annotated[...] 和 InjectedToolCallId 告诉 LangGraph:
# 1. 这个参数不应暴露给 LLM,LLM 在调用此工具时不需要提供它。
# 2. LangGraph 在执行时,会自动将触发此工具的那个工具调用的 ID 注入到这个参数中。
# 这个 ID 对于创建与原始请求相关联的 ToolMessage 至关重要。
tool_call_id: Annotated[str, InjectedToolCallId]
) -> str:
"""当需要人工确认或更正信息时,请求人类协助。"""
# 调用 interrupt() 来暂停图的执行,并向人类审核者呈现一个包含问题和待确认数据的字典。
# 图会在此处暂停,直到人类通过 resume 指令提供了响应。
human_response = interrupt(
{
"question": "Is this correct?",
"name": name,
"birthday": birthday,
},
)
# 检查人类的响应。如果响应中 'correct' 键的值是 'yes' 或 'y' 开头,
# 则认为信息是正确的。
if human_response.get("correct", "").lower().startswith("y"):
# 如果信息正确,直接使用从 LLM 获取的原始信息。
verified_name = name
verified_birthday = birthday
response = "Correct"
# 否则,认为人类审核者提供了更正后的信息。
else:
# 从人类的响应中获取更正后的姓名和生日。
# 如果人类没有提供新的值,则使用 .get() 的默认值,即原始值。
verified_name = human_response.get("name", name)
verified_birthday = human_response.get("birthday", birthday)
response = f"Made a correction: {human_response}"

# 在工具内部直接构造一个用于更新图状态的字典。
state_update = {
"name": verified_name, # 更新状态中的 'name' 字段
"birthday": verified_birthday, # 更新状态中的 'birthday' 字段
# 创建一个 ToolMessage,将其添加到状态的 'messages' 列表中。
# 这个消息将作为此工具调用的正式“答复”出现在对话历史中。
# tool_call_id 是必需的,用于将此答复与 LLM 的原始工具调用请求关联起来。
"messages": [ToolMessage(response, tool_call_id=tool_call_id)],
}
# 这个工具不返回一个简单的字符串或数字,而是返回一个 Command 对象。
# Command(update=...) 是一个明确的指令,告诉 LangGraph 执行器:
# “请不要将我的返回值当作普通工具输出,而是用 state_update 字典里的内容来直接更新当前的图状态。”
return Command(update=state_update)

图的其余部分保持不变。

提示聊天机器人调用人工审查

提示聊天机器人查找 LangGraph 库的“生日”,并在其获取所需信息后,指示聊天机器人使用 human_assistance 工具。通过在工具参数中设置 namebirthday,您将强制聊天机器人为这些字段生成提议。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
user_input = (
"你能查一下 LangGraph 是什么时候发布的吗? "
"当你有了答案后,使用 human_assistance 工具进行审查。"
)
config = {"configurable": {"thread_id": "1"}}

events = graph.stream(
{"messages": [{"role": "user", "content": user_input}]},
config,
stream_mode="values",
)
for event in events:
if "messages" in event:
event["messages"][-1].pretty_print()

我们再次在 human_assistance 工具中触发了 interrupt

添加人工协助

聊天机器人未能识别正确的日期,因此为其提供信息

1
2
3
4
5
6
7
8
9
10
11
human_command = Command(
resume={
"name": "LangGraph",
"birthday": "Jan 17, 2024",
},
)

events = graph.stream(human_command, config, stream_mode="values")
for event in events:
if "messages" in event:
event["messages"][-1].pretty_print()

请注意,这些字段现在已反映在状态中

1
2
3
snapshot = graph.get_state(config)

{k: v for k, v in snapshot.values.items() if k in ("name", "birthday")}

这使得下游节点(例如,进一步处理或存储信息的节点)可以轻松访问它们。

时间功能(从之前的某个状态开始)

在典型的聊天机器人工作流程中,用户与机器人进行一次或多次交互以完成任务。记忆人工干预功能可以为图状态启用检查点并控制未来的响应。

如果您希望用户能够从之前的响应开始并探索不同的结果,该怎么办?或者,如果您希望用户能够回溯聊天机器人的工作以纠正错误或尝试不同的策略,这在自主软件工程师等应用程序中很常见,那又该怎么办?

您可以使用 LangGraph 内置的时光旅行功能创建这些类型的体验。

回溯您的图

通过使用图的get_state_history方法获取检查点来回溯您的图。然后,您可以从之前的这个时间点恢复执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 初始化一个变量 to_replay 为 None,它将用于存储我们想要“时间旅行”回去的特定状态。
# to_replay 将在循环中被赋值为我们感兴趣的那个历史状态快照。
to_replay = None

# 遍历 `graph` 在给定 `config` 下的所有历史状态。
# `graph.get_state_history(config)` 会返回一个迭代器,其中包含了从开始到当前的所有状态快照。
for state in graph.get_state_history(config):
# 打印当前状态快照中的一些信息,以便我们观察和选择。
# `len(state.values["messages"])` 显示了到该状态为止,对话历史中的消息总数。
# `state.next` 显示了在该状态之后,图将要执行的下一个节点或步骤的名称。
print("Num Messages: ", len(state.values["messages"]), "Next: ", state.next)

# 打印一条分隔线,使输出更易读。
print("-" * 80)

# 这里是选择“时间旅行”目标点的关键逻辑。
# 我们设定一个条件:当对话历史中的消息数量正好等于4时,我们就找到了想要回到的那个点。
# 这是一个为了演示而设定的任意条件,在实际应用中,您可以根据需要设置更复杂的选择逻辑。
if len(state.values["messages"]) == 4:
# We are somewhat arbitrarily selecting a specific state based on the number of chat messages in the state.
# 将当前这个符合条件的状态(state)保存到 to_replay 变量中。
# 循环结束后,to_replay 变量将持有我们选中的那个历史时刻的完整状态,
# 之后我们就可以用它来恢复或修改执行流程。
to_replay = state

图的每一步都会保存检查点。这跨越了调用,因此您可以回溯整个线程的历史。

从特定时间点加载状态

to_replay状态恢复。从这一点恢复将接下来调用action节点。

1
2
print(to_replay.next)
print(to_replay.config)

检查点的to_replay.config包含一个checkpoint_id时间戳。提供此checkpoint_id值会告诉 LangGraph 的检查点器从该时间点加载状态。

1
2
3
for event in graph.stream(None, to_replay.config, stream_mode="values"):
if "messages" in event:
event["messages"][-1].pretty_print()

运行本地服务器

安装 LangGraph CLI

1
2
3
# Python >= 3.11 is required.

pip install --upgrade "langgraph-cli[inmem]"

创建 LangGraph 应用 🌱

new-langgraph-project-python 模板new-langgraph-project-js 模板 创建一个新应用。此模板展示了一个单节点应用程序,您可以根据自己的逻辑进行扩展。

1
langgraph new . --template new-langgraph-project-python

使用 langgraph new 而不指定模板,系统将显示一个交互式菜单,您可以从中选择可用的模板列表。

使用uv安装依赖项

uv 是一个用 Rust 编写的极速 Python 包和项目管理器 。它旨在解决传统 Python 包管理工具(如 pip 、 poetry 等)在速度和效率方面的痛点,提供更快的安装、依赖解析和环境管理。

1
pip install uv#安装uv

运行uv sync会根据 pyproject.toml的依赖创建虚拟环境并安装依赖

pyproject.toml 文件是 Python 项目中用于统一配置项目元数据、构建系统、依赖管理和各种工具设置的标准化文件。它通常用于替代旧的 requirements.txt 文件,提供更现代和集中的项目配置方式。

创建一个 .env 文件

您将在新 LangGraph 应用的根目录下找到一个 .env.example 文件。在新 LangGraph 应用的根目录下创建一个 .env 文件,并将 .env.example 文件的内容复制到其中,填入所需的 API 密钥。

添加环境变量如LANGSMITH_API_KEY,OPENAI_API_KEY等

启动 LangGraph 服务器

在本地启动 LangGraph API 服务器

1
langgraph dev

示例输出

1
2
3
4
5
6
7
>    Ready!
>
> - API: [https://:2024](https://:2024/)
>
> - Docs: https://:2024/docs
>
> - LangGraph Studio Web UI: https://smith.langchain.com/studio/?baseUrl=http://127.0.0.1:2024

LangGraph 服务器(如您通过 langgraph dev 命令启动的服务器)的主要作用是提供一个运行环境和接口,用于开发、测试、部署和管理基于 LangGraph 构建的 AI 代理和应用程序。具体来说,它有以下几个主要用途:

  1. API 接口暴露 :它将您用 LangGraph 定义的复杂代理逻辑(即图结构)通过标准的 RESTful API 接口暴露出来。这意味着其他应用程序、前端界面或者其他服务可以通过 HTTP 请求与您的 LangGraph 代理进行交互,而无需直接集成 LangGraph 的 Python 代码。

  2. 简化部署 :通过将 LangGraph 应用程序打包成一个可运行的服务,您可以更容易地将其部署到云服务器、容器(如 Docker)或其他生产环境中。这使得 LangGraph 代理可以作为一个独立的微服务运行,方便扩展和管理。

  3. 开发和调试便利 :

    • 实时预览和调试 :服务器通常会提供一个 Studio UI(如您在 http://127.0.0.1:2024/studio 看到的),让开发者能够可视化地查看代理的图结构、执行流程、状态变化和中间步骤,这对于理解和调试复杂的代理行为至关重要。
    • API 文档 :自动生成的 API 文档(如 http://127.0.0.1:2024/docs )提供了所有可用接口的详细说明和交互式测试功能,极大地加速了开发和集成过程。
  4. 状态管理和持久化 :LangGraph 代理通常涉及复杂的状态管理。服务器可以负责处理这些状态的持久化,确保代理在多次交互之间能够记住上下文和历史信息。

在 LangGraph Studio 中测试您的应用程序

LangGraph Studio 是一个专门的 UI,您可以连接到 LangGraph API 服务器,以便在本地可视化、交互和调试您的应用程序。通过访问 langgraph dev 命令输出中提供的 URL,在 LangGraph Studio 中测试您的图。

1
>    - LangGraph Studio Web UI: https://smith.langchain.com/studio/?baseUrl=http://127.0.0.1:2024

参考资料

构建一个基本聊天机器人 - LangChain 框架

Overview - Docs by LangChain

官方教程,但是英文https://academy.langchain.com/collections

3.5 小时出证!LangGraph 官方课程 🆓 重磅上线🔥🔥🔥_哔哩哔哩_bilibili

注入上下文

“注入上下文”就是在运行过程中节点/大模型 可能需要、但不会(也不应该)去改变的只读信息的集合。

场景 注入上下文里可能放什么
权限控制 user_id, tenant_id(决定能访问哪些数据)
外部依赖 db_connection, api_key, s3_bucket(节点里要用)
个性化参数 language, timezone, model_temperature
会话元信息 session_id, channel(Slack / 微信 / Web)

案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from dataclasses import dataclass

from langgraph.graph import StateGraph
from langgraph.runtime import Runtime

@dataclass
class Context:
"""Context schema defined by the developer."""
user_id: str
db_connection: str

def node(state: State, runtime: Runtime[Context]):
# type safe access to context attributes
user_id = runtime.context.user_id
db_conn = runtime.context.db_connection
...

builder = StateGraph(state_schema=State, context_schema=Context)

# add nodes, edges, compile the graph...

# top level context arg is typed as Context for autocomplete and type checking
result = graph.invoke(
{'input': 'abc'},
context=Context(user_id='123', db_conn='conn_mock')
)

Runtime 类提供了一个单一接口,用于访问信息,例如:

  • 上下文:在运行开始时传递的静态数据
  • 存储:长期记忆的存储机制
  • 流写入器:用于向图输出流写入的自定义函数
  • 对于功能 API 用户,previous 也可用:给定线程的前一个返回值

现在,开发者不再需要将上述所有内容作为单独的参数注入到节点函数中, 而是可以通过一个 runtime 参数来访问它们。

配置langsmith

安装LangSmith SDK

1
pip install langsmith

环境变量

获取apiLangSmith

设置相应的环境变量。这将把跟踪记录到default项目(尽管您可以轻松更改)。

1
2
3
export LANGSMITH_TRACING=true
export LANGSMITH_API_KEY=
export LANGSMITH_PROJECT=default
1
2
3
4
5
LANGSMITH_TRACING=true
LANGSMITH_ENDPOINT="https://api.smith.langchain.com"
LANGSMITH_API_KEY="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"
LANGSMITH_PROJECT="langgraph"

资源

官网《LangSmith》 — LangSmith

参考文档LangSmith 入门 | 🦜️🛠️ LangSmith 文档

Get started with LangSmith | 🦜️🛠️ LangSmith

环境配置

python虚拟环境构建

1
python -m venv .venv
1
2
3
4
5
pip install langgraph==0.2.74                  
pip install langchain-openai==0.3.6
pip install fastapi==0.115.8
pip install uvicorn==0.34.0
pip install gradio==5.18.0

查看包pip list

构建一个基本的fastapi+langgraph应用

llm示例的构建(利用ChatOpenAI)

1
2
3
4
5
6
7
8
9
# 创建LLM实例
llm = ChatOpenAI(
base_url=config["base_url"],
api_key=config["api_key"],
model=config["model"],
temperature=DEFAULT_TEMPERATURE,
timeout=30, # 添加超时配置(秒)
max_retries=2 # 添加重试次数
)

数据类型的构建

继承于pydantic

规范化 API 请求和响应的数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# 定义消息类,用于封装API接口返回数据
#基于 Pydantic 的数据模型
# 定义Message类
class Message(BaseModel):
'''
role (角色): 这是一个字符串,表示消息的发送者。常见的角色包括:

- user (用户): 表示用户输入的消息。
- assistant (助手): 表示聊天机器人或模型生成的消息。
- system (系统): 表示为模型提供上下文或指令的系统消息。
'''
role: str
content: str

# 定义ChatCompletionRequest类
#聊天 API 请求
class ChatCompletionRequest(BaseModel):
messages: List[Message]
stream: Optional[bool] = False#是否流式方式响应
userId: Optional[str] = None#用于标识发起请求的用户
conversationId: Optional[str] = None#用于标识特定的对话会话,这对于管理对话上下文或历史记录非常有用

# 定义ChatCompletionResponseChoice类
#聊天完成响应中的一个“选择”或一个生成的回复
class ChatCompletionResponseChoice(BaseModel):
index: int
message: Message
finish_reason: Optional[str] = None

# 定义ChatCompletionResponse类
class ChatCompletionResponse(BaseModel):
id: str = Field(default_factory=lambda: f"chatcmpl-{uuid.uuid4().hex}")
object: str = "chat.completion"
created: int = Field(default_factory=lambda: int(time.time()))
choices: List[ChatCompletionResponseChoice]#模型生成的所有可能的回复选项
system_fingerprint: Optional[str] = None

定义fastapi应用并管理应用的生命周期

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# 定义了一个异步函数lifespan,它接收一个FastAPI应用实例app作为参数。这个函数将管理应用的生命周期,包括启动和关闭时的操作
# 函数在应用启动时执行一些初始化操作,如加载上下文数据、以及初始化问题生成器
# 函数在应用关闭时执行一些清理操作
# @asynccontextmanager 装饰器用于创建一个异步上下文管理器,它允许你在 yield 之前和之后执行特定的代码块,分别表示启动和关闭时的操作
@asynccontextmanager
async def lifespan(app: FastAPI):
# 启动时执行
# 申明引用全局变量,在函数中被初始化,并在整个应用中使用
global graph

try:
logger.info("正在初始化模型、定义Graph...")
#(1)初始化LLM
llm = get_llm(llm_type)
#(2)定义Graph
graph = create_graph(llm)
#(3)将Graph可视化图保存
save_graph_visualization(graph)
logger.info("初始化完成")
except Exception as e:
logger.error(f"初始化过程中出错: {str(e)}")
# raise 关键字重新抛出异常,以确保程序不会在错误状态下继续运行
raise

# yield 关键字将控制权交还给FastAPI框架,使应用开始运行
# 分隔了启动和关闭的逻辑。在yield 之前的代码在应用启动时运行,yield 之后的代码在应用关闭时运行
yield
# 关闭时执行
logger.info("正在关闭...")


# lifespan参数用于在应用程序生命周期的开始和结束时执行一些初始化或清理工作
app = FastAPI(lifespan=lifespan)

langgraph核心逻辑

创建langgraph

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 定义chatbot的状态
class State(TypedDict):
messages: Annotated[list, add_messages]

# 创建和配置chatbot的状态图
def create_graph(llm) -> StateGraph:
try:
# 构建graph
#创建一个 StateGraph 的实例,并将其配置为使用 State 类作为其状态管理的数据模型
graph_builder = StateGraph(State)

# 定义chatbot的node
def chatbot(state: State) -> dict:
# 处理当前状态并返回 LLM 响应
return {"messages": [llm.invoke(state["messages"])]}

# 配置graph
#第二个参数 chatbot :这是一个可调用对象(通常是一个函数或方法),它定义了当执行流程到达这个名为 "chatbot" 的节点时,应该执行什么操作。
graph_builder.add_node("chatbot", chatbot)
graph_builder.add_edge(START, "chatbot")
graph_builder.add_edge("chatbot", END)

# 这里使用内存存储 也可以持久化到数据库
memory = MemorySaver()

# 编译生成graph并返回
#checkpointer 参数将 memory 实例传递给编译过程,使得图能够管理其状态的保存和加载。编译后的图对象被返回,这个对象可以被调用来运行聊天机器人。
return graph_builder.compile(checkpointer=memory)

except Exception as e:
raise RuntimeError(f"Failed to create graph: {str(e)}")

可视化langgraph节点

1
2
3
4
5
6
7
8
# 将构建的graph可视化保存为 PNG 文件
def save_graph_visualization(graph: StateGraph, filename: str = "graph.png") -> None:
try:
with open(filename, "wb") as f:
f.write(graph.get_graph().draw_mermaid_png())
logger.info(f"Graph visualization saved as {filename}")
except IOError as e:
logger.info(f"Warning: Failed to save graph visualization: {str(e)}")

封装接口

包含流式输出与非流式输出的处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# 封装POST请求接口,与大模型进行问答
@app.post("/v1/chat/completions")
async def chat_completions(request: ChatCompletionRequest):
# 判断初始化是否完成
if not graph:
logger.error("服务未初始化")
raise HTTPException(status_code=500, detail="服务未初始化")

try:
logger.info(f"收到聊天完成请求: {request}")

query_prompt = request.messages[-1].content
logger.info(f"用户问题是: {query_prompt}")

config = {"configurable": {"thread_id": request.userId+"@@"+request.conversationId}}
logger.info(f"用户当前会话信息: {config}")

prompt_template_system = PromptTemplate.from_file(PROMPT_TEMPLATE_TXT_SYS)
prompt_template_user = PromptTemplate.from_file(PROMPT_TEMPLATE_TXT_USER)
prompt = [
{"role": "system", "content": prompt_template_system.template},
{"role": "user", "content": prompt_template_user.template.format(query=query_prompt)}
]

# 处理流式响应
if request.stream:
async def generate_stream():
chunk_id = f"chatcmpl-{uuid.uuid4().hex}"
async for message_chunk, metadata in graph.astream({"messages": prompt}, config, stream_mode="messages"):
chunk = message_chunk.content
logger.info(f"chunk: {chunk}")
# 在处理过程中产生每个块
yield f"data: {json.dumps({'id': chunk_id,'object': 'chat.completion.chunk','created': int(time.time()),'choices': [{'index': 0,'delta': {'content': chunk},'finish_reason': None}]})}\n\n"
# 流结束的最后一块
yield f"data: {json.dumps({'id': chunk_id,'object': 'chat.completion.chunk','created': int(time.time()),'choices': [{'index': 0,'delta': {},'finish_reason': 'stop'}]})}\n\n"
# 返回fastapi.responses中StreamingResponse对象
return StreamingResponse(generate_stream(), media_type="text/event-stream")

# 处理非流式响应处理
else:
try:
events = graph.stream({"messages": prompt}, config)
for event in events:
for value in event.values():
result = value["messages"][-1].content
except Exception as e:
logger.info(f"Error processing response: {str(e)}")

formatted_response = str(format_response(result))
logger.info(f"格式化的搜索结果: {formatted_response}")
#封装响应
response = ChatCompletionResponse(
choices=[
ChatCompletionResponseChoice(
index=0,
message=Message(role="assistant", content=formatted_response),
finish_reason="stop"
)
]
)
logger.info(f"发送响应内容: \n{response}")
# 返回fastapi.responses中JSONResponse对象
# model_dump()方法通常用于将Pydantic模型实例的内容转换为一个标准的Python字典,以便进行序列化
return JSONResponse(content=response.model_dump())

except Exception as e:
logger.error(f"处理聊天完成时出错:\n\n {str(e)}")
raise HTTPException(status_code=500, detail=str(e))

langgraph的短期记忆与长期记忆

LangGraph支持两种对于构建对话代理至关重要的内存类型:

  • 短期内存:通过在会话中维护消息历史来跟踪正在进行的对话。
  • 长期内存:在不同会话之间存储用户特定或应用程序级别的数据。
image-20250715094855646

在LangGraph中

  • 短期内存也称为线程级内存
  • 长期内存也称为跨线程内存

教程地址

NanGePlus/LangGraphChatBot: 使用LangGraph+DeepSeek-R1+FastAPI+Gradio实现一个带有记忆功能的流量包推荐智能客服web端用例,同时也支持gpt大模型、国产大模型(OneApi方式)、Ollama本地开源大模型、阿里通义千问大模型

LangGraph+deepseek-r1+FastAPI+Gradio实现拥有记忆的流量包推荐智能客服web端用例,同时也支持gpt、国产大模型、Ollama_哔哩哔哩_bilibili

prompt Engineering

Prompt Engineering是与大型语言模型(LLM)交互的基础,其核心在于精心设计输入内容,以引导模型生成期望的输出。

尽管 Prompt Engineering 至关重要,但对于构建稳健、可用于生产环境的系统而言,它存在固有的局限性:

  • 脆弱性&不可复现性: 提示中微小的措辞变化可能导致输出结果的巨大差异,使得这一过程更像是一种依赖反复试错的“艺术”,而非可复现的“科学” 。

  • 扩展性差: 手动、迭代地优化提示的过程,在面对大量用户、多样化用例和不断出现的边缘情况时,难以有效扩展 。

  • 用户负担: 这种方法将精心构建一套详尽指令的负担完全压在了用户身上,对于需要自主运行、或处理高并发请求的系统而言是不切实际的 。

  • 无状态性: Prompt Engineering 本质上是为单轮、“一次性”的交互而设计的,难以处理需要记忆和状态管理的长对话或多步骤任务 。

Context Engineering

Context Engineering是一门设计、构建并优化动态自动化系统的学科,旨在为大型语言模型在正确的时间、以正确的格式,提供正确的信息和工具,从而可靠、可扩展地完成复杂任务

prompt 告诉模型如何思考,而 Context 则赋予模型完成工作所需的知识和工具。

  • Context Engineering 决定用什么内容填充 Context Window

  • Prompt Engineering 则负责优化窗口内的具体指令

Context Engineering 的基石:RAG(Retrieval-Augmented Generation)

本部分将阐述检索增强生成(RAG)作为实现 Context Engineering 的主要架构模式。

解决LLM的核心弱点

RAG直接解决了标准LLM在企业应用中存在的固有局限性:

  • 知识冻结: LLM的知识被冻结在其训练数据的时间点。RAG通过在推理时注入实时的、最新的信息来解决这个问题 。

  • 缺乏领域专有知识: 标准LLM无法访问组织的内部私有数据。RAG则能够将LLM连接到这些内部知识库,如技术手册、政策文件等 。

  • 幻觉(Hallucination): LLM 会不同程度上地编造事实。RAG通过将模型的回答“锚定”在可验证的、检索到的证据上,提高事实的准确性和可信度 。

RAG工作流

  1. 索引(离线阶段): 在这个阶段,系统会处理外部知识源。文档被加载、分割成更小的 chunks,然后通过Embedding Model 转换为向量表示,并最终存储在专门的向量数据库中以备检索 。

  2. 推理(在线阶段): 当用户提出请求时,系统执行以下步骤:

    1. 检索(Retrieve): 将用户的查询同样转换为向量,然后在向量数据库中进行相似性搜索,找出与查询最相关的文档块。
    2. 增强(Augment): 将检索到的这些文档块与原始的用户查询、系统指令等结合起来,构建一个内容丰富的、增强的最终提示。
    3. 生成(Generate): 将这个增强后的提示输入给LLM,LLM会基于提供的上下文生成一个有理有据的回答 。

Context 工程化:如何判断和提取哪些内容应该进入上下文?

1.chunking

文本分块(Chunking)是RAG流程中最关键也最容易被忽视的一步。其目标是创建在语义上自成一体的文本块。

2.Reranking

为了平衡检索的速度和准确性,业界普遍采用两阶段检索流程。

  • 两阶段流程:

    • 第一阶段(召回): 使用一个快速、高效的检索器(如基于 bi-encoder 的向量搜索或BM25等词法搜索)进行广泛撒网,召回一个较大的候选文档集(例如,前100个) 。
    • 第二阶段(精排/重排序): 使用一个更强大但计算成本更高的模型,对这个较小的候选集进行重新评估,以识别出最相关的少数几个文档(例如,前5个) 。
  • Cross-Encoder: 交叉编码器之所以在重排序阶段表现优越,是因为它与双编码器的工作方式不同。双编码器独立地为查询和文档生成嵌入向量,然后计算它们的相似度。而交叉编码器则是将查询和文档同时作为输入,让模型在内部通过 Attention Mechanism 对二者进行深度交互。这使得模型能够捕捉到更细微的语义关系,从而给出更准确的相关性评分 。

  • 实际影响: 重排序显著提高了最终送入LLM的上下文质量,从而产出更准确、幻觉更少的答案。在金融、法律等高风险领域,重排序被认为是必不可少而非可选的步骤 。

3.优化上下文窗口:压缩与摘要

本节详细介绍用于主动管理上下文的技术,确保最有价值的信息被优先呈现。

  • 上下文压缩的目标: 缩短检索到的文档列表和/或精简单个文档的内容,只将最相关的信息传递给LLM。这能有效降低API调用成本、减少延迟,并缓解 Lost in the Middle 的问题 。

  • 压缩方法:

    • 过滤式压缩: 这类方法决定是保留还是丢弃整个检索到的文档。
      • LLMChainFilter: 利用一个LLM对每个文档的相关性做出简单的“是/否”判断 。
      • EmbeddingsFilter: 更经济快速的方法,根据文档嵌入与查询嵌入的余弦相似度来过滤文档 。
    • 内容提取式压缩: 这类方法会直接修改文档内容。
      • LLMChainExtractor: 遍历每个文档,并使用LLM从中提取仅与查询相关的句子或陈述 。
    • 用 top N 代替压缩: 像LLMListwiseRerank这样的技术,使用LLM对检索到的文档进行重排序,并只返回排名最高的N个,从而起到高质量过滤器的作用 。
  • 作为压缩策略的摘要: 对于非常长的文档或冗长的对话历史,可以利用LLM生成摘要。这些摘要随后被注入上下文,既保留了关键信息,又大幅减少了 Token 数量。这是在长时程运行的智能体中管理上下文的关键技术 。

智能体架构中的数据流与工作流编排

工作流(Workflow) vs. 智能体(Agent)

  • 工作流(Workflows)
    • 指的是LLM和工具通过预定义的代码路径进行编排的系统。在这种模式下,数据流动的路径是固定的、由开发者明确设计的,类似于上世纪流行的“专家系统”。例如,“第一步:分析用户邮件;第二步:根据分析结果在日历中查找空闲时段;第三步:起草会议邀请邮件”。这种模式确定性高,易于调试和控制,非常适合有明确业务流程的场景(如风控需求高、数据敏感、安全等级要求)。
  • 智能体(Agents)
    • 指的是LLM动态地指导自己的流程和工具使用,自主控制如何完成任务的系统。在这种模式下,数据流动的路径不是预先固定的,而是由LLM在每一步根据当前情况和目标动态决定的。这种模式灵活性高,能处理开放式问题,但可控性和可预测性较低 。

复杂的智能体通常是这两种模式的混合体,在宏观层面遵循一个预定义的工作流,但在某些节点内部,又赋予LLM一定的自主决策权。管理这一切的核心,我们称之为编排层(Orchestration Layer)

核心架构:预定义数据流的实现

  1. 链式工作流(Prompt Chaining)

  2. 路由工作流(Routing)

  3. 编排器-工作者模式(Orchestrator-Workers)

框架与工具

上述的架构和机制并非凭空存在,而是通过具体的开发框架实现的。其中,LangGraph作为LangChain的扩展,为构建具有显式数据流的智能体系统提供了强大的工具集。

LangGraph:用图(Graph)定义工作流(Workflow)

LangGraph的核心思想是将智能体应用构建成一个状态图(State Graph) 。这个图由节点和边组成,清晰地定义了数据如何在不同模块间流动

  • 状态(State): 这是整个图的核心,一个所有节点共享的中央数据对象。
    • 你可以把它想象成一个“数据总线”或共享内存。开发者需要预先定义State的结构,每个节点在执行时都可以读取和更新这个State对象 。
  • 节点(Nodes): 代表工作流中的一个计算单元或一个步骤。
    • 每个节点通常是一个Python函数,它接收当前的State作为输入,执行特定任务(如调用LLM、执行工具、处理数据),然后返回对State的更新 。
  • 边(Edges): 连接节点,定义了工作流的路径,即数据在State更新后应该流向哪个节点。
    • 简单边(Simple Edges): 定义了固定的、无条件的流向,用于实现链式工作流 。
    • 条件边(Conditional Edges): 用于实现路由逻辑。它会根据一个函数的输出来决定接下来应该走向哪个节点,从而实现流程的分支 。
  • 检查点(Checkpointer): LangGraph提供了持久化机制,可以在每一步执行后自动保存State的状态。这对于构建需要长期记忆、可中断和恢复、或需要 Human-in-the-Loop 的复杂业务流程至关重要 。

复杂业务流程的AI智能体,其核心挑战已从单纯优化信息检索(如RAG)或提示词,转向了对内部工作流和数据流的精心设计与编排

0%