什么是uv

uv 是由 Astral 团队开发的一个超高速 Python 包管理器,用 Rust 编写,目标是替代 pipvenvpip-toolspoetry 等多个工具。

uv常用命令

uv init myproj 创建新项目

source .venv/bin/activate(Linux/macOS)激活虚拟环境

uv add requests 安装依赖并写入 pyproject.toml

uv remove requests 移除依赖

uv sync 同步依赖到虚拟环境

uv export 导出 lock 文件为 requirements.txt 等格式

uv build 构建源码包和 wheel

uv publish 发布到 PyPI

uvx是什么

uvx 是:

uv tool run快捷别名(alias),用于无需安装即可运行 Python 包提供的命令行工具

uvx 就像 Python 世界的 npxpipx run —— 临时拉取、构建隔离环境、运行工具,用完即走,不留痕迹

uv管理命令行工具

使用uv tool

  • 用途:安装、管理、运行全局可用的 Python 命令行工具
  • 安装位置:默认安装到 ~/.local/bin(Windows: C:\Users\<USER>\.local\bin)。
1
uv tool install pytest

安装后可以直接使用pytest而不用uv run pytest

uv sync和uv pip install -e .的区别

uv pip install -e .

  • 作用:将当前项目以可编辑模式安装到当前 Python 环境。
  • 行为
    • 读取 pyproject.toml 中的 [project] 元数据。
    • 构建并安装你的主包(如 my_package),使其可被 import
    • 不会自动安装依赖(除非你显式加上 --deps,但通常不这么做)。
  • 典型用途:开发自己的包时,让本地代码可导入。

uv sync

  • 作用根据锁定文件(如 uv.lock)精确同步整个项目的依赖环境
  • 行为
    • 读取 uv.lock(由 uv lock 生成)或 pyproject.toml
    • 安装所有依赖项(包括直接依赖和传递依赖)到当前环境。
    • 默认也会以可编辑模式安装当前项目(如果 pyproject.toml 中定义了项目)。
    • 确保环境状态与锁定文件完全一致(版本、哈希、来源等)。
  • 前提:通常需要先运行 uv lock 生成 uv.lock

参考资料

【uv】Python迄今最好的项目管理+环境管理工具(吧?)_哔哩哔哩_bilibili

从pip到uv:一口气梳理现代Python项目管理全流程!_哔哩哔哩_bilibili

环境配置

1
pip install langchain-mcp-adapters

使用langgraph调用mcp

要点主要是利用MultiServerMCPClient构建服务,获取tool

利用预设的create_react_agent构建ReAct架构的智能体并调用工具

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import asyncio # 需要导入 asyncio 来运行异步函数
# 从langchain_mcp_adapters.client模块导入MultiServerMCPClient类
# 从langgraph.prebuilt模块导入create_react_agent函数
from langchain_mcp_adapters.client import MultiServerMCPClient
from langgraph.prebuilt import create_react_agent

# 导入 LLM 相关库
from langchain_openai import ChatOpenAI

# 将主要逻辑封装在一个异步函数中
async def main():
# 创建MultiServerMCPClient实例,配置两个不同的服务
client = MultiServerMCPClient(
{
"math": { # 数学计算服务
"command": "python", # 使用python命令启动
# 替换为你的math_server.py文件的绝对路径
"args": ["/workspace/langgraph-mcp/math_server.py"],
"transport": "stdio", # 使用标准输入输出传输
},
"weather": { # 天气服务
# 确保你的天气服务器在8000端口运行
# *** 确保这个 URL 是正确的,并且服务器正在运行 ***
"url": "http://localhost:8000/mcp",
"transport": "streamable_http", # 使用可流式HTTP传输
}
}
)

tools = []
try:
# 在异步函数内部正确使用 await
tools = await client.get_tools()
print(f"成功获取到 {len(tools)} 个MCP工具。")
for tool_item in tools:
print(f" - {tool_item.name}: {tool_item.description}")
except Exception as e:
print(f"获取MCP工具失败: {e}")
print("请确保MCP服务URL有效且可访问,或者您已正确配置了认证信息。")
# 在函数内部,如果出错可以选择返回或继续处理
# return # 这里可以 return,但会结束 main 函数

if not tools:
print("没有获取到工具,无法创建代理。")
return

# 创建ReAct代理
llm = ChatOpenAI(
model="qwen3-235b-a22b-thinking-2507",
api_key="sk-a8ef27c47ea84224ac6eed6d4bba1bab",
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1" # 修正了末尾多余的空格
)
agent = create_react_agent(llm, tools)


# 异步调用代理来解决数学问题
# 确保在异步函数内部使用 await
math_response = await agent.ainvoke(
{"messages": [{"role": "user", "content": "what's (3 + 5) x 12?"}]}
)
print("\n--- 数学问题回答 ---")
print(math_response["messages"][-1].content) # 打印最后一条消息(LLM的回答)

# 异步调用代理来查询天气
weather_response = await agent.ainvoke(
{"messages": [{"role": "user", "content": "what is the weather in nyc?"}]}
)
print("\n--- 天气问题回答 ---")
print(weather_response["messages"][-1].content) # 打印最后一条消息(LLM的回答)


# --- 这是脚本的入口点 ---
# 使用 asyncio.run() 来运行你的主异步函数
if __name__ == "__main__":
asyncio.run(main())

参考资料

使用 MCP - LangChain 框架

框架流程

image-20250801164905578

✅ 三个角色(系统组件)

角色 作用
Client 前端或用户界面,发起请求
Auth Provider 认证服务(如 OAuth、JWT 提供者),负责登录和签发 token
LangGraph Backend 应用的后端服务,处理业务逻辑
Secret Store 存放用户敏感信息(如 token、密钥等)
MCP Server 后端工具服务,提供具体的工具或资源接口

✅ 流程详解(12步)

🔐 阶段一:用户登录 & 获取 Token(1~6)

  1. 用户登录
    Client 提交用户名和密码给 Auth Provider。

  2. 返回 Token
    Auth Provider 验证成功后,返回一个访问令牌(token)。

  3. 携带 Token 请求
    Client 将 token 附加在请求头中,发给 LangGraph Backend。

  4. 验证 Token
    LangGraph Backend 使用 @auth.authenticate 中间件验证 token 是否有效。

  5. 获取用户信息
    验证通过后,LangGraph Backend 从 Auth Provider 拉取用户详细信息。

  6. 确认有效性
    后端确认用户信息无误,流程继续。

🔑 阶段二:获取用户权限 Token(6a~6b)

6a. 拉取用户权限 Token
LangGraph Backend 从 Secret Store 获取该用户对应的权限 token(可能是 MCP 所需的访问凭证)。

6b. 返回权限 Token
Secret Store 返回该 token。

🛠️ 阶段三:调用工具 & 返回结果(7~12)

  1. 权限控制检查
    LangGraph Backend 使用 @auth.on.* 权限控制逻辑,确认用户是否有权调用该工具。

  2. 构建 MCP Client
    后端用用户的权限 token 构建一个 MCP 客户端。

  3. 调用 MCP 工具
    MCP Client 发起请求,调用某个具体工具,携带 token(通常放在请求头中)。

  4. MCP 验证并执行
    MCP Server 验证 token 是否有效,确认无误后执行工具逻辑。

  5. 工具返回结果
    MCP Server 返回工具执行结果或资源数据。

  6. 返回给前端
    LangGraph Backend 将结果返回给 Client,完成整个链路。

配置环境

1
2
3
4
5
6
7
8
9
10
11
12
13
# Create a new directory for our project
uv init weather
cd weather

# Create virtual environment and activate it
uv venv
source .venv/bin/activate

# Install dependencies
uv add "mcp[cli]" httpx

# Create our server file
touch weather.py

mcp studio样例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from mcp.server.fastmcp import FastMCP

mcp = FastMCP("Math")

@mcp.tool()
def add(a: int, b: int) -> int:
"""Add two numbers"""
return a + b

@mcp.tool()
def multiply(a: int, b: int) -> int:
"""Multiply two numbers"""
return a * b

if __name__ == "__main__":
mcp.run(transport="stdio")

mcp streamable-http 样例

1
2
3
4
5
6
7
8
9
10
11
from mcp.server.fastmcp import FastMCP

mcp = FastMCP("Weather")

@mcp.tool()
async def get_weather(location: str) -> str:
"""Get weather for location."""
return "It's always sunny in New York"

if __name__ == "__main__":
mcp.run(transport="streamable-http")

使用

mcp市场MCP 广场 · 魔搭社区

使用 uv(推荐)

当使用 uv 时不需要特定的安装步骤。我们将使用 uvx 直接运行 mcp-server-fetch

1
2
3
4
5
6
"mcpServers": {
"fetch": {
"command": "uvx",
"args": ["mcp-server-fetch"]
}
}

使用 PIP

或者,您可以通过 pip 安装 mcp-server-fetch

1
pip install mcp-server-fetch
1
2
3
4
5
6
"mcpServers": {
"fetch": {
"command": "python",
"args": ["-m", "mcp_server_fetch"]
}
}

远程托管

image-20250802120145192
1
2
3
4
5
6
7
8
{
"mcpServers": {
"fetch": {
"type": "sse",
"url": "https://mcp.api-inference.modelscope.net/991cf46/sse"
}
}
}

参考资料

构建 MCP 服务器 - 模型上下文协议 — Build an MCP Server - Model Context Protocol

配置环境

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 创建项目目录
uv init mcp-client
cd mcp-client

# 创建虚拟环境
uv venv

# 激活虚拟环境
# 在 Windows 上:
.venv\Scripts\activate
# 在 Unix 或 MacOS 上:
source .venv/bin/activate

# 安装所需包
uv add mcp anthropic python-dotenv
#使用镜像源安装
uv add mcp anthropic python-dotenv --index-url https://pypi.tuna.tsinghua.edu.cn/simple/

# 删除样板文件
# 在 Windows 上:
del main.py
# 在 Unix 或 MacOS 上:
rm main.py

# 创建我们的主文件
touch client.py

设置 API 密钥

创建一个 .env 文件来存储它:

1
2
# Create .env file
touch .env

将您的密钥添加到 .env 文件:

1
ANTHROPIC_API_KEY=<your key here>

.env 添加到您的 .gitignore

1
echo ".env" >> .gitignore

.env 文件名添加到 .gitignore 文件中,这样 Git 就会忽略 .env 文件,不会将其纳入版本控制。

创建客户端

基本客户端结构

首先,让我们设置我们的导入并创建基本的客户端类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import asyncio
from typing import Optional
from contextlib import AsyncExitStack

from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client

from anthropic import Anthropic
from dotenv import load_dotenv

load_dotenv() # 从 .env 文件加载环境变量

class MCPClient:
def __init__(self):
# 初始化会话和客户端对象
self.session: Optional[ClientSession] = None # MCP客户端会话
self.exit_stack = AsyncExitStack() # 异步上下文管理器堆栈,用于资源清理
self.anthropic = Anthropic() # Anthropic AI 客户端

# 后续方法将在这里定义

服务器连接管理

接下来,我们将实现连接到 MCP 服务器的功能:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
async def connect_to_server(self, server_script_path: str):
"""连接到MCP服务器

Args:
server_script_path: 服务器脚本路径 (.py 或 .js 文件)
"""
# 检查是否为Python文件
is_python = server_script_path.endswith('.py')
# 检查是否为JavaScript文件
is_js = server_script_path.endswith('.js')

# 如果不是Python或JavaScript文件,则抛出错误
if not (is_python or is_js):
raise ValueError("服务器脚本必须是 .py 或 .js 文件")

# 根据文件类型确定执行命令
command = "python" if is_python else "node"

# 创建服务器参数对象
server_params = StdioServerParameters(
command=command, # 执行命令
args=[server_script_path], # 脚本路径作为参数
env=None # 环境变量(使用默认环境)
)

# 建立stdio客户端连接并将其添加到异步上下文管理器中
stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
self.stdio, self.write = stdio_transport

# 创建客户端会话并将其添加到异步上下文管理器中
self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))

# 初始化会话
await self.session.initialize()

# 列出可用的工具
response = await self.session.list_tools()
tools = response.tools

# 打印连接的服务器提供的工具列表
print("\n已连接到服务器,可用工具:", [tool.name for tool in tools])

查询处理逻辑

现在让我们添加处理查询和调用工具的核心功能:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
async def process_query(self, query: str) -> str:
"""使用Claude和可用工具处理查询"""
# 构建消息列表
messages = [
{
"role": "user", # 用户角色
"content": query # 用户查询内容
}
]

# 获取可用工具列表
response = await self.session.list_tools()
available_tools = [{
"name": tool.name, # 工具名称
"description": tool.description, # 工具描述
"input_schema": tool.inputSchema # 工具输入模式
} for tool in response.tools]

# 初始Claude API调用
response = self.anthropic.messages.create(
model="qwen3-235b-a22b", # 使用的模型
max_tokens=1000, # 最大返回令牌数
messages=messages, # 消息历史
tools=available_tools # 可用工具
)

# 处理响应并处理工具调用
final_text = [] # 存储最终文本结果

assistant_message_content = [] # 存储助手消息内容
for content in response.content: # 遍历响应内容
if content.type == 'text': # 如果是文本内容
final_text.append(content.text) # 添加到最终结果
assistant_message_content.append(content) # 添加到助手消息
elif content.type == 'tool_use': # 如果是工具调用
tool_name = content.name # 工具名称
tool_args = content.input # 工具参数

# 执行工具调用
result = await self.session.call_tool(tool_name, tool_args)
final_text.append(f"[调用工具 {tool_name},参数 {tool_args}]")

assistant_message_content.append(content)
# 添加助手消息到历史
messages.append({
"role": "assistant",
"content": assistant_message_content
})
# 添加工具执行结果到历史
messages.append({
"role": "user",
"content": [
{
"type": "tool_result", # 工具结果类型
"tool_use_id": content.id, # 工具使用ID
"content": result.content # 工具执行结果
}
]
})

# 获取Claude的下一个响应
response = self.anthropic.messages.create(
model="qwen3-235b-a22b",
max_tokens=1000,
messages=messages,
tools=available_tools
)

# 添加响应文本到最终结果
final_text.append(response.content[0].text)

# 返回连接后的最终文本结果
return "\n".join(final_text)

交互式聊天界面

现在我们将添加聊天循环和清理功能:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
async def chat_loop(self):
"""运行交互式聊天循环"""
print("\nMCP客户端已启动!")
print("输入您的问题或输入'quit'退出。")

while True: # 无限循环,持续接收用户输入
try:
query = input("\n问题: ").strip() # 获取用户输入并去除首尾空格

if query.lower() == 'quit': # 如果用户输入'quit'(不区分大小写)
break # 退出循环

# 处理用户查询并获取响应
response = await self.process_query(query)
print("\n" + response) # 打印AI响应结果

except Exception as e: # 捕获所有异常
print(f"\n错误: {str(e)}") # 打印错误信息

async def cleanup(self):
"""清理资源"""
await self.exit_stack.aclose() # 异步关闭所有在exit_stack中管理的资源

主入口点

最后,我们将添加主要的执行逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
async def main():
# 检查命令行参数数量,如果少于2个则显示使用说明
if len(sys.argv) < 2:
print("用法: python client.py <服务器脚本路径>")
sys.exit(1) # 退出程序,返回错误码1

