LangGraph学习——agent——下

前言

本教程为langchain官方教程的学习记录

academy.langchain.com/enrollments

代码见[learn-rag-langchain/academy-langgraph at main · zxj-2023/learn-rag-langchain](https://github.com/zxj-2023/learn-rag-langchain/tree/main/academy-langgraph)

module-4

Parallel node execution 并行节点执行

Waiting for nodes to finish 等待节点完成

现在,让我们考虑一个案例,其中一个并行路径的步骤比另一个多。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
builder = StateGraph(State)

# Initialize each node with node_secret
builder.add_node("a", ReturnNodeValue("I'm A"))
builder.add_node("b", ReturnNodeValue("I'm B"))
builder.add_node("b2", ReturnNodeValue("I'm B2"))
builder.add_node("c", ReturnNodeValue("I'm C"))
builder.add_node("d", ReturnNodeValue("I'm D"))

# Flow
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "b2")
builder.add_edge(["b2", "c"], "d")
builder.add_edge("d", END)
graph = builder.compile()

display(Image(graph.get_graph().draw_mermaid_png()))
image-20250722143802652

在这种情况下,bb2c 都是同一个步骤的一部分。图形将在进入 d 步骤之前等待所有这些操作完成。

1
graph.invoke({"state": []})
1
2
3
4
5
6
Adding I'm A to []
Adding I'm B to ["I'm A"]
Adding I'm C to ["I'm A"]
Adding I'm B2 to ["I'm A", "I'm B", "I'm C"]
Adding I'm D to ["I'm A", "I'm B", "I'm C", "I'm B2"]
{'state': ["I'm A", "I'm B", "I'm C", "I'm B2", "I'm D"]}
Setting the order of state updates 设置状态更新的顺序

然而,在每个步骤中,我们无法对状态更新的顺序进行具体控制!简单来说,它是基于图拓扑结构由 LangGraph 确定的确定性顺序,该顺序为 *我们无法控制*

上面,我们看到 c 被添加在 b2 之前

然而,我们可以使用自定义 reducer 来定制此功能,例如,对状态更新进行排序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def sorting_reducer(left, right):
""" 合并并排序列表中的值"""
# 如果 left 不是列表,则将其转换为列表
if not isinstance(left, list):
left = [left]

# 如果 right 不是列表,则将其转换为列表
if not isinstance(right, list):
right = [right]

# 合并 left 和 right 列表,然后升序排序
return sorted(left + right, reverse=False)
class State(TypedDict):
# sorting_reducer 函数将对 state 中的值进行排序
state: Annotated[list, sorting_reducer]
1
graph.invoke({"state": []})
1
{'state': ["I'm A", "I'm B", "I'm B2", "I'm C", "I'm D"]}

现在,reducer 对更新的状态值进行排序!sorting_reducer 示例对所有值进行全局排序。我们还可以:

  1. 在并行步骤期间将输出写入状态中的单独字段
  2. 在并行步骤之后使用“汇”节点来合并和排序这些输出
  3. 合并后清除临时字段

请参阅 docs 以获取更多详细信息。

Working with LLMs 使用 LLMs

现在,让我们添加一个现实中的例子!我们希望从两个外部来源(Wikipedia 和 Web-Search)收集上下文信息,并让 LLM 回答一个问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
from langchain_core.messages import HumanMessage, SystemMessage

from langchain_community.document_loaders import WikipediaLoader
from langchain_community.tools import TavilySearchResults

def search_web(state):

""" 从网络搜索中检索文档 """

# 搜索
tavily_search = TavilySearchResults(max_results=3)
search_docs = tavily_search.invoke(state['question'])

# 将多个搜索文档转换成了一个统一格式的长文本,每个文档都有自己的元数据(如URL),并且文档之间有明确的分隔,便于后续处理或展示。
"""
- 这是一个 列表推导式 (list comprehension) ,它会遍历 search_docs 列表中的每一个元素(这里我们称之为 doc )。
- search_docs 里的每个 doc 应该是一个包含 'url' 和 'content' 键的字典。
"""
formatted_search_docs = "\n\n---\n\n".join(
[
f'<Document href="{doc["url"]}">\n{doc["content"]}\n</Document>'
for doc in search_docs
]
)

return {"context": [formatted_search_docs]}

def search_wikipedia(state):

""" 从维基百科中检索文档 """

# 搜索
search_docs = WikipediaLoader(query=state['question'],
load_max_docs=2).load()

# 格式化
formatted_search_docs = "\n\n---\n\n".join(
[
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}">\n{doc.page_content}\n</Document>'
for doc in search_docs
]
)

return {"context": [formatted_search_docs]}

def generate_answer(state):

""" 用于回答问题的节点 """

# 获取状态
context = state["context"]
question = state["question"]

# 模板
answer_template = """使用以下上下文回答问题 {question}: {context}"""
answer_instructions = answer_template.format(question=question,
context=context)

# 回答
answer = llm.invoke([SystemMessage(content=answer_instructions)]+[HumanMessage(content=f"回答问题。")])

# 将其附加到状态
return {"answer": answer}

# 添加节点
builder = StateGraph(State)

# 初始化每个节点
builder.add_node("search_web",search_web)
builder.add_node("search_wikipedia", search_wikipedia)
builder.add_node("generate_answer", generate_answer)

