什么是uv

uv 是由 Astral 团队开发的一个超高速 Python 包管理器,用 Rust 编写,目标是替代 pipvenvpip-toolspoetry 等多个工具。

uv常用命令

uv init myproj 创建新项目

source .venv/bin/activate(Linux/macOS)激活虚拟环境

uv add requests 安装依赖并写入 pyproject.toml

uv remove requests 移除依赖

uv sync 同步依赖到虚拟环境

uv export 导出 lock 文件为 requirements.txt 等格式

uv build 构建源码包和 wheel

uv publish 发布到 PyPI

uvx是什么

uvx 是:

uv tool run快捷别名(alias),用于无需安装即可运行 Python 包提供的命令行工具

uvx 就像 Python 世界的 npxpipx run —— 临时拉取、构建隔离环境、运行工具,用完即走,不留痕迹

uv管理命令行工具

使用uv tool

  • 用途:安装、管理、运行全局可用的 Python 命令行工具
  • 安装位置:默认安装到 ~/.local/bin(Windows: C:\Users\<USER>\.local\bin)。
1
uv tool install pytest

安装后可以直接使用pytest而不用uv run pytest

uv sync和uv pip install -e .的区别

uv pip install -e .

  • 作用:将当前项目以可编辑模式安装到当前 Python 环境。
  • 行为
    • 读取 pyproject.toml 中的 [project] 元数据。
    • 构建并安装你的主包(如 my_package),使其可被 import
    • 不会自动安装依赖(除非你显式加上 --deps,但通常不这么做)。
  • 典型用途:开发自己的包时,让本地代码可导入。

uv sync

  • 作用根据锁定文件(如 uv.lock)精确同步整个项目的依赖环境
  • 行为
    • 读取 uv.lock(由 uv lock 生成)或 pyproject.toml
    • 安装所有依赖项(包括直接依赖和传递依赖)到当前环境。
    • 默认也会以可编辑模式安装当前项目(如果 pyproject.toml 中定义了项目)。
    • 确保环境状态与锁定文件完全一致(版本、哈希、来源等)。
  • 前提:通常需要先运行 uv lock 生成 uv.lock

参考资料

【uv】Python迄今最好的项目管理+环境管理工具(吧?)_哔哩哔哩_bilibili

从pip到uv:一口气梳理现代Python项目管理全流程!_哔哩哔哩_bilibili

日志级别

级别 方法 用途
DEBUG logging.debug() 调试信息
INFO logging.info() 普通信息
WARNING logging.warning() 警告信息
ERROR logging.error() 错误信息
CRITICAL logging.critical() 严重错误

python默认只会打印warning以上级别的日志,可通过basicConfig进行设置,如下

1
2
3
4
5
6
7
8
9
# 基础配置
logging.basicConfig(level=logging.DEBUG)

# 记录不同级别的日志
logging.debug("这是一个DEBUG级别的日志")
logging.info("这是一个INFO级别的日志")
logging.warning("这是一个WARNING级别的日志")
logging.error("这是一个ERROR级别的日志")
logging.critical("这是一个CRITICAL级别的日志")

格式化log并输出

我们可以使用全局配置,完成log的格式化和输出成文件,如下

1
logging.basicConfig(level=logging.DEBUG,format='%(asctime)s - %(levelname)s - %(message)s',filename='basic.log',filemode='w')

同样,我们可以自定义logger

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# 创建自定义logger
logger = logging.getLogger('my_app')
logger.setLevel(logging.DEBUG)

# 清除之前的处理器
logger.handlers.clear()

# 创建文件处理器
file_handler = logging.FileHandler('logs/my_app.log', encoding='utf-8')
file_handler.setLevel(logging.DEBUG)

# 创建控制台处理器
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.WARNING)

# 创建不同的格式器
file_formatter = logging.Formatter(
'%(asctime)s | %(name)s | %(levelname)s | %(funcName)s:%(lineno)d | %(message)s'
)
console_formatter = logging.Formatter(
'🚨 %(levelname)s: %(message)s'
)

file_handler.setFormatter(file_formatter)
console_handler.setFormatter(console_formatter)

# 添加处理器
logger.addHandler(file_handler)
logger.addHandler(console_handler)

# 测试不同级别的日志
logger.debug("调试信息 - 只写入文件")
logger.info("普通信息 - 只写入文件")
logger.warning("警告信息 - 控制台和文件都有")
logger.error("错误信息 - 控制台和文件都有")

异常捕获

1
2
3
4
5
6
7
8
try:
result = divide(10, 0)
except ZeroDivisionError as exc:
# 方式 1:记录异常对象
logger.error("除零异常发生: {}", exc)

# 方式 2:记录完整 traceback(推荐)
logger.exception("捕获到异常,详情如下")

loguru的常用使用方法

基础用法

1
2
3
4
5
6
7
from loguru import logger

logger.debug("这是 debug")
logger.info("这是 info")
logger.warning("这是 warning")
logger.error("这是 error")
logger.critical("这是 critical")

输出到文件 logger.add(“app.log”)

过滤级别 logger.add(“app.log”, level=“WARNING”)

移除默认控制台输出 logger.remove()

参考资料

[Python] logging模块怎么用_哔哩哔哩_bilibili

[Python] 打印log神器 —— loguru_哔哩哔哩_bilibili

环境配置

1
pip install langchain-mcp-adapters

使用langgraph调用mcp

要点主要是利用MultiServerMCPClient构建服务,获取tool

利用预设的create_react_agent构建ReAct架构的智能体并调用工具

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import asyncio # 需要导入 asyncio 来运行异步函数
# 从langchain_mcp_adapters.client模块导入MultiServerMCPClient类
# 从langgraph.prebuilt模块导入create_react_agent函数
from langchain_mcp_adapters.client import MultiServerMCPClient
from langgraph.prebuilt import create_react_agent

# 导入 LLM 相关库
from langchain_openai import ChatOpenAI

# 将主要逻辑封装在一个异步函数中
async def main():
# 创建MultiServerMCPClient实例,配置两个不同的服务
client = MultiServerMCPClient(
{
"math": { # 数学计算服务
"command": "python", # 使用python命令启动
# 替换为你的math_server.py文件的绝对路径
"args": ["/workspace/langgraph-mcp/math_server.py"],
"transport": "stdio", # 使用标准输入输出传输
},
"weather": { # 天气服务
# 确保你的天气服务器在8000端口运行
# *** 确保这个 URL 是正确的,并且服务器正在运行 ***
"url": "http://localhost:8000/mcp",
"transport": "streamable_http", # 使用可流式HTTP传输
}
}
)

tools = []
try:
# 在异步函数内部正确使用 await
tools = await client.get_tools()
print(f"成功获取到 {len(tools)} 个MCP工具。")
for tool_item in tools:
print(f" - {tool_item.name}: {tool_item.description}")
except Exception as e:
print(f"获取MCP工具失败: {e}")
print("请确保MCP服务URL有效且可访问,或者您已正确配置了认证信息。")
# 在函数内部,如果出错可以选择返回或继续处理
# return # 这里可以 return,但会结束 main 函数

if not tools:
print("没有获取到工具,无法创建代理。")
return

# 创建ReAct代理
llm = ChatOpenAI(
model="qwen3-235b-a22b-thinking-2507",
api_key="sk-a8ef27c47ea84224ac6eed6d4bba1bab",
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1" # 修正了末尾多余的空格
)
agent = create_react_agent(llm, tools)


# 异步调用代理来解决数学问题
# 确保在异步函数内部使用 await
math_response = await agent.ainvoke(
{"messages": [{"role": "user", "content": "what's (3 + 5) x 12?"}]}
)
print("\n--- 数学问题回答 ---")
print(math_response["messages"][-1].content) # 打印最后一条消息(LLM的回答)

# 异步调用代理来查询天气
weather_response = await agent.ainvoke(
{"messages": [{"role": "user", "content": "what is the weather in nyc?"}]}
)
print("\n--- 天气问题回答 ---")
print(weather_response["messages"][-1].content) # 打印最后一条消息(LLM的回答)


# --- 这是脚本的入口点 ---
# 使用 asyncio.run() 来运行你的主异步函数
if __name__ == "__main__":
asyncio.run(main())

参考资料

使用 MCP - LangChain 框架

框架流程

image-20250801164905578

✅ 三个角色(系统组件)

角色 作用
Client 前端或用户界面,发起请求
Auth Provider 认证服务(如 OAuth、JWT 提供者),负责登录和签发 token
LangGraph Backend 应用的后端服务,处理业务逻辑
Secret Store 存放用户敏感信息(如 token、密钥等)
MCP Server 后端工具服务,提供具体的工具或资源接口

✅ 流程详解(12步)

🔐 阶段一:用户登录 & 获取 Token(1~6)

  1. 用户登录
    Client 提交用户名和密码给 Auth Provider。

  2. 返回 Token
    Auth Provider 验证成功后,返回一个访问令牌(token)。

  3. 携带 Token 请求
    Client 将 token 附加在请求头中,发给 LangGraph Backend。

  4. 验证 Token
    LangGraph Backend 使用 @auth.authenticate 中间件验证 token 是否有效。

  5. 获取用户信息
    验证通过后,LangGraph Backend 从 Auth Provider 拉取用户详细信息。

  6. 确认有效性
    后端确认用户信息无误,流程继续。

🔑 阶段二:获取用户权限 Token(6a~6b)

6a. 拉取用户权限 Token
LangGraph Backend 从 Secret Store 获取该用户对应的权限 token(可能是 MCP 所需的访问凭证)。

6b. 返回权限 Token
Secret Store 返回该 token。

🛠️ 阶段三:调用工具 & 返回结果(7~12)

  1. 权限控制检查
    LangGraph Backend 使用 @auth.on.* 权限控制逻辑,确认用户是否有权调用该工具。

  2. 构建 MCP Client
    后端用用户的权限 token 构建一个 MCP 客户端。

  3. 调用 MCP 工具
    MCP Client 发起请求,调用某个具体工具,携带 token(通常放在请求头中)。

  4. MCP 验证并执行
    MCP Server 验证 token 是否有效,确认无误后执行工具逻辑。

  5. 工具返回结果
    MCP Server 返回工具执行结果或资源数据。

  6. 返回给前端
    LangGraph Backend 将结果返回给 Client,完成整个链路。

配置环境

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 创建项目目录
uv init mcp-client
cd mcp-client

# 创建虚拟环境
uv venv

# 激活虚拟环境
# 在 Windows 上:
.venv\Scripts\activate
# 在 Unix 或 MacOS 上:
source .venv/bin/activate

# 安装所需包
uv add mcp anthropic python-dotenv
#使用镜像源安装
uv add mcp anthropic python-dotenv --index-url https://pypi.tuna.tsinghua.edu.cn/simple/