# 创建MCP客户端实例
client = MCPClient()
try:
# 连接到服务器,sys.argv[1]是第一个命令行参数(服务器脚本路径)
await client.connect_to_server(sys.argv[1])
# 启动交互式聊天循环
await client.chat_loop()
finally:
# 确保程序结束时清理资源
await client.cleanup()

# 程序入口点
if __name__ == "__main__":
import sys # 导入sys模块用于处理命令行参数
# 运行异步主函数
asyncio.run(main())
QQ20250801-164738

运行客户端

要使您的客户端与任何 MCP 服务器运行:

1
2
uv run client.py path/to/server.py # python server
uv run client.py path/to/build/index.js # node server

客户端将:

  1. 连接到指定服务器
  2. 列出可用工具
  3. 开始一个交互式聊天会话,您可以在其中:
    • 输入查询
    • 查看工具执行情况
    • 从 Claude 获取响应

运作流程

当你提交查询时:

  1. 客户端从服务器获取可用工具列表
  2. 你的查询连同工具描述一起发送给 Claude
  3. Claude 决定使用哪些工具(如果有的话)
  4. 客户端通过服务器执行任何请求的工具调用
  5. 结果会发送回 Claude
  6. Claude 提供自然语言响应
  7. 响应显示给您

参考资料

Build an MCP Client - Model Context Protocol

适配openai版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
import asyncio
import json
import sys
from typing import Optional
from contextlib import AsyncExitStack

from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client

from openai import OpenAI
from dotenv import load_dotenv
import os

load_dotenv() # 从 .env 文件加载环境变量

class MCPClient:
def __init__(self):
# 初始化会话和客户端对象
self.session: Optional[ClientSession] = None # MCP客户端会话
self.exit_stack = AsyncExitStack() # 异步上下文管理器堆栈,用于资源清理
self.anthropic = OpenAI(
api_key=os.getenv("DASHSCOPE_API_KEY"),
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"
) # 使用OpenAI兼容模式连接通义千问

async def connect_to_server(self, server_script_path: str):
"""连接到MCP服务器

Args:
server_script_path (str): 服务器脚本路径,支持.py或.js文件

Raises:
ValueError: 当脚本文件不是.py或.js格式时抛出
"""
# 检查是否为Python文件
is_python = server_script_path.endswith('.py')
# 检查是否为JavaScript文件
is_js = server_script_path.endswith('.js')

# 如果不是Python或JavaScript文件,则抛出错误
if not (is_python or is_js):
raise ValueError("服务器脚本必须是 .py 或 .js 文件")

# 根据文件类型确定执行命令
command = "python" if is_python else "node"

# 创建服务器参数对象
server_params = StdioServerParameters(
command=command, # 执行命令
args=[server_script_path], # 脚本路径作为参数
env=None # 环境变量(使用默认环境)
)

# 建立stdio客户端连接并将其添加到异步上下文管理器中
stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
self.stdio, self.write = stdio_transport

# 创建客户端会话并将其添加到异步上下文管理器中
self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))

# 初始化会话
await self.session.initialize()

# 列出可用的工具
response = await self.session.list_tools()
tools = response.tools

# 打印连接的服务器提供的工具列表
print("\n已连接到服务器,可用工具:", [tool.name for tool in tools])

async def process_query(self, query: str) -> str:
"""使用Qwen和可用工具处理查询"""
# 构建消息列表
messages = [
{
"role": "user",
"content": query
}
]

# 获取可用工具列表并转换为OpenAI格式
response = await self.session.list_tools()
available_tools = []
for tool in response.tools:
schema = tool.inputSchema
if isinstance(schema, str):
schema = json.loads(schema)
if isinstance(schema, dict) and "properties" in schema:
schema = {"type": "object", **schema}

available_tools.append({
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"parameters": schema
}
})

# 第一次调用模型
response = self.anthropic.chat.completions.create(
model="qwen3-235b-a22b",
max_tokens=1000,
messages=messages,
tools=available_tools,
extra_body={"enable_thinking": False}
)

final_text = []
message = response.choices[0].message

# 处理文本内容
if message.content:
final_text.append(message.content)

# 处理工具调用
if message.tool_calls:
for tool_call in message.tool_calls:
tool_name = tool_call.function.name
tool_args = json.loads(tool_call.function.arguments)

# 执行工具调用
result = await self.session.call_tool(tool_name, tool_args)
final_text.append(f"[调用工具 {tool_name},参数 {tool_args}]")

# 处理工具结果
tool_result_content = ""
if result.content:
for item in result.content:
if hasattr(item, 'type') and item.type == 'text':
tool_result_content += item.text
else:
tool_result_content += str(item)

# 添加助手消息到历史
messages.append({
"role": "assistant",
"content": None,
"tool_calls": [tool_call]
})

# 添加工具执行结果到历史
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": tool_result_content
})

# 再次调用模型
response = self.anthropic.chat.completions.create(
model="qwen3-235b-a22b",
max_tokens=1000,
messages=messages,
tools=available_tools,
extra_body={"enable_thinking": False}
)

# 处理最终响应
if response.choices and response.choices[0].message.content:
final_text.append(response.choices[0].message.content)

return "\n".join(final_text)

async def chat_loop(self):
"""运行交互式聊天循环"""
print("\nMCP客户端已启动!")
print("输入您的问题或输入'quit'退出。")

while True: # 无限循环,持续接收用户输入
try:
query = input("\n问题: ").strip() # 获取用户输入并去除首尾空格

if query.lower() == 'quit': # 如果用户输入'quit'(不区分大小写)
break # 退出循环

# 处理用户查询并获取响应
response = await self.process_query(query)
print("\n" + response) # 打印AI响应结果

except Exception as e: # 捕获所有异常
print(f"\n错误: {str(e)}") # 打印错误信息
import traceback
traceback.print_exc() # 打印详细错误信息

async def cleanup(self):
"""清理资源"""
await self.exit_stack.aclose() # 异步关闭所有在exit_stack中管理的资源

async def main():
# 检查命令行参数数量,如果少于2个则显示使用说明
if len(sys.argv) < 2:
print("用法: python client.py <服务器脚本路径>")
sys.exit(1) # 退出程序,返回错误码1

# 创建MCP客户端实例
client = MCPClient()
try:
# 连接到服务器,sys.argv[1]是第一个命令行参数(服务器脚本路径)
await client.connect_to_server(sys.argv[1])
# 启动交互式聊天循环
await client.chat_loop()
finally:
# 确保程序结束时清理资源
await client.cleanup()

# 程序入口点
if __name__ == "__main__":
# 运行异步主函数
asyncio.run(main())

openai和claude在工具调用的差异

  1. 工具格式转换修复

问题:MCP工具格式与OpenAI API不兼容 修复

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 原错误格式
available_tools = [{
"name": tool.name,
"description": tool.description,
"input_schema": tool.inputSchema
}]

# 修复后格式(符合OpenAI规范)
available_tools = [{
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"parameters": schema # 正确的JSON Schema格式
}
}]

  1. API响应处理修复

问题:错误访问了OpenAI响应对象的属性 修复

1
2
3
4
5
6
7
8
9
10
# 原错误代码
for content in response.content: # ❌ response没有content属性

# 修复后代码
message = response.choices[0].message # ✅ 正确的访问路径
if message.content:
final_text.append(message.content)
if message.tool_calls:
for tool_call in message.tool_calls:
# 处理工具调用

  1. 工具调用结果处理修复

问题:错误处理MCP工具调用返回的结果结构 修复

1
2
3
4
5
6
7
8
9
10
11
# 原错误代码
"content": result.content # ❌ 可能包含复杂对象

# 修复后代码
tool_result_content = ""
if result.content:
for item in result.content:
if hasattr(item, 'type') and item.type == 'text':
tool_result_content += item.text
else:
tool_result_content += str(item)
  1. 消息历史构建修复

问题:工具调用后消息历史格式不正确 修复

1
2
3
4
5
6
7
8
9
10
11
# 正确的消息历史格式
messages.append({
"role": "assistant",
"content": None,
"tool_calls": [tool_call]
})
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": tool_result_content
})

  1. JSON Schema兼容性处理

问题:MCP返回的schema可能缺少必要的根类型定义 修复

1
2
3
4
5
schema = tool.inputSchema
if isinstance(schema, str):
schema = json.loads(schema)
if isinstance(schema, dict) and "properties" in schema:
schema = {"type": "object", **schema} # 确保有根类型

Chroma是什么

Chroma(通常指 ChromaDB)是一款 开源、AI 原生的向量数据库,专为存储和检索 高维嵌入向量 而设计,目标是让开发者 5 分钟内在本地跑起一个语义搜索或 RAG 系统

极简安装pip install chromadb

双运行模式

  • 内存模式:调试/原型随意重启;
  • 持久化模式:指定 persist_directory 即可落盘,生产也不怕丢数据。

HNSW 索引:百万级向量也能 毫秒级 响应。

使用chroma存储

文档分块

1
2
3
4
5
6
7
from langchain.document_loaders import PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter

loader = PyPDFLoader("input/健康档案.pdf")
docs = loader.load()
#递归分块
text_splitter = RecursiveCharacterTextSplitter(chunk_siz

定义embedding模型与chroma

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

from langchain.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings
embedding = OpenAIEmbeddings(
api_key="sk-",
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
model="text-embedding-v4",
check_embedding_ctx_length = False,
dimensions=1536
)
# 使用 Chroma 向量数据库存储文档 chunks
vectorstore = Chroma.from_documents(
documents=chunks, # 要存储的文档chunks列表(已处理好的文本片段)
embedding=embedding,
persist_directory="chromaDB", # 向量数据库的持久化存储目录路径
collection_name="demo001" # 集合名称,用于区分不同的文档集合
)
vectorstore.persist()

检索

这里采取直接检索进行测试

1
2
3
4
5
results = vectorstore.similarity_search(
"张三九的基本信息是什么",
k=2,
collection_name="demo001" # 指定检索的集合
)

重新加载

1
2
3
4
5
6
7
# 重新加载已存在的 Chroma 数据库
vectorstore = Chroma(
persist_directory="./chroma_db",
embedding_function=embedding
)

retriever = vectorstore.as_retriever()

参考资料

Chroma | 🦜️🔗 LangChain — Chroma | 🦜️🔗 LangChain

什么是Plan-and-Execute模式

Plan-and-Execute架构流程:先指定计划,后交给执行agent,执行后交给replan节点,判断是否需要更新计划,若要更新计划返回返回更新后的机会,否则返回response,然后路由判断是执行agent还是response

与ReAct模式不同的是,ReAct只做一次规划,而Plan-and-Execute模式核心思想是首先制定一个多步骤计划,然后逐项执行该计划。完成特定任务后,可以重新审视计划并进行适当修改。

举个例子,用户在问一个问题后,agent产生一份任务清单,选取第一份任务开始执行,执行后的结果结合任务清单,执行replan,结合新的信息,更改任务清单的内容,让后续大模型更好地执行,并去除已经完成的任务

实际生产中,应该在planner之前再进行一次判断,如果问题过于简单,不需要进行Plan-and-Execute模式

实战

安装包

1
pip install --quiet -U langgraph langchain-community langchain-openai tavily-python

定义网络搜索工具与执行agent

在产生plan后,要有一个agent对任务清单进行执行,这里以一个ReAct的网络搜索agent为例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#定义工具
from langchain_tavily import TavilySearch
tools = [TavilySearch(max_results=3)]

from langchain_openai import ChatOpenAI

from langgraph.prebuilt import create_react_agent


llm = ChatOpenAI(
model="qwen3-235b-a22b-thinking-2507",
api_key="sk-",
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"
)
prompt = "You are a helpful assistant."
agent_executor = create_react_agent(llm, tools, prompt=prompt)

测试功能

1
agent_executor.invoke({"messages": [("user", "今天是几月几日")]})

定义执行节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from typing import Literal
from langgraph.graph import END


async def execute_step(state: PlanExecute):
"""执行计划中的步骤"""
plan = state["plan"]
# 将计划格式化为带编号的字符串
plan_str = "\n".join(f"{i + 1}. {step}" for i, step in enumerate(plan))
task = plan[0] # 获取第一个待执行的任务
task_formatted = f"""对于以下计划:
{plan_str}\n\n你被分配执行第 {1} 步, {task}。"""
# 调用代理执行器来执行任务
agent_response = await agent_executor.ainvoke(
{"messages": [("user", task_formatted)]}
)
# 返回执行结果,添加到历史步骤中
return {
"past_steps": [(task, agent_response["messages"][-1].content)],
}

create_react_agent 是 LangGraph 提供的一个预构建函数,位于 langgraph.prebuilt 模块中,用于快速创建一个基于 ReAct(Reasoning + Acting)架构的智能代理。

langgraph预设的其他常用组件如下

ToolNode

功能:把 LangChain 工具(BaseTool)封装成一个图节点,负责:

  • 接收 LLM 生成的工具调用请求

  • 真正执行工具

  • 把结果返回给图

1
2
3
from langgraph.prebuilt import ToolNode

tool_node = ToolNode(tools=[search, calculator])

tools_condition

功能:判断 LLM 是否要继续调用工具的“路由函数”。

在 ReAct 图里通常放在节点之间的 条件边:

1
2
3
4
5
6
7
8
9
from langgraph.prebuilt import tools_condition

graph.add_conditional_edges("agent", tools_condition, {

"tools": "tool_node", # 需要工具 → 去 ToolNode

"**__end__**": END # 不需要 → 结束

})

定义状态

1
2
3
4
5
6
7
8
9
10
import operator
from typing import Annotated, List, Tuple
from typing_extensions import TypedDict


class PlanExecute(TypedDict):
input: str
plan: List[str]
past_steps: Annotated[List[Tuple], operator.add]
response: str
1
2
3
4
5
6
7
8
9
from pydantic import BaseModel, Field


class Plan(BaseModel):
"""未来要遵循的计划"""

steps: List[str] = Field(
description="需要遵循的不同步骤,应该按排序顺序排列"
)

定义模型

1
2
3
4
5
from langchain_community.chat_models import ChatTongyi
model=ChatTongyi(
model="qwen-plus-2025-07-14",
api_key="sk-"
)

定义初始计划节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from langchain_core.prompts import ChatPromptTemplate

planner_prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"""对于给定的目标,制定一个简单的逐步计划。\
这个计划应该包含独立的任务,如果正确执行这些任务将得到正确的答案。不要添加任何多余的步骤。\
最后一步的结果应该是最终答案。确保每个步骤都包含所需的所有信息——不要跳过任何步骤。""",
),
("placeholder", "{messages}"),
]
)
planner = planner_prompt | model.with_structured_output(Plan)

async def plan_step(state: PlanExecute):
"""制定初始计划步骤"""
# 使用规划器为用户输入制定计划
plan = await planner.ainvoke({"messages": [("user", state["input"])]})

return {"plan": plan.steps}

定义再计划节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
from typing import Union


class Response(BaseModel):
"""对用户的响应"""

response: str


class Act(BaseModel):
"""要执行的动作"""

action: Union[Response, Plan] = Field(
description="要执行的动作。如果你想响应用户,使用 Response。"
"如果你需要进一步使用工具来获取答案,使用 Plan。"
)


replanner_prompt = ChatPromptTemplate.from_template(
"""对于给定的目标,制定一个简单的逐步计划。\
这个 计划应该包含独立的任务,如果正确执行这些任务将得到正确的答案。不要添加任何多余的步骤。\
最后一步的结果应该是最终答案。确保每个步骤都包含所需的所有信息——不要跳过任何步骤。

你的目标是:
{input}

你的原始计划是:
{plan}

你目前已经完成了以下步骤:
{past_steps}

相应地更新你的计划。如果不需要更多步骤并且可以返回给用户,则直接响应。否则,填写计划。只添加仍需要完成的步骤到计划中。不要将已经完成的步骤作为计划的一部分返回。"""
)