# 流程
builder.add_edge(START, "search_wikipedia")
builder.add_edge(START, "search_web")
builder.add_edge("search_wikipedia", "generate_answer")
builder.add_edge("search_web", "generate_answer")
builder.add_edge("generate_answer", END)
graph = builder.compile()

display(Image(graph.get_graph().draw_mermaid_png()))
image-20250722153534867
1
2
result = graph.invoke({"question": "英伟达2025年第一季度财报表现如何"})
result['answer'].content

Sub-graphs 子图

子图允许你在图表的不同部分创建和管理不同的状态。这在多智能体系统中尤其有用,尤其是在每个智能体都有各自状态的智能体团队中。

让我们考虑一个简单的例子:

  • 我有一个系统,它接收日志。
  • 该系统通过不同的代理执行两个独立的子任务(总结日志,查找故障模式)。
  • 我希望在两个不同的子图中执行这两个操作。

最重要的是要理解图表是如何传达信息的!

简而言之,通信是 *通过重叠密钥* 实现的:

  • 子图可以访问父图中的 docs(文档)。
  • 父图可以从子图中访问 summary(摘要)和 failure_report(故障报告)。
subgraph.png
  1. Logs (Traces):
    • 这是系统的输入,表示一系列的日志记录。
  2. Entry Graph:
    • 这是系统的入口图,它包含了总体状态(Overall State),其中包含:
      • docs:文档或日志数据。
      • summary report:摘要报告,这是系统执行摘要任务后生成的。
      • failure report:故障报告,这是系统执行故障分析任务后生成的。
  3. Call sub-graphs:
    • 这是从入口图调用两个子图的过程,分别用于执行摘要和故障分析任务。
  4. Summarization:
    • 这个子图负责生成日志的摘要。它的状态(Summary State)包含:
      • docs:与入口图共享的文档数据。
      • summary:摘要内容。
      • summary report:摘要报告,这是摘要任务完成后生成的。
  5. Failure Analysis:
    • 这个子图负责分析日志中的故障模式。它的状态(Failure Analysis State)包含:
      • docs:与入口图共享的文档数据。
      • failures:故障模式。
      • failure report:故障报告,这是故障分析任务完成后生成的。
  6. Finish:
    • 这是流程的结束点,表示两个子图的任务都已完成,并且生成了摘要报告和故障报告。
1
2
3
4
5
6
7
8
9
10
11
12
13
from operator import add
from typing_extensions import TypedDict
from typing import List, Optional, Annotated

#logs结构
class Log(TypedDict):
id: str
question: str
docs: Optional[List]
answer: str
grade: Optional[int]
grader: Optional[str]
feedback: Optional[str]
Sub graphs

这里是失败分析子图,它使用了 FailureAnalysisState

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from IPython.display import Image, display
from langgraph.graph import StateGraph, START, END

# 故障分析子图
class FailureAnalysisState(TypedDict):
"""用于故障分析的状态。"""
cleaned_logs: List[Log] # 清理后的日志列表
failures: List[Log] # 包含故障的日志列表
fa_summary: str # 故障分析摘要
processed_logs: List[str] # 已处理的日志标识符列表

class FailureAnalysisOutputState(TypedDict):
"""故障分析的输出状态。"""
fa_summary: str # 故障分析摘要
processed_logs: List[str] # 已处理的日志标识符列表

def get_failures(state):
"""获取包含故障的日志"""
cleaned_logs = state["cleaned_logs"]
# 从清理后的日志中筛选出包含 "grade" 键的日志,这些被视为故障
failures = [log for log in cleaned_logs if "grade" in log]
return {"failures": failures}

def generate_summary(state):
"""生成故障摘要"""
failures = state["failures"]
# 添加功能:fa_summary = summary_generation(qs_summary)
fa_summary = "Chroma 文档的检索质量不佳。"
# 为每个故障日志生成一个处理过的日志标识符
return {"fa_summary": fa_summary, "processed_logs": [f"failure-analysis-on-log-{failure['id']}" for failure in failures]}

fa_builder = StateGraph(state_schema=FailureAnalysisState,output_schema=FailureAnalysisOutputState)
fa_builder.add_node("get_failures", get_failures)
fa_builder.add_node("generate_summary", generate_summary)
fa_builder.add_edge(START, "get_failures")
fa_builder.add_edge("get_failures", "generate_summary")
fa_builder.add_edge("generate_summary", END)

graph = fa_builder.compile()
display(Image(graph.get_graph().draw_mermaid_png()))
image-20250723094949158

这里是问题总结子图,它使用了 QuestionSummarizationState

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# 摘要生成子图
class QuestionSummarizationState(TypedDict):
"""用于问题摘要的状态。"""
cleaned_logs: List[Log] # 清理后的日志列表
qs_summary: str # 问题摘要
report: str # 从摘要生成的报告
processed_logs: List[str] # 已处理的日志标识符列表

class QuestionSummarizationOutputState(TypedDict):
"""问题摘要的输出状态。"""
report: str # 最终报告
processed_logs: List[str] # 已处理的日志标识符列表

def generate_summary(state):
"""从日志生成摘要。"""
cleaned_logs = state["cleaned_logs"]
# 添加功能:summary = summarize(generate_summary)
summary = "问题集中在 ChatOllama 和 Chroma 向量存储的使用上。"
# 返回摘要以及已处理的日志ID
return {"qs_summary": summary, "processed_logs": [f"summary-on-log-{log['id']}" for log in cleaned_logs]}