# 删除样板文件
# 在 Windows 上:
del main.py
# 在 Unix 或 MacOS 上:
rm main.py

# 创建我们的主文件
touch client.py

设置 API 密钥

创建一个 .env 文件来存储它:

1
2
# Create .env file
touch .env

将您的密钥添加到 .env 文件:

1
ANTHROPIC_API_KEY=<your key here>

.env 添加到您的 .gitignore

1
echo ".env" >> .gitignore

.env 文件名添加到 .gitignore 文件中,这样 Git 就会忽略 .env 文件,不会将其纳入版本控制。

创建客户端

基本客户端结构

首先,让我们设置我们的导入并创建基本的客户端类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import asyncio
from typing import Optional
from contextlib import AsyncExitStack

from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client

from anthropic import Anthropic
from dotenv import load_dotenv

load_dotenv() # 从 .env 文件加载环境变量

class MCPClient:
def __init__(self):
# 初始化会话和客户端对象
self.session: Optional[ClientSession] = None # MCP客户端会话
self.exit_stack = AsyncExitStack() # 异步上下文管理器堆栈,用于资源清理
self.anthropic = Anthropic() # Anthropic AI 客户端

# 后续方法将在这里定义

服务器连接管理

接下来,我们将实现连接到 MCP 服务器的功能:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
async def connect_to_server(self, server_script_path: str):
"""连接到MCP服务器

Args:
server_script_path: 服务器脚本路径 (.py 或 .js 文件)
"""
# 检查是否为Python文件
is_python = server_script_path.endswith('.py')
# 检查是否为JavaScript文件
is_js = server_script_path.endswith('.js')

# 如果不是Python或JavaScript文件,则抛出错误
if not (is_python or is_js):
raise ValueError("服务器脚本必须是 .py 或 .js 文件")

# 根据文件类型确定执行命令
command = "python" if is_python else "node"

# 创建服务器参数对象
server_params = StdioServerParameters(
command=command, # 执行命令
args=[server_script_path], # 脚本路径作为参数
env=None # 环境变量(使用默认环境)
)

# 建立stdio客户端连接并将其添加到异步上下文管理器中
stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
self.stdio, self.write = stdio_transport

# 创建客户端会话并将其添加到异步上下文管理器中
self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))

# 初始化会话
await self.session.initialize()

# 列出可用的工具
response = await self.session.list_tools()
tools = response.tools

# 打印连接的服务器提供的工具列表
print("\n已连接到服务器,可用工具:", [tool.name for tool in tools])

查询处理逻辑

现在让我们添加处理查询和调用工具的核心功能:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
async def process_query(self, query: str) -> str:
"""使用Claude和可用工具处理查询"""
# 构建消息列表
messages = [
{
"role": "user", # 用户角色
"content": query # 用户查询内容
}
]

# 获取可用工具列表
response = await self.session.list_tools()
available_tools = [{
"name": tool.name, # 工具名称
"description": tool.description, # 工具描述
"input_schema": tool.inputSchema # 工具输入模式
} for tool in response.tools]

# 初始Claude API调用
response = self.anthropic.messages.create(
model="qwen3-235b-a22b", # 使用的模型
max_tokens=1000, # 最大返回令牌数
messages=messages, # 消息历史
tools=available_tools # 可用工具
)

# 处理响应并处理工具调用
final_text = [] # 存储最终文本结果

assistant_message_content = [] # 存储助手消息内容
for content in response.content: # 遍历响应内容
if content.type == 'text': # 如果是文本内容
final_text.append(content.text) # 添加到最终结果
assistant_message_content.append(content) # 添加到助手消息
elif content.type == 'tool_use': # 如果是工具调用
tool_name = content.name # 工具名称
tool_args = content.input # 工具参数

# 执行工具调用
result = await self.session.call_tool(tool_name, tool_args)
final_text.append(f"[调用工具 {tool_name},参数 {tool_args}]")

assistant_message_content.append(content)
# 添加助手消息到历史
messages.append({
"role": "assistant",
"content": assistant_message_content
})
# 添加工具执行结果到历史
messages.append({
"role": "user",
"content": [
{
"type": "tool_result", # 工具结果类型
"tool_use_id": content.id, # 工具使用ID
"content": result.content # 工具执行结果
}
]
})

# 获取Claude的下一个响应
response = self.anthropic.messages.create(
model="qwen3-235b-a22b",
max_tokens=1000,
messages=messages,
tools=available_tools
)

# 添加响应文本到最终结果
final_text.append(response.content[0].text)

# 返回连接后的最终文本结果
return "\n".join(final_text)

交互式聊天界面

现在我们将添加聊天循环和清理功能:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
async def chat_loop(self):
"""运行交互式聊天循环"""
print("\nMCP客户端已启动!")
print("输入您的问题或输入'quit'退出。")

while True: # 无限循环,持续接收用户输入
try:
query = input("\n问题: ").strip() # 获取用户输入并去除首尾空格

if query.lower() == 'quit': # 如果用户输入'quit'(不区分大小写)
break # 退出循环

# 处理用户查询并获取响应
response = await self.process_query(query)
print("\n" + response) # 打印AI响应结果

except Exception as e: # 捕获所有异常
print(f"\n错误: {str(e)}") # 打印错误信息

async def cleanup(self):
"""清理资源"""
await self.exit_stack.aclose() # 异步关闭所有在exit_stack中管理的资源

主入口点

最后,我们将添加主要的执行逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
async def main():
# 检查命令行参数数量,如果少于2个则显示使用说明
if len(sys.argv) < 2:
print("用法: python client.py <服务器脚本路径>")
sys.exit(1) # 退出程序,返回错误码1

# 创建MCP客户端实例
client = MCPClient()
try:
# 连接到服务器,sys.argv[1]是第一个命令行参数(服务器脚本路径)
await client.connect_to_server(sys.argv[1])
# 启动交互式聊天循环
await client.chat_loop()
finally:
# 确保程序结束时清理资源
await client.cleanup()

# 程序入口点
if __name__ == "__main__":
import sys # 导入sys模块用于处理命令行参数
# 运行异步主函数
asyncio.run(main())
QQ20250801-164738

运行客户端

要使您的客户端与任何 MCP 服务器运行:

1
2
uv run client.py path/to/server.py # python server
uv run client.py path/to/build/index.js # node server

客户端将:

  1. 连接到指定服务器
  2. 列出可用工具
  3. 开始一个交互式聊天会话,您可以在其中:
    • 输入查询
    • 查看工具执行情况
    • 从 Claude 获取响应

运作流程

当你提交查询时:

  1. 客户端从服务器获取可用工具列表
  2. 你的查询连同工具描述一起发送给 Claude
  3. Claude 决定使用哪些工具(如果有的话)
  4. 客户端通过服务器执行任何请求的工具调用
  5. 结果会发送回 Claude
  6. Claude 提供自然语言响应
  7. 响应显示给您

参考资料

Build an MCP Client - Model Context Protocol

适配openai版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
import asyncio
import json
import sys
from typing import Optional
from contextlib import AsyncExitStack

from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client

from openai import OpenAI
from dotenv import load_dotenv
import os

load_dotenv() # 从 .env 文件加载环境变量

class MCPClient:
def __init__(self):
# 初始化会话和客户端对象
self.session: Optional[ClientSession] = None # MCP客户端会话
self.exit_stack = AsyncExitStack() # 异步上下文管理器堆栈,用于资源清理
self.anthropic = OpenAI(
api_key=os.getenv("DASHSCOPE_API_KEY"),
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"
) # 使用OpenAI兼容模式连接通义千问

async def connect_to_server(self, server_script_path: str):
"""连接到MCP服务器

Args:
server_script_path (str): 服务器脚本路径,支持.py或.js文件

Raises:
ValueError: 当脚本文件不是.py或.js格式时抛出
"""
# 检查是否为Python文件
is_python = server_script_path.endswith('.py')
# 检查是否为JavaScript文件
is_js = server_script_path.endswith('.js')

# 如果不是Python或JavaScript文件,则抛出错误
if not (is_python or is_js):
raise ValueError("服务器脚本必须是 .py 或 .js 文件")

# 根据文件类型确定执行命令
command = "python" if is_python else "node"

# 创建服务器参数对象
server_params = StdioServerParameters(
command=command, # 执行命令
args=[server_script_path], # 脚本路径作为参数
env=None # 环境变量(使用默认环境)
)

# 建立stdio客户端连接并将其添加到异步上下文管理器中
stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
self.stdio, self.write = stdio_transport

# 创建客户端会话并将其添加到异步上下文管理器中
self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))

# 初始化会话
await self.session.initialize()

# 列出可用的工具
response = await self.session.list_tools()
tools = response.tools

# 打印连接的服务器提供的工具列表
print("\n已连接到服务器,可用工具:", [tool.name for tool in tools])

async def process_query(self, query: str) -> str:
"""使用Qwen和可用工具处理查询"""
# 构建消息列表
messages = [
{
"role": "user",
"content": query
}
]

# 获取可用工具列表并转换为OpenAI格式
response = await self.session.list_tools()
available_tools = []
for tool in response.tools:
schema = tool.inputSchema
if isinstance(schema, str):
schema = json.loads(schema)
if isinstance(schema, dict) and "properties" in schema:
schema = {"type": "object", **schema}

available_tools.append({
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"parameters": schema
}
})

# 第一次调用模型
response = self.anthropic.chat.completions.create(
model="qwen3-235b-a22b",
max_tokens=1000,
messages=messages,
tools=available_tools,
extra_body={"enable_thinking": False}
)

final_text = []
message = response.choices[0].message

# 处理文本内容
if message.content:
final_text.append(message.content)

# 处理工具调用
if message.tool_calls:
for tool_call in message.tool_calls:
tool_name = tool_call.function.name
tool_args = json.loads(tool_call.function.arguments)

# 执行工具调用
result = await self.session.call_tool(tool_name, tool_args)
final_text.append(f"[调用工具 {tool_name},参数 {tool_args}]")

# 处理工具结果
tool_result_content = ""
if result.content:
for item in result.content:
if hasattr(item, 'type') and item.type == 'text':
tool_result_content += item.text
else:
tool_result_content += str(item)

# 添加助手消息到历史
messages.append({
"role": "assistant",
"content": None,
"tool_calls": [tool_call]
})

# 添加工具执行结果到历史
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": tool_result_content
})

# 再次调用模型
response = self.anthropic.chat.completions.create(
model="qwen3-235b-a22b",
max_tokens=1000,
messages=messages,
tools=available_tools,
extra_body={"enable_thinking": False}
)

# 处理最终响应
if response.choices and response.choices[0].message.content:
final_text.append(response.choices[0].message.content)