replanner = replanner_prompt | model.with_structured_output(Act)
1
2
3
4
5
6
7
8
9
10
async def replan_step(state: PlanExecute):
"""重新规划步骤"""
# 使用重新规划器根据当前状态更新计划
output = await replanner.ainvoke(state)
if isinstance(output.action, Response):
# 如果动作是响应,返回最终响应
return {"response": output.action.response}
else:
# 如果动作是计划,返回新的计划步骤
return {"plan": output.action.steps}

定义路由

1
2
3
4
5
6
7
8
def should_end(state: PlanExecute):
"""判断是否结束执行流程"""
if "response" in state and state["response"]:
# 如果存在响应内容,结束流程
return END
else:
# 否则继续执行代理步骤
return "agent"

编译图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from langgraph.graph import StateGraph, START
from langgraph.checkpoint.memory import MemorySaver

# 创建内存检查点保存器
memory = MemorySaver()

# 创建工作流图,使用 PlanExecute 状态类型
workflow = StateGraph(PlanExecute)

# 添加计划节点
workflow.add_node("planner", plan_step)

# 添加执行步骤节点
workflow.add_node("agent", execute_step)

# 添加重新规划节点
workflow.add_node("replan", replan_step)

# 从开始节点连接到计划节点
workflow.add_edge(START, "planner")

# 从计划节点连接到代理执行节点
workflow.add_edge("planner", "agent")

# 从代理执行节点连接到重新规划节点
workflow.add_edge("agent", "replan")

# 添加条件边 - 从重新规划节点根据条件决定下一步
workflow.add_conditional_edges(
"replan",
# 传入决定下一个调用节点的函数
should_end,
["agent", END], # 可能的下一个节点:代理节点或结束
)

# 最后,编译工作流并添加检查点功能!
# 这将其编译为 LangChain Runnable,
# 意味着你可以像使用其他任何 runnable 一样使用它
app = workflow.compile(checkpointer=memory)
image-20250729092408872

调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 配置递归限制,防止无限循环
config = {"configurable": {
"thread_id": "1", # 必需:线程ID
},"recursion_limit": 50}

# 输入问题:2024年澳大利亚网球公开赛男单冠军的家乡是哪里?
inputs = {"input": "2024年澳大利亚网球公开赛男单冠军的家乡是哪里?"}

# 异步流式执行应用
async for event in app.astream(inputs, config=config):
# 遍历每个事件
for k, v in event.items():
# 排除结束标记,打印其他所有事件内容
if k != "__end__":
print(v)

资源

计划与执行 — Plan-and-Execute

前言

常使用pgsql与redis配合langgraph的checkpoint与store进行持久化储存,完成长期记忆与短期记忆的实现

pgsql实现持久化

什么是pgsql

PostgreSQL(常简称pgsql或Postgres)是一个功能强大的开源对象-关系型数据库管理系统(ORDBMS),以其稳定性、扩展性和符合SQL标准著称。

docker拉取

docker-compose

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
version: '3.8'

services:
postgres:
image: postgres:15 # 指定具体版本
container_name: postgres_db
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: postgres
TZ: Asia/Shanghai # 设置时区
ports:
- "5432:5432"
volumes:
- pgdata:/var/lib/postgresql/data
restart: unless-stopped
healthcheck: # 健康检查
test: ["CMD", "pg_isready", "-U", "nange"]
interval: 10s
timeout: 5s
retries: 5
command: ["postgres", "-c", "max_connections=200"] # 自定义配置

volumes:
pgdata:

Docker Compose 是一个 定义和运行多容器 Docker 应用声明式工具。 它通过一个 YAML 文件(通常叫 docker-compose.yml 描述整个应用的服务、网络、存储等配置,然后用一条命令即可 启动/停止/管理 所有容器,无需手动逐个 docker run

1
2
#拉取并运行
docker-compose up -d

在pgsqsl安装依赖

因为LangGraph的PostgresStore需要使用到pgvector,因此需要在容器中按照如下步骤进行操作,直接使用Docker Desktop软件中进行操作

  • 为什么需要 pgvector
  • PostgresStore 支持将 向量嵌入(embedding) 存储在 PostgreSQL 中,并基于它们进行 语义搜索
  • 该功能依赖 pgvector 扩展提供的 vector 类型和索引机制(如 HNSW)。
1
2
3
安装依赖           
apt update #刷新本地软件包索引
apt install -y git build-essential postgresql-server-dev-15

apt install -y git build-essential postgresql-server-dev-15一次性安装 3 类依赖。

  • git —— 用来克隆 pgvector 源码。
  • build-essential —— Debian/Ubuntu 的“编译工具链”元包,包含 gcc、make 等。
  • postgresql-server-dev-15 —— 与当前 Postgres 主版本一致 的开发头文件和 pg_config
1
2
3
4
5
6
7
8
9
10
编译并安装 pgvector      
#把 pgvector 的 v0.7.0 稳定版 源码克隆到本地目录 ./pgvector。
git clone --branch v0.7.0 https://github.com/pgvector/pgvector.git
cd pgvector
#调用 Makefile 根据当前操作系统 + PostgreSQL 版本编译出二进制文件(.so 共享库)。
make
#把刚刚编好的 .so 文件和 .sql/.control 文件复制到 PostgreSQL 的扩展目录
make install
验证安装,检查扩展文件是否安装成功
ls -l /usr/share/postgresql/15/extension/vector*

pgvector/pgvector: Open-source vector similarity search for Postgres

接下来,若要在脚本中进行使用,首先在系统环境中需要安装PostgreSQL 的开发库(libpq),因为 psycopg 需要它来编译或运行

1
2
sudo apt update
sudo apt install libpq-dev postgresql-server-dev-all

psycopg(Python 操作 PostgreSQL 的库)Linux/macOS 上运行时,底层依赖于 PostgreSQL 的 C 语言开发库 libpq。 如果系统里 没有 libpq,psycopg 会出现以下两种问题:

  1. 编译安装失败(源码/旧版本) 当 pip install psycopg2 需要现场编译时,会找不到头文件 libpq-fe.h 或动态库 libpq.so,导致报错:

    1
    error: libpq-fe.h: No such file or directory
  2. 运行时崩溃 即使通过预编译的 wheel 包安装成功,运行时也可能提示:

    1
    ImportError: libpq.so.5: cannot open shared object file

最后,再安装相关依赖包

1
2
pip install langgraph-checkpoint-postgres                    
pip install psycopg psycopg-pool

psycopg官方 PostgreSQL 驱动,在 psycopg 之上再包一层“连接池”,让并发访问更快、更稳定。

连接pgsql

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from langgraph.store.postgres import PostgresStore
from langgraph.checkpoint.postgres import PostgresSaver
from psycopg_pool import ConnectionPool
# 1) 连接字符串(URI 语法)
DB_URI = "postgresql://postgres:postgres@localhost:5432/postgres?sslmode=disable"
# 协议:// 用户 : 密码 @ 主机:端口 / 数据库名 ? 额外参数

# 2) 连接级参数
connection_kwargs = {
"autocommit": True, # 每条 SQL 执行完立即提交,无需手动 commit
"prepare_threshold": 0, # 禁用服务器端 prepared statement,可减少一次往返
}

# 3) 创建池
connection_pool = ConnectionPool(
conninfo=DB_URI,
max_size=20, # 最多 20 条物理连接
kwargs=connection_kwargs,
)

# 4) 显式打开池(psycopg 3 的 ConnectionPool 默认懒启动,调 open() 会立即建 min_size 条连接)
connection_pool.open()
print("数据库连接池初始化成功")

正常情况每次 SQL 都新建一条 TCP 连接、做 SSL 握手、验证密码、分配内存,如果不复用连接,这些动作就要 每次都重新来一遍成本非常高

连接池(Connection Pool)是一种 数据库访问层资源管理组件,其核心目标是在 高并发、短事务 场景下,通过 复用已建立的数据库物理连接 来降低系统整体延迟、减少资源消耗,并防止数据库因瞬时连接风暴而崩溃。

初始化pgsql

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 初始化PostgresStore
in_postgres_store = PostgresStore(
pool,
index={
"dims": 1536,
"embed": embedding
}
)
in_postgres_store.setup()

#初始化checkpoint
# 使用传入的连接池创建 PostgresSaver
checkpointer = PostgresSaver(pool)
checkpointer.setup()

#最后编译时添加
graph_builder.compile(checkpointer=checkpointer, store=in_postgres_store)

in_postgres_store.setup() 的角色一句话就能说清:

把数据库里所有为了让向量存储正常工作的“一次性基建”全部建好——只建一次,后面再跑就不会重复执行。

具体而言,它通常干下面三件事:1.建表 / 建扩展;2.建向量索引;3.元数据初始化

fastapi实现生命周期的管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# 定义了一个异步函数lifespan,它接收一个FastAPI应用实例app作为参数。这个函数将管理应用的生命周期,包括启动和关闭时的操作
# 函数在应用启动时执行一些初始化操作,如加载上下文数据、以及初始化问题生成器
# 函数在应用关闭时执行一些清理操作
# @asynccontextmanager 装饰器用于创建一个异步上下文管理器,它允许你在 yield 之前和之后执行特定的代码块,分别表示启动和关闭时的操作
@asynccontextmanager
async def lifespan(app: FastAPI):
# 启动时执行
# 申明引用全局变量,在函数中被初始化,并在整个应用中使用
global graph, connection_pool
# 启动时执行
try:
logger.info("正在初始化模型、定义 Graph...")
# 初始化 LLM
llm, embedding = get_llm(llm_type)
# 创建数据库连接池
DB_URI = "postgresql://postgres:postgres@localhost:5432/postgres?sslmode=disable"
connection_kwargs = {
"autocommit": True,
"prepare_threshold": 0,
}
connection_pool = ConnectionPool(
conninfo=DB_URI,
max_size=20,
kwargs=connection_kwargs,
)
connection_pool.open() # 显式打开连接池
logger.info("数据库连接池初始化成功")
# 短期记忆 初始化checkpointer
checkpointer = PostgresSaver(connection_pool)
checkpointer.setup()
# 长期记忆 初始化PostgresStore
in_postgres_store = PostgresStore(
connection_pool,
index={
"dims": 1536,
"embed": embedding
}
)
in_postgres_store.setup()
# 定义 Graph
graph = create_graph(llm, checkpointer, in_postgres_store )
# 保存 Graph 可视化图
save_graph_visualization(graph)
logger.info("初始化完成")
except Exception as e:
logger.error(f"初始化过程中出错: {str(e)}")
raise

yield # 应用运行期间

# 关闭时执行
logger.info("正在关闭...")
if connection_pool:
connection_pool.close() # 关闭连接池
logger.info("数据库连接池已关闭")


# lifespan参数用于在应用程序生命周期的开始和结束时执行一些初始化或清理工作
app = FastAPI(lifespan=lifespan)

pgsql的存储结构

一、Checkpoints 系列

作用:让 LangGraph Runtime 能在 分布式/长流程 场景下 断点续跑、重放、并发控制

表名 存什么 典型字段(示意) 何时写入
checkpoints 每个「图实例」的 最新快照(state 的完整 JSONB) thread_id, checkpoint_ns, checkpoint_id, parent_checkpoint_id, state, created_at 每次节点执行成功后覆盖更新
checkpoint_blobs checkpoints 里 大字段的拆分(避免行过大) thread_id, checkpoint_ns, channel, type, blob 当 state 过大,自动拆分
checkpoint_migrations 记录 schema 版本/迁移脚本 version, name, applied_at 只在 setup() 时写一次
checkpoint_writes 写放大日志(每个节点写 state 的增量 diff) thread_id, checkpoint_id, task_id, idx, channel, type, value 每次节点完成时追加

关系:
checkpoints = 最新完整快照
checkpoint_writes = 所有增量历史(用于重放/审计)
checkpoint_blobs = checkpoints 里超大型 value 的切片

二、Store 系列

作用:给 业务代码(开发者) 提供 持久化 KV / 向量存储,与图运行状态无关。

表名 存什么 典型字段(示意) 何时写入
store 任意 KV 文档(LangChain Document → JSONB) uuid, namespace, key, value, created_at, updated_at 你调用 store.amput / amset 等 API
store_migrations 同 checkpoint_migrations,记录 store schema 版本 version, name, applied_at 只在第一次 setup()
store_vectors 向量索引表(embedding → vector 类型) uuid, collection_id, embedding, document, metadata 你调用 add_documents(..., embeddings=...)
vector_migrations 记录 pgvector 扩展及索引迁移版本 version, applied_at setup() 时若第一次装 pgvector

三、调用链脑图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
┌──────────────┐
│ LangGraph │ 运行图实例
└────┬─────────┘
│1. 写 checkpoints
│2. 写 checkpoint_writes
│3. 拆大字段到 checkpoint_blobs

┌──────────────┐
│ 业务代码 │ 读写 KV/向量
└────┬─────────┘
│1. 写 store
│2. 写 store_vectors

PostgreSQL (pgsql)

在linux安装postgresql

查看linux发行版本

1
2
# 查看发行版名称和版本
cat /etc/os-release

输出

1
2
3
4
5
6
7
8
9
10
11
12
13
PRETTY_NAME="Ubuntu 24.04.2 LTS"
NAME="Ubuntu"
VERSION_ID="24.04"
VERSION="24.04.2 LTS (Noble Numbat)"
VERSION_CODENAME=noble
ID=ubuntu
ID_LIKE=debian
HOME_URL="https://www.ubuntu.com/"
SUPPORT_URL="https://help.ubuntu.com/"
BUG_REPORT_URL="https://bugs.launchpad.net/ubuntu/"
PRIVACY_POLICY_URL="https://www.ubuntu.com/legal/terms-and-policies/privacy-policy"
UBUNTU_CODENAME=noble
LOGO=ubuntu-logo

安装postgresql

1
2
3
4
5
6
7
8
9
10
11
12
13
# 更新软件包索引
sudo apt update

# 安装 PostgreSQL 和常用扩展
sudo apt install -y postgresql postgresql-contrib

# 检查服务状态
#Linux 容器/子系统( Docker、WSL 或 LXC)没有使用 systemd ,因此 systemctl 无法工作
sudo systemctl status postgresql

#确认 PostgreSQL 已安装
which psql # 应该输出 /usr/bin/psql
pg_ctl --version # 显示版本号即已安装
  • postgresql 是主程序
  • postgresql-contrib 提供额外扩展(如 uuid-ossppgcrypto 等)

systemctl一般用于服务器上,完整的linux系统上

Linux 容器/子系统( Docker、WSL 或 LXC)使用 service

pgsql常用指令

启动pgsql服务

1
sudo service postgresql start

停止pgsql服务

1
sudo service postgresql stop

查看状态

1
sudo service postgresql status

连接PostgreSQL 默认的系统用户,可以执行pgsql的相关指令

1
sudo -u postgres psql

psql为PostgreSQL 的终端客户端,默认会连接与当前操作系统用户名同名的数据库用户和数据库。

postgres为PostgreSQL 自带的系统数据库,postgres 默认 数据库密码为空,可以通过以下指令进行设置

1
2
-- 设置postgres用户密码
ALTER USER postgres PASSWORD 'postgres';

数据库关于user的作用

数据库里的“用户”是 PostgreSQL 内部用来做“访问控制”的一把钥匙。一句话:“谁能连哪个库、谁能读哪张表、谁能改哪些行”——全靠这些数据库用户(角色)来判定。

作用 举例
1. 认证(Authentication) 告诉 PostgreSQL “我连库时提供的用户名+密码是否合法”。
2. 授权(Authorization) 决定 “这个用户连进来后,对哪些库、哪些表、哪些行有何种权限(SELECT/INSERT/UPDATE/DELETE…)”。
3. 资源隔离(Isolation) 不同业务/团队用不同用户,方便审计、限流、回收权限,互不干扰。

pgsql通用 URL 模板

1
postgresql://<用户名>:<密码>@127.0.0.1:5432/<数据库名>[?参数=值&...]

连接pgsql

1
2
3
4
5
6
7
8
9
10
11
12
# 基于数据库持久化存储的short-term
db_uri = "postgresql://postgres:postgres@localhost:5432/postgres?sslmode=disable"

# short-term短期记忆 实例化PostgresSaver对象 并初始化checkpointer
# long-term长期记忆 实例化PostgresStore对象 并初始化store
async with (
AsyncPostgresSaver.from_conn_string(db_uri) as checkpointer,
AsyncPostgresStore.from_conn_string(db_uri) as store

):
await store.setup()
await checkpointer.setup()

更换apt镜像源

1
2
3
4
5
6
7
8
cat > /etc/apt/sources.list <<'EOF'
deb http://mirrors.aliyun.com/debian/ bookworm main contrib non-free
deb http://mirrors.aliyun.com/debian/ bookworm-updates main contrib non-free
deb http://mirrors.aliyun.com/debian/ bookworm-backports main contrib non-free
deb http://mirrors.aliyun.com/debian-security bookworm-security main contrib non-free
EOF

apt update

参考资料

postgresql向量扩展pgvector的安装与入门本文简答的介绍了 rag 的架构,引申出向量数据库的作用,介绍了 - 掘金

langchain支持向量存储向量存储 | 🦜️🔗 LangChain — Vector stores | 🦜️🔗 LangChain

前言

本文基于AI-Guide-and-Demos-zh_CN/PaperNotes/Transformer 论文精读.md at master · Hoper-J/AI-Guide-and-Demos-zh_CNTransformer论文逐段精读【论文精读】_哔哩哔哩_bilibili阅读学习

transformer贡献

实际在这一阶段的工作中,注意力机制就已经在编码器-解码器架构中被广泛应用(与 RNN 一起使用),但 Transformer 彻底颠覆了默认采取的逻辑:直接放弃 RNN 的递归结构,只使用注意力机制来编码和解码序列信息

Transformer 的主要贡献如下:

  • 取消递归结构,实现并行计算

    通过采用自注意力机制(Self-Attention),Transformer 可以同时处理多个输入序列,极大提高了计算的并行度和训练速度。

  • 引入位置编码(Positional Encoding)并结合 Attention 机制巧妙地捕捉位置信息

    在不依赖 RNN 结构的情况下,通过位置编码为序列中的每个元素嵌入位置信息,从而使模型能够感知输入的顺序。

transformer架构

image-20250807135151542
image-20250807145354145

【Transformer模型】曼妙动画轻松学,形象比喻贼好记_哔哩哔哩_bilibili

Transformer 模型基于编码器(左)- 解码器(右)架构

Transformer编码器同样由 N 个完全相同的层(原始论文中 N=6)堆叠而成,每层只有两个子层,而解码器有三个。

  1. 多头自注意力(Multi-Head Self-Attention) 让输入序列中的每个位置都能关注序列内所有位置,直接建模全局依赖。
  2. 前馈全连接网络(Position-wise Feed-Forward Network) 对每个位置独立地做一次两层的全连接变换(通常先升维再降维)。

同样,每个子层后都有

  • 残差连接(Residual Connection)
  • 层归一化(Layer Normalization)

另外,编码器在输入端还会用到

  • 位置编码(Positional Encoding)——给模型提供序列位置信息,因为注意力本身不包含顺序信息。

Transformer解码器由多个相同的层堆叠而成,每一层包含三个核心子层:

  1. 掩蔽多头自注意力机制(Masked Multi-Head Attention) 用于处理目标序列,通过掩码防止当前位置关注未来位置,确保生成过程的自回归特性。
  2. 编码器-解码器注意力机制(Encoder-Decoder Attention) 使解码器能够关注编码器输出的上下文信息,建立输入与输出序列之间的关联。
  3. 前馈神经网络(Feed-Forward Neural Network) 对注意力机制的输出进行非线性变换,增强模型的表达能力。

此外,每个子层后均包含残差连接(Residual Connection)和层归一化(Layer Normalization),以稳定训练过程并加速收敛。最终,解码器的输出通过线性层和Softmax层映射为词汇表上的概率分布。

嵌入(Embeddings)

image-20250808103256344

在 Transformer 模型中,嵌入层(Embedding Layer) 是处理输入和输出数据的关键步骤,因为模型实际操作的是张量(tensor),而非字符串(string)。在将输入文本传递给模型之前,首先需要进行分词(tokenization),即将文本拆解为多个 token,随后这些 token 会被映射为对应的 token ID,从而转换为模型可理解的数值形式。此时,数据的形状为 (seq_len,),其中 seq_len 表示输入序列的长度。

目的:为了让模型捕捉到 token 背后复杂的语义(Semantic meaning)关系,我们需要将离散的 token ID 映射到一个高维的连续向量空间(Continuous, dense)。这意味着每个 token ID 会被转换为一个嵌入向量(embedding vector),期望通过这种方式让语义相近的词汇在向量空间中距离更近,使模型能更好地捕捉词汇之间的关系。

流程:(前面要进行分词,后面要进行位置编码)

初始化一个可学习的矩阵 E ∈ ℝ^(|V| × d_model) |V| = 词表大小(比如 32 k、50 k),d_model = 512/768/1024…

把 token id 作为行号,直接取对应行: x_i = E[token_id_i] 得到 [batch, seq_len, d_model] 的浮点张量。

位置编码(Positional Encoding)

image-20250808103418150

Transformer 的自注意力机制(Self-Attention)是位置无关(position-agnostic)的。也就是说,如果不做任何处理,模型无法区分“我爱你”和“你爱我”这两个句子的差异,因为自注意力机制只关注 token 之间的相关性,而不考虑它们在序列中的顺序。

为了让模型感知到 token 的位置信息,Transformer 引入了位置编码

在原始论文中,Transformer 使用的是固定位置编码(Positional Encoding),其公式如下:

$$ \begin{aligned} PE_{(pos, 2i)} &= \sin\left(\frac{pos}{10000^{2i/d_{\text{model}}}}\right), \\ PE_{(pos, 2i+1)} &= \cos\left(\frac{pos}{10000^{2i/d_{\text{model}}}}\right). \end{aligned} $$

其中:

  • pos 表示位置索引(Position)。
  • i 表示维度索引。
  • dmodel 是嵌入向量的维度。

流程:输入的是一个整数索引(位置序号 0,1,2,…)。位置编码模块先把这些整数映射成与词向量同维度的向量(例如 512 维),再把结果加到词向量上。

linear与Softmax

image-20250808111823715

从Linear到MLP AI模型的数学本质【Transformer结构拆解】_哔哩哔哩_bilibili

在 Transformer 模型中,Softmax 函数不仅在计算注意力权重时用到,在预测阶段的输出处理环节也会用到,因为预测 token 的过程可以看成是多分类问题

Softmax 函数是一种常用的激活函数,能够将任意实数向量转换为概率分布,确保每个元素的取值范围在 [0, 1] 之间,并且所有元素的和为 1。其数学定义如下:

$$ \text{Softmax}(x_i) = \frac{e^{x_i}}{\sum_{j} e^{x_j}} $$

其中:

  • xi 表示输入向量中的第 i 个元素。
  • Softmax(xi) 表示输入 xi 转换后的概率。

我们可以把 Softmax 看作一种归一化的指数变换。相比于简单的比例归一化 $\frac{x_i}{\sum_j x_j}$, Softmax 通过指数变换放大数值间的差异,让较大的值对应更高的概率,同时避免了负值和数值过小的问题,让模型聚焦于权重最高的位置,同时保留全局信息(低权重仍非零)。

注意力机制

【Attention 注意力机制】激情告白transformer、Bert、GNN的精髓_哔哩哔哩_bilibili

image-20250808135811744

缩放点积注意力机制(Scaled Dot-Product Attention)

image-20250808140834132

Transformer 的核心是多头注意力机制(Multi-Head Attention),它能够捕捉输入序列中不同位置之间的依赖关系,并从多个角度对信息进行建模。模块将自底向上的进行讲解:在深入理解注意力机制前,首先需要理解论文使用的缩放点积注意力机制(Scaled Dot-Product Attention)

给定查询矩阵 Q、键矩阵 K 和值矩阵 V, 其注意力输出的数学表达式如下:

$$ \text{Attention}(Q, K, V) = \text{Softmax}\left(\frac{Q K^\top}{\sqrt{d_k}}\right) V $$

  • Q(Query): 用于查询的向量矩阵。
  • K(Key): 表示键的向量矩阵,用于与查询匹配。
  • V(Value): 值矩阵,注意力权重最终会作用在该矩阵上。
  • dk: 键或查询向量的维度。

理解 Q、K、V 的关键在于代码,它们实际上是通过线性变换从输入序列生成的

公式解释

  1. 点积计算(Dot Produce)

    将查询矩阵 Q 与键矩阵的转置 K 做点积,计算每个查询向量与所有键向量之间的相似度:

    $`\text{Scores} = Q K^\top`$

    • 每一行表示某个查询与所有键之间的相似度(匹配分数)。
    • 每一列表示某个键与所有查询之间的相似度(匹配分数)。
  2. 缩放(Scaling)

    dk 较大时,点积的数值可能会过大,导致 Softmax 过后的梯度变得极小,因此除以 $\sqrt{d_k}$ 缩放点积结果的数值范围:

    $`\text{Scaled Scores} = \frac{Q K^\top}{\sqrt{d_k}}`$

    缩放后(Scaled Dot-Product)也称为注意力分数(attention scores)。

  3. Softmax 归一化

    使用 Softmax 函数将缩放后的分数转换为概率分布:

    $`\text{Attention Weights} = \text{Softmax}\left(\frac{Q K^\top}{\sqrt{d_k}}\right)`$

    注意:Softmax 是在每一行上进行的,这意味着每个查询的匹配分数将归一化为概率,总和为 1。

  4. 加权求和(Weighted Sum)

    最后,使用归一化后的注意力权重对值矩阵 V 进行加权求和,得到每个查询位置的最终输出: $`\text{Output} = \text{Attention Weights} \times V`$

单头注意力机制(Single-Head Attention)

将输入序列(Inputs)通过线性变换生成查询矩阵(Query, Q)、键矩阵(Key, K)和值矩阵(Value, V),随后执行缩放点积注意力(Scaled Dot-Product Attention)。

掩码注意力机制(Masked Attention)

如果使用 mask 掩盖将要预测的词汇,那么 Attention 就延伸为 Masked Attention

在这段代码中,mask 矩阵用于指定哪些位置应该被遮蔽(即填充为 -∞),从而保证这些位置的注意力权重在 softmax 输出中接近于零。注意,掩码机制并不是直接在截断输入序列,也不是在算分数的时候就排除不应该看到的位置,因为看到也没有关系,不会影响与其他位置的分数,所以在传入 Softmax(计算注意力权重)之前排除就可以了。

另外,根据输入数据的来源,还可以将注意力分为自注意力(Self-Attention)和交叉注意力(Cross-Attention)

自注意力机制(Self-attention)

image-20250808142109091

Transformer 模型架构使用到了三个看起来不同的注意力机制,我们继续忽视共有的 Multi-Head。观察输入,线条一分为三传入 Attention 模块,这意味着查询(query)、键(key)和值(value)实际上都来自同一输入序列 X,数学表达如下:

Q = XWQ,  K = XWK,  V = XWV

  • WQ, WK, WV:可训练的线性变换权重,实际上就是简单的线性层

交叉注意力机制(Cross-Attention)

在 Transformer 解码器中,除了自注意力外,还使用了 交叉注意力(Cross-Attention)

如下图所示,解码器(右)在自底向上的处理过程中,先执行自注意力机制,然后通过交叉注意力从编码器的输出中获取上下文信息。

image-20250808142428374

数学表达如下:

Q = XdecoderWQ,  K = XencoderWK,  V = XencoderWV

对比学习

Masked AttentionSelf-AttentionCross-Attention 的本质是一致的,这一点从代码调用可以看出来,三者的区别在于未来掩码的使用和输入数据的来源:

  • Masked Attention:用于解码过程,通过掩码屏蔽未来的时间步,确保模型只能基于已生成的部分进行预测,论文中解码器部分的第一个 Attention 使用的是 Masked Self-Attention。

  • Self-Attention:查询、键和值矩阵来自同一输入序列,模型通过自注意力机制学习输入序列的全局依赖关系。

  • Cross-Attention:查询矩阵来自解码器的输入,而键和值矩阵来自编码器的输出,解码器的第二个 Attention 模块就是 Cross-Attention,用于从编码器输出中获取相关的上下文信息。

    • 机器翻译中的中译英任务为例:对于中文句子“中国的首都是北京”,假设模型已经生成了部分译文“The capital of China is”,此时需要预测下一个单词。

      在这一阶段,解码器中的交叉注意力机制会使用当前已生成的译文“The capital of China is”的编码表示作为查询,并将编码器对输入句子“中国的首都是北京”编码表示作为,通过计算查询与键之间的匹配程度,生成相应的注意力权重,以此从值中提取上下文信息,基于这些信息生成下一个可能的单词(token),比如:“Beijing”。

多头注意力机制(Multi-Head Attention)

image-20250808143810965
image-20250808143822630
image-20250808143845681
image-20250808145725202

多头注意力机制就是存在多个不同的权重矩阵,形成多个矩阵Z,再把它们 按最后一维(hidden)拼接(concat)→ 做一次线性变换 得到最终输出。

线性bian’h把拼接后的多头结果 Z_concat(形状 batch×seq×d_model)重新线性映射回与输入相同的维度,同时让网络可以学习如何融合不同头的信息

【Transformer模型】曼妙动画轻松学,形象比喻贼好记_哔哩哔哩_bilibili

Transformer原理及架构:多头自注意力机制_哔哩哔哩_bilibili

残差连接(Residual Connection)和层归一化(Layer Normalization, LayerNorm)

image-20250808150313758

在 Transformer 架构中,残差连接(Residual Connection)与层归一化(LayerNorm)结合使用,统称为 Add & Norm 操作。

Add(残差连接,Residual Connection)

残差连接是一种跳跃连接(Skip Connection),它将层的输入直接加到输出上(观察架构图中的箭头),对应的公式如下:

Output = SubLayer(x) + x

这种连接方式有效缓解了深层神经网络的梯度消失问题。

image-20250808151004667

在transform中,就是输入的矩阵x加上经过注意力机制计算出来的z矩阵

Norm(层归一化,Layer Normalization)

层归一化(LayerNorm)是一种归一化技术,用于提升训练的稳定性和模型的泛化能力。

假设输入向量为 x = (x1, x2, …, xd), LayerNorm 的计算步骤如下:

  1. 计算均值和方差: 对输入的所有特征求均值 μ 和方差 σ2

    $` \mu = \frac{1}{d} \sum_{j=1}^{d} x_j, \quad \sigma^2 = \frac{1}{d} \sum_{j=1}^{d} (x_j - \mu)^2 `$

  2. 归一化公式: 将输入特征 i 进行归一化:

    $` \hat{x}_i = \frac{x_i - \mu}{\sqrt{\sigma^2 + \epsilon}} `$

    其中, ϵ 是一个很小的常数(比如 1e-9),用于防止除以零的情况。

  3. 引入可学习参数: 归一化后的输出乘以 γ 并加上 β, 公式如下:

    $` \text{Output} = \gamma \hat{x} + \beta `$

    其中 γβ 是可学习的参数,用于进一步调整归一化后的输出。

前馈神经网络 Position-wise Feed-Forward Networks(FFN)

image-20251220191959727

在 Transformer 中,前馈网络层(Feed-Forward Network,FFN)的作用可以概括为一句话: “对每个位置的向量进行非线性变换,增加模型的表达能力。”

在编码器-解码器架构中,另一个看起来“大一点”的模块就是 Feed Forward,它在每个位置 i 上的计算可以表示为:

FFN(xi) = max(0, xiW1 + b1)W2 + b2

其中:

  • xi ∈ ℝdmodel 表示第 i 个位置的输入向量。
  • W1 ∈ ℝdmodel × dffW2 ∈ ℝdff × dmodel 是两个线性变换的权重矩阵。
  • b1 ∈ ℝdffb2 ∈ ℝdmodel 是对应的偏置向量。
  • max(0, ⋅)ReLU 激活函数,用于引入非线性。

大模型发展树

image-20250807171237889
image-20250807173520481

预训练语言模型

预训练语言模型(PLM)是一种通过大量文本数据进行无监督或弱监督训练的语言模型,目的是学习语言的通用表示(即语言的模式、语法、语义等)。这些模型通常在大规模文本数据上进行预训练,然后可以被微调(Fine - tuning)以适应各种下游任务,如文本分类、问答、命名实体识别等。

预训练语言模型的核心思想是利用大量的无标注文本数据来学习语言的通用特征,从而为各种自然语言处理任务提供强大的语言理解能力。预训练模型可以显著提高任务的性能,减少对标注数据的依赖,并且能够快速适应新的任务。

BERT模型(Encoder-only PLM)

针对 Encoder、Decoder 的特点,引入 ELMo 的预训练思路,开始出现不同的、对 Transformer 进行优化的思路。例如,Google 仅选择了 Encoder 层,通过将 Encoder 层进行堆叠,再提出不同的预训练任务-掩码语言模型(Masked Language Model,MLM),打造了一统自然语言理解(Natural Language Understanding,NLU)任务的代表模型——BERT

BERT,全名为 Bidirectional Encoder Representations from Transformers,是由 Google 团队在 2018年发布的预训练语言模型。该模型发布于论文《BERT: Pre-training of Deep Bidirectional Transformers for Language Understanding》,实现了包括 GLUE、MultiNLI 等七个自然语言处理评测任务的最优性能(State Of The Art,SOTA),堪称里程碑式的成果。

T5(Encoder-Decoder PLM)

BERT 也存在一些问题,例如 MLM 任务和下游任务微调的不一致性,以及无法处理超过模型训练长度的输入等问题。为了解决这些问题,研究者们提出了 Encoder-Decoder 模型,通过引入 Decoder 部分来解决这些问题,同时也为 NLP 领域带来了新的思路和方法。

T5(Text-To-Text Transfer Transformer)是由 Google 提出的一种预训练语言模型,通过将所有 NLP 任务统一表示为文本到文本的转换问题,大大简化了模型设计和任务处理。T5 基于 Transformer 架构,包含编码器和解码器两个部分,使用自注意力机制和多头注意力捕捉全局依赖关系,利用相对位置编码处理长序列中的位置信息,并在每层中包含前馈神经网络进一步处理特征。

LLama模型(Decoder-Only PLM)

LLaMA模型是由Meta(前Facebook)开发的一系列大型预训练语言模型。从LLaMA-1到LLaMA-3,LLaMA系列模型展示了大规模预训练语言模型的演进及其在实际应用中的显著潜力。

GPT模型(Decoder-Only PLM)

GPT,即 Generative Pre-Training Language Model,是由 OpenAI 团队于 2018年发布的预训练语言模型。虽然学界普遍认可 BERT 作为预训练语言模型时代的代表,但首先明确提出预训练-微调思想的模型其实是 GPT。

GPT 提出了通用预训练的概念,也就是在海量无监督语料上预训练,进而在每个特定任务上进行微调,从而实现这些任务的巨大收益。虽然在发布之初,由于性能略输于不久后发布的 BERT,没能取得轰动性成果,也没能让 GPT 所使用的 Decoder-Only 架构成为学界研究的主流,但 OpenAI 团队坚定地选择了不断扩大预训练数据、增加模型参数,在 GPT 架构上不断优化,最终在 2020年发布的 GPT-3 成就了 LLM 时代的基础,并以 GPT-3 为基座模型的 ChatGPT 成功打开新时代的大门,成为 LLM 时代的最强竞争者也是目前的最大赢家。

参考资料

Hello! · Transformers快速入门

jsksxs360/How-to-use-Transformers: Transformers 库快速入门教程

Hoper-J/AI-Guide-and-Demos-zh_CN: 这是一份入门AI/LLM大模型的逐步指南,包含教程和演示代码,带你从API走进本地大模型部署和微调,代码文件会提供Kaggle或Colab在线版本,即便没有显卡也可以进行学习。项目中还开设了一个小型的代码游乐场🎡,你可以尝试在里面实验一些有意思的AI脚本。同时,包含李宏毅 (HUNG-YI LEE)2024生成式人工智能导论课程的完整中文镜像作业。

Happy-LLM

梗直哥

90%人不知道的LLM黑科技:拆解Transformer如何吃透全网知识!_哔哩哔哩_bilibili

Transformer如何成为AI模型的地基_哔哩哔哩_bilibili

什么是 MCP?

  • 全称:Model Context Protocol
  • 作用:让 AI 助手(如 Claude、Cline 等)在对话过程中,动态调用外部工具(Tool)完成复杂任务(读写文件、查询数据库、调用 API 等)。
  • 组成
    1. MCP Host(宿主,如 Cline、Claude Desktop)
    2. MCP Server(提供 Tool 的后台服务)
    3. Tool(具体功能单元,如 read_file, exec_command 等)

核心概念速记

  • MCP Server
    • 一个独立进程,提供 1-N 个 Tool。
    • 可以用任何语言编写,只要暴露标准 MCP 接口。
  • Tool
    • 最小执行单元,必须包含:
      • name(唯一)
      • description(让 LLM 理解何时调用)
      • input schema(参数结构,JSON Schema)
  • 交互流程(重点)
    • 在启动mcp server时,server将tool信息传送给host
    • 用户在 Host 输入自然语言需求。
    • Host 将需求 + 可用 Tool 列表发给 LLM。
    • LLM 判断调用哪个 Tool,并填充参数。
    • Host 通过 MCP 协议向对应 Server 发送请求。
    • Server 执行 Tool 并返回结果。
    • Host 将结果合并上下文,继续对话。

mcp和fuction calling的区别

维度 Function Calling(FC) MCP(Model Context Protocol)
本质 能力 —— 某个大模型原生就带的一种「调用函数」功能 协议 —— 定义 AI 与外部世界如何长期、标准、可复用地交互
工作方式 模型在一次推理里主动决定要调用哪个函数,并吐出结构化参数 通过「客户端-服务器」架构,由 MCP Server 被动等待模型或 Agent 的请求
是否标准化 否。OpenAI、Anthropic、百度等各家接口格式不同 是。统一 JSON-RPC 2.0 协议,跨模型通用
上下文管理 单次调用,无状态;复杂多轮任务需自己维护 协议层面支持会话、状态、长链路任务
复用/共享 函数代码往往紧耦合在项目里,换模型就得重写 一次写成 MCP Server,可被任何支持 MCP 的模型/IDE/Agent 直接插用

一句话总结: Function Calling 是「某个模型自带的快捷指令」,MCP 是「让任何模型都能统一插拔工具的工业标准」。 二者并非互斥——MCP 的实现里仍然可以用 Function Calling 去触发具体函数,但它把「怎么描述工具、怎么发现工具、怎么保持会话」这些事都标准化了,从而解决了 FC 带来的碎片化、难维护、难共享的问题 。

安装mcp

在mcp server市场查找自己想用的mcp服务,如Fetch MCP Server

复制mcp配置

1
2
3
4
5
6
7
8
9
10
{
"mcpServers": {
"fetch": {
"command": "uvx",
"args": [
"mcp-server-fetch"
]
}
}
}

在mcp host 中安装,如trae

host会自动完成对mcp的配置

创建一个mcp server

初始化项目

1
2
3
4
5
6
7
8
uv init weather

uv sync

source .venv/bin/activate

#添加依赖
uv add "mcp[cli]" httpx

创建weather.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# 导入类型提示模块,用于类型注解
from typing import Any

# 导入httpx库,用于发送HTTP请求
import httpx

# 从mcp.server.fastmcp模块导入FastMCP类
# FastMCP是一个快速构建MCP(Model Control Protocol)服务器的框架
from mcp.server.fastmcp import FastMCP

# 创建FastMCP实例,命名为"weather",日志级别设置为ERROR(只显示错误信息)
mcp = FastMCP("weather", log_level="ERROR")


# 常量定义
# NWS(National Weather Service)API的基础URL
NWS_API_BASE = "https://api.weather.gov"
# 用户代理字符串,用于标识应用程序
USER_AGENT = "weather-app/1.0"


async def make_nws_request(url: str) -> dict[str, Any] | None:
"""向NWS API发起请求并处理错误。

Args:
url: 要请求的API URL

Returns:
成功时返回解析后的JSON数据字典,失败时返回None
"""
# 设置请求头信息
headers = {
"User-Agent": USER_AGENT, # 用户代理标识
"Accept": "application/geo+json" # 接受的数据格式
}

# 创建异步HTTP客户端
async with httpx.AsyncClient() as client:
try:
# 发起GET请求,设置超时时间为30秒
response = await client.get(url, headers=headers, timeout=30.0)
# 如果响应状态码不是2xx,抛出异常
response.raise_for_status()
# 返回解析后的JSON数据
return response.json()
except Exception:
# 捕获所有异常,返回None表示请求失败
return None


def format_alert(feature: dict) -> str:
"""将警报数据格式化为可读的字符串。

Args:
feature: 包含警报信息的字典

Returns:
格式化后的警报字符串
"""
# 获取警报属性
props = feature["properties"]
# 格式化警报信息,使用get方法提供默认值防止键不存在
return f"""
事件: {props.get('event', '未知')}
区域: {props.get('areaDesc', '未知')}
严重程度: {props.get('severity', '未知')}
描述: {props.get('description', '无描述信息')}
指示: {props.get('instruction', '无具体指示')}
"""


# 使用@mcp.tool()装饰器将函数注册为MCP工具
@mcp.tool()
async def get_alerts(state: str) -> str:
"""获取指定美国州的天气警报。

Args:
state: 两个字母的美国州代码(例如:CA, NY)

Returns:
格式化后的警报信息字符串
"""
# 构建获取州警报的URL
url = f"{NWS_API_BASE}/alerts/active/area/{state}"
# 发起API请求获取数据
data = await make_nws_request(url)

# 检查数据是否有效
if not data or "features" not in data:
return "无法获取警报或未找到警报。"

# 检查是否有警报
if not data["features"]:
return "该州无活动警报。"

# 格式化所有警报
alerts = [format_alert(feature) for feature in data["features"]]
# 用分隔符连接所有警报
return "\n---\n".join(alerts)


# 注册为MCP工具的天气预报函数
@mcp.tool()
async def get_forecast(latitude: float, longitude: float) -> str:
"""获取指定位置的天气预报。

Args:
latitude: 位置的纬度
longitude: 位置的经度

Returns:
格式化后的天气预报字符串
"""
# 首先获取预报网格端点
points_url = f"{NWS_API_BASE}/points/{latitude},{longitude}"
points_data = await make_nws_request(points_url)

# 检查点数据是否获取成功
if not points_data:
return "无法获取此位置的预报数据。"

# 从点响应中获取预报URL
forecast_url = points_data["properties"]["forecast"]
forecast_data = await make_nws_request(forecast_url)

# 检查预报数据是否获取成功
if not forecast_data:
return "无法获取详细预报。"

# 将时间段格式化为可读的预报
periods = forecast_data["properties"]["periods"]
forecasts = []
# 只显示接下来的5个时间段
for period in periods[:5]:
forecast = f"""
{period['name']}:
温度: {period['temperature']}°{period['temperatureUnit']}
风力: {period['windSpeed']} {period['windDirection']}
预报: {period['detailedForecast']}
"""
forecasts.append(forecast)

# 用分隔符连接所有预报
return "\n---\n".join(forecasts)


# 程序入口点
if __name__ == "__main__":
# 初始化并运行服务器,使用stdio传输方式
mcp.run(transport='stdio')

@mcp.tool()可以将函数内的字符串,参数类型等信息传给大模型,以供大模型决定何时调用这个tool

mcp.run(transport=‘stdio’)说明mcp server和host的传输方式是输入和输出

mcp server 配置信息

1
2
3
4
5
6
7
8
9
10
11
12
"weather": {
"disabled": false,
"timeout": 60,
"command": "uv",
"args": [
"--directory",
"/Users/joeygreen/PycharmProjects/weather",
"run",
"weather.py"
],
"transportType": "stdio"
}

“disabled”: false表示该服务是否被禁用。false 表示该服务是启用状态,可以正常运行。

“timeout”: 60设置该服务的超时时间,单位为秒。

“command”: “uv”指定执行该服务时使用的命令。

“args”出了执行 command 时需要传递的参数。

“transportType”: “stdio”指定服务的通信方式。stdio 表示标准输入输出流(Standard Input Output),通常用于进程间通信。

解析mcp server与host的通信

image-20250727154739013

输入为host对server发送,输出为server对host发送,以下将列举几个重要的说明

输入中:method字段为host告诉server接下来要干什么,如初始化 (Initialization)通知已初始化 (Notification)查询可用工具 (Listing Tools)调用工具 (Calling a Tool)

protocolVersion说明了mcp使用的协议版本

以下见server返回的tool信息,其中的一个参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
{
"name": "get_forecast",
"description": "Get weather forecast for a location.\n\nArgs:\n latitude: Latitude of the location\n longitude: Longitude of the location\n",
"inputSchema": {
"properties": {
"latitude": {
"title": "Latitude",
"type": "number"
},
"longitude": {
"title": "Longitude",
"type": "number"
}
},
"required": [
"latitude",
"longitude"
],
"title": "get_forecastArguments",
"type": "object"
}
}

可以和定义的函数对比学习

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# 注册为MCP工具的天气预报函数
@mcp.tool()
async def get_forecast(latitude: float, longitude: float) -> str:
"""获取指定位置的天气预报。

Args:
latitude: 位置的纬度
longitude: 位置的经度

Returns:
格式化后的天气预报字符串
"""
# 首先获取预报网格端点
points_url = f"{NWS_API_BASE}/points/{latitude},{longitude}"
points_data = await make_nws_request(points_url)

# 检查点数据是否获取成功
if not points_data:
return "无法获取此位置的预报数据。"

# 从点响应中获取预报URL
forecast_url = points_data["properties"]["forecast"]
forecast_data = await make_nws_request(forecast_url)

# 检查预报数据是否获取成功
if not forecast_data:
return "无法获取详细预报。"

# 将时间段格式化为可读的预报
periods = forecast_data["properties"]["periods"]
forecasts = []
# 只显示接下来的5个时间段
for period in periods[:5]:
forecast = f"""
{period['name']}:
温度: {period['temperature']}°{period['temperatureUnit']}
风力: {period['windSpeed']} {period['windDirection']}
预报: {period['detailedForecast']}
"""
forecasts.append(forecast)

# 用分隔符连接所有预报
return "\n---\n".join(forecasts)

description内容即为我们在定义这个tool的时候写的文档字符串(Documentation String),通常简称为 docstring

inputSchema 是在MCP(Model Control Protocol)中用来描述工具(tool)所需参数的结构和类型的规范。它本质上是一个JSON Schema。

JSON Schema 是一个用于描述和验证 JSON 数据结构的规范。你可以把它理解为 JSON 数据的“蓝图”或“模板”。

required指明哪些参数是必需的,哪些是可选的。

理解mcp的本质

以上内容皆是server与host直接的交互,本质上可以理解成host对server提供的工具进行注册与使用。这其中并不涉及到host与大模型的交互,也就是大模型是如何使用host提供的信息。实际上不同的mcp host与模型的交互协议也不同,如cline使用的是xml格式;cherry studio使用的则是fuction calling

再看mcp的全称Model Context Protocol,模型上下文协议,也就是mcp增加模型的扩展性,使他可以获取更多信息,而server就是为模型提供更多信息的工具

mcp host与模型的交互

使用中转服务器截获日志

image-20250727163811531

以下为cline发送给模型的请求

image-20250727164527797

messages包含了系统提示词与用户输入

先来看系统提示词,cline提供的提示词包括工具使用格式,工具信息,工具使用方法等。这里重点说一下,cline的工具使用格式xml

结构如下:

1
2
3
4
5
<tool_name>
<parameter1_name>value1</parameter1_name>
<parameter2_name>value2</parameter2_name>
...
</tool_name>

例如:

1
2
3
<read_file>
src/main.js
</read_file>

再举个例子

use_mcp_tool 描述:请求使用由连接的 MCP 服务器提供的工具。每个 MCP 服务器可以提供具有不同功能的多个工具。工具有定义的输入模式,用于指定必需和可选参数。 参数:

server_name: (必需) 提供该工具的 MCP 服务器的名称 tool_name: (必需) 要执行的工具的名称 arguments: (必需) 一个 JSON 对象,包含工具的输入参数,遵循工具的输入模式 用法:

1
2
3
4
5
6
7
8
9
10
<use_mcp_tool>
<server_name>服务器名称在此</server_name>
<tool_name>工具名称在此</tool_name>

{
"param1": "value1",
"param2": "value2"
}

</use_mcp_tool>

模型返回响应如下

image-20250727174504506

sse连接,流式输出

SSE 是一种基于标准 HTTP只允许服务器向客户端单向推送文本流的实时通信技术,浏览器原生支持,自动重连,常用于AI 流式回答实时日志股价/监控推送等场景。

即客户端发送一次请求,连续接受多次响应直到结束

mcp的三种传输协议

协议名称 通信方式 适用场景 优势 局限
Stdio(标准输入输出) 使用进程的标准输入(stdin)和标准输出(stdout)进行本地通信,基于 JSON-RPC 2.0 格式 本地开发、调试、IDE插件、命令行工具 简单易实现、跨平台、低延迟 仅支持本地通信,无法跨网络,低并发
SSE(Server-Sent Events) 客户端通过 HTTP POST 发送请求,服务器通过 SSE 单向推送流式响应 实时监控、新闻推送、远程服务调用 基于 HTTP,浏览器友好,支持流式数据 仅支持单向通信,MCP官方已标记为“即将废弃”
Streamable HTTP(新型流式HTTP) 支持双向流式通信的现代 HTTP 协议,替代 SSE,支持会话恢复、OAuth 认证等 分布式系统、高并发、双向实时交互 双向通信、高性能、企业级安全机制 实现较复杂,生态仍在发展中

ReAct

ReAct 是一种用于增强大型语言模型(LLMs)推理和行动能力的技术框架,它通过结合“推理”(Reasoning)和“行动”(Acting)来提升模型处理复杂任务的能力。

其工作流程通常包括以下几个步骤:

  1. 思考(Reasoning):模型对当前问题进行分析,思考下一步需要采取的行动。
  2. 行动(Acting):模型决定调用哪些工具或函数,并提供必要的参数。
  3. 观察(Observation):工具执行后返回结果,模型对结果进行观察。
  4. 响应(Response):根据观察结果,模型生成最终的用户响应。

实际应用上就是告诉大模型用ReAct这种模式来思考

参考资料

MCP终极指南 - 从原理到实战,带你深入掌握MCP(基础篇)_哔哩哔哩_bilibili

前言

本教程为langchain官方教程的学习记录

academy.langchain.com/enrollments

代码见[learn-rag-langchain/academy-langgraph at main · zxj-2023/learn-rag-langchain](https://github.com/zxj-2023/learn-rag-langchain/tree/main/academy-langgraph)

module-4

Parallel node execution 并行节点执行

Waiting for nodes to finish 等待节点完成

现在,让我们考虑一个案例,其中一个并行路径的步骤比另一个多。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
builder = StateGraph(State)

# Initialize each node with node_secret
builder.add_node("a", ReturnNodeValue("I'm A"))
builder.add_node("b", ReturnNodeValue("I'm B"))
builder.add_node("b2", ReturnNodeValue("I'm B2"))
builder.add_node("c", ReturnNodeValue("I'm C"))
builder.add_node("d", ReturnNodeValue("I'm D"))

# Flow
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "b2")
builder.add_edge(["b2", "c"], "d")
builder.add_edge("d", END)
graph = builder.compile()

display(Image(graph.get_graph().draw_mermaid_png()))
image-20250722143802652

在这种情况下,bb2c 都是同一个步骤的一部分。图形将在进入 d 步骤之前等待所有这些操作完成。

1
graph.invoke({"state": []})
1
2
3
4
5
6
Adding I'm A to []
Adding I'm B to ["I'm A"]
Adding I'm C to ["I'm A"]
Adding I'm B2 to ["I'm A", "I'm B", "I'm C"]
Adding I'm D to ["I'm A", "I'm B", "I'm C", "I'm B2"]
{'state': ["I'm A", "I'm B", "I'm C", "I'm B2", "I'm D"]}
Setting the order of state updates 设置状态更新的顺序

然而,在每个步骤中,我们无法对状态更新的顺序进行具体控制!简单来说,它是基于图拓扑结构由 LangGraph 确定的确定性顺序,该顺序为 *我们无法控制*

上面,我们看到 c 被添加在 b2 之前

然而,我们可以使用自定义 reducer 来定制此功能,例如,对状态更新进行排序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def sorting_reducer(left, right):
""" 合并并排序列表中的值"""
# 如果 left 不是列表,则将其转换为列表
if not isinstance(left, list):
left = [left]

# 如果 right 不是列表,则将其转换为列表
if not isinstance(right, list):
right = [right]

# 合并 left 和 right 列表,然后升序排序
return sorted(left + right, reverse=False)
class State(TypedDict):
# sorting_reducer 函数将对 state 中的值进行排序
state: Annotated[list, sorting_reducer]
1
graph.invoke({"state": []})
1
{'state': ["I'm A", "I'm B", "I'm B2", "I'm C", "I'm D"]}

现在,reducer 对更新的状态值进行排序!sorting_reducer 示例对所有值进行全局排序。我们还可以:

  1. 在并行步骤期间将输出写入状态中的单独字段
  2. 在并行步骤之后使用“汇”节点来合并和排序这些输出
  3. 合并后清除临时字段

请参阅 docs 以获取更多详细信息。

Working with LLMs 使用 LLMs

现在,让我们添加一个现实中的例子!我们希望从两个外部来源(Wikipedia 和 Web-Search)收集上下文信息,并让 LLM 回答一个问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
from langchain_core.messages import HumanMessage, SystemMessage

from langchain_community.document_loaders import WikipediaLoader
from langchain_community.tools import TavilySearchResults

def search_web(state):

""" 从网络搜索中检索文档 """

# 搜索
tavily_search = TavilySearchResults(max_results=3)
search_docs = tavily_search.invoke(state['question'])

# 将多个搜索文档转换成了一个统一格式的长文本,每个文档都有自己的元数据(如URL),并且文档之间有明确的分隔,便于后续处理或展示。
"""
- 这是一个 列表推导式 (list comprehension) ,它会遍历 search_docs 列表中的每一个元素(这里我们称之为 doc )。
- search_docs 里的每个 doc 应该是一个包含 'url' 和 'content' 键的字典。
"""
formatted_search_docs = "\n\n---\n\n".join(
[
f'<Document href="{doc["url"]}">\n{doc["content"]}\n</Document>'
for doc in search_docs
]
)

return {"context": [formatted_search_docs]}

def search_wikipedia(state):

""" 从维基百科中检索文档 """

# 搜索
search_docs = WikipediaLoader(query=state['question'],
load_max_docs=2).load()

# 格式化
formatted_search_docs = "\n\n---\n\n".join(
[
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}">\n{doc.page_content}\n</Document>'
for doc in search_docs
]
)