def send_to_slack(state):
"""模拟发送报告。"""
qs_summary = state["qs_summary"]
# 添加功能:report = report_generation(qs_summary)
report = "foo bar baz"
return {"report": report}

qs_builder = StateGraph(QuestionSummarizationState,output_schema=QuestionSummarizationOutputState)
qs_builder.add_node("generate_summary", generate_summary)
qs_builder.add_node("send_to_slack", send_to_slack)
qs_builder.add_edge(START, "generate_summary")
qs_builder.add_edge("generate_summary", "send_to_slack")
qs_builder.add_edge("send_to_slack", END)

graph = qs_builder.compile()
display(Image(graph.get_graph().draw_mermaid_png()))
image-20250723100648199
Adding sub graphs to our parent graph 向父图添加子图

现在,我们可以将所有内容整合在一起。我们使用 EntryGraphState 创建父图。并且我们将我们的子图添加为节点!

1
2
3
4
5
6
7
8
9
# 入口图
class EntryGraphState(TypedDict):
# 原始日志数据列表
raw_logs: List[Log]
# 经过清洗处理后的日志数据列表
cleaned_logs: List[Log]
fa_summary: str # 这只会在故障分析子图中生成
report: str # 这只会在问题摘要子图中生成
processed_logs: Annotated[List[int], add] # 跟踪哪些日志已经被处理过 ,尤其是在子图之间共享状态时。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def clean_logs(state):
# 获取日志
raw_logs = state["raw_logs"]
# 数据清洗 raw_logs -> docs
cleaned_logs = raw_logs
return {"cleaned_logs": cleaned_logs}

entry_builder = StateGraph(EntryGraphState)
entry_builder.add_node("clean_logs", clean_logs)
#添加子图
entry_builder.add_node("question_summarization", qs_builder.compile())
entry_builder.add_node("failure_analysis", fa_builder.compile())

entry_builder.add_edge(START, "clean_logs")
entry_builder.add_edge("clean_logs", "failure_analysis")
entry_builder.add_edge("clean_logs", "question_summarization")
entry_builder.add_edge("failure_analysis", END)
entry_builder.add_edge("question_summarization", END)

graph = entry_builder.compile()

from IPython.display import Image, display

# 将 xray 设置为 1 将显示嵌套图的内部结构
display(Image(graph.get_graph(xray=1).draw_mermaid_png()))
image-20250723102913524

Map-reduce

LangGraph 里的 Map-Reduce 是一种并行处理模式,用于将一个大任务拆分成多个子任务(Map),再汇总结果(Reduce)。这是 LangGraph 中处理批量数据或并行节点执行的核心机制之一。

让我们设计一个系统,该系统将完成两件事情:

  1. 映射(Map) —— 根据主题生成一组笑话。
  2. 归约(Reduce) —— 从这组笑话中挑出最棒的一条。
1
2
3
4
5
6
7
8
9
10
11
12
from langchain_openai import ChatOpenAI

# Prompts we will use
subjects_prompt = """生成一个包含3个子主题的列表,这些子主题都与以下总体主题相关:{topic}。"""
joke_prompt = """生成一个关于{subject}的笑话"""
best_joke_prompt = """下面是关于{topic}的一堆笑话。选择最好的一个!返回最好笑话的ID,第一个笑话的ID从0开始。笑话:\n\n {jokes}"""
# LLM
model = ChatOpenAI(
model="qwen-plus-2025-04-28",
api_key="sk-",
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"
)
Parallelizing joke generation 并行化笑话生成

首先,让我们定义图的入口点,它将:

  • 接收用户输入的主题
  • 基于该主题生成若干“笑话子主题”
  • 将每个子主题发送到上面定义的笑话生成节点

我们的状态有一个 jokes 键,它将累积来自并行化笑话生成的笑话。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class Subjects(BaseModel):
"""定义一个Pydantic模型,用于表示主题列表。"""
subjects: list[str] # 主题字符串列表

class BestJoke(BaseModel):
"""定义一个Pydantic模型,用于表示最佳笑话的ID。"""
id: int # 最佳笑话的索引ID

class OverallState(TypedDict):
"""定义整个图的总体状态,使用TypedDict以便于类型提示和状态管理。"""
topic: str # 当前讨论的主题
subjects: list # 生成的子主题列表
jokes: Annotated[list, operator.add] # 笑话列表,使用operator.add表示列表内容会累加而不是覆盖
best_selected_joke: str # 最终选出的最佳笑话

生成笑话的主题。

1
2
3
4
5
6
7
#用于生成主题
def generate_topics(state: OverallState):
#使用 Python 的 format() 方法来构建一个提示字符串( prompt )
prompt = subjects_prompt.format(topic=state["topic"])
#.with_structured_output(Subjects) 它指示模型尝试将其输出格式化为 Subjects 类
response = model.with_structured_output(Subjects).invoke(prompt)
return {"subjects": response.subjects}

这就是妙处:我们利用 Send 为每个主题并行生成笑话。

非常实用!无论主题数量多少,它都能自动并行处理。

  • generate_joke:图中节点的名字
  • {"subject": s}:要传递的状态