return "\n".join(final_text)

async def chat_loop(self):
"""运行交互式聊天循环"""
print("\nMCP客户端已启动!")
print("输入您的问题或输入'quit'退出。")

while True: # 无限循环,持续接收用户输入
try:
query = input("\n问题: ").strip() # 获取用户输入并去除首尾空格

if query.lower() == 'quit': # 如果用户输入'quit'(不区分大小写)
break # 退出循环

# 处理用户查询并获取响应
response = await self.process_query(query)
print("\n" + response) # 打印AI响应结果

except Exception as e: # 捕获所有异常
print(f"\n错误: {str(e)}") # 打印错误信息
import traceback
traceback.print_exc() # 打印详细错误信息

async def cleanup(self):
"""清理资源"""
await self.exit_stack.aclose() # 异步关闭所有在exit_stack中管理的资源

async def main():
# 检查命令行参数数量,如果少于2个则显示使用说明
if len(sys.argv) < 2:
print("用法: python client.py <服务器脚本路径>")
sys.exit(1) # 退出程序,返回错误码1

# 创建MCP客户端实例
client = MCPClient()
try:
# 连接到服务器,sys.argv[1]是第一个命令行参数(服务器脚本路径)
await client.connect_to_server(sys.argv[1])
# 启动交互式聊天循环
await client.chat_loop()
finally:
# 确保程序结束时清理资源
await client.cleanup()

# 程序入口点
if __name__ == "__main__":
# 运行异步主函数
asyncio.run(main())

openai和claude在工具调用的差异

  1. 工具格式转换修复

问题:MCP工具格式与OpenAI API不兼容 修复

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 原错误格式
available_tools = [{
"name": tool.name,
"description": tool.description,
"input_schema": tool.inputSchema
}]

# 修复后格式(符合OpenAI规范)
available_tools = [{
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"parameters": schema # 正确的JSON Schema格式
}
}]

  1. API响应处理修复

问题:错误访问了OpenAI响应对象的属性 修复

1
2
3
4
5
6
7
8
9
10
# 原错误代码
for content in response.content: # ❌ response没有content属性

# 修复后代码
message = response.choices[0].message # ✅ 正确的访问路径
if message.content:
final_text.append(message.content)
if message.tool_calls:
for tool_call in message.tool_calls:
# 处理工具调用

  1. 工具调用结果处理修复

问题:错误处理MCP工具调用返回的结果结构 修复

1
2
3
4
5
6
7
8
9
10
11
# 原错误代码
"content": result.content # ❌ 可能包含复杂对象

# 修复后代码
tool_result_content = ""
if result.content:
for item in result.content:
if hasattr(item, 'type') and item.type == 'text':
tool_result_content += item.text
else:
tool_result_content += str(item)
  1. 消息历史构建修复

问题:工具调用后消息历史格式不正确 修复

1
2
3
4
5
6
7
8
9
10
11
# 正确的消息历史格式
messages.append({
"role": "assistant",
"content": None,
"tool_calls": [tool_call]
})
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": tool_result_content
})

  1. JSON Schema兼容性处理

问题:MCP返回的schema可能缺少必要的根类型定义 修复

1
2
3
4
5
schema = tool.inputSchema
if isinstance(schema, str):
schema = json.loads(schema)
if isinstance(schema, dict) and "properties" in schema:
schema = {"type": "object", **schema} # 确保有根类型

配置环境

1
2
3
4
5
6
7
8
9
10
11
12
13
# Create a new directory for our project
uv init weather
cd weather

# Create virtual environment and activate it
uv venv
source .venv/bin/activate

# Install dependencies
uv add "mcp[cli]" httpx

# Create our server file
touch weather.py

mcp studio样例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from mcp.server.fastmcp import FastMCP

mcp = FastMCP("Math")

@mcp.tool()
def add(a: int, b: int) -> int:
"""Add two numbers"""
return a + b

@mcp.tool()
def multiply(a: int, b: int) -> int:
"""Multiply two numbers"""
return a * b

if __name__ == "__main__":
mcp.run(transport="stdio")

mcp streamable-http 样例

1
2
3
4
5
6
7
8
9
10
11
from mcp.server.fastmcp import FastMCP

mcp = FastMCP("Weather")

@mcp.tool()
async def get_weather(location: str) -> str:
"""Get weather for location."""
return "It's always sunny in New York"

if __name__ == "__main__":
mcp.run(transport="streamable-http")

使用

mcp市场MCP 广场 · 魔搭社区

使用 uv(推荐)

当使用 uv 时不需要特定的安装步骤。我们将使用 uvx 直接运行 mcp-server-fetch

1
2
3
4
5
6
"mcpServers": {
"fetch": {
"command": "uvx",
"args": ["mcp-server-fetch"]
}
}

使用 PIP

或者,您可以通过 pip 安装 mcp-server-fetch

1
pip install mcp-server-fetch
1
2
3
4
5
6
"mcpServers": {
"fetch": {
"command": "python",
"args": ["-m", "mcp_server_fetch"]
}
}

远程托管

image-20250802120145192
1
2
3
4
5
6
7
8
{
"mcpServers": {
"fetch": {
"type": "sse",
"url": "https://mcp.api-inference.modelscope.net/991cf46/sse"
}
}
}

参考资料

构建 MCP 服务器 - 模型上下文协议 — Build an MCP Server - Model Context Protocol

Chroma是什么

Chroma(通常指 ChromaDB)是一款 开源、AI 原生的向量数据库,专为存储和检索 高维嵌入向量 而设计,目标是让开发者 5 分钟内在本地跑起一个语义搜索或 RAG 系统

极简安装pip install chromadb

双运行模式

  • 内存模式:调试/原型随意重启;
  • 持久化模式:指定 persist_directory 即可落盘,生产也不怕丢数据。

HNSW 索引:百万级向量也能 毫秒级 响应。

使用chroma存储

文档分块

1
2
3
4
5
6
7
from langchain.document_loaders import PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter

loader = PyPDFLoader("input/健康档案.pdf")
docs = loader.load()
#递归分块
text_splitter = RecursiveCharacterTextSplitter(chunk_siz

定义embedding模型与chroma

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

from langchain.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings
embedding = OpenAIEmbeddings(
api_key="sk-",
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
model="text-embedding-v4",
check_embedding_ctx_length = False,
dimensions=1536
)
# 使用 Chroma 向量数据库存储文档 chunks
vectorstore = Chroma.from_documents(
documents=chunks, # 要存储的文档chunks列表(已处理好的文本片段)
embedding=embedding,
persist_directory="chromaDB", # 向量数据库的持久化存储目录路径
collection_name="demo001" # 集合名称,用于区分不同的文档集合
)
vectorstore.persist()

检索

这里采取直接检索进行测试

1
2
3
4
5
results = vectorstore.similarity_search(
"张三九的基本信息是什么",
k=2,
collection_name="demo001" # 指定检索的集合
)

重新加载

1
2
3
4
5
6
7
# 重新加载已存在的 Chroma 数据库
vectorstore = Chroma(
persist_directory="./chroma_db",
embedding_function=embedding
)

retriever = vectorstore.as_retriever()

参考资料

Chroma | 🦜️🔗 LangChain — Chroma | 🦜️🔗 LangChain

什么是Plan-and-Execute模式

Plan-and-Execute架构流程:先指定计划,后交给执行agent,执行后交给replan节点,判断是否需要更新计划,若要更新计划返回返回更新后的机会,否则返回response,然后路由判断是执行agent还是response

与ReAct模式不同的是,ReAct只做一次规划,而Plan-and-Execute模式核心思想是首先制定一个多步骤计划,然后逐项执行该计划。完成特定任务后,可以重新审视计划并进行适当修改。

举个例子,用户在问一个问题后,agent产生一份任务清单,选取第一份任务开始执行,执行后的结果结合任务清单,执行replan,结合新的信息,更改任务清单的内容,让后续大模型更好地执行,并去除已经完成的任务

实际生产中,应该在planner之前再进行一次判断,如果问题过于简单,不需要进行Plan-and-Execute模式

实战

安装包

1
pip install --quiet -U langgraph langchain-community langchain-openai tavily-python

定义网络搜索工具与执行agent

在产生plan后,要有一个agent对任务清单进行执行,这里以一个ReAct的网络搜索agent为例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#定义工具
from langchain_tavily import TavilySearch
tools = [TavilySearch(max_results=3)]

from langchain_openai import ChatOpenAI

from langgraph.prebuilt import create_react_agent


llm = ChatOpenAI(
model="qwen3-235b-a22b-thinking-2507",
api_key="sk-",
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"
)
prompt = "You are a helpful assistant."
agent_executor = create_react_agent(llm, tools, prompt=prompt)

测试功能

1
agent_executor.invoke({"messages": [("user", "今天是几月几日")]})

定义执行节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from typing import Literal
from langgraph.graph import END


async def execute_step(state: PlanExecute):
"""执行计划中的步骤"""
plan = state["plan"]
# 将计划格式化为带编号的字符串
plan_str = "\n".join(f"{i + 1}. {step}" for i, step in enumerate(plan))
task = plan[0] # 获取第一个待执行的任务
task_formatted = f"""对于以下计划:
{plan_str}\n\n你被分配执行第 {1} 步, {task}。"""
# 调用代理执行器来执行任务
agent_response = await agent_executor.ainvoke(
{"messages": [("user", task_formatted)]}
)
# 返回执行结果,添加到历史步骤中
return {
"past_steps": [(task, agent_response["messages"][-1].content)],
}

create_react_agent 是 LangGraph 提供的一个预构建函数,位于 langgraph.prebuilt 模块中,用于快速创建一个基于 ReAct(Reasoning + Acting)架构的智能代理。

langgraph预设的其他常用组件如下

ToolNode

功能:把 LangChain 工具(BaseTool)封装成一个图节点,负责:

  • 接收 LLM 生成的工具调用请求

  • 真正执行工具

  • 把结果返回给图

1
2
3
from langgraph.prebuilt import ToolNode

tool_node = ToolNode(tools=[search, calculator])

tools_condition

功能:判断 LLM 是否要继续调用工具的“路由函数”。

在 ReAct 图里通常放在节点之间的 条件边:

1
2
3
4
5
6
7
8
9
from langgraph.prebuilt import tools_condition

graph.add_conditional_edges("agent", tools_condition, {

"tools": "tool_node", # 需要工具 → 去 ToolNode

"**__end__**": END # 不需要 → 结束

})

定义状态

1
2
3
4
5
6
7
8
9
10
import operator
from typing import Annotated, List, Tuple
from typing_extensions import TypedDict


class PlanExecute(TypedDict):
input: str
plan: List[str]
past_steps: Annotated[List[Tuple], operator.add]
response: str
1
2
3
4
5
6
7
8
9
from pydantic import BaseModel, Field


class Plan(BaseModel):
"""未来要遵循的计划"""

steps: List[str] = Field(
description="需要遵循的不同步骤,应该按排序顺序排列"
)

定义模型

1
2
3
4
5
from langchain_community.chat_models import ChatTongyi
model=ChatTongyi(
model="qwen-plus-2025-07-14",
api_key="sk-"
)

定义初始计划节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from langchain_core.prompts import ChatPromptTemplate

planner_prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"""对于给定的目标,制定一个简单的逐步计划。\
这个计划应该包含独立的任务,如果正确执行这些任务将得到正确的答案。不要添加任何多余的步骤。\
最后一步的结果应该是最终答案。确保每个步骤都包含所需的所有信息——不要跳过任何步骤。""",
),
("placeholder", "{messages}"),
]
)
planner = planner_prompt | model.with_structured_output(Plan)