return {"context": [formatted_search_docs]}

def generate_answer(state):

""" 用于回答问题的节点 """

# 获取状态
context = state["context"]
question = state["question"]

# 模板
answer_template = """使用以下上下文回答问题 {question}: {context}"""
answer_instructions = answer_template.format(question=question,
context=context)

# 回答
answer = llm.invoke([SystemMessage(content=answer_instructions)]+[HumanMessage(content=f"回答问题。")])

# 将其附加到状态
return {"answer": answer}

# 添加节点
builder = StateGraph(State)

# 初始化每个节点
builder.add_node("search_web",search_web)
builder.add_node("search_wikipedia", search_wikipedia)
builder.add_node("generate_answer", generate_answer)

# 流程
builder.add_edge(START, "search_wikipedia")
builder.add_edge(START, "search_web")
builder.add_edge("search_wikipedia", "generate_answer")
builder.add_edge("search_web", "generate_answer")
builder.add_edge("generate_answer", END)
graph = builder.compile()

display(Image(graph.get_graph().draw_mermaid_png()))
image-20250722153534867
1
2
result = graph.invoke({"question": "英伟达2025年第一季度财报表现如何"})
result['answer'].content

Sub-graphs 子图

子图允许你在图表的不同部分创建和管理不同的状态。这在多智能体系统中尤其有用,尤其是在每个智能体都有各自状态的智能体团队中。