Send 允许你向 generate_joke 节点发送任意结构的状态,无需与 OverallState 对齐。
在这里,generate_joke 使用自己的内部状态,我们通过 Send 按需填充即可。

在 LangGraph 里,Send 是一个“动态派发器”:它让你在运行时决定“要把哪个节点运行多少次、每次给它什么数据”,并自动并行执行。

1
2
3
4
5
6
from langgraph.types import Send
def continue_to_jokes(state: OverallState):
# 该函数根据当前状态中的主题列表,为每个主题生成一个 Send 对象。
# 每个 Send 对象都指示图将数据发送到名为 "generate_joke" 的节点,
# 并将当前主题作为 "subject" 参数传递,从而实现并行生成笑话。
return [Send("generate_joke", {"subject": s}) for s in state["subjects"]]
Joke generation (map) 笑话生成

现在,我们只需定义一个节点来生成笑话,命名为 generate_joke

生成的笑话会被写回到 OverallState 中的 jokes 字段。 该字段配有 reducer,能够自动把多次写入的列表合并成一个大列表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 定义 JokeState,用于表示生成笑话任务的输入状态。
class JokeState(TypedDict):
subject: str#笑话的主题

# 定义 Joke 模型,用于表示模型生成的笑话的结构。
class Joke(BaseModel):
joke: str#笑话的文本内容

# 定义生成笑话的函数。
def generate_joke(state: JokeState):
# 根据 joke_prompt 模板和当前状态中的主题格式化提示。
prompt = joke_prompt.format(subject=state["subject"])
# 调用语言模型,并指定输出应结构化为 Joke 类型。
response = model.with_structured_output(Joke).invoke(prompt)
# 返回一个包含生成的笑话的字典,键为 "jokes"。
return {"jokes": [response.joke]}
Best joke selection (reduce) 最佳笑话选择

现在,我们添加逻辑来挑选最好的笑话。

1
2
3
4
5
6
7
8
9
def best_joke(state: OverallState):
# 将状态中所有笑话列表连接成一个字符串,每个笑话之间用两个换行符分隔
jokes = "\n\n".join(state["jokes"])
# 使用主题和所有笑话格式化最佳笑话提示语
prompt = best_joke_prompt.format(topic=state["topic"], jokes=jokes)
# 调用模型,期望其输出符合 BestJoke 结构(包含最佳笑话的ID)
response = model.with_structured_output(BestJoke).invoke(prompt)
# 根据模型返回的ID,从笑话列表中选择最佳笑话,并将其存储在状态的 best_selected_joke 字段中
return {"best_selected_joke": state["jokes"][response.id]}
Compile 编译
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from IPython.display import Image
from langgraph.graph import END, StateGraph, START

# 构建图:将所有组件组合在一起构建我们的流程图
graph = StateGraph(OverallState)

graph.add_node("generate_topics", generate_topics)
graph.add_node("generate_joke", generate_joke)
graph.add_node("best_joke", best_joke)

graph.add_edge(START, "generate_topics")
# 根据条件决定是否继续生成笑话
graph.add_conditional_edges("generate_topics", continue_to_jokes, ["generate_joke"])
# 生成笑话后执行选择最佳笑话
graph.add_edge("generate_joke", "best_joke")
# 选择最佳笑话后流程结束
graph.add_edge("best_joke", END)


app = graph.compile()
Image(app.get_graph().draw_mermaid_png())
image-20250723112736244
1
2
for s in app.stream({"topic": "日本广岛原子弹"}):
print(s)
1
2
3
4
5
{'generate_topics': {'subjects': ['原子弹投放的历史背景与决策过程', '广岛原爆对城市与居民的影响', '战后和平运动与核裁军倡议']}}
{'generate_joke': {'jokes': ['为什么广岛的重建计划从不担心交通堵塞?因为每个人都习惯了‘瞬间消失’!(注:此笑话为虚构创作,旨在以幽默方式引发思考,并非对历史事件的不尊重)']}}
{'generate_joke': {'jokes': ['战后和平运动的人开会讨论核裁军,一个人站起来说:‘我们必须彻底销毁所有核武器!’ 另一个人犹豫地举手:‘那……我们保留一个吧,就一个,藏在冰箱后面,以防万一。’']}}
{'generate_joke': {'jokes': ["杜鲁门宣布要结束战争,顾问问是否要使用原子弹。罗斯福的棺材板突然震动了一下,丘吉尔的雪茄掉在了地上,斯大林则默默拿起了电话:'同志,我们的计划可能需要再推迟一下。'"]}}
{'best_joke': {'best_selected_joke': '为什么广岛的重建计划从不担心交通堵塞?因为每个人都习惯了‘瞬间消失’!(注:此笑话为虚构创作,旨在以幽默方式引发思考,并非对历史事件的不尊重)'}}

Research Assistant 研究助理

见另一篇文章

module-5

Chatbot with Memory 带有记忆功能的聊天机器人

在这里,我们将介绍 LangGraph Memory Store 作为一种保存和检索长期记忆的方法。

LangGraph Memory Store 是 LangGraph 提供的长期记忆存储接口,用于在 不同对话线程之间 持久化保存和检索用户信息。

名称 作用
BaseStore 抽象接口,定义了 put/get/search/delete 等操作方法
InMemoryStore 内存实现,适合原型验证
RedisStore / AsyncRedisStore 生产级实现,支持向量搜索、元数据过滤和命名空间管理