async def plan_step(state: PlanExecute):
"""制定初始计划步骤"""
# 使用规划器为用户输入制定计划
plan = await planner.ainvoke({"messages": [("user", state["input"])]})

return {"plan": plan.steps}

定义再计划节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
from typing import Union


class Response(BaseModel):
"""对用户的响应"""

response: str


class Act(BaseModel):
"""要执行的动作"""

action: Union[Response, Plan] = Field(
description="要执行的动作。如果你想响应用户,使用 Response。"
"如果你需要进一步使用工具来获取答案,使用 Plan。"
)


replanner_prompt = ChatPromptTemplate.from_template(
"""对于给定的目标,制定一个简单的逐步计划。\
这个 计划应该包含独立的任务,如果正确执行这些任务将得到正确的答案。不要添加任何多余的步骤。\
最后一步的结果应该是最终答案。确保每个步骤都包含所需的所有信息——不要跳过任何步骤。

你的目标是:
{input}

你的原始计划是:
{plan}

你目前已经完成了以下步骤:
{past_steps}

相应地更新你的计划。如果不需要更多步骤并且可以返回给用户,则直接响应。否则,填写计划。只添加仍需要完成的步骤到计划中。不要将已经完成的步骤作为计划的一部分返回。"""
)


replanner = replanner_prompt | model.with_structured_output(Act)
1
2
3
4
5
6
7
8
9
10
async def replan_step(state: PlanExecute):
"""重新规划步骤"""
# 使用重新规划器根据当前状态更新计划
output = await replanner.ainvoke(state)
if isinstance(output.action, Response):
# 如果动作是响应,返回最终响应
return {"response": output.action.response}
else:
# 如果动作是计划,返回新的计划步骤
return {"plan": output.action.steps}

定义路由

1
2
3
4
5
6
7
8
def should_end(state: PlanExecute):
"""判断是否结束执行流程"""
if "response" in state and state["response"]:
# 如果存在响应内容,结束流程
return END
else:
# 否则继续执行代理步骤
return "agent"

编译图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from langgraph.graph import StateGraph, START
from langgraph.checkpoint.memory import MemorySaver

# 创建内存检查点保存器
memory = MemorySaver()

# 创建工作流图,使用 PlanExecute 状态类型
workflow = StateGraph(PlanExecute)

# 添加计划节点
workflow.add_node("planner", plan_step)

# 添加执行步骤节点
workflow.add_node("agent", execute_step)

# 添加重新规划节点
workflow.add_node("replan", replan_step)

# 从开始节点连接到计划节点
workflow.add_edge(START, "planner")

# 从计划节点连接到代理执行节点
workflow.add_edge("planner", "agent")

# 从代理执行节点连接到重新规划节点
workflow.add_edge("agent", "replan")

# 添加条件边 - 从重新规划节点根据条件决定下一步
workflow.add_conditional_edges(
"replan",
# 传入决定下一个调用节点的函数
should_end,
["agent", END], # 可能的下一个节点:代理节点或结束
)

# 最后,编译工作流并添加检查点功能!
# 这将其编译为 LangChain Runnable,
# 意味着你可以像使用其他任何 runnable 一样使用它
app = workflow.compile(checkpointer=memory)
image-20250729092408872

调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 配置递归限制,防止无限循环
config = {"configurable": {
"thread_id": "1", # 必需:线程ID
},"recursion_limit": 50}

# 输入问题:2024年澳大利亚网球公开赛男单冠军的家乡是哪里?
inputs = {"input": "2024年澳大利亚网球公开赛男单冠军的家乡是哪里?"}

# 异步流式执行应用
async for event in app.astream(inputs, config=config):
# 遍历每个事件
for k, v in event.items():
# 排除结束标记,打印其他所有事件内容
if k != "__end__":
print(v)

资源

计划与执行 — Plan-and-Execute

前言

常使用pgsql与redis配合langgraph的checkpoint与store进行持久化储存,完成长期记忆与短期记忆的实现

pgsql实现持久化

什么是pgsql

PostgreSQL(常简称pgsql或Postgres)是一个功能强大的开源对象-关系型数据库管理系统(ORDBMS),以其稳定性、扩展性和符合SQL标准著称。

docker拉取

docker-compose

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
version: '3.8'

services:
postgres:
image: postgres:15 # 指定具体版本
container_name: postgres_db
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: postgres
TZ: Asia/Shanghai # 设置时区
ports:
- "5432:5432"
volumes:
- pgdata:/var/lib/postgresql/data
restart: unless-stopped
healthcheck: # 健康检查
test: ["CMD", "pg_isready", "-U", "nange"]
interval: 10s
timeout: 5s
retries: 5
command: ["postgres", "-c", "max_connections=200"] # 自定义配置

volumes:
pgdata:

Docker Compose 是一个 定义和运行多容器 Docker 应用声明式工具。 它通过一个 YAML 文件(通常叫 docker-compose.yml 描述整个应用的服务、网络、存储等配置,然后用一条命令即可 启动/停止/管理 所有容器,无需手动逐个 docker run

1
2
#拉取并运行
docker-compose up -d

在pgsqsl安装依赖

因为LangGraph的PostgresStore需要使用到pgvector,因此需要在容器中按照如下步骤进行操作,直接使用Docker Desktop软件中进行操作

  • 为什么需要 pgvector
  • PostgresStore 支持将 向量嵌入(embedding) 存储在 PostgreSQL 中,并基于它们进行 语义搜索
  • 该功能依赖 pgvector 扩展提供的 vector 类型和索引机制(如 HNSW)。
1
2
3
安装依赖           
apt update #刷新本地软件包索引
apt install -y git build-essential postgresql-server-dev-15

apt install -y git build-essential postgresql-server-dev-15一次性安装 3 类依赖。

  • git —— 用来克隆 pgvector 源码。
  • build-essential —— Debian/Ubuntu 的“编译工具链”元包,包含 gcc、make 等。
  • postgresql-server-dev-15 —— 与当前 Postgres 主版本一致 的开发头文件和 pg_config
1
2
3
4
5
6
7
8
9
10
编译并安装 pgvector      
#把 pgvector 的 v0.7.0 稳定版 源码克隆到本地目录 ./pgvector。
git clone --branch v0.7.0 https://github.com/pgvector/pgvector.git
cd pgvector
#调用 Makefile 根据当前操作系统 + PostgreSQL 版本编译出二进制文件(.so 共享库)。
make
#把刚刚编好的 .so 文件和 .sql/.control 文件复制到 PostgreSQL 的扩展目录
make install
验证安装,检查扩展文件是否安装成功
ls -l /usr/share/postgresql/15/extension/vector*

pgvector/pgvector: Open-source vector similarity search for Postgres

接下来,若要在脚本中进行使用,首先在系统环境中需要安装PostgreSQL 的开发库(libpq),因为 psycopg 需要它来编译或运行

1
2
sudo apt update
sudo apt install libpq-dev postgresql-server-dev-all

psycopg(Python 操作 PostgreSQL 的库)Linux/macOS 上运行时,底层依赖于 PostgreSQL 的 C 语言开发库 libpq。 如果系统里 没有 libpq,psycopg 会出现以下两种问题:

  1. 编译安装失败(源码/旧版本) 当 pip install psycopg2 需要现场编译时,会找不到头文件 libpq-fe.h 或动态库 libpq.so,导致报错:

    1
    error: libpq-fe.h: No such file or directory
  2. 运行时崩溃 即使通过预编译的 wheel 包安装成功,运行时也可能提示:

    1
    ImportError: libpq.so.5: cannot open shared object file

最后,再安装相关依赖包

1
2
pip install langgraph-checkpoint-postgres                    
pip install psycopg psycopg-pool

psycopg官方 PostgreSQL 驱动,在 psycopg 之上再包一层“连接池”,让并发访问更快、更稳定。

连接pgsql

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from langgraph.store.postgres import PostgresStore
from langgraph.checkpoint.postgres import PostgresSaver
from psycopg_pool import ConnectionPool
# 1) 连接字符串(URI 语法)
DB_URI = "postgresql://postgres:postgres@localhost:5432/postgres?sslmode=disable"
# 协议:// 用户 : 密码 @ 主机:端口 / 数据库名 ? 额外参数

# 2) 连接级参数
connection_kwargs = {
"autocommit": True, # 每条 SQL 执行完立即提交,无需手动 commit
"prepare_threshold": 0, # 禁用服务器端 prepared statement,可减少一次往返
}

# 3) 创建池
connection_pool = ConnectionPool(
conninfo=DB_URI,
max_size=20, # 最多 20 条物理连接
kwargs=connection_kwargs,
)

# 4) 显式打开池(psycopg 3 的 ConnectionPool 默认懒启动,调 open() 会立即建 min_size 条连接)
connection_pool.open()
print("数据库连接池初始化成功")

正常情况每次 SQL 都新建一条 TCP 连接、做 SSL 握手、验证密码、分配内存,如果不复用连接,这些动作就要 每次都重新来一遍成本非常高

连接池(Connection Pool)是一种 数据库访问层资源管理组件,其核心目标是在 高并发、短事务 场景下,通过 复用已建立的数据库物理连接 来降低系统整体延迟、减少资源消耗,并防止数据库因瞬时连接风暴而崩溃。

初始化pgsql

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 初始化PostgresStore
in_postgres_store = PostgresStore(
pool,
index={
"dims": 1536,
"embed": embedding
}
)
in_postgres_store.setup()

#初始化checkpoint
# 使用传入的连接池创建 PostgresSaver
checkpointer = PostgresSaver(pool)
checkpointer.setup()

#最后编译时添加
graph_builder.compile(checkpointer=checkpointer, store=in_postgres_store)

in_postgres_store.setup() 的角色一句话就能说清:

把数据库里所有为了让向量存储正常工作的“一次性基建”全部建好——只建一次,后面再跑就不会重复执行。

具体而言,它通常干下面三件事:1.建表 / 建扩展;2.建向量索引;3.元数据初始化

fastapi实现生命周期的管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# 定义了一个异步函数lifespan,它接收一个FastAPI应用实例app作为参数。这个函数将管理应用的生命周期,包括启动和关闭时的操作
# 函数在应用启动时执行一些初始化操作,如加载上下文数据、以及初始化问题生成器
# 函数在应用关闭时执行一些清理操作
# @asynccontextmanager 装饰器用于创建一个异步上下文管理器,它允许你在 yield 之前和之后执行特定的代码块,分别表示启动和关闭时的操作
@asynccontextmanager
async def lifespan(app: FastAPI):
# 启动时执行
# 申明引用全局变量,在函数中被初始化,并在整个应用中使用
global graph, connection_pool
# 启动时执行
try:
logger.info("正在初始化模型、定义 Graph...")
# 初始化 LLM
llm, embedding = get_llm(llm_type)
# 创建数据库连接池
DB_URI = "postgresql://postgres:postgres@localhost:5432/postgres?sslmode=disable"
connection_kwargs = {
"autocommit": True,
"prepare_threshold": 0,
}
connection_pool = ConnectionPool(
conninfo=DB_URI,
max_size=20,
kwargs=connection_kwargs,
)
connection_pool.open() # 显式打开连接池
logger.info("数据库连接池初始化成功")
# 短期记忆 初始化checkpointer
checkpointer = PostgresSaver(connection_pool)
checkpointer.setup()
# 长期记忆 初始化PostgresStore
in_postgres_store = PostgresStore(
connection_pool,
index={
"dims": 1536,
"embed": embedding
}
)
in_postgres_store.setup()
# 定义 Graph
graph = create_graph(llm, checkpointer, in_postgres_store )
# 保存 Graph 可视化图
save_graph_visualization(graph)
logger.info("初始化完成")
except Exception as e:
logger.error(f"初始化过程中出错: {str(e)}")
raise

yield # 应用运行期间

# 关闭时执行
logger.info("正在关闭...")
if connection_pool:
connection_pool.close() # 关闭连接池
logger.info("数据库连接池已关闭")


# lifespan参数用于在应用程序生命周期的开始和结束时执行一些初始化或清理工作
app = FastAPI(lifespan=lifespan)

pgsql的存储结构

一、Checkpoints 系列

作用:让 LangGraph Runtime 能在 分布式/长流程 场景下 断点续跑、重放、并发控制

表名 存什么 典型字段(示意) 何时写入
checkpoints 每个「图实例」的 最新快照(state 的完整 JSONB) thread_id, checkpoint_ns, checkpoint_id, parent_checkpoint_id, state, created_at 每次节点执行成功后覆盖更新
checkpoint_blobs checkpoints 里 大字段的拆分(避免行过大) thread_id, checkpoint_ns, channel, type, blob 当 state 过大,自动拆分
checkpoint_migrations 记录 schema 版本/迁移脚本 version, name, applied_at 只在 setup() 时写一次
checkpoint_writes 写放大日志(每个节点写 state 的增量 diff) thread_id, checkpoint_id, task_id, idx, channel, type, value 每次节点完成时追加

关系:
checkpoints = 最新完整快照
checkpoint_writes = 所有增量历史(用于重放/审计)
checkpoint_blobs = checkpoints 里超大型 value 的切片

二、Store 系列

作用:给 业务代码(开发者) 提供 持久化 KV / 向量存储,与图运行状态无关。

表名 存什么 典型字段(示意) 何时写入
store 任意 KV 文档(LangChain Document → JSONB) uuid, namespace, key, value, created_at, updated_at 你调用 store.amput / amset 等 API
store_migrations 同 checkpoint_migrations,记录 store schema 版本 version, name, applied_at 只在第一次 setup()
store_vectors 向量索引表(embedding → vector 类型) uuid, collection_id, embedding, document, metadata 你调用 add_documents(..., embeddings=...)
vector_migrations 记录 pgvector 扩展及索引迁移版本 version, applied_at setup() 时若第一次装 pgvector

三、调用链脑图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
┌──────────────┐
│ LangGraph │ 运行图实例
└────┬─────────┘
│1. 写 checkpoints
│2. 写 checkpoint_writes
│3. 拆大字段到 checkpoint_blobs

┌──────────────┐
│ 业务代码 │ 读写 KV/向量
└────┬─────────┘
│1. 写 store
│2. 写 store_vectors

PostgreSQL (pgsql)

在linux安装postgresql

查看linux发行版本

1
2
# 查看发行版名称和版本
cat /etc/os-release

输出

1
2
3
4
5
6
7
8
9
10
11
12
13
PRETTY_NAME="Ubuntu 24.04.2 LTS"
NAME="Ubuntu"
VERSION_ID="24.04"
VERSION="24.04.2 LTS (Noble Numbat)"
VERSION_CODENAME=noble
ID=ubuntu
ID_LIKE=debian
HOME_URL="https://www.ubuntu.com/"
SUPPORT_URL="https://help.ubuntu.com/"
BUG_REPORT_URL="https://bugs.launchpad.net/ubuntu/"
PRIVACY_POLICY_URL="https://www.ubuntu.com/legal/terms-and-policies/privacy-policy"
UBUNTU_CODENAME=noble
LOGO=ubuntu-logo

安装postgresql

1
2
3
4
5
6
7
8
9
10
11
12
13
# 更新软件包索引
sudo apt update

# 安装 PostgreSQL 和常用扩展
sudo apt install -y postgresql postgresql-contrib

# 检查服务状态
#Linux 容器/子系统( Docker、WSL 或 LXC)没有使用 systemd ,因此 systemctl 无法工作
sudo systemctl status postgresql

#确认 PostgreSQL 已安装
which psql # 应该输出 /usr/bin/psql
pg_ctl --version # 显示版本号即已安装
  • postgresql 是主程序
  • postgresql-contrib 提供额外扩展(如 uuid-ossppgcrypto 等)

systemctl一般用于服务器上,完整的linux系统上

Linux 容器/子系统( Docker、WSL 或 LXC)使用 service

pgsql常用指令

启动pgsql服务

1
sudo service postgresql start

停止pgsql服务

1
sudo service postgresql stop

查看状态

1
sudo service postgresql status

连接PostgreSQL 默认的系统用户,可以执行pgsql的相关指令

1
sudo -u postgres psql

psql为PostgreSQL 的终端客户端,默认会连接与当前操作系统用户名同名的数据库用户和数据库。

postgres为PostgreSQL 自带的系统数据库,postgres 默认 数据库密码为空,可以通过以下指令进行设置

1
2
-- 设置postgres用户密码
ALTER USER postgres PASSWORD 'postgres';

数据库关于user的作用

数据库里的“用户”是 PostgreSQL 内部用来做“访问控制”的一把钥匙。一句话:“谁能连哪个库、谁能读哪张表、谁能改哪些行”——全靠这些数据库用户(角色)来判定。

作用 举例
1. 认证(Authentication) 告诉 PostgreSQL “我连库时提供的用户名+密码是否合法”。
2. 授权(Authorization) 决定 “这个用户连进来后,对哪些库、哪些表、哪些行有何种权限(SELECT/INSERT/UPDATE/DELETE…)”。
3. 资源隔离(Isolation) 不同业务/团队用不同用户,方便审计、限流、回收权限,互不干扰。

pgsql通用 URL 模板

1
postgresql://<用户名>:<密码>@127.0.0.1:5432/<数据库名>[?参数=值&...]

连接pgsql

1
2
3
4
5
6
7
8
9
10
11
12
# 基于数据库持久化存储的short-term
db_uri = "postgresql://postgres:postgres@localhost:5432/postgres?sslmode=disable"

# short-term短期记忆 实例化PostgresSaver对象 并初始化checkpointer
# long-term长期记忆 实例化PostgresStore对象 并初始化store
async with (
AsyncPostgresSaver.from_conn_string(db_uri) as checkpointer,
AsyncPostgresStore.from_conn_string(db_uri) as store

):
await store.setup()
await checkpointer.setup()

更换apt镜像源

1
2
3
4
5
6
7
8
cat > /etc/apt/sources.list <<'EOF'
deb http://mirrors.aliyun.com/debian/ bookworm main contrib non-free
deb http://mirrors.aliyun.com/debian/ bookworm-updates main contrib non-free
deb http://mirrors.aliyun.com/debian/ bookworm-backports main contrib non-free
deb http://mirrors.aliyun.com/debian-security bookworm-security main contrib non-free
EOF

apt update

参考资料

postgresql向量扩展pgvector的安装与入门本文简答的介绍了 rag 的架构,引申出向量数据库的作用,介绍了 - 掘金

langchain支持向量存储向量存储 | 🦜️🔗 LangChain — Vector stores | 🦜️🔗 LangChain

前言

本文基于AI-Guide-and-Demos-zh_CN/PaperNotes/Transformer 论文精读.md at master · Hoper-J/AI-Guide-and-Demos-zh_CNTransformer论文逐段精读【论文精读】_哔哩哔哩_bilibili阅读学习

transformer贡献

实际在这一阶段的工作中,注意力机制就已经在编码器-解码器架构中被广泛应用(与 RNN 一起使用),但 Transformer 彻底颠覆了默认采取的逻辑:直接放弃 RNN 的递归结构,只使用注意力机制来编码和解码序列信息

Transformer 的主要贡献如下:

  • 取消递归结构,实现并行计算

    通过采用自注意力机制(Self-Attention),Transformer 可以同时处理多个输入序列,极大提高了计算的并行度和训练速度。

  • 引入位置编码(Positional Encoding)并结合 Attention 机制巧妙地捕捉位置信息

    在不依赖 RNN 结构的情况下,通过位置编码为序列中的每个元素嵌入位置信息,从而使模型能够感知输入的顺序。

transformer架构

image-20250807135151542
image-20250807145354145

【Transformer模型】曼妙动画轻松学,形象比喻贼好记_哔哩哔哩_bilibili

Transformer 模型基于编码器(左)- 解码器(右)架构

Transformer编码器同样由 N 个完全相同的层(原始论文中 N=6)堆叠而成,每层只有两个子层,而解码器有三个。

  1. 多头自注意力(Multi-Head Self-Attention) 让输入序列中的每个位置都能关注序列内所有位置,直接建模全局依赖。
  2. 前馈全连接网络(Position-wise Feed-Forward Network) 对每个位置独立地做一次两层的全连接变换(通常先升维再降维)。

同样,每个子层后都有

  • 残差连接(Residual Connection)
  • 层归一化(Layer Normalization)

另外,编码器在输入端还会用到

  • 位置编码(Positional Encoding)——给模型提供序列位置信息,因为注意力本身不包含顺序信息。

Transformer解码器由多个相同的层堆叠而成,每一层包含三个核心子层:

  1. 掩蔽多头自注意力机制(Masked Multi-Head Attention) 用于处理目标序列,通过掩码防止当前位置关注未来位置,确保生成过程的自回归特性。
  2. 编码器-解码器注意力机制(Encoder-Decoder Attention) 使解码器能够关注编码器输出的上下文信息,建立输入与输出序列之间的关联。
  3. 前馈神经网络(Feed-Forward Neural Network) 对注意力机制的输出进行非线性变换,增强模型的表达能力。

此外,每个子层后均包含残差连接(Residual Connection)和层归一化(Layer Normalization),以稳定训练过程并加速收敛。最终,解码器的输出通过线性层和Softmax层映射为词汇表上的概率分布。

嵌入(Embeddings)

image-20250808103256344

在 Transformer 模型中,嵌入层(Embedding Layer) 是处理输入和输出数据的关键步骤,因为模型实际操作的是张量(tensor),而非字符串(string)。在将输入文本传递给模型之前,首先需要进行分词(tokenization),即将文本拆解为多个 token,随后这些 token 会被映射为对应的 token ID,从而转换为模型可理解的数值形式。此时,数据的形状为 (seq_len,),其中 seq_len 表示输入序列的长度。

目的:为了让模型捕捉到 token 背后复杂的语义(Semantic meaning)关系,我们需要将离散的 token ID 映射到一个高维的连续向量空间(Continuous, dense)。这意味着每个 token ID 会被转换为一个嵌入向量(embedding vector),期望通过这种方式让语义相近的词汇在向量空间中距离更近,使模型能更好地捕捉词汇之间的关系。

流程:(前面要进行分词,后面要进行位置编码)

初始化一个可学习的矩阵 E ∈ ℝ^(|V| × d_model) |V| = 词表大小(比如 32 k、50 k),d_model = 512/768/1024…

把 token id 作为行号,直接取对应行: x_i = E[token_id_i] 得到 [batch, seq_len, d_model] 的浮点张量。

位置编码(Positional Encoding)

image-20250808103418150

Transformer 的自注意力机制(Self-Attention)是位置无关(position-agnostic)的。也就是说,如果不做任何处理,模型无法区分“我爱你”和“你爱我”这两个句子的差异,因为自注意力机制只关注 token 之间的相关性,而不考虑它们在序列中的顺序。

为了让模型感知到 token 的位置信息,Transformer 引入了位置编码

在原始论文中,Transformer 使用的是固定位置编码(Positional Encoding),其公式如下:

$$ \begin{aligned} PE_{(pos, 2i)} &= \sin\left(\frac{pos}{10000^{2i/d_{\text{model}}}}\right), \\ PE_{(pos, 2i+1)} &= \cos\left(\frac{pos}{10000^{2i/d_{\text{model}}}}\right). \end{aligned} $$

其中:

  • pos 表示位置索引(Position)。
  • i 表示维度索引。
  • dmodel 是嵌入向量的维度。

流程:输入的是一个整数索引(位置序号 0,1,2,…)。位置编码模块先把这些整数映射成与词向量同维度的向量(例如 512 维),再把结果加到词向量上。

linear与Softmax

image-20250808111823715

从Linear到MLP AI模型的数学本质【Transformer结构拆解】_哔哩哔哩_bilibili

在 Transformer 模型中,Softmax 函数不仅在计算注意力权重时用到,在预测阶段的输出处理环节也会用到,因为预测 token 的过程可以看成是多分类问题

Softmax 函数是一种常用的激活函数,能够将任意实数向量转换为概率分布,确保每个元素的取值范围在 [0, 1] 之间,并且所有元素的和为 1。其数学定义如下:

$$ \text{Softmax}(x_i) = \frac{e^{x_i}}{\sum_{j} e^{x_j}} $$

其中:

  • xi 表示输入向量中的第 i 个元素。
  • Softmax(xi) 表示输入 xi 转换后的概率。

我们可以把 Softmax 看作一种归一化的指数变换。相比于简单的比例归一化 $\frac{x_i}{\sum_j x_j}$, Softmax 通过指数变换放大数值间的差异,让较大的值对应更高的概率,同时避免了负值和数值过小的问题,让模型聚焦于权重最高的位置,同时保留全局信息(低权重仍非零)。

注意力机制

【Attention 注意力机制】激情告白transformer、Bert、GNN的精髓_哔哩哔哩_bilibili

image-20250808135811744

缩放点积注意力机制(Scaled Dot-Product Attention)

image-20250808140834132

Transformer 的核心是多头注意力机制(Multi-Head Attention),它能够捕捉输入序列中不同位置之间的依赖关系,并从多个角度对信息进行建模。模块将自底向上的进行讲解:在深入理解注意力机制前,首先需要理解论文使用的缩放点积注意力机制(Scaled Dot-Product Attention)

给定查询矩阵 Q、键矩阵 K 和值矩阵 V, 其注意力输出的数学表达式如下:

$$ \text{Attention}(Q, K, V) = \text{Softmax}\left(\frac{Q K^\top}{\sqrt{d_k}}\right) V $$

  • Q(Query): 用于查询的向量矩阵。
  • K(Key): 表示键的向量矩阵,用于与查询匹配。
  • V(Value): 值矩阵,注意力权重最终会作用在该矩阵上。
  • dk: 键或查询向量的维度。

理解 Q、K、V 的关键在于代码,它们实际上是通过线性变换从输入序列生成的

公式解释

  1. 点积计算(Dot Produce)

    将查询矩阵 Q 与键矩阵的转置 K 做点积,计算每个查询向量与所有键向量之间的相似度:

    $`\text{Scores} = Q K^\top`$

    • 每一行表示某个查询与所有键之间的相似度(匹配分数)。
    • 每一列表示某个键与所有查询之间的相似度(匹配分数)。
  2. 缩放(Scaling)

    dk 较大时,点积的数值可能会过大,导致 Softmax 过后的梯度变得极小,因此除以 $\sqrt{d_k}$ 缩放点积结果的数值范围:

    $`\text{Scaled Scores} = \frac{Q K^\top}{\sqrt{d_k}}`$

    缩放后(Scaled Dot-Product)也称为注意力分数(attention scores)。

  3. Softmax 归一化

    使用 Softmax 函数将缩放后的分数转换为概率分布:

    $`\text{Attention Weights} = \text{Softmax}\left(\frac{Q K^\top}{\sqrt{d_k}}\right)`$

    注意:Softmax 是在每一行上进行的,这意味着每个查询的匹配分数将归一化为概率,总和为 1。

  4. 加权求和(Weighted Sum)

    最后,使用归一化后的注意力权重对值矩阵 V 进行加权求和,得到每个查询位置的最终输出: $`\text{Output} = \text{Attention Weights} \times V`$

单头注意力机制(Single-Head Attention)

将输入序列(Inputs)通过线性变换生成查询矩阵(Query, Q)、键矩阵(Key, K)和值矩阵(Value, V),随后执行缩放点积注意力(Scaled Dot-Product Attention)。

掩码注意力机制(Masked Attention)

如果使用 mask 掩盖将要预测的词汇,那么 Attention 就延伸为 Masked Attention

在这段代码中,mask 矩阵用于指定哪些位置应该被遮蔽(即填充为 -∞),从而保证这些位置的注意力权重在 softmax 输出中接近于零。注意,掩码机制并不是直接在截断输入序列,也不是在算分数的时候就排除不应该看到的位置,因为看到也没有关系,不会影响与其他位置的分数,所以在传入 Softmax(计算注意力权重)之前排除就可以了。

另外,根据输入数据的来源,还可以将注意力分为自注意力(Self-Attention)和交叉注意力(Cross-Attention)

自注意力机制(Self-attention)

image-20250808142109091

Transformer 模型架构使用到了三个看起来不同的注意力机制,我们继续忽视共有的 Multi-Head。观察输入,线条一分为三传入 Attention 模块,这意味着查询(query)、键(key)和值(value)实际上都来自同一输入序列 X,数学表达如下:

Q = XWQ,  K = XWK,  V = XWV

  • WQ, WK, WV:可训练的线性变换权重,实际上就是简单的线性层

交叉注意力机制(Cross-Attention)

在 Transformer 解码器中,除了自注意力外,还使用了 交叉注意力(Cross-Attention)

如下图所示,解码器(右)在自底向上的处理过程中,先执行自注意力机制,然后通过交叉注意力从编码器的输出中获取上下文信息。

image-20250808142428374

数学表达如下:

Q = XdecoderWQ,  K = XencoderWK,  V = XencoderWV

对比学习

Masked AttentionSelf-AttentionCross-Attention 的本质是一致的,这一点从代码调用可以看出来,三者的区别在于未来掩码的使用和输入数据的来源:

  • Masked Attention:用于解码过程,通过掩码屏蔽未来的时间步,确保模型只能基于已生成的部分进行预测,论文中解码器部分的第一个 Attention 使用的是 Masked Self-Attention。

  • Self-Attention:查询、键和值矩阵来自同一输入序列,模型通过自注意力机制学习输入序列的全局依赖关系。

  • Cross-Attention:查询矩阵来自解码器的输入,而键和值矩阵来自编码器的输出,解码器的第二个 Attention 模块就是 Cross-Attention,用于从编码器输出中获取相关的上下文信息。

    • 机器翻译中的中译英任务为例:对于中文句子“中国的首都是北京”,假设模型已经生成了部分译文“The capital of China is”,此时需要预测下一个单词。

      在这一阶段,解码器中的交叉注意力机制会使用当前已生成的译文“The capital of China is”的编码表示作为查询,并将编码器对输入句子“中国的首都是北京”编码表示作为,通过计算查询与键之间的匹配程度,生成相应的注意力权重,以此从值中提取上下文信息,基于这些信息生成下一个可能的单词(token),比如:“Beijing”。

多头注意力机制(Multi-Head Attention)

image-20250808143810965
image-20250808143822630
image-20250808143845681
image-20250808145725202

多头注意力机制就是存在多个不同的权重矩阵,形成多个矩阵Z,再把它们 按最后一维(hidden)拼接(concat)→ 做一次线性变换 得到最终输出。

线性bian’h把拼接后的多头结果 Z_concat(形状 batch×seq×d_model)重新线性映射回与输入相同的维度,同时让网络可以学习如何融合不同头的信息

【Transformer模型】曼妙动画轻松学,形象比喻贼好记_哔哩哔哩_bilibili

Transformer原理及架构:多头自注意力机制_哔哩哔哩_bilibili

残差连接(Residual Connection)和层归一化(Layer Normalization, LayerNorm)

image-20250808150313758

在 Transformer 架构中,残差连接(Residual Connection)与层归一化(LayerNorm)结合使用,统称为 Add & Norm 操作。

Add(残差连接,Residual Connection)

残差连接是一种跳跃连接(Skip Connection),它将层的输入直接加到输出上(观察架构图中的箭头),对应的公式如下:

Output = SubLayer(x) + x

这种连接方式有效缓解了深层神经网络的梯度消失问题。

image-20250808151004667

在transform中,就是输入的矩阵x加上经过注意力机制计算出来的z矩阵

Norm(层归一化,Layer Normalization)

层归一化(LayerNorm)是一种归一化技术,用于提升训练的稳定性和模型的泛化能力。

假设输入向量为 x = (x1, x2, …, xd), LayerNorm 的计算步骤如下:

  1. 计算均值和方差: 对输入的所有特征求均值 μ 和方差 σ2

    $` \mu = \frac{1}{d} \sum_{j=1}^{d} x_j, \quad \sigma^2 = \frac{1}{d} \sum_{j=1}^{d} (x_j - \mu)^2 `$

  2. 归一化公式: 将输入特征 i 进行归一化:

    $` \hat{x}_i = \frac{x_i - \mu}{\sqrt{\sigma^2 + \epsilon}} `$

    其中, ϵ 是一个很小的常数(比如 1e-9),用于防止除以零的情况。

  3. 引入可学习参数: 归一化后的输出乘以 γ 并加上 β, 公式如下:

    $` \text{Output} = \gamma \hat{x} + \beta `$

    其中 γβ 是可学习的参数,用于进一步调整归一化后的输出。

前馈神经网络 Position-wise Feed-Forward Networks(FFN)

image-20251220191959727

在 Transformer 中,前馈网络层(Feed-Forward Network,FFN)的作用可以概括为一句话: “对每个位置的向量进行非线性变换,增加模型的表达能力。”

在编码器-解码器架构中,另一个看起来“大一点”的模块就是 Feed Forward,它在每个位置 i 上的计算可以表示为:

FFN(xi) = max(0, xiW1 + b1)W2 + b2

其中:

  • xi ∈ ℝdmodel 表示第 i 个位置的输入向量。
  • W1 ∈ ℝdmodel × dffW2 ∈ ℝdff × dmodel 是两个线性变换的权重矩阵。
  • b1 ∈ ℝdffb2 ∈ ℝdmodel 是对应的偏置向量。
  • max(0, ⋅)ReLU 激活函数,用于引入非线性。

大模型发展树

image-20250807171237889
image-20250807173520481

预训练语言模型

预训练语言模型(PLM)是一种通过大量文本数据进行无监督或弱监督训练的语言模型,目的是学习语言的通用表示(即语言的模式、语法、语义等)。这些模型通常在大规模文本数据上进行预训练,然后可以被微调(Fine - tuning)以适应各种下游任务,如文本分类、问答、命名实体识别等。

预训练语言模型的核心思想是利用大量的无标注文本数据来学习语言的通用特征,从而为各种自然语言处理任务提供强大的语言理解能力。预训练模型可以显著提高任务的性能,减少对标注数据的依赖,并且能够快速适应新的任务。

BERT模型(Encoder-only PLM)

针对 Encoder、Decoder 的特点,引入 ELMo 的预训练思路,开始出现不同的、对 Transformer 进行优化的思路。例如,Google 仅选择了 Encoder 层,通过将 Encoder 层进行堆叠,再提出不同的预训练任务-掩码语言模型(Masked Language Model,MLM),打造了一统自然语言理解(Natural Language Understanding,NLU)任务的代表模型——BERT

BERT,全名为 Bidirectional Encoder Representations from Transformers,是由 Google 团队在 2018年发布的预训练语言模型。该模型发布于论文《BERT: Pre-training of Deep Bidirectional Transformers for Language Understanding》,实现了包括 GLUE、MultiNLI 等七个自然语言处理评测任务的最优性能(State Of The Art,SOTA),堪称里程碑式的成果。

T5(Encoder-Decoder PLM)

BERT 也存在一些问题,例如 MLM 任务和下游任务微调的不一致性,以及无法处理超过模型训练长度的输入等问题。为了解决这些问题,研究者们提出了 Encoder-Decoder 模型,通过引入 Decoder 部分来解决这些问题,同时也为 NLP 领域带来了新的思路和方法。

T5(Text-To-Text Transfer Transformer)是由 Google 提出的一种预训练语言模型,通过将所有 NLP 任务统一表示为文本到文本的转换问题,大大简化了模型设计和任务处理。T5 基于 Transformer 架构,包含编码器和解码器两个部分,使用自注意力机制和多头注意力捕捉全局依赖关系,利用相对位置编码处理长序列中的位置信息,并在每层中包含前馈神经网络进一步处理特征。

LLama模型(Decoder-Only PLM)

LLaMA模型是由Meta(前Facebook)开发的一系列大型预训练语言模型。从LLaMA-1到LLaMA-3,LLaMA系列模型展示了大规模预训练语言模型的演进及其在实际应用中的显著潜力。

GPT模型(Decoder-Only PLM)

GPT,即 Generative Pre-Training Language Model,是由 OpenAI 团队于 2018年发布的预训练语言模型。虽然学界普遍认可 BERT 作为预训练语言模型时代的代表,但首先明确提出预训练-微调思想的模型其实是 GPT。

GPT 提出了通用预训练的概念,也就是在海量无监督语料上预训练,进而在每个特定任务上进行微调,从而实现这些任务的巨大收益。虽然在发布之初,由于性能略输于不久后发布的 BERT,没能取得轰动性成果,也没能让 GPT 所使用的 Decoder-Only 架构成为学界研究的主流,但 OpenAI 团队坚定地选择了不断扩大预训练数据、增加模型参数,在 GPT 架构上不断优化,最终在 2020年发布的 GPT-3 成就了 LLM 时代的基础,并以 GPT-3 为基座模型的 ChatGPT 成功打开新时代的大门,成为 LLM 时代的最强竞争者也是目前的最大赢家。

参考资料

Hello! · Transformers快速入门

jsksxs360/How-to-use-Transformers: Transformers 库快速入门教程

Hoper-J/AI-Guide-and-Demos-zh_CN: 这是一份入门AI/LLM大模型的逐步指南,包含教程和演示代码,带你从API走进本地大模型部署和微调,代码文件会提供Kaggle或Colab在线版本,即便没有显卡也可以进行学习。项目中还开设了一个小型的代码游乐场🎡,你可以尝试在里面实验一些有意思的AI脚本。同时,包含李宏毅 (HUNG-YI LEE)2024生成式人工智能导论课程的完整中文镜像作业。

Happy-LLM

梗直哥

90%人不知道的LLM黑科技:拆解Transformer如何吃透全网知识!_哔哩哔哩_bilibili

Transformer如何成为AI模型的地基_哔哩哔哩_bilibili

什么是 MCP?

  • 全称:Model Context Protocol
  • 作用:让 AI 助手(如 Claude、Cline 等)在对话过程中,动态调用外部工具(Tool)完成复杂任务(读写文件、查询数据库、调用 API 等)。
  • 组成
    1. MCP Host(宿主,如 Cline、Claude Desktop)
    2. MCP Server(提供 Tool 的后台服务)
    3. Tool(具体功能单元,如 read_file, exec_command 等)

核心概念速记

  • MCP Server
    • 一个独立进程,提供 1-N 个 Tool。
    • 可以用任何语言编写,只要暴露标准 MCP 接口。
  • Tool
    • 最小执行单元,必须包含:
      • name(唯一)
      • description(让 LLM 理解何时调用)
      • input schema(参数结构,JSON Schema)
  • 交互流程(重点)
    • 在启动mcp server时,server将tool信息传送给host
    • 用户在 Host 输入自然语言需求。
    • Host 将需求 + 可用 Tool 列表发给 LLM。
    • LLM 判断调用哪个 Tool,并填充参数。
    • Host 通过 MCP 协议向对应 Server 发送请求。
    • Server 执行 Tool 并返回结果。
    • Host 将结果合并上下文,继续对话。

mcp和fuction calling的区别

维度 Function Calling(FC) MCP(Model Context Protocol)
本质 能力 —— 某个大模型原生就带的一种「调用函数」功能 协议 —— 定义 AI 与外部世界如何长期、标准、可复用地交互
工作方式 模型在一次推理里主动决定要调用哪个函数,并吐出结构化参数 通过「客户端-服务器」架构,由 MCP Server 被动等待模型或 Agent 的请求
是否标准化 否。OpenAI、Anthropic、百度等各家接口格式不同 是。统一 JSON-RPC 2.0 协议,跨模型通用
上下文管理 单次调用,无状态;复杂多轮任务需自己维护 协议层面支持会话、状态、长链路任务
复用/共享 函数代码往往紧耦合在项目里,换模型就得重写 一次写成 MCP Server,可被任何支持 MCP 的模型/IDE/Agent 直接插用

一句话总结: Function Calling 是「某个模型自带的快捷指令」,MCP 是「让任何模型都能统一插拔工具的工业标准」。 二者并非互斥——MCP 的实现里仍然可以用 Function Calling 去触发具体函数,但它把「怎么描述工具、怎么发现工具、怎么保持会话」这些事都标准化了,从而解决了 FC 带来的碎片化、难维护、难共享的问题 。

安装mcp

在mcp server市场查找自己想用的mcp服务,如Fetch MCP Server

复制mcp配置

1
2
3
4
5
6
7
8
9
10
{
"mcpServers": {
"fetch": {
"command": "uvx",
"args": [
"mcp-server-fetch"
]
}
}
}

在mcp host 中安装,如trae

host会自动完成对mcp的配置

创建一个mcp server

初始化项目

1
2
3
4
5
6
7
8
uv init weather

uv sync

source .venv/bin/activate

#添加依赖
uv add "mcp[cli]" httpx

创建weather.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# 导入类型提示模块,用于类型注解
from typing import Any

# 导入httpx库,用于发送HTTP请求
import httpx

# 从mcp.server.fastmcp模块导入FastMCP类
# FastMCP是一个快速构建MCP(Model Control Protocol)服务器的框架
from mcp.server.fastmcp import FastMCP

# 创建FastMCP实例,命名为"weather",日志级别设置为ERROR(只显示错误信息)
mcp = FastMCP("weather", log_level="ERROR")


# 常量定义
# NWS(National Weather Service)API的基础URL
NWS_API_BASE = "https://api.weather.gov"
# 用户代理字符串,用于标识应用程序
USER_AGENT = "weather-app/1.0"


async def make_nws_request(url: str) -> dict[str, Any] | None:
"""向NWS API发起请求并处理错误。

Args:
url: 要请求的API URL

Returns:
成功时返回解析后的JSON数据字典,失败时返回None
"""
# 设置请求头信息
headers = {
"User-Agent": USER_AGENT, # 用户代理标识
"Accept": "application/geo+json" # 接受的数据格式
}

# 创建异步HTTP客户端
async with httpx.AsyncClient() as client:
try:
# 发起GET请求,设置超时时间为30秒
response = await client.get(url, headers=headers, timeout=30.0)
# 如果响应状态码不是2xx,抛出异常
response.raise_for_status()
# 返回解析后的JSON数据
return response.json()
except Exception:
# 捕获所有异常,返回None表示请求失败
return None


def format_alert(feature: dict) -> str:
"""将警报数据格式化为可读的字符串。

Args:
feature: 包含警报信息的字典

Returns:
格式化后的警报字符串
"""
# 获取警报属性
props = feature["properties"]
# 格式化警报信息,使用get方法提供默认值防止键不存在
return f"""
事件: {props.get('event', '未知')}
区域: {props.get('areaDesc', '未知')}
严重程度: {props.get('severity', '未知')}
描述: {props.get('description', '无描述信息')}
指示: {props.get('instruction', '无具体指示')}
"""


# 使用@mcp.tool()装饰器将函数注册为MCP工具
@mcp.tool()
async def get_alerts(state: str) -> str:
"""获取指定美国州的天气警报。

Args:
state: 两个字母的美国州代码(例如:CA, NY)

Returns:
格式化后的警报信息字符串
"""
# 构建获取州警报的URL
url = f"{NWS_API_BASE}/alerts/active/area/{state}"
# 发起API请求获取数据
data = await make_nws_request(url)

# 检查数据是否有效
if not data or "features" not in data:
return "无法获取警报或未找到警报。"

# 检查是否有警报
if not data["features"]:
return "该州无活动警报。"

# 格式化所有警报
alerts = [format_alert(feature) for feature in data["features"]]
# 用分隔符连接所有警报
return "\n---\n".join(alerts)


# 注册为MCP工具的天气预报函数
@mcp.tool()
async def get_forecast(latitude: float, longitude: float) -> str:
"""获取指定位置的天气预报。

Args:
latitude: 位置的纬度
longitude: 位置的经度

Returns:
格式化后的天气预报字符串
"""
# 首先获取预报网格端点
points_url = f"{NWS_API_BASE}/points/{latitude},{longitude}"
points_data = await make_nws_request(points_url)

# 检查点数据是否获取成功
if not points_data:
return "无法获取此位置的预报数据。"

# 从点响应中获取预报URL
forecast_url = points_data["properties"]["forecast"]
forecast_data = await make_nws_request(forecast_url)

# 检查预报数据是否获取成功
if not forecast_data:
return "无法获取详细预报。"

# 将时间段格式化为可读的预报
periods = forecast_data["properties"]["periods"]
forecasts = []
# 只显示接下来的5个时间段
for period in periods[:5]:
forecast = f"""
{period['name']}:
温度: {period['temperature']}°{period['temperatureUnit']}
风力: {period['windSpeed']} {period['windDirection']}
预报: {period['detailedForecast']}
"""
forecasts.append(forecast)

# 用分隔符连接所有预报
return "\n---\n".join(forecasts)


# 程序入口点
if __name__ == "__main__":
# 初始化并运行服务器,使用stdio传输方式
mcp.run(transport='stdio')

@mcp.tool()可以将函数内的字符串,参数类型等信息传给大模型,以供大模型决定何时调用这个tool

mcp.run(transport=‘stdio’)说明mcp server和host的传输方式是输入和输出

mcp server 配置信息

1
2
3
4
5
6
7
8
9
10
11
12
"weather": {
"disabled": false,
"timeout": 60,
"command": "uv",
"args": [
"--directory",
"/Users/joeygreen/PycharmProjects/weather",
"run",
"weather.py"
],
"transportType": "stdio"
}

“disabled”: false表示该服务是否被禁用。false 表示该服务是启用状态,可以正常运行。

“timeout”: 60设置该服务的超时时间,单位为秒。

“command”: “uv”指定执行该服务时使用的命令。

“args”出了执行 command 时需要传递的参数。

“transportType”: “stdio”指定服务的通信方式。stdio 表示标准输入输出流(Standard Input Output),通常用于进程间通信。

解析mcp server与host的通信

image-20250727154739013

输入为host对server发送,输出为server对host发送,以下将列举几个重要的说明

输入中:method字段为host告诉server接下来要干什么,如初始化 (Initialization)通知已初始化 (Notification)查询可用工具 (Listing Tools)调用工具 (Calling a Tool)

protocolVersion说明了mcp使用的协议版本

以下见server返回的tool信息,其中的一个参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
{
"name": "get_forecast",
"description": "Get weather forecast for a location.\n\nArgs:\n latitude: Latitude of the location\n longitude: Longitude of the location\n",
"inputSchema": {
"properties": {
"latitude": {
"title": "Latitude",
"type": "number"
},
"longitude": {
"title": "Longitude",
"type": "number"
}
},
"required": [
"latitude",
"longitude"
],
"title": "get_forecastArguments",
"type": "object"
}
}

可以和定义的函数对比学习

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# 注册为MCP工具的天气预报函数
@mcp.tool()
async def get_forecast(latitude: float, longitude: float) -> str:
"""获取指定位置的天气预报。

Args:
latitude: 位置的纬度
longitude: 位置的经度

Returns:
格式化后的天气预报字符串
"""
# 首先获取预报网格端点
points_url = f"{NWS_API_BASE}/points/{latitude},{longitude}"
points_data = await make_nws_request(points_url)

# 检查点数据是否获取成功
if not points_data:
return "无法获取此位置的预报数据。"

# 从点响应中获取预报URL
forecast_url = points_data["properties"]["forecast"]
forecast_data = await make_nws_request(forecast_url)

# 检查预报数据是否获取成功
if not forecast_data:
return "无法获取详细预报。"

# 将时间段格式化为可读的预报
periods = forecast_data["properties"]["periods"]
forecasts = []
# 只显示接下来的5个时间段
for period in periods[:5]:
forecast = f"""
{period['name']}:
温度: {period['temperature']}°{period['temperatureUnit']}
风力: {period['windSpeed']} {period['windDirection']}
预报: {period['detailedForecast']}
"""
forecasts.append(forecast)

# 用分隔符连接所有预报
return "\n---\n".join(forecasts)

description内容即为我们在定义这个tool的时候写的文档字符串(Documentation String),通常简称为 docstring

inputSchema 是在MCP(Model Control Protocol)中用来描述工具(tool)所需参数的结构和类型的规范。它本质上是一个JSON Schema。

JSON Schema 是一个用于描述和验证 JSON 数据结构的规范。你可以把它理解为 JSON 数据的“蓝图”或“模板”。

required指明哪些参数是必需的,哪些是可选的。

理解mcp的本质

以上内容皆是server与host直接的交互,本质上可以理解成host对server提供的工具进行注册与使用。这其中并不涉及到host与大模型的交互,也就是大模型是如何使用host提供的信息。实际上不同的mcp host与模型的交互协议也不同,如cline使用的是xml格式;cherry studio使用的则是fuction calling

再看mcp的全称Model Context Protocol,模型上下文协议,也就是mcp增加模型的扩展性,使他可以获取更多信息,而server就是为模型提供更多信息的工具

mcp host与模型的交互

使用中转服务器截获日志

image-20250727163811531

以下为cline发送给模型的请求

image-20250727164527797

messages包含了系统提示词与用户输入

先来看系统提示词,cline提供的提示词包括工具使用格式,工具信息,工具使用方法等。这里重点说一下,cline的工具使用格式xml

结构如下:

1
2
3
4
5
<tool_name>
<parameter1_name>value1</parameter1_name>
<parameter2_name>value2</parameter2_name>
...
</tool_name>

例如:

1
2
3
<read_file>
src/main.js
</read_file>

再举个例子

use_mcp_tool 描述:请求使用由连接的 MCP 服务器提供的工具。每个 MCP 服务器可以提供具有不同功能的多个工具。工具有定义的输入模式,用于指定必需和可选参数。 参数:

server_name: (必需) 提供该工具的 MCP 服务器的名称 tool_name: (必需) 要执行的工具的名称 arguments: (必需) 一个 JSON 对象,包含工具的输入参数,遵循工具的输入模式 用法:

1
2
3
4
5
6
7
8
9
10
<use_mcp_tool>
<server_name>服务器名称在此</server_name>
<tool_name>工具名称在此</tool_name>

{
"param1": "value1",
"param2": "value2"
}

</use_mcp_tool>

模型返回响应如下

image-20250727174504506

sse连接,流式输出

SSE 是一种基于标准 HTTP只允许服务器向客户端单向推送文本流的实时通信技术,浏览器原生支持,自动重连,常用于AI 流式回答实时日志股价/监控推送等场景。

即客户端发送一次请求,连续接受多次响应直到结束

mcp的三种传输协议

协议名称 通信方式 适用场景 优势 局限
Stdio(标准输入输出) 使用进程的标准输入(stdin)和标准输出(stdout)进行本地通信,基于 JSON-RPC 2.0 格式 本地开发、调试、IDE插件、命令行工具 简单易实现、跨平台、低延迟 仅支持本地通信,无法跨网络,低并发
SSE(Server-Sent Events) 客户端通过 HTTP POST 发送请求,服务器通过 SSE 单向推送流式响应 实时监控、新闻推送、远程服务调用 基于 HTTP,浏览器友好,支持流式数据 仅支持单向通信,MCP官方已标记为“即将废弃”
Streamable HTTP(新型流式HTTP) 支持双向流式通信的现代 HTTP 协议,替代 SSE,支持会话恢复、OAuth 认证等 分布式系统、高并发、双向实时交互 双向通信、高性能、企业级安全机制 实现较复杂,生态仍在发展中

ReAct

ReAct 是一种用于增强大型语言模型(LLMs)推理和行动能力的技术框架,它通过结合“推理”(Reasoning)和“行动”(Acting)来提升模型处理复杂任务的能力。

其工作流程通常包括以下几个步骤:

  1. 思考(Reasoning):模型对当前问题进行分析,思考下一步需要采取的行动。
  2. 行动(Acting):模型决定调用哪些工具或函数,并提供必要的参数。
  3. 观察(Observation):工具执行后返回结果,模型对结果进行观察。
  4. 响应(Response):根据观察结果,模型生成最终的用户响应。

实际应用上就是告诉大模型用ReAct这种模式来思考

参考资料

MCP终极指南 - 从原理到实战,带你深入掌握MCP(基础篇)_哔哩哔哩_bilibili

0%