让我们考虑一个简单的例子:

  • 我有一个系统,它接收日志。
  • 该系统通过不同的代理执行两个独立的子任务(总结日志,查找故障模式)。
  • 我希望在两个不同的子图中执行这两个操作。

最重要的是要理解图表是如何传达信息的!

简而言之,通信是 *通过重叠密钥* 实现的:

  • 子图可以访问父图中的 docs(文档)。
  • 父图可以从子图中访问 summary(摘要)和 failure_report(故障报告)。
subgraph.png
  1. Logs (Traces):
    • 这是系统的输入,表示一系列的日志记录。
  2. Entry Graph:
    • 这是系统的入口图,它包含了总体状态(Overall State),其中包含:
      • docs:文档或日志数据。
      • summary report:摘要报告,这是系统执行摘要任务后生成的。
      • failure report:故障报告,这是系统执行故障分析任务后生成的。
  3. Call sub-graphs:
    • 这是从入口图调用两个子图的过程,分别用于执行摘要和故障分析任务。
  4. Summarization:
    • 这个子图负责生成日志的摘要。它的状态(Summary State)包含:
      • docs:与入口图共享的文档数据。
      • summary:摘要内容。
      • summary report:摘要报告,这是摘要任务完成后生成的。
  5. Failure Analysis:
    • 这个子图负责分析日志中的故障模式。它的状态(Failure Analysis State)包含:
      • docs:与入口图共享的文档数据。
      • failures:故障模式。
      • failure report:故障报告,这是故障分析任务完成后生成的。
  6. Finish:
    • 这是流程的结束点,表示两个子图的任务都已完成,并且生成了摘要报告和故障报告。
1
2
3
4
5
6
7
8
9
10
11
12
13
from operator import add
from typing_extensions import TypedDict
from typing import List, Optional, Annotated

#logs结构
class Log(TypedDict):
id: str
question: str
docs: Optional[List]
answer: str
grade: Optional[int]
grader: Optional[str]
feedback: Optional[str]
Sub graphs

这里是失败分析子图,它使用了 FailureAnalysisState

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from IPython.display import Image, display
from langgraph.graph import StateGraph, START, END

# 故障分析子图
class FailureAnalysisState(TypedDict):
"""用于故障分析的状态。"""
cleaned_logs: List[Log] # 清理后的日志列表
failures: List[Log] # 包含故障的日志列表
fa_summary: str # 故障分析摘要
processed_logs: List[str] # 已处理的日志标识符列表

class FailureAnalysisOutputState(TypedDict):
"""故障分析的输出状态。"""
fa_summary: str # 故障分析摘要
processed_logs: List[str] # 已处理的日志标识符列表

def get_failures(state):
"""获取包含故障的日志"""
cleaned_logs = state["cleaned_logs"]
# 从清理后的日志中筛选出包含 "grade" 键的日志,这些被视为故障
failures = [log for log in cleaned_logs if "grade" in log]
return {"failures": failures}

def generate_summary(state):
"""生成故障摘要"""
failures = state["failures"]
# 添加功能:fa_summary = summary_generation(qs_summary)
fa_summary = "Chroma 文档的检索质量不佳。"
# 为每个故障日志生成一个处理过的日志标识符
return {"fa_summary": fa_summary, "processed_logs": [f"failure-analysis-on-log-{failure['id']}" for failure in failures]}

fa_builder = StateGraph(state_schema=FailureAnalysisState,output_schema=FailureAnalysisOutputState)
fa_builder.add_node("get_failures", get_failures)
fa_builder.add_node("generate_summary", generate_summary)
fa_builder.add_edge(START, "get_failures")
fa_builder.add_edge("get_failures", "generate_summary")
fa_builder.add_edge("generate_summary", END)

graph = fa_builder.compile()
display(Image(graph.get_graph().draw_mermaid_png()))
image-20250723094949158

这里是问题总结子图,它使用了 QuestionSummarizationState

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# 摘要生成子图
class QuestionSummarizationState(TypedDict):
"""用于问题摘要的状态。"""
cleaned_logs: List[Log] # 清理后的日志列表
qs_summary: str # 问题摘要
report: str # 从摘要生成的报告
processed_logs: List[str] # 已处理的日志标识符列表

class QuestionSummarizationOutputState(TypedDict):
"""问题摘要的输出状态。"""
report: str # 最终报告
processed_logs: List[str] # 已处理的日志标识符列表

def generate_summary(state):
"""从日志生成摘要。"""
cleaned_logs = state["cleaned_logs"]
# 添加功能:summary = summarize(generate_summary)
summary = "问题集中在 ChatOllama 和 Chroma 向量存储的使用上。"
# 返回摘要以及已处理的日志ID
return {"qs_summary": summary, "processed_logs": [f"summary-on-log-{log['id']}" for log in cleaned_logs]}