我们将构建一个使用 short-term (within-thread仅当前对话线程)long-term (across-thread跨所有对话线程 ) 内存的聊天机器人。

我们将重点关注长期 semantic memory(语义记忆),它将包含关于用户的事实信息。这些长期记忆将被用于创建一个个性化的聊天机器人,它可以记住有关用户的事实。

它将节省内存 “in the hot path”,当用户与之聊天时。

“in the hot path” 指的是:在对话或任务执行的主流程中(即用户输入后立即、同步地)主动调用工具或写入记忆,使信息实时生效并可用于下一步决策。

Introduction to the LangGraph Store LangGraph 存储简介

LangGraph Memory Store 提供了一种在 LangGraph 中 跨线程 存储和检索信息的方式。这是一个用于持久化 key-value 存储的 开源基类

1
2
3
import uuid
from langgraph.store.memory import InMemoryStore
in_memory_store = InMemoryStore()

Store 中存储对象(例如,记忆)时,我们提供:

- 对象的 namespace(类似于目录的元组)

- 对象的 key(类似于文件名)

- 对象的 value(类似于文件内容)

我们使用 put 方法通过 namespacekey 将对象保存到存储中。

image-20250725155616205
1
2
3
4
5
6
7
8
9
10
11
12
# 为要保存的记忆设置命名空间
user_id = "1" # 用户ID
namespace_for_memory = (user_id, "memories") # 记忆命名空间

# 生成一个唯一的键值
key = str(uuid.uuid4()) # 使用UUID创建唯一标识符作为键

# 值需要是一个字典格式
value = {"food_preference": "我喜欢披萨"} # 存储的食物偏好信息

# 保存记忆
in_memory_store.put(namespace_for_memory, key, value) # 将记忆存储到内存中

我们使用 search 通过 namespace 从存储中检索对象。这将返回一个列表。

1
2
3
4
5
6
7
8
9
memories = in_memory_store.search(namespace_for_memory)
type(memories)

# Metatdata
memories[0].dict()

# The key, value
print(memories[0].key, memories[0].value)
9e65de8a-f404-4974-b509-0df0566d8fb5 {'food_preference': '我喜欢披萨'}

我们还可以使用 get 通过 namespacekey 检索对象。

1
2
3
# Get the memory by namespace and key
memory = in_memory_store.get(namespace_for_memory, key)
memory.dict()
Chatbot with long-term memory 具有长期记忆的聊天机器人

我们想要一个聊天机器人,有两种记忆的方式:

  1. Short-term (within-thread) memory: 聊天机器人可以保留会话历史记录和/或允许在聊天会话中进行中断。
  2. Long-term (cross-thread) memory: 聊天机器人可以记住特定用户在所有聊天会话中的信息 跨会话

对于 short-term memory,我们将使用一个 checkpointer

  • 他们在每一步都将图状态写入线程中。
  • 他们在该线程中持久化保存聊天历史记录。
  • 他们允许图在该线程中的任何步骤被中断和/或恢复。

对于 long-term memory,我们将使用上面介绍的 LangGraph Store

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
from IPython.display import Image, display

from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.store.base import BaseStore

from langchain_core.messages import HumanMessage, SystemMessage
from langchain_core.runnables.config import RunnableConfig

# 聊天机器人指令
MODEL_SYSTEM_MESSAGE = """你是一个拥有记忆功能的有用助手,能够提供关于用户的信息。
如果你有这个用户的记忆,请使用它来个性化你的回复。
以下是记忆内容(可能为空):{memory}"""

# 根据聊天历史和任何现有记忆创建新记忆
CREATE_MEMORY_INSTRUCTION = """"你正在收集用户信息以个性化你的回复。

当前用户信息:
{memory}

指令:
1. 仔细查看下面的聊天历史
2. 识别有关用户的新信息,例如:
- 个人详情(姓名、位置)
- 偏好(喜欢、不喜欢)
- 兴趣和爱好
- 过去的经历
- 目标或未来计划
3. 将任何新信息与现有记忆合并
4. 将记忆格式化为清晰的项目符号列表
5. 如果新信息与现有记忆冲突,请保留最新版本

记住:只包括用户直接陈述的事实信息。不要做假设或推断。

基于以下聊天历史,请更新用户信息:"""

def call_model(state: MessagesState, config: RunnableConfig, store: BaseStore):

"""从存储中加载记忆并使用它来个性化聊天机器人的回复。"""

# 从配置中获取用户ID
user_id = config["configurable"]["user_id"]

# 从存储中检索记忆
namespace = ("memory", user_id)
key = "user_memory"
existing_memory = store.get(namespace, key)

# 如果存在则提取实际的记忆内容并添加前缀
if existing_memory:
# 值是一个带有memory键的字典
existing_memory_content = existing_memory.value.get('memory')
else:
existing_memory_content = "未找到现有记忆。"

# 在系统提示中格式化记忆
system_msg = MODEL_SYSTEM_MESSAGE.format(memory=existing_memory_content)

# 使用记忆以及聊天历史进行回复
response = model.invoke([SystemMessage(content=system_msg)]+state["messages"])

return {"messages": response}

def write_memory(state: MessagesState, config: RunnableConfig, store: BaseStore):

"""反思聊天历史并将记忆保存到存储中。"""

# 从配置中获取用户ID
user_id = config["configurable"]["user_id"]

# 从存储中检索现有记忆
namespace = ("memory", user_id)
existing_memory = store.get(namespace, "user_memory")

# 提取记忆
if existing_memory:
existing_memory_content = existing_memory.value.get('memory')
else:
existing_memory_content = "未找到现有记忆。"

# 在系统提示中格式化记忆
system_msg = CREATE_MEMORY_INSTRUCTION.format(memory=existing_memory_content)
new_memory = model.invoke([SystemMessage(content=system_msg)]+state['messages'])

# 覆盖存储中的现有记忆
key = "user_memory"

# 将值写入为带有memory键的字典
store.put(namespace, key, {"memory": new_memory.content})

# 定义图
builder = StateGraph(MessagesState)
builder.add_node("call_model", call_model)
builder.add_node("write_memory", write_memory)
builder.add_edge(START, "call_model")
builder.add_edge("call_model", "write_memory")
builder.add_edge("write_memory", END)

# 用于跨线程的长期记忆存储
across_thread_memory = InMemoryStore()

# 用于线程内的短期记忆检查点
within_thread_memory = MemorySaver()

# 使用检查点器和存储编译图
graph = builder.compile(checkpointer=within_thread_memory, store=across_thread_memory)

# 显示图
display(Image(graph.get_graph(xray=1).draw_mermaid_png()))
image-20250725161106319

聊天历史将通过检查点工具保存到短期记忆中。聊天机器人将回顾聊天历史。然后,它将创建并保存一个记忆到 LangGraph Store。此内存可在未来的聊天会话中访问,以个性化聊天机器人的响应。

当我们与聊天机器人交互时,我们提供两样东西:

  1. Short-term (within-thread) memory: 一个用于持久化聊天历史的 thread ID
  2. Long-term (cross-thread) memory: 一个用于将长期记忆命名空间到用户的 user ID
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 我们提供线程ID用于短期(线程内)记忆
# 我们提供用户ID用于长期(跨线程)记忆
config = {"configurable": {"thread_id": "1", "user_id": "1"}}

# 用户输入
input_messages = [HumanMessage(content="你好,我的名字是Lance")]

# 运行图
for chunk in graph.stream({"messages": input_messages}, config, stream_mode="values"):
chunk["messages"][-1].pretty_print()

================================ Human Message =================================

你好,我的名字是Lance
================================== Ai Message ==================================

你好,Lance!很高兴认识你。有什么我可以帮你的吗?😊

我们正在使用 MemorySaver 检查点来管理线程内内存。这会将聊天历史保存到线程中。我们可以查看保存到线程的聊天历史记录。

1
2
3
4
thread = {"configurable": {"thread_id": "1"}}
state = graph.get_state(thread).values
for m in state["messages"]:
m.pretty_print()

回想一下,我们使用存储库编译了该图:across_thread_memory = InMemoryStore()并且,我们向图中添加了一个节点 (write_memory),该节点反映了聊天历史并保存了一段记忆到存储中。

我们可以查看内存是否已保存到存储中。

1
2
3
4
5
6
7
8
9
10
11
# 为要保存的记忆设置命名空间
user_id = "1"
namespace = ("memory", user_id)
existing_memory = across_thread_memory.get(namespace, "user_memory")
existing_memory.dict()

{'namespace': ['memory', '1'],
'key': 'user_memory',
'value': {'memory': '- 姓名:Lance \n- 位置:旧金山 \n- 兴趣和爱好:骑自行车 \n- 喜欢的活动:在旧金山骑行,可能包括金门大桥和滨海区路线'},
'created_at': '2025-07-25T08:12:35.877160+00:00',
'updated_at': '2025-07-25T08:12:35.877161+00:00'}

现在,让我们以 相同的用户ID启动一个新线程。我们应该看到聊天机器人记住了用户的个人资料,并将其用于个性化响应。

1
2
3
4
5
6
7
8
9
# 我们提供用户ID用于跨线程记忆以及一个新的线程ID
config = {"configurable": {"thread_id": "2", "user_id": "1"}}

# 用户输入
input_messages = [HumanMessage(content="你好!你推荐我去哪里骑自行车?")]

# 运行图
for chunk in graph.stream({"messages": input_messages}, config, stream_mode="values"):
chunk["messages"][-1].pretty_print()

Chatbot with Profile Schema 带有配置文件模式的聊天机器人

我们的聊天机器人将记忆保存为字符串。在实践中,我们通常希望记忆具有结构化格式。例如,记忆可以是单个、持续更新的模式。在我们的案例中,我们希望这是一个单一的用户档案。

我们将扩展我们的聊天机器人,将语义记忆保存到单个用户档案中。我们还将介绍一个库 Trustcall,用于使用新信息更新此模式。

Defining a user profile schema 定义用户配置文件模式

Python 有许多不同类型的 structured data,例如 TypedDict、字典、JSON 和 Pydantic

让我们先使用 TypedDict 来定义一个用户资料模式。

1
2
3
4
5
6
from typing import TypedDict, List

class UserProfile(TypedDict):
"""带有类型字段的用户档案模式"""
user_name: str # 用户的首选名称
interests: List[str] # 用户兴趣列表
Saving a schema to the store 将模式保存到存储中