def send_to_slack(state):
"""模拟发送报告。"""
qs_summary = state["qs_summary"]
# 添加功能:report = report_generation(qs_summary)
report = "foo bar baz"
return {"report": report}

qs_builder = StateGraph(QuestionSummarizationState,output_schema=QuestionSummarizationOutputState)
qs_builder.add_node("generate_summary", generate_summary)
qs_builder.add_node("send_to_slack", send_to_slack)
qs_builder.add_edge(START, "generate_summary")
qs_builder.add_edge("generate_summary", "send_to_slack")
qs_builder.add_edge("send_to_slack", END)

graph = qs_builder.compile()
display(Image(graph.get_graph().draw_mermaid_png()))
image-20250723100648199
Adding sub graphs to our parent graph 向父图添加子图

现在,我们可以将所有内容整合在一起。我们使用 EntryGraphState 创建父图。并且我们将我们的子图添加为节点!

1
2
3
4
5
6
7
8
9
# 入口图
class EntryGraphState(TypedDict):
# 原始日志数据列表
raw_logs: List[Log]
# 经过清洗处理后的日志数据列表
cleaned_logs: List[Log]
fa_summary: str # 这只会在故障分析子图中生成
report: str # 这只会在问题摘要子图中生成
processed_logs: Annotated[List[int], add] # 跟踪哪些日志已经被处理过 ,尤其是在子图之间共享状态时。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def clean_logs(state):
# 获取日志
raw_logs = state["raw_logs"]
# 数据清洗 raw_logs -> docs
cleaned_logs = raw_logs
return {"cleaned_logs": cleaned_logs}

entry_builder = StateGraph(EntryGraphState)
entry_builder.add_node("clean_logs", clean_logs)
#添加子图
entry_builder.add_node("question_summarization", qs_builder.compile())
entry_builder.add_node("failure_analysis", fa_builder.compile())

entry_builder.add_edge(START, "clean_logs")
entry_builder.add_edge("clean_logs", "failure_analysis")
entry_builder.add_edge("clean_logs", "question_summarization")
entry_builder.add_edge("failure_analysis", END)
entry_builder.add_edge("question_summarization", END)

graph = entry_builder.compile()

from IPython.display import Image, display

# 将 xray 设置为 1 将显示嵌套图的内部结构
display(Image(graph.get_graph(xray=1).draw_mermaid_png()))
image-20250723102913524

Map-reduce

LangGraph 里的 Map-Reduce 是一种并行处理模式,用于将一个大任务拆分成多个子任务(Map),再汇总结果(Reduce)。这是 LangGraph 中处理批量数据或并行节点执行的核心机制之一。

让我们设计一个系统,该系统将完成两件事情:

  1. 映射(Map) —— 根据主题生成一组笑话。
  2. 归约(Reduce) —— 从这组笑话中挑出最棒的一条。
1
2
3
4
5
6
7
8
9
10
11
12
from langchain_openai import ChatOpenAI

# Prompts we will use
subjects_prompt = """生成一个包含3个子主题的列表,这些子主题都与以下总体主题相关:{topic}。"""
joke_prompt = """生成一个关于{subject}的笑话"""
best_joke_prompt = """下面是关于{topic}的一堆笑话。选择最好的一个!返回最好笑话的ID,第一个笑话的ID从0开始。笑话:\n\n {jokes}"""
# LLM
model = ChatOpenAI(
model="qwen-plus-2025-04-28",
api_key="sk-",
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"
)
Parallelizing joke generation 并行化笑话生成

首先,让我们定义图的入口点,它将:

  • 接收用户输入的主题
  • 基于该主题生成若干“笑话子主题”
  • 将每个子主题发送到上面定义的笑话生成节点

我们的状态有一个 jokes 键,它将累积来自并行化笑话生成的笑话。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class Subjects(BaseModel):
"""定义一个Pydantic模型,用于表示主题列表。"""
subjects: list[str] # 主题字符串列表

class BestJoke(BaseModel):
"""定义一个Pydantic模型,用于表示最佳笑话的ID。"""
id: int # 最佳笑话的索引ID

class OverallState(TypedDict):
"""定义整个图的总体状态,使用TypedDict以便于类型提示和状态管理。"""
topic: str # 当前讨论的主题
subjects: list # 生成的子主题列表
jokes: Annotated[list, operator.add] # 笑话列表,使用operator.add表示列表内容会累加而不是覆盖
best_selected_joke: str # 最终选出的最佳笑话

生成笑话的主题。

1
2
3
4
5
6
7
#用于生成主题
def generate_topics(state: OverallState):
#使用 Python 的 format() 方法来构建一个提示字符串( prompt )
prompt = subjects_prompt.format(topic=state["topic"])
#.with_structured_output(Subjects) 它指示模型尝试将其输出格式化为 Subjects 类
response = model.with_structured_output(Subjects).invoke(prompt)
return {"subjects": response.subjects}

这就是妙处:我们利用 Send 为每个主题并行生成笑话。

非常实用!无论主题数量多少,它都能自动并行处理。

  • generate_joke:图中节点的名字
  • {"subject": s}:要传递的状态

Send 允许你向 generate_joke 节点发送任意结构的状态,无需与 OverallState 对齐。
在这里,generate_joke 使用自己的内部状态,我们通过 Send 按需填充即可。

在 LangGraph 里,Send 是一个“动态派发器”:它让你在运行时决定“要把哪个节点运行多少次、每次给它什么数据”,并自动并行执行。

1
2
3
4
5
6
from langgraph.types import Send
def continue_to_jokes(state: OverallState):
# 该函数根据当前状态中的主题列表,为每个主题生成一个 Send 对象。
# 每个 Send 对象都指示图将数据发送到名为 "generate_joke" 的节点,
# 并将当前主题作为 "subject" 参数传递,从而实现并行生成笑话。
return [Send("generate_joke", {"subject": s}) for s in state["subjects"]]
Joke generation (map) 笑话生成

现在,我们只需定义一个节点来生成笑话,命名为 generate_joke

生成的笑话会被写回到 OverallState 中的 jokes 字段。 该字段配有 reducer,能够自动把多次写入的列表合并成一个大列表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 定义 JokeState,用于表示生成笑话任务的输入状态。
class JokeState(TypedDict):
subject: str#笑话的主题

# 定义 Joke 模型,用于表示模型生成的笑话的结构。
class Joke(BaseModel):
joke: str#笑话的文本内容

# 定义生成笑话的函数。
def generate_joke(state: JokeState):
# 根据 joke_prompt 模板和当前状态中的主题格式化提示。
prompt = joke_prompt.format(subject=state["subject"])
# 调用语言模型,并指定输出应结构化为 Joke 类型。
response = model.with_structured_output(Joke).invoke(prompt)
# 返回一个包含生成的笑话的字典,键为 "jokes"。
return {"jokes": [response.joke]}
Best joke selection (reduce) 最佳笑话选择

现在,我们添加逻辑来挑选最好的笑话。

1
2
3
4
5
6
7
8
9
def best_joke(state: OverallState):
# 将状态中所有笑话列表连接成一个字符串,每个笑话之间用两个换行符分隔
jokes = "\n\n".join(state["jokes"])
# 使用主题和所有笑话格式化最佳笑话提示语
prompt = best_joke_prompt.format(topic=state["topic"], jokes=jokes)
# 调用模型,期望其输出符合 BestJoke 结构(包含最佳笑话的ID)
response = model.with_structured_output(BestJoke).invoke(prompt)
# 根据模型返回的ID,从笑话列表中选择最佳笑话,并将其存储在状态的 best_selected_joke 字段中
return {"best_selected_joke": state["jokes"][response.id]}
Compile 编译
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from IPython.display import Image
from langgraph.graph import END, StateGraph, START

# 构建图:将所有组件组合在一起构建我们的流程图
graph = StateGraph(OverallState)

graph.add_node("generate_topics", generate_topics)
graph.add_node("generate_joke", generate_joke)
graph.add_node("best_joke", best_joke)

graph.add_edge(START, "generate_topics")
# 根据条件决定是否继续生成笑话
graph.add_conditional_edges("generate_topics", continue_to_jokes, ["generate_joke"])
# 生成笑话后执行选择最佳笑话
graph.add_edge("generate_joke", "best_joke")
# 选择最佳笑话后流程结束
graph.add_edge("best_joke", END)


app = graph.compile()
Image(app.get_graph().draw_mermaid_png())
image-20250723112736244
1
2
for s in app.stream({"topic": "日本广岛原子弹"}):
print(s)
1
2
3
4
5
{'generate_topics': {'subjects': ['原子弹投放的历史背景与决策过程', '广岛原爆对城市与居民的影响', '战后和平运动与核裁军倡议']}}
{'generate_joke': {'jokes': ['为什么广岛的重建计划从不担心交通堵塞?因为每个人都习惯了‘瞬间消失’!(注:此笑话为虚构创作,旨在以幽默方式引发思考,并非对历史事件的不尊重)']}}
{'generate_joke': {'jokes': ['战后和平运动的人开会讨论核裁军,一个人站起来说:‘我们必须彻底销毁所有核武器!’ 另一个人犹豫地举手:‘那……我们保留一个吧,就一个,藏在冰箱后面,以防万一。’']}}
{'generate_joke': {'jokes': ["杜鲁门宣布要结束战争,顾问问是否要使用原子弹。罗斯福的棺材板突然震动了一下,丘吉尔的雪茄掉在了地上,斯大林则默默拿起了电话:'同志,我们的计划可能需要再推迟一下。'"]}}
{'best_joke': {'best_selected_joke': '为什么广岛的重建计划从不担心交通堵塞?因为每个人都习惯了‘瞬间消失’!(注:此笑话为虚构创作,旨在以幽默方式引发思考,并非对历史事件的不尊重)'}}

Research Assistant 研究助理

见另一篇文章

module-5

Chatbot with Memory 带有记忆功能的聊天机器人

在这里,我们将介绍 LangGraph Memory Store 作为一种保存和检索长期记忆的方法。

LangGraph Memory Store 是 LangGraph 提供的长期记忆存储接口,用于在 不同对话线程之间 持久化保存和检索用户信息。

名称 作用
BaseStore 抽象接口,定义了 put/get/search/delete 等操作方法
InMemoryStore 内存实现,适合原型验证
RedisStore / AsyncRedisStore 生产级实现,支持向量搜索、元数据过滤和命名空间管理

我们将构建一个使用 short-term (within-thread仅当前对话线程)long-term (across-thread跨所有对话线程 ) 内存的聊天机器人。

我们将重点关注长期 semantic memory(语义记忆),它将包含关于用户的事实信息。这些长期记忆将被用于创建一个个性化的聊天机器人,它可以记住有关用户的事实。

它将节省内存 “in the hot path”,当用户与之聊天时。

“in the hot path” 指的是:在对话或任务执行的主流程中(即用户输入后立即、同步地)主动调用工具或写入记忆,使信息实时生效并可用于下一步决策。

Introduction to the LangGraph Store LangGraph 存储简介

LangGraph Memory Store 提供了一种在 LangGraph 中 跨线程 存储和检索信息的方式。这是一个用于持久化 key-value 存储的 开源基类

1
2
3
import uuid
from langgraph.store.memory import InMemoryStore
in_memory_store = InMemoryStore()

Store 中存储对象(例如,记忆)时,我们提供:

- 对象的 namespace(类似于目录的元组)

- 对象的 key(类似于文件名)

- 对象的 value(类似于文件内容)

我们使用 put 方法通过 namespacekey 将对象保存到存储中。

image-20250725155616205
1
2
3
4
5
6
7
8
9
10
11
12
# 为要保存的记忆设置命名空间
user_id = "1" # 用户ID
namespace_for_memory = (user_id, "memories") # 记忆命名空间

# 生成一个唯一的键值
key = str(uuid.uuid4()) # 使用UUID创建唯一标识符作为键

# 值需要是一个字典格式
value = {"food_preference": "我喜欢披萨"} # 存储的食物偏好信息

# 保存记忆
in_memory_store.put(namespace_for_memory, key, value) # 将记忆存储到内存中

我们使用 search 通过 namespace 从存储中检索对象。这将返回一个列表。

1
2
3
4
5
6
7
8
9
memories = in_memory_store.search(namespace_for_memory)
type(memories)

# Metatdata
memories[0].dict()

# The key, value
print(memories[0].key, memories[0].value)
9e65de8a-f404-4974-b509-0df0566d8fb5 {'food_preference': '我喜欢披萨'}

我们还可以使用 get 通过 namespacekey 检索对象。

1
2
3
# Get the memory by namespace and key
memory = in_memory_store.get(namespace_for_memory, key)
memory.dict()
Chatbot with long-term memory 具有长期记忆的聊天机器人

我们想要一个聊天机器人,有两种记忆的方式:

  1. Short-term (within-thread) memory: 聊天机器人可以保留会话历史记录和/或允许在聊天会话中进行中断。
  2. Long-term (cross-thread) memory: 聊天机器人可以记住特定用户在所有聊天会话中的信息 跨会话

对于 short-term memory,我们将使用一个 checkpointer

  • 他们在每一步都将图状态写入线程中。
  • 他们在该线程中持久化保存聊天历史记录。
  • 他们允许图在该线程中的任何步骤被中断和/或恢复。

对于 long-term memory,我们将使用上面介绍的 LangGraph Store

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
from IPython.display import Image, display

from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.store.base import BaseStore

from langchain_core.messages import HumanMessage, SystemMessage
from langchain_core.runnables.config import RunnableConfig

# 聊天机器人指令
MODEL_SYSTEM_MESSAGE = """你是一个拥有记忆功能的有用助手,能够提供关于用户的信息。
如果你有这个用户的记忆,请使用它来个性化你的回复。
以下是记忆内容(可能为空):{memory}"""

# 根据聊天历史和任何现有记忆创建新记忆
CREATE_MEMORY_INSTRUCTION = """"你正在收集用户信息以个性化你的回复。

当前用户信息:
{memory}

指令:
1. 仔细查看下面的聊天历史
2. 识别有关用户的新信息,例如:
- 个人详情(姓名、位置)
- 偏好(喜欢、不喜欢)
- 兴趣和爱好
- 过去的经历
- 目标或未来计划
3. 将任何新信息与现有记忆合并
4. 将记忆格式化为清晰的项目符号列表
5. 如果新信息与现有记忆冲突,请保留最新版本

记住:只包括用户直接陈述的事实信息。不要做假设或推断。

基于以下聊天历史,请更新用户信息:"""

def call_model(state: MessagesState, config: RunnableConfig, store: BaseStore):

"""从存储中加载记忆并使用它来个性化聊天机器人的回复。"""

# 从配置中获取用户ID
user_id = config["configurable"]["user_id"]

# 从存储中检索记忆
namespace = ("memory", user_id)
key = "user_memory"
existing_memory = store.get(namespace, key)

# 如果存在则提取实际的记忆内容并添加前缀
if existing_memory:
# 值是一个带有memory键的字典
existing_memory_content = existing_memory.value.get('memory')
else:
existing_memory_content = "未找到现有记忆。"

# 在系统提示中格式化记忆
system_msg = MODEL_SYSTEM_MESSAGE.format(memory=existing_memory_content)

# 使用记忆以及聊天历史进行回复
response = model.invoke([SystemMessage(content=system_msg)]+state["messages"])

return {"messages": response}

def write_memory(state: MessagesState, config: RunnableConfig, store: BaseStore):

"""反思聊天历史并将记忆保存到存储中。"""