LangGraph Store 接受任何 Python 字典作为 value

1
2
3
4
5
6
# TypedDict 实例
user_profile: UserProfile = {
"user_name": "Lance", # 用户名
"interests": ["骑行", "科技", "咖啡"] # 兴趣爱好
}
user_profile

我们使用 put 方法将 TypedDict 保存到存储中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import uuid
from langgraph.store.memory import InMemoryStore

# 初始化内存存储
in_memory_store = InMemoryStore()

# 为要保存的记忆创建命名空间
user_id = "1"
namespace_for_memory = (user_id, "memory")

# 将记忆以键值对的形式保存到命名空间中
key = "user_profile"
value = user_profile
in_memory_store.put(namespace_for_memory, key, value)

我们使用 search 按命名空间从存储中检索对象。

1
2
3
4
for m in in_memory_store.search(namespace_for_memory):
print(m.dict())

{'namespace': ['1', 'memory'], 'key': 'user_profile', 'value': {'user_name': 'Lance', 'interests': ['骑行', '科技', '咖啡']}, 'created_at': '2025-07-25T08:58:06.031770+00:00', 'updated_at': '2025-07-25T08:58:06.031775+00:00', 'score': None}

我们还可以使用 get 通过命名空间和键来检索特定对象。

1
2
3
4
profile = in_memory_store.get(namespace_for_memory, "user_profile")
profile.value

{'user_name': 'Lance', 'interests': ['骑行', '科技', '咖啡']}
Chatbot with profile schema 带有配置文件模式的聊天机器人

现在我们知道了如何为记忆指定一个模式,并将其保存到存储中。现在,我们如何根据这个特定的模式 创建 记忆?

在我们的聊天机器人中,我们 想要从一个聊天里创建记忆.这就是 structured outputs格式化输出 概念有用的地方。

LangChain 的 chat model 接口有一个 PROTECTED11 方法用于强制结构化输出。这在我们需要确保输出符合某个模式时非常有用,而且它会为我们解析输出。

1
2
3
4
5
6
# 将模式绑定到模型
model_with_structure = model.with_structured_output(UserProfile)

# 调用模型生成符合模式的结构化输出
structured_output = model_with_structure.invoke([HumanMessage("我的名字是Lance,我喜欢骑自行车。")])
structured_output

Structured outputs(结构化输出) 指让大模型不再“随意说人话”,而是按你事先定义好的格式(JSON / 表格 / 枚举 / 嵌套对象等)精确返回数据。相当于给模型套了一个“模具”,保证输出可直接被代码解析、入库或传给下游系统,避免再用正则、字符串拼接去“猜”结果。

使用官方的大模型组件

1
2
3
4
5
from langchain_community.chat_models import ChatTongyi
model=ChatTongyi(
model="qwen-plus-2025-07-14",
api_key="sk-"
)

现在,让我们在聊天机器人中使用它。这只需要对 write_memory 函数进行轻微修改。我们使用 model_with_structure,如上所述,来生成一个与我们模式匹配的配置文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
from IPython.display import Image, display

from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.store.base import BaseStore

from langchain_core.messages import HumanMessage, SystemMessage, AIMessage
from langchain_core.runnables.config import RunnableConfig

# 聊天机器人指令
MODEL_SYSTEM_MESSAGE = """你是一个拥有记忆功能的 helpful 助手,可以提供关于用户的信息。
如果你有这个用户的记忆,请使用它来个性化你的回复。
以下是记忆内容(可能为空):{memory}"""

# 根据聊天历史和任何现有记忆创建新记忆
CREATE_MEMORY_INSTRUCTION = """根据用户的聊天历史创建或更新用户档案记忆。
这将被保存为长期记忆。如果存在现有记忆,只需更新它。
以下是现有记忆(可能为空):{memory}"""

def call_model(state: MessagesState, config: RunnableConfig, store: BaseStore):

"""从存储中加载记忆并使用它来个性化聊天机器人的回复。"""

# 从配置中获取用户ID
user_id = config["configurable"]["user_id"]

# 从存储中检索记忆
namespace = ("memory", user_id)
existing_memory = store.get(namespace, "user_memory")

# 为系统提示格式化记忆
if existing_memory and existing_memory.value:
memory_dict = existing_memory.value
formatted_memory = (
f"姓名: {memory_dict.get('user_name', '未知')}\n"
f"兴趣: {', '.join(memory_dict.get('interests', []))}"
)
else:
formatted_memory = None

# 在系统提示中格式化记忆
system_msg = MODEL_SYSTEM_MESSAGE.format(memory=formatted_memory)

# 使用记忆以及聊天历史来回复
response = model.invoke([SystemMessage(content=system_msg)]+state["messages"])

return {"messages": response}

def write_memory(state: MessagesState, config: RunnableConfig, store: BaseStore):

"""反思聊天历史并将记忆保存到存储中。"""

# 从配置中获取用户ID
user_id = config["configurable"]["user_id"]

# 从存储中检索现有记忆
namespace = ("memory", user_id)
existing_memory = store.get(namespace, "user_memory")

# 为系统提示格式化记忆
if existing_memory and existing_memory.value:
memory_dict = existing_memory.value
formatted_memory = (
f"姓名: {memory_dict.get('user_name', '未知')}\n"
f"兴趣: {', '.join(memory_dict.get('interests', []))}"
)
else:
formatted_memory = None