# 从配置中获取用户ID
user_id = config["configurable"]["user_id"]

# 从存储中检索现有记忆
namespace = ("memory", user_id)
existing_memory = store.get(namespace, "user_memory")

# 提取记忆
if existing_memory:
existing_memory_content = existing_memory.value.get('memory')
else:
existing_memory_content = "未找到现有记忆。"

# 在系统提示中格式化记忆
system_msg = CREATE_MEMORY_INSTRUCTION.format(memory=existing_memory_content)
new_memory = model.invoke([SystemMessage(content=system_msg)]+state['messages'])

# 覆盖存储中的现有记忆
key = "user_memory"

# 将值写入为带有memory键的字典
store.put(namespace, key, {"memory": new_memory.content})

# 定义图
builder = StateGraph(MessagesState)
builder.add_node("call_model", call_model)
builder.add_node("write_memory", write_memory)
builder.add_edge(START, "call_model")
builder.add_edge("call_model", "write_memory")
builder.add_edge("write_memory", END)

# 用于跨线程的长期记忆存储
across_thread_memory = InMemoryStore()

# 用于线程内的短期记忆检查点
within_thread_memory = MemorySaver()

# 使用检查点器和存储编译图
graph = builder.compile(checkpointer=within_thread_memory, store=across_thread_memory)

# 显示图
display(Image(graph.get_graph(xray=1).draw_mermaid_png()))
image-20250725161106319

聊天历史将通过检查点工具保存到短期记忆中。聊天机器人将回顾聊天历史。然后,它将创建并保存一个记忆到 LangGraph Store。此内存可在未来的聊天会话中访问,以个性化聊天机器人的响应。

当我们与聊天机器人交互时,我们提供两样东西:

  1. Short-term (within-thread) memory: 一个用于持久化聊天历史的 thread ID
  2. Long-term (cross-thread) memory: 一个用于将长期记忆命名空间到用户的 user ID
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 我们提供线程ID用于短期(线程内)记忆
# 我们提供用户ID用于长期(跨线程)记忆
config = {"configurable": {"thread_id": "1", "user_id": "1"}}

# 用户输入
input_messages = [HumanMessage(content="你好,我的名字是Lance")]

# 运行图
for chunk in graph.stream({"messages": input_messages}, config, stream_mode="values"):
chunk["messages"][-1].pretty_print()

================================ Human Message =================================

你好,我的名字是Lance
================================== Ai Message ==================================

你好,Lance!很高兴认识你。有什么我可以帮你的吗?😊

我们正在使用 MemorySaver 检查点来管理线程内内存。这会将聊天历史保存到线程中。我们可以查看保存到线程的聊天历史记录。

1
2
3
4
thread = {"configurable": {"thread_id": "1"}}
state = graph.get_state(thread).values
for m in state["messages"]:
m.pretty_print()

回想一下,我们使用存储库编译了该图:across_thread_memory = InMemoryStore()并且,我们向图中添加了一个节点 (write_memory),该节点反映了聊天历史并保存了一段记忆到存储中。

我们可以查看内存是否已保存到存储中。

1
2
3
4
5
6
7
8
9
10
11
# 为要保存的记忆设置命名空间
user_id = "1"
namespace = ("memory", user_id)
existing_memory = across_thread_memory.get(namespace, "user_memory")
existing_memory.dict()

{'namespace': ['memory', '1'],
'key': 'user_memory',
'value': {'memory': '- 姓名:Lance \n- 位置:旧金山 \n- 兴趣和爱好:骑自行车 \n- 喜欢的活动:在旧金山骑行,可能包括金门大桥和滨海区路线'},
'created_at': '2025-07-25T08:12:35.877160+00:00',
'updated_at': '2025-07-25T08:12:35.877161+00:00'}

现在,让我们以 相同的用户ID启动一个新线程。我们应该看到聊天机器人记住了用户的个人资料,并将其用于个性化响应。

1
2
3
4
5
6
7
8
9
# 我们提供用户ID用于跨线程记忆以及一个新的线程ID
config = {"configurable": {"thread_id": "2", "user_id": "1"}}

# 用户输入
input_messages = [HumanMessage(content="你好!你推荐我去哪里骑自行车?")]

# 运行图
for chunk in graph.stream({"messages": input_messages}, config, stream_mode="values"):
chunk["messages"][-1].pretty_print()

Chatbot with Profile Schema 带有配置文件模式的聊天机器人

我们的聊天机器人将记忆保存为字符串。在实践中,我们通常希望记忆具有结构化格式。例如,记忆可以是单个、持续更新的模式。在我们的案例中,我们希望这是一个单一的用户档案。

我们将扩展我们的聊天机器人,将语义记忆保存到单个用户档案中。我们还将介绍一个库 Trustcall,用于使用新信息更新此模式。

Defining a user profile schema 定义用户配置文件模式

Python 有许多不同类型的 structured data,例如 TypedDict、字典、JSON 和 Pydantic

让我们先使用 TypedDict 来定义一个用户资料模式。

1
2
3
4
5
6
from typing import TypedDict, List

class UserProfile(TypedDict):
"""带有类型字段的用户档案模式"""
user_name: str # 用户的首选名称
interests: List[str] # 用户兴趣列表
Saving a schema to the store 将模式保存到存储中

LangGraph Store 接受任何 Python 字典作为 value

1
2
3
4
5
6
# TypedDict 实例
user_profile: UserProfile = {
"user_name": "Lance", # 用户名
"interests": ["骑行", "科技", "咖啡"] # 兴趣爱好
}
user_profile

我们使用 put 方法将 TypedDict 保存到存储中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import uuid
from langgraph.store.memory import InMemoryStore

# 初始化内存存储
in_memory_store = InMemoryStore()

# 为要保存的记忆创建命名空间
user_id = "1"
namespace_for_memory = (user_id, "memory")

# 将记忆以键值对的形式保存到命名空间中
key = "user_profile"
value = user_profile
in_memory_store.put(namespace_for_memory, key, value)

我们使用 search 按命名空间从存储中检索对象。

1
2
3
4
for m in in_memory_store.search(namespace_for_memory):
print(m.dict())

{'namespace': ['1', 'memory'], 'key': 'user_profile', 'value': {'user_name': 'Lance', 'interests': ['骑行', '科技', '咖啡']}, 'created_at': '2025-07-25T08:58:06.031770+00:00', 'updated_at': '2025-07-25T08:58:06.031775+00:00', 'score': None}

我们还可以使用 get 通过命名空间和键来检索特定对象。

1
2
3
4
profile = in_memory_store.get(namespace_for_memory, "user_profile")
profile.value

{'user_name': 'Lance', 'interests': ['骑行', '科技', '咖啡']}
Chatbot with profile schema 带有配置文件模式的聊天机器人

现在我们知道了如何为记忆指定一个模式,并将其保存到存储中。现在,我们如何根据这个特定的模式 创建 记忆?

在我们的聊天机器人中,我们 想要从一个聊天里创建记忆.这就是 structured outputs格式化输出 概念有用的地方。

LangChain 的 chat model 接口有一个 PROTECTED11 方法用于强制结构化输出。这在我们需要确保输出符合某个模式时非常有用,而且它会为我们解析输出。

1
2
3
4
5
6
# 将模式绑定到模型
model_with_structure = model.with_structured_output(UserProfile)

# 调用模型生成符合模式的结构化输出
structured_output = model_with_structure.invoke([HumanMessage("我的名字是Lance,我喜欢骑自行车。")])
structured_output

Structured outputs(结构化输出) 指让大模型不再“随意说人话”,而是按你事先定义好的格式(JSON / 表格 / 枚举 / 嵌套对象等)精确返回数据。相当于给模型套了一个“模具”,保证输出可直接被代码解析、入库或传给下游系统,避免再用正则、字符串拼接去“猜”结果。

使用官方的大模型组件

1
2
3
4
5
from langchain_community.chat_models import ChatTongyi
model=ChatTongyi(
model="qwen-plus-2025-07-14",
api_key="sk-"
)

现在,让我们在聊天机器人中使用它。这只需要对 write_memory 函数进行轻微修改。我们使用 model_with_structure,如上所述,来生成一个与我们模式匹配的配置文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
from IPython.display import Image, display

from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.store.base import BaseStore

from langchain_core.messages import HumanMessage, SystemMessage, AIMessage
from langchain_core.runnables.config import RunnableConfig

# 聊天机器人指令
MODEL_SYSTEM_MESSAGE = """你是一个拥有记忆功能的 helpful 助手,可以提供关于用户的信息。
如果你有这个用户的记忆,请使用它来个性化你的回复。
以下是记忆内容(可能为空):{memory}"""

# 根据聊天历史和任何现有记忆创建新记忆
CREATE_MEMORY_INSTRUCTION = """根据用户的聊天历史创建或更新用户档案记忆。
这将被保存为长期记忆。如果存在现有记忆,只需更新它。
以下是现有记忆(可能为空):{memory}"""

def call_model(state: MessagesState, config: RunnableConfig, store: BaseStore):

"""从存储中加载记忆并使用它来个性化聊天机器人的回复。"""

# 从配置中获取用户ID
user_id = config["configurable"]["user_id"]

# 从存储中检索记忆
namespace = ("memory", user_id)
existing_memory = store.get(namespace, "user_memory")

# 为系统提示格式化记忆
if existing_memory and existing_memory.value:
memory_dict = existing_memory.value
formatted_memory = (
f"姓名: {memory_dict.get('user_name', '未知')}\n"
f"兴趣: {', '.join(memory_dict.get('interests', []))}"
)
else:
formatted_memory = None

# 在系统提示中格式化记忆
system_msg = MODEL_SYSTEM_MESSAGE.format(memory=formatted_memory)

# 使用记忆以及聊天历史来回复
response = model.invoke([SystemMessage(content=system_msg)]+state["messages"])

return {"messages": response}

def write_memory(state: MessagesState, config: RunnableConfig, store: BaseStore):

"""反思聊天历史并将记忆保存到存储中。"""

# 从配置中获取用户ID
user_id = config["configurable"]["user_id"]

# 从存储中检索现有记忆
namespace = ("memory", user_id)
existing_memory = store.get(namespace, "user_memory")

# 为系统提示格式化记忆
if existing_memory and existing_memory.value:
memory_dict = existing_memory.value
formatted_memory = (
f"姓名: {memory_dict.get('user_name', '未知')}\n"
f"兴趣: {', '.join(memory_dict.get('interests', []))}"
)
else:
formatted_memory = None

# 在指令中格式化现有记忆
system_msg = CREATE_MEMORY_INSTRUCTION.format(memory=formatted_memory)

# 调用模型生成符合模式的结构化输出
new_memory = model_with_structure.invoke([SystemMessage(content=system_msg)]+state['messages'])

# 覆盖现有的用户档案记忆
key = "user_memory"
store.put(namespace, key, new_memory)

# 定义图
builder = StateGraph(MessagesState)
builder.add_node("call_model", call_model)
builder.add_node("write_memory", write_memory)
builder.add_edge(START, "call_model")
builder.add_edge("call_model", "write_memory")
builder.add_edge("write_memory", END)

# 用于长期(跨线程)记忆的存储
across_thread_memory = InMemoryStore()

# 用于短期(线程内)记忆的检查点
within_thread_memory = MemorySaver()

# 使用检查点和存储编译图
graph = builder.compile(checkpointer=within_thread_memory, store=across_thread_memory)

# 显示
display(Image(graph.get_graph(xray=1).draw_mermaid_png()))
image-20250726103516996

调用

1
2
3
4
5
6
7
8
9
10
# 我们提供线程ID用于短期(线程内)记忆
# 我们提供用户ID用于长期(跨线程)记忆
config = {"configurable": {"thread_id": "1", "user_id": "1"}}

# 用户输入
input_messages = [HumanMessage(content="你好,我的名字是Lance,我喜欢在旧金山骑自行车和在面包店吃饭。")]

# 运行图
for chunk in graph.stream({"messages": input_messages}, config, stream_mode="values"):
chunk["messages"][-1].pretty_print()

查看记忆

1
2
3
4
5
# Namespace for the memory to save
user_id = "1"
namespace = ("memory", user_id)
existing_memory = across_thread_memory.get(namespace, "user_memory")
existing_memory.value
Trustcall for creating and updating profile schemas Trustcall 用于创建和更新配置文件模式

Trustcall 是一个由 LangChain 团队开发的开源 Python 库,旨在解决大型语言模型(LLM)在生成或修改复杂 JSON 数据结构时效率低、易出错的问题

我们使用 create_extractor,传入模型以及我们的模式作为 tool。使用 TrustCall 时,可以以多种方式提供模式。

例如,我们可以传递一个 JSON 对象 / Python 字典或 Pydantic 模型。在底层,TrustCall 使用 tool calling 从输入的 messages 列表中生成 structured output。为了强制 Trustcall 生成 structured output,我们可以在 tool_choice 参数中包含模式名称。

我仅做了解了,感觉用处不多,用结果化输出就能达到效果

Chatbot with Collection Schema 带集合模式的聊天机器人

“collection” 指的是一种将语义记忆组织为多个独立文档(objects)的存储方式,而不是把所有信息都塞进一个巨大的“用户档案”(single profile)里。

假设你在做一个 AI 助手,用户说:

“我下周要去东京出差,住在涩谷区的 Sakura Hotel。” “我朋友 Ken 也要来,他喜欢吃拉面。”

如果用 collection,你会存成两条记忆:

1
2
3
4
5
6
7
8
9
10
11
12
13
[
{
"type": "trip",
"destination": "Tokyo",
"date": "2025-08-04",
"hotel": "Sakura Hotel, Shibuya"
},
{
"type": "friend",
"name": "Ken",
"food_preference": "ramen"
}
]

每条记忆都是一个独立文档,后续你可以:

  • 搜索“trip”类型的记忆,找到用户的出差安排;
  • 搜索“friend”类型的记忆,找到 Ken 的偏好;
  • 更新 Ken 的信息时,只改一条记录,不会影响其它记忆。

Memory Agent 内存代理

现在,我们将把学到的内容整合起来,构建一个具有长期记忆的 agent

我们的代理 task_mAIstro 将帮助我们管理待办事项列表!

我们之前构建的聊天机器人 始终 会反思对话并保存记忆。task_mAIstro 将决定 何时 保存记忆(待办事项列表中的项目)。

我们之前构建的聊天机器人始终保存一种类型的记忆,即个人资料或集合。task_mAIstro 可以决定将数据保存到用户个人资料或 ToDo 事项集合中。

除此之外,task_mAIstro 还将管理程序性记忆。这允许用户更新创建ToDo项的偏好设置。

Creating an agent 创建一个代理

有许多不同的 agent 架构可供选择。在这里,我们将实现一个简单的内容,一个 ReAct 代理。这个代理将成为创建和管理待办事项列表的得力助手。

此代理可以决定更新三种类型的长期记忆:

  1. 创建或更新具有普通用户信息的用户 profile

  2. 在ToDo列表中添加或更新项目 collection

  3. 更新其自身的 instructions 以了解如何更新待办事项列表中的项目

1
2
3
4
5
6
from typing import TypedDict, Literal

# 更新记忆工具
class UpdateMemory(TypedDict):
""" 决定更新哪种记忆类型 """
update_type: Literal['user', 'todo', 'instructions']
  • 'user':用户相关信息
  • 'todo':待办事项
  • 'instructions':指令信息
Graph definition 图定义

我们添加了一个简单的路由器 route_message,它通过二元决策来节省内存。

module-6

0%