# 在指令中格式化现有记忆
system_msg = CREATE_MEMORY_INSTRUCTION.format(memory=formatted_memory)

# 调用模型生成符合模式的结构化输出
new_memory = model_with_structure.invoke([SystemMessage(content=system_msg)]+state['messages'])

# 覆盖现有的用户档案记忆
key = "user_memory"
store.put(namespace, key, new_memory)

# 定义图
builder = StateGraph(MessagesState)
builder.add_node("call_model", call_model)
builder.add_node("write_memory", write_memory)
builder.add_edge(START, "call_model")
builder.add_edge("call_model", "write_memory")
builder.add_edge("write_memory", END)

# 用于长期(跨线程)记忆的存储
across_thread_memory = InMemoryStore()

# 用于短期(线程内)记忆的检查点
within_thread_memory = MemorySaver()

# 使用检查点和存储编译图
graph = builder.compile(checkpointer=within_thread_memory, store=across_thread_memory)

# 显示
display(Image(graph.get_graph(xray=1).draw_mermaid_png()))
image-20250726103516996

调用

1
2
3
4
5
6
7
8
9
10
# 我们提供线程ID用于短期(线程内)记忆
# 我们提供用户ID用于长期(跨线程)记忆
config = {"configurable": {"thread_id": "1", "user_id": "1"}}

# 用户输入
input_messages = [HumanMessage(content="你好,我的名字是Lance,我喜欢在旧金山骑自行车和在面包店吃饭。")]

# 运行图
for chunk in graph.stream({"messages": input_messages}, config, stream_mode="values"):
chunk["messages"][-1].pretty_print()

查看记忆

1
2
3
4
5
# Namespace for the memory to save
user_id = "1"
namespace = ("memory", user_id)
existing_memory = across_thread_memory.get(namespace, "user_memory")
existing_memory.value
Trustcall for creating and updating profile schemas Trustcall 用于创建和更新配置文件模式

Trustcall 是一个由 LangChain 团队开发的开源 Python 库,旨在解决大型语言模型(LLM)在生成或修改复杂 JSON 数据结构时效率低、易出错的问题

我们使用 create_extractor,传入模型以及我们的模式作为 tool。使用 TrustCall 时,可以以多种方式提供模式。

例如,我们可以传递一个 JSON 对象 / Python 字典或 Pydantic 模型。在底层,TrustCall 使用 tool calling 从输入的 messages 列表中生成 structured output。为了强制 Trustcall 生成 structured output,我们可以在 tool_choice 参数中包含模式名称。

我仅做了解了,感觉用处不多,用结果化输出就能达到效果

Chatbot with Collection Schema 带集合模式的聊天机器人

“collection” 指的是一种将语义记忆组织为多个独立文档(objects)的存储方式,而不是把所有信息都塞进一个巨大的“用户档案”(single profile)里。

假设你在做一个 AI 助手,用户说:

“我下周要去东京出差,住在涩谷区的 Sakura Hotel。” “我朋友 Ken 也要来,他喜欢吃拉面。”

如果用 collection,你会存成两条记忆:

1
2
3
4
5
6
7
8
9
10
11
12
13
[
{
"type": "trip",
"destination": "Tokyo",
"date": "2025-08-04",
"hotel": "Sakura Hotel, Shibuya"
},
{
"type": "friend",
"name": "Ken",
"food_preference": "ramen"
}
]

每条记忆都是一个独立文档,后续你可以:

  • 搜索“trip”类型的记忆,找到用户的出差安排;
  • 搜索“friend”类型的记忆,找到 Ken 的偏好;
  • 更新 Ken 的信息时,只改一条记录,不会影响其它记忆。

Memory Agent 内存代理

现在,我们将把学到的内容整合起来,构建一个具有长期记忆的 agent

我们的代理 task_mAIstro 将帮助我们管理待办事项列表!

我们之前构建的聊天机器人 始终 会反思对话并保存记忆。task_mAIstro 将决定 何时 保存记忆(待办事项列表中的项目)。

我们之前构建的聊天机器人始终保存一种类型的记忆,即个人资料或集合。task_mAIstro 可以决定将数据保存到用户个人资料或 ToDo 事项集合中。

除此之外,task_mAIstro 还将管理程序性记忆。这允许用户更新创建ToDo项的偏好设置。

Creating an agent 创建一个代理

有许多不同的 agent 架构可供选择。在这里,我们将实现一个简单的内容,一个 ReAct 代理。这个代理将成为创建和管理待办事项列表的得力助手。

此代理可以决定更新三种类型的长期记忆:

  1. 创建或更新具有普通用户信息的用户 profile

  2. 在ToDo列表中添加或更新项目 collection

  3. 更新其自身的 instructions 以了解如何更新待办事项列表中的项目

1
2
3
4
5
6
from typing import TypedDict, Literal

# 更新记忆工具
class UpdateMemory(TypedDict):
""" 决定更新哪种记忆类型 """
update_type: Literal['user', 'todo', 'instructions']
  • 'user':用户相关信息
  • 'todo':待办事项
  • 'instructions':指令信息
Graph definition 图定义

我们添加了一个简单的路由器 route_message,它通过二元决策来节省内存。

module-6