配置环境

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 创建项目目录
uv init mcp-client
cd mcp-client

# 创建虚拟环境
uv venv

# 激活虚拟环境
# 在 Windows 上:
.venv\Scripts\activate
# 在 Unix 或 MacOS 上:
source .venv/bin/activate

# 安装所需包
uv add mcp anthropic python-dotenv
#使用镜像源安装
uv add mcp anthropic python-dotenv --index-url https://pypi.tuna.tsinghua.edu.cn/simple/

# 删除样板文件
# 在 Windows 上:
del main.py
# 在 Unix 或 MacOS 上:
rm main.py

# 创建我们的主文件
touch client.py

设置 API 密钥

创建一个 .env 文件来存储它:

1
2
# Create .env file
touch .env

将您的密钥添加到 .env 文件:

1
ANTHROPIC_API_KEY=<your key here>

.env 添加到您的 .gitignore

1
echo ".env" >> .gitignore

.env 文件名添加到 .gitignore 文件中,这样 Git 就会忽略 .env 文件,不会将其纳入版本控制。

创建客户端

基本客户端结构

首先,让我们设置我们的导入并创建基本的客户端类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import asyncio
from typing import Optional
from contextlib import AsyncExitStack

from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client

from anthropic import Anthropic
from dotenv import load_dotenv

load_dotenv() # 从 .env 文件加载环境变量

class MCPClient:
def __init__(self):
# 初始化会话和客户端对象
self.session: Optional[ClientSession] = None # MCP客户端会话
self.exit_stack = AsyncExitStack() # 异步上下文管理器堆栈,用于资源清理
self.anthropic = Anthropic() # Anthropic AI 客户端

# 后续方法将在这里定义

服务器连接管理

接下来,我们将实现连接到 MCP 服务器的功能:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
async def connect_to_server(self, server_script_path: str):
"""连接到MCP服务器

Args:
server_script_path: 服务器脚本路径 (.py 或 .js 文件)
"""
# 检查是否为Python文件
is_python = server_script_path.endswith('.py')
# 检查是否为JavaScript文件
is_js = server_script_path.endswith('.js')

# 如果不是Python或JavaScript文件,则抛出错误
if not (is_python or is_js):
raise ValueError("服务器脚本必须是 .py 或 .js 文件")

# 根据文件类型确定执行命令
command = "python" if is_python else "node"

# 创建服务器参数对象
server_params = StdioServerParameters(
command=command, # 执行命令
args=[server_script_path], # 脚本路径作为参数
env=None # 环境变量(使用默认环境)
)

# 建立stdio客户端连接并将其添加到异步上下文管理器中
stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
self.stdio, self.write = stdio_transport

# 创建客户端会话并将其添加到异步上下文管理器中
self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))

# 初始化会话
await self.session.initialize()

# 列出可用的工具
response = await self.session.list_tools()
tools = response.tools

# 打印连接的服务器提供的工具列表
print("\n已连接到服务器,可用工具:", [tool.name for tool in tools])

查询处理逻辑

现在让我们添加处理查询和调用工具的核心功能:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
async def process_query(self, query: str) -> str:
"""使用Claude和可用工具处理查询"""
# 构建消息列表
messages = [
{
"role": "user", # 用户角色
"content": query # 用户查询内容
}
]

# 获取可用工具列表
response = await self.session.list_tools()
available_tools = [{
"name": tool.name, # 工具名称
"description": tool.description, # 工具描述
"input_schema": tool.inputSchema # 工具输入模式
} for tool in response.tools]

# 初始Claude API调用
response = self.anthropic.messages.create(
model="qwen3-235b-a22b", # 使用的模型
max_tokens=1000, # 最大返回令牌数
messages=messages, # 消息历史
tools=available_tools # 可用工具
)

# 处理响应并处理工具调用
final_text = [] # 存储最终文本结果

assistant_message_content = [] # 存储助手消息内容
for content in response.content: # 遍历响应内容
if content.type == 'text': # 如果是文本内容
final_text.append(content.text) # 添加到最终结果
assistant_message_content.append(content) # 添加到助手消息
elif content.type == 'tool_use': # 如果是工具调用
tool_name = content.name # 工具名称
tool_args = content.input # 工具参数

# 执行工具调用
result = await self.session.call_tool(tool_name, tool_args)
final_text.append(f"[调用工具 {tool_name},参数 {tool_args}]")

assistant_message_content.append(content)
# 添加助手消息到历史
messages.append({
"role": "assistant",
"content": assistant_message_content
})
# 添加工具执行结果到历史
messages.append({
"role": "user",
"content": [
{
"type": "tool_result", # 工具结果类型
"tool_use_id": content.id, # 工具使用ID
"content": result.content # 工具执行结果
}
]
})

# 获取Claude的下一个响应
response = self.anthropic.messages.create(
model="qwen3-235b-a22b",
max_tokens=1000,
messages=messages,
tools=available_tools
)

# 添加响应文本到最终结果
final_text.append(response.content[0].text)

# 返回连接后的最终文本结果
return "\n".join(final_text)

交互式聊天界面

现在我们将添加聊天循环和清理功能:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
async def chat_loop(self):
"""运行交互式聊天循环"""
print("\nMCP客户端已启动!")
print("输入您的问题或输入'quit'退出。")

while True: # 无限循环,持续接收用户输入
try:
query = input("\n问题: ").strip() # 获取用户输入并去除首尾空格

if query.lower() == 'quit': # 如果用户输入'quit'(不区分大小写)
break # 退出循环

# 处理用户查询并获取响应
response = await self.process_query(query)
print("\n" + response) # 打印AI响应结果

except Exception as e: # 捕获所有异常
print(f"\n错误: {str(e)}") # 打印错误信息

async def cleanup(self):
"""清理资源"""
await self.exit_stack.aclose() # 异步关闭所有在exit_stack中管理的资源

主入口点

最后,我们将添加主要的执行逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
async def main():
# 检查命令行参数数量,如果少于2个则显示使用说明
if len(sys.argv) < 2:
print("用法: python client.py <服务器脚本路径>")
sys.exit(1) # 退出程序,返回错误码1

# 创建MCP客户端实例
client = MCPClient()
try:
# 连接到服务器,sys.argv[1]是第一个命令行参数(服务器脚本路径)
await client.connect_to_server(sys.argv[1])
# 启动交互式聊天循环
await client.chat_loop()
finally:
# 确保程序结束时清理资源
await client.cleanup()

# 程序入口点
if __name__ == "__main__":
import sys # 导入sys模块用于处理命令行参数
# 运行异步主函数
asyncio.run(main())

QQ20250801-164738

运行客户端

要使您的客户端与任何 MCP 服务器运行:

1
2
uv run client.py path/to/server.py # python server
uv run client.py path/to/build/index.js # node server

客户端将:

  1. 连接到指定服务器
  2. 列出可用工具
  3. 开始一个交互式聊天会话,您可以在其中:
    • 输入查询
    • 查看工具执行情况
    • 从 Claude 获取响应

运作流程

当你提交查询时:

  1. 客户端从服务器获取可用工具列表
  2. 你的查询连同工具描述一起发送给 Claude
  3. Claude 决定使用哪些工具(如果有的话)
  4. 客户端通过服务器执行任何请求的工具调用
  5. 结果会发送回 Claude
  6. Claude 提供自然语言响应
  7. 响应显示给您

参考资料

Build an MCP Client - Model Context Protocol

适配openai版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
import asyncio
import json
import sys
from typing import Optional
from contextlib import AsyncExitStack

from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client

from openai import OpenAI
from dotenv import load_dotenv
import os

load_dotenv() # 从 .env 文件加载环境变量

class MCPClient:
def __init__(self):
# 初始化会话和客户端对象
self.session: Optional[ClientSession] = None # MCP客户端会话
self.exit_stack = AsyncExitStack() # 异步上下文管理器堆栈,用于资源清理
self.anthropic = OpenAI(
api_key=os.getenv("DASHSCOPE_API_KEY"),
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"
) # 使用OpenAI兼容模式连接通义千问

async def connect_to_server(self, server_script_path: str):
"""连接到MCP服务器

Args:
server_script_path (str): 服务器脚本路径,支持.py或.js文件

Raises:
ValueError: 当脚本文件不是.py或.js格式时抛出
"""
# 检查是否为Python文件
is_python = server_script_path.endswith('.py')
# 检查是否为JavaScript文件
is_js = server_script_path.endswith('.js')

# 如果不是Python或JavaScript文件,则抛出错误
if not (is_python or is_js):
raise ValueError("服务器脚本必须是 .py 或 .js 文件")

# 根据文件类型确定执行命令
command = "python" if is_python else "node"

# 创建服务器参数对象
server_params = StdioServerParameters(
command=command, # 执行命令
args=[server_script_path], # 脚本路径作为参数
env=None # 环境变量(使用默认环境)
)

# 建立stdio客户端连接并将其添加到异步上下文管理器中
stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
self.stdio, self.write = stdio_transport

# 创建客户端会话并将其添加到异步上下文管理器中
self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))

# 初始化会话
await self.session.initialize()

# 列出可用的工具
response = await self.session.list_tools()
tools = response.tools

# 打印连接的服务器提供的工具列表
print("\n已连接到服务器,可用工具:", [tool.name for tool in tools])

async def process_query(self, query: str) -> str:
"""使用Qwen和可用工具处理查询"""
# 构建消息列表
messages = [
{
"role": "user",
"content": query
}
]

# 获取可用工具列表并转换为OpenAI格式
response = await self.session.list_tools()
available_tools = []
for tool in response.tools:
schema = tool.inputSchema
if isinstance(schema, str):
schema = json.loads(schema)
if isinstance(schema, dict) and "properties" in schema:
schema = {"type": "object", **schema}

available_tools.append({
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"parameters": schema
}
})

# 第一次调用模型
response = self.anthropic.chat.completions.create(
model="qwen3-235b-a22b",
max_tokens=1000,
messages=messages,
tools=available_tools,
extra_body={"enable_thinking": False}
)

final_text = []
message = response.choices[0].message

# 处理文本内容
if message.content:
final_text.append(message.content)

# 处理工具调用
if message.tool_calls:
for tool_call in message.tool_calls:
tool_name = tool_call.function.name
tool_args = json.loads(tool_call.function.arguments)

# 执行工具调用
result = await self.session.call_tool(tool_name, tool_args)
final_text.append(f"[调用工具 {tool_name},参数 {tool_args}]")

# 处理工具结果
tool_result_content = ""
if result.content:
for item in result.content:
if hasattr(item, 'type') and item.type == 'text':
tool_result_content += item.text
else:
tool_result_content += str(item)

# 添加助手消息到历史
messages.append({
"role": "assistant",
"content": None,
"tool_calls": [tool_call]
})

# 添加工具执行结果到历史
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": tool_result_content
})

# 再次调用模型
response = self.anthropic.chat.completions.create(
model="qwen3-235b-a22b",
max_tokens=1000,
messages=messages,
tools=available_tools,
extra_body={"enable_thinking": False}
)

# 处理最终响应
if response.choices and response.choices[0].message.content:
final_text.append(response.choices[0].message.content)

return "\n".join(final_text)

async def chat_loop(self):
"""运行交互式聊天循环"""
print("\nMCP客户端已启动!")
print("输入您的问题或输入'quit'退出。")

while True: # 无限循环,持续接收用户输入
try:
query = input("\n问题: ").strip() # 获取用户输入并去除首尾空格

if query.lower() == 'quit': # 如果用户输入'quit'(不区分大小写)
break # 退出循环

# 处理用户查询并获取响应
response = await self.process_query(query)
print("\n" + response) # 打印AI响应结果

except Exception as e: # 捕获所有异常
print(f"\n错误: {str(e)}") # 打印错误信息
import traceback
traceback.print_exc() # 打印详细错误信息

async def cleanup(self):
"""清理资源"""
await self.exit_stack.aclose() # 异步关闭所有在exit_stack中管理的资源

async def main():
# 检查命令行参数数量,如果少于2个则显示使用说明
if len(sys.argv) < 2:
print("用法: python client.py <服务器脚本路径>")
sys.exit(1) # 退出程序,返回错误码1

# 创建MCP客户端实例
client = MCPClient()
try:
# 连接到服务器,sys.argv[1]是第一个命令行参数(服务器脚本路径)
await client.connect_to_server(sys.argv[1])
# 启动交互式聊天循环
await client.chat_loop()
finally:
# 确保程序结束时清理资源
await client.cleanup()

# 程序入口点
if __name__ == "__main__":
# 运行异步主函数
asyncio.run(main())

openai和claude在工具调用的差异

  1. 工具格式转换修复

问题:MCP工具格式与OpenAI API不兼容
修复

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 原错误格式
available_tools = [{
"name": tool.name,
"description": tool.description,
"input_schema": tool.inputSchema
}]

# 修复后格式(符合OpenAI规范)
available_tools = [{
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"parameters": schema # 正确的JSON Schema格式
}
}]

  1. API响应处理修复

问题:错误访问了OpenAI响应对象的属性
修复

1
2
3
4
5
6
7
8
9
10
# 原错误代码
for content in response.content: # ❌ response没有content属性

# 修复后代码
message = response.choices[0].message # ✅ 正确的访问路径
if message.content:
final_text.append(message.content)
if message.tool_calls:
for tool_call in message.tool_calls:
# 处理工具调用

  1. 工具调用结果处理修复

问题:错误处理MCP工具调用返回的结果结构
修复

1
2
3
4
5
6
7
8
9
10
11
# 原错误代码
"content": result.content # ❌ 可能包含复杂对象

# 修复后代码
tool_result_content = ""
if result.content:
for item in result.content:
if hasattr(item, 'type') and item.type == 'text':
tool_result_content += item.text
else:
tool_result_content += str(item)
  1. 消息历史构建修复

问题:工具调用后消息历史格式不正确
修复

1
2
3
4
5
6
7
8
9
10
11
# 正确的消息历史格式
messages.append({
"role": "assistant",
"content": None,
"tool_calls": [tool_call]
})
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": tool_result_content
})

  1. JSON Schema兼容性处理

问题:MCP返回的schema可能缺少必要的根类型定义
修复

1
2
3
4
5
schema = tool.inputSchema
if isinstance(schema, str):
schema = json.loads(schema)
if isinstance(schema, dict) and "properties" in schema:
schema = {"type": "object", **schema} # 确保有根类型

配置环境

1
2
3
4
5
6
7
8
9
10
11
12
13
# Create a new directory for our project
uv init weather
cd weather

# Create virtual environment and activate it
uv venv
source .venv/bin/activate

# Install dependencies
uv add "mcp[cli]" httpx

# Create our server file
touch weather.py

mcp studio样例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from mcp.server.fastmcp import FastMCP

mcp = FastMCP("Math")

@mcp.tool()
def add(a: int, b: int) -> int:
"""Add two numbers"""
return a + b

@mcp.tool()
def multiply(a: int, b: int) -> int:
"""Multiply two numbers"""
return a * b

if __name__ == "__main__":
mcp.run(transport="stdio")

mcp streamable-http 样例

1
2
3
4
5
6
7
8
9
10
11
from mcp.server.fastmcp import FastMCP

mcp = FastMCP("Weather")

@mcp.tool()
async def get_weather(location: str) -> str:
"""Get weather for location."""
return "It's always sunny in New York"

if __name__ == "__main__":
mcp.run(transport="streamable-http")

使用

mcp市场MCP 广场 · 魔搭社区

使用 uv(推荐)

当使用 uv 时不需要特定的安装步骤。我们将使用 uvx 直接运行 mcp-server-fetch

1
2
3
4
5
6
"mcpServers": {
"fetch": {
"command": "uvx",
"args": ["mcp-server-fetch"]
}
}

使用 PIP

或者,您可以通过 pip 安装 mcp-server-fetch

1
pip install mcp-server-fetch
1
2
3
4
5
6
"mcpServers": {
"fetch": {
"command": "python",
"args": ["-m", "mcp_server_fetch"]
}
}

远程托管

image-20250802120145192

1
2
3
4
5
6
7
8
{
"mcpServers": {
"fetch": {
"type": "sse",
"url": "https://mcp.api-inference.modelscope.net/991cf46/sse"
}
}
}

参考资料

构建 MCP 服务器 - 模型上下文协议 —- Build an MCP Server - Model Context Protocol

Chroma是什么

Chroma(通常指 ChromaDB)是一款 开源、AI 原生的向量数据库,专为存储和检索 高维嵌入向量 而设计,目标是让开发者 5 分钟内在本地跑起一个语义搜索或 RAG 系统

极简安装pip install chromadb

双运行模式

  • 内存模式:调试/原型随意重启;
  • 持久化模式:指定 persist_directory 即可落盘,生产也不怕丢数据。

HNSW 索引:百万级向量也能 毫秒级 响应。

使用chroma存储

文档分块

1
2
3
4
5
6
7
from langchain.document_loaders import PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter

loader = PyPDFLoader("input/健康档案.pdf")
docs = loader.load()
#递归分块
text_splitter = RecursiveCharacterTextSplitter(chunk_siz

定义embedding模型与chroma

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

from langchain.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings
embedding = OpenAIEmbeddings(
api_key="sk-",
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
model="text-embedding-v4",
check_embedding_ctx_length = False,
dimensions=1536
)
# 使用 Chroma 向量数据库存储文档 chunks
vectorstore = Chroma.from_documents(
documents=chunks, # 要存储的文档chunks列表(已处理好的文本片段)
embedding=embedding,
persist_directory="chromaDB", # 向量数据库的持久化存储目录路径
collection_name="demo001" # 集合名称,用于区分不同的文档集合
)
vectorstore.persist()

检索

这里采取直接检索进行测试

1
2
3
4
5
results = vectorstore.similarity_search(
"张三九的基本信息是什么",
k=2,
collection_name="demo001" # 指定检索的集合
)

重新加载

1
2
3
4
5
6
7
# 重新加载已存在的 Chroma 数据库
vectorstore = Chroma(
persist_directory="./chroma_db",
embedding_function=embedding
)

retriever = vectorstore.as_retriever()

参考资料

Chroma | 🦜️🔗 LangChain —- Chroma | 🦜️🔗 LangChain

什么是Plan-and-Execute模式

Plan-and-Execute架构流程:先指定计划,后交给执行agent,执行后交给replan节点,判断是否需要更新计划,若要更新计划返回返回更新后的机会,否则返回response,然后路由判断是执行agent还是response

与ReAct模式不同的是,ReAct只做一次规划,而Plan-and-Execute模式核心思想是首先制定一个多步骤计划,然后逐项执行该计划。完成特定任务后,可以重新审视计划并进行适当修改。

举个例子,用户在问一个问题后,agent产生一份任务清单,选取第一份任务开始执行,执行后的结果结合任务清单,执行replan,结合新的信息,更改任务清单的内容,让后续大模型更好地执行,并去除已经完成的任务

实际生产中,应该在planner之前再进行一次判断,如果问题过于简单,不需要进行Plan-and-Execute模式

实战

安装包

1
pip install --quiet -U langgraph langchain-community langchain-openai tavily-python

定义网络搜索工具与执行agent

在产生plan后,要有一个agent对任务清单进行执行,这里以一个ReAct的网络搜索agent为例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#定义工具
from langchain_tavily import TavilySearch
tools = [TavilySearch(max_results=3)]

from langchain_openai import ChatOpenAI

from langgraph.prebuilt import create_react_agent


llm = ChatOpenAI(
model="qwen3-235b-a22b-thinking-2507",
api_key="sk-",
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"
)
prompt = "You are a helpful assistant."
agent_executor = create_react_agent(llm, tools, prompt=prompt)

测试功能

1
agent_executor.invoke({"messages": [("user", "今天是几月几日")]})

定义执行节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from typing import Literal
from langgraph.graph import END


async def execute_step(state: PlanExecute):
"""执行计划中的步骤"""
plan = state["plan"]
# 将计划格式化为带编号的字符串
plan_str = "\n".join(f"{i + 1}. {step}" for i, step in enumerate(plan))
task = plan[0] # 获取第一个待执行的任务
task_formatted = f"""对于以下计划:
{plan_str}\n\n你被分配执行第 {1} 步, {task}。"""
# 调用代理执行器来执行任务
agent_response = await agent_executor.ainvoke(
{"messages": [("user", task_formatted)]}
)
# 返回执行结果,添加到历史步骤中
return {
"past_steps": [(task, agent_response["messages"][-1].content)],
}

create_react_agent 是 LangGraph 提供的一个预构建函数,位于 langgraph.prebuilt 模块中,用于快速创建一个基于 ReAct(Reasoning + Acting)架构的智能代理。

langgraph预设的其他常用组件如下

ToolNode

功能:把 LangChain 工具(BaseTool)封装成一个图节点,负责:

  • 接收 LLM 生成的工具调用请求

  • 真正执行工具

  • 把结果返回给图

1
2
3
from langgraph.prebuilt import ToolNode

tool_node = ToolNode(tools=[search, calculator])

tools_condition

功能:判断 LLM 是否要继续调用工具的“路由函数”。

在 ReAct 图里通常放在节点之间的 条件边:

1
2
3
4
5
6
7
8
9
from langgraph.prebuilt import tools_condition

graph.add_conditional_edges("agent", tools_condition, {

"tools": "tool_node", # 需要工具 → 去 ToolNode

"**__end__**": END # 不需要 → 结束

})

定义状态

1
2
3
4
5
6
7
8
9
10
import operator
from typing import Annotated, List, Tuple
from typing_extensions import TypedDict


class PlanExecute(TypedDict):
input: str
plan: List[str]
past_steps: Annotated[List[Tuple], operator.add]
response: str
1
2
3
4
5
6
7
8
9
from pydantic import BaseModel, Field


class Plan(BaseModel):
"""未来要遵循的计划"""

steps: List[str] = Field(
description="需要遵循的不同步骤,应该按排序顺序排列"
)

定义模型

1
2
3
4
5
from langchain_community.chat_models import ChatTongyi
model=ChatTongyi(
model="qwen-plus-2025-07-14",
api_key="sk-"
)

定义初始计划节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from langchain_core.prompts import ChatPromptTemplate

planner_prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"""对于给定的目标,制定一个简单的逐步计划。\
这个计划应该包含独立的任务,如果正确执行这些任务将得到正确的答案。不要添加任何多余的步骤。\
最后一步的结果应该是最终答案。确保每个步骤都包含所需的所有信息——不要跳过任何步骤。""",
),
("placeholder", "{messages}"),
]
)
planner = planner_prompt | model.with_structured_output(Plan)

async def plan_step(state: PlanExecute):
"""制定初始计划步骤"""
# 使用规划器为用户输入制定计划
plan = await planner.ainvoke({"messages": [("user", state["input"])]})

return {"plan": plan.steps}

定义再计划节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
from typing import Union


class Response(BaseModel):
"""对用户的响应"""

response: str


class Act(BaseModel):
"""要执行的动作"""

action: Union[Response, Plan] = Field(
description="要执行的动作。如果你想响应用户,使用 Response。"
"如果你需要进一步使用工具来获取答案,使用 Plan。"
)


replanner_prompt = ChatPromptTemplate.from_template(
"""对于给定的目标,制定一个简单的逐步计划。\
这个 计划应该包含独立的任务,如果正确执行这些任务将得到正确的答案。不要添加任何多余的步骤。\
最后一步的结果应该是最终答案。确保每个步骤都包含所需的所有信息——不要跳过任何步骤。

你的目标是:
{input}

你的原始计划是:
{plan}

你目前已经完成了以下步骤:
{past_steps}

相应地更新你的计划。如果不需要更多步骤并且可以返回给用户,则直接响应。否则,填写计划。只添加仍需要完成的步骤到计划中。不要将已经完成的步骤作为计划的一部分返回。"""
)


replanner = replanner_prompt | model.with_structured_output(Act)
1
2
3
4
5
6
7
8
9
10
async def replan_step(state: PlanExecute):
"""重新规划步骤"""
# 使用重新规划器根据当前状态更新计划
output = await replanner.ainvoke(state)
if isinstance(output.action, Response):
# 如果动作是响应,返回最终响应
return {"response": output.action.response}
else:
# 如果动作是计划,返回新的计划步骤
return {"plan": output.action.steps}

定义路由

1
2
3
4
5
6
7
8
def should_end(state: PlanExecute):
"""判断是否结束执行流程"""
if "response" in state and state["response"]:
# 如果存在响应内容,结束流程
return END
else:
# 否则继续执行代理步骤
return "agent"

编译图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from langgraph.graph import StateGraph, START
from langgraph.checkpoint.memory import MemorySaver

# 创建内存检查点保存器
memory = MemorySaver()

# 创建工作流图,使用 PlanExecute 状态类型
workflow = StateGraph(PlanExecute)

# 添加计划节点
workflow.add_node("planner", plan_step)

# 添加执行步骤节点
workflow.add_node("agent", execute_step)

# 添加重新规划节点
workflow.add_node("replan", replan_step)

# 从开始节点连接到计划节点
workflow.add_edge(START, "planner")

# 从计划节点连接到代理执行节点
workflow.add_edge("planner", "agent")

# 从代理执行节点连接到重新规划节点
workflow.add_edge("agent", "replan")

# 添加条件边 - 从重新规划节点根据条件决定下一步
workflow.add_conditional_edges(
"replan",
# 传入决定下一个调用节点的函数
should_end,
["agent", END], # 可能的下一个节点:代理节点或结束
)

# 最后,编译工作流并添加检查点功能!
# 这将其编译为 LangChain Runnable,
# 意味着你可以像使用其他任何 runnable 一样使用它
app = workflow.compile(checkpointer=memory)

image-20250729092408872

调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 配置递归限制,防止无限循环
config = {"configurable": {
"thread_id": "1", # 必需:线程ID
},"recursion_limit": 50}

# 输入问题:2024年澳大利亚网球公开赛男单冠军的家乡是哪里?
inputs = {"input": "2024年澳大利亚网球公开赛男单冠军的家乡是哪里?"}

# 异步流式执行应用
async for event in app.astream(inputs, config=config):
# 遍历每个事件
for k, v in event.items():
# 排除结束标记,打印其他所有事件内容
if k != "__end__":
print(v)

资源

计划与执行 —- Plan-and-Execute

前言

常使用pgsql与redis配合langgraph的checkpoint与store进行持久化储存,完成长期记忆与短期记忆的实现

pgsql实现持久化

什么是pgsql

PostgreSQL(常简称pgsql或Postgres)是一个功能强大的开源对象-关系型数据库管理系统(ORDBMS),以其稳定性、扩展性和符合SQL标准著称。

docker拉取

docker-compose

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
version: '3.8'

services:
postgres:
image: postgres:15 # 指定具体版本
container_name: postgres_db
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: postgres
TZ: Asia/Shanghai # 设置时区
ports:
- "5432:5432"
volumes:
- pgdata:/var/lib/postgresql/data
restart: unless-stopped
healthcheck: # 健康检查
test: ["CMD", "pg_isready", "-U", "nange"]
interval: 10s
timeout: 5s
retries: 5
command: ["postgres", "-c", "max_connections=200"] # 自定义配置

volumes:
pgdata:

Docker Compose 是一个 定义和运行多容器 Docker 应用声明式工具
它通过一个 YAML 文件(通常叫 docker-compose.yml 描述整个应用的服务、网络、存储等配置,然后用一条命令即可 启动/停止/管理 所有容器,无需手动逐个 docker run

1
2
#拉取并运行
docker-compose up -d

在pgsqsl安装依赖

因为LangGraph的PostgresStore需要使用到pgvector,因此需要在容器中按照如下步骤进行操作,直接使用Docker Desktop软件中进行操作

  • 为什么需要 pgvector
  • PostgresStore 支持将 向量嵌入(embedding) 存储在 PostgreSQL 中,并基于它们进行 语义搜索
  • 该功能依赖 pgvector 扩展提供的 vector 类型和索引机制(如 HNSW)。
1
2
3
安装依赖           
apt update #刷新本地软件包索引
apt install -y git build-essential postgresql-server-dev-15

apt install -y git build-essential postgresql-server-dev-15一次性安装 3 类依赖。

  • git —— 用来克隆 pgvector 源码。
  • build-essential —— Debian/Ubuntu 的“编译工具链”元包,包含 gcc、make 等。
  • postgresql-server-dev-15 —— 与当前 Postgres 主版本一致 的开发头文件和 pg_config
1
2
3
4
5
6
7
8
9
10
编译并安装 pgvector      
#把 pgvector 的 v0.7.0 稳定版 源码克隆到本地目录 ./pgvector。
git clone --branch v0.7.0 https://github.com/pgvector/pgvector.git
cd pgvector
#调用 Makefile 根据当前操作系统 + PostgreSQL 版本编译出二进制文件(.so 共享库)。
make
#把刚刚编好的 .so 文件和 .sql/.control 文件复制到 PostgreSQL 的扩展目录
make install
验证安装,检查扩展文件是否安装成功
ls -l /usr/share/postgresql/15/extension/vector*

pgvector/pgvector: Open-source vector similarity search for Postgres

接下来,若要在脚本中进行使用,首先在系统环境中需要安装PostgreSQL 的开发库(libpq),因为 psycopg 需要它来编译或运行

1
2
sudo apt update
sudo apt install libpq-dev postgresql-server-dev-all

psycopg(Python 操作 PostgreSQL 的库)Linux/macOS 上运行时,底层依赖于 PostgreSQL 的 C 语言开发库 libpq
如果系统里 没有 libpq,psycopg 会出现以下两种问题:

  1. 编译安装失败(源码/旧版本)
    pip install psycopg2 需要现场编译时,会找不到头文件 libpq-fe.h 或动态库 libpq.so,导致报错:

    1
    error: libpq-fe.h: No such file or directory
  2. 运行时崩溃
    即使通过预编译的 wheel 包安装成功,运行时也可能提示:

    1
    ImportError: libpq.so.5: cannot open shared object file

最后,再安装相关依赖包

1
2
pip install langgraph-checkpoint-postgres                    
pip install psycopg psycopg-pool

psycopg官方 PostgreSQL 驱动,在 psycopg 之上再包一层“连接池”,让并发访问更快、更稳定。

连接pgsql

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from langgraph.store.postgres import PostgresStore
from langgraph.checkpoint.postgres import PostgresSaver
from psycopg_pool import ConnectionPool
# 1) 连接字符串(URI 语法)
DB_URI = "postgresql://postgres:postgres@localhost:5432/postgres?sslmode=disable"
# 协议:// 用户 : 密码 @ 主机:端口 / 数据库名 ? 额外参数

# 2) 连接级参数
connection_kwargs = {
"autocommit": True, # 每条 SQL 执行完立即提交,无需手动 commit
"prepare_threshold": 0, # 禁用服务器端 prepared statement,可减少一次往返
}

# 3) 创建池
connection_pool = ConnectionPool(
conninfo=DB_URI,
max_size=20, # 最多 20 条物理连接
kwargs=connection_kwargs,
)

# 4) 显式打开池(psycopg 3 的 ConnectionPool 默认懒启动,调 open() 会立即建 min_size 条连接)
connection_pool.open()
print("数据库连接池初始化成功")

正常情况每次 SQL 都新建一条 TCP 连接、做 SSL 握手、验证密码、分配内存,如果不复用连接,这些动作就要 每次都重新来一遍成本非常高

连接池(Connection Pool)是一种 数据库访问层资源管理组件,其核心目标是在 高并发、短事务 场景下,通过 复用已建立的数据库物理连接 来降低系统整体延迟、减少资源消耗,并防止数据库因瞬时连接风暴而崩溃。

初始化pgsql

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 初始化PostgresStore
in_postgres_store = PostgresStore(
pool,
index={
"dims": 1536,
"embed": embedding
}
)
in_postgres_store.setup()

#初始化checkpoint
# 使用传入的连接池创建 PostgresSaver
checkpointer = PostgresSaver(pool)
checkpointer.setup()

#最后编译时添加
graph_builder.compile(checkpointer=checkpointer, store=in_postgres_store)

in_postgres_store.setup() 的角色一句话就能说清:

把数据库里所有为了让向量存储正常工作的“一次性基建”全部建好——只建一次,后面再跑就不会重复执行。

具体而言,它通常干下面三件事:1.建表 / 建扩展;2.建向量索引;3.元数据初始化

fastapi实现生命周期的管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# 定义了一个异步函数lifespan,它接收一个FastAPI应用实例app作为参数。这个函数将管理应用的生命周期,包括启动和关闭时的操作
# 函数在应用启动时执行一些初始化操作,如加载上下文数据、以及初始化问题生成器
# 函数在应用关闭时执行一些清理操作
# @asynccontextmanager 装饰器用于创建一个异步上下文管理器,它允许你在 yield 之前和之后执行特定的代码块,分别表示启动和关闭时的操作
@asynccontextmanager
async def lifespan(app: FastAPI):
# 启动时执行
# 申明引用全局变量,在函数中被初始化,并在整个应用中使用
global graph, connection_pool
# 启动时执行
try:
logger.info("正在初始化模型、定义 Graph...")
# 初始化 LLM
llm, embedding = get_llm(llm_type)
# 创建数据库连接池
DB_URI = "postgresql://postgres:postgres@localhost:5432/postgres?sslmode=disable"
connection_kwargs = {
"autocommit": True,
"prepare_threshold": 0,
}
connection_pool = ConnectionPool(
conninfo=DB_URI,
max_size=20,
kwargs=connection_kwargs,
)
connection_pool.open() # 显式打开连接池
logger.info("数据库连接池初始化成功")
# 短期记忆 初始化checkpointer
checkpointer = PostgresSaver(connection_pool)
checkpointer.setup()
# 长期记忆 初始化PostgresStore
in_postgres_store = PostgresStore(
connection_pool,
index={
"dims": 1536,
"embed": embedding
}
)
in_postgres_store.setup()
# 定义 Graph
graph = create_graph(llm, checkpointer, in_postgres_store )
# 保存 Graph 可视化图
save_graph_visualization(graph)
logger.info("初始化完成")
except Exception as e:
logger.error(f"初始化过程中出错: {str(e)}")
raise

yield # 应用运行期间

# 关闭时执行
logger.info("正在关闭...")
if connection_pool:
connection_pool.close() # 关闭连接池
logger.info("数据库连接池已关闭")


# lifespan参数用于在应用程序生命周期的开始和结束时执行一些初始化或清理工作
app = FastAPI(lifespan=lifespan)

pgsql的存储结构

一、Checkpoints 系列

作用:让 LangGraph Runtime 能在 分布式/长流程 场景下 断点续跑、重放、并发控制

表名 存什么 典型字段(示意) 何时写入
checkpoints 每个「图实例」的 最新快照(state 的完整 JSONB) thread_id, checkpoint_ns, checkpoint_id, parent_checkpoint_id, state, created_at 每次节点执行成功后覆盖更新
checkpoint_blobs checkpoints 里 大字段的拆分(避免行过大) thread_id, checkpoint_ns, channel, type, blob 当 state 过大,自动拆分
checkpoint_migrations 记录 schema 版本/迁移脚本 version, name, applied_at 只在 setup() 时写一次
checkpoint_writes 写放大日志(每个节点写 state 的增量 diff) thread_id, checkpoint_id, task_id, idx, channel, type, value 每次节点完成时追加

关系:
checkpoints = 最新完整快照
checkpoint_writes = 所有增量历史(用于重放/审计)
checkpoint_blobs = checkpoints 里超大型 value 的切片

二、Store 系列

作用:给 业务代码(开发者) 提供 持久化 KV / 向量存储,与图运行状态无关。

表名 存什么 典型字段(示意) 何时写入
store 任意 KV 文档(LangChain Document → JSONB) uuid, namespace, key, value, created_at, updated_at 你调用 store.amput / amset 等 API
store_migrations 同 checkpoint_migrations,记录 store schema 版本 version, name, applied_at 只在第一次 setup()
store_vectors 向量索引表(embedding → vector 类型) uuid, collection_id, embedding, document, metadata 你调用 add_documents(..., embeddings=...)
vector_migrations 记录 pgvector 扩展及索引迁移版本 version, applied_at setup() 时若第一次装 pgvector

三、调用链脑图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
┌──────────────┐
│ LangGraph │ 运行图实例
└────┬─────────┘
│1. 写 checkpoints
│2. 写 checkpoint_writes
│3. 拆大字段到 checkpoint_blobs

┌──────────────┐
│ 业务代码 │ 读写 KV/向量
└────┬─────────┘
│1. 写 store
│2. 写 store_vectors

PostgreSQL (pgsql)

在linux安装postgresql

查看linux发行版本

1
2
# 查看发行版名称和版本
cat /etc/os-release

输出

1
2
3
4
5
6
7
8
9
10
11
12
13
PRETTY_NAME="Ubuntu 24.04.2 LTS"
NAME="Ubuntu"
VERSION_ID="24.04"
VERSION="24.04.2 LTS (Noble Numbat)"
VERSION_CODENAME=noble
ID=ubuntu
ID_LIKE=debian
HOME_URL="https://www.ubuntu.com/"
SUPPORT_URL="https://help.ubuntu.com/"
BUG_REPORT_URL="https://bugs.launchpad.net/ubuntu/"
PRIVACY_POLICY_URL="https://www.ubuntu.com/legal/terms-and-policies/privacy-policy"
UBUNTU_CODENAME=noble
LOGO=ubuntu-logo

安装postgresql

1
2
3
4
5
6
7
8
9
10
11
12
13
# 更新软件包索引
sudo apt update

# 安装 PostgreSQL 和常用扩展
sudo apt install -y postgresql postgresql-contrib

# 检查服务状态
#Linux 容器/子系统( Docker、WSL 或 LXC)没有使用 systemd ,因此 systemctl 无法工作
sudo systemctl status postgresql

#确认 PostgreSQL 已安装
which psql # 应该输出 /usr/bin/psql
pg_ctl --version # 显示版本号即已安装
  • postgresql 是主程序
  • postgresql-contrib 提供额外扩展(如 uuid-ossppgcrypto 等)

systemctl一般用于服务器上,完整的linux系统上

Linux 容器/子系统( Docker、WSL 或 LXC)使用 service

pgsql常用指令

启动pgsql服务

1
sudo service postgresql start

停止pgsql服务

1
sudo service postgresql stop

查看状态

1
sudo service postgresql status

连接PostgreSQL 默认的系统用户,可以执行pgsql的相关指令

1
sudo -u postgres psql

psql为PostgreSQL 的终端客户端,默认会连接与当前操作系统用户名同名的数据库用户和数据库。

postgres为PostgreSQL 自带的系统数据库,postgres 默认 数据库密码为空,可以通过以下指令进行设置

1
2
-- 设置postgres用户密码
ALTER USER postgres PASSWORD 'postgres';

数据库关于user的作用

数据库里的“用户”是 PostgreSQL 内部用来做“访问控制”的一把钥匙。一句话:“谁能连哪个库、谁能读哪张表、谁能改哪些行”——全靠这些数据库用户(角色)来判定。

作用 举例
1. 认证(Authentication) 告诉 PostgreSQL “我连库时提供的用户名+密码是否合法”。
2. 授权(Authorization) 决定 “这个用户连进来后,对哪些库、哪些表、哪些行有何种权限(SELECT/INSERT/UPDATE/DELETE…)”。
3. 资源隔离(Isolation) 不同业务/团队用不同用户,方便审计、限流、回收权限,互不干扰。

pgsql通用 URL 模板

1
postgresql://<用户名>:<密码>@127.0.0.1:5432/<数据库名>[?参数=值&...]

连接pgsql

1
2
3
4
5
6
7
8
9
10
11
12
# 基于数据库持久化存储的short-term
db_uri = "postgresql://postgres:postgres@localhost:5432/postgres?sslmode=disable"

# short-term短期记忆 实例化PostgresSaver对象 并初始化checkpointer
# long-term长期记忆 实例化PostgresStore对象 并初始化store
async with (
AsyncPostgresSaver.from_conn_string(db_uri) as checkpointer,
AsyncPostgresStore.from_conn_string(db_uri) as store

):
await store.setup()
await checkpointer.setup()

更换apt镜像源

1
2
3
4
5
6
7
8
cat > /etc/apt/sources.list <<'EOF'
deb http://mirrors.aliyun.com/debian/ bookworm main contrib non-free
deb http://mirrors.aliyun.com/debian/ bookworm-updates main contrib non-free
deb http://mirrors.aliyun.com/debian/ bookworm-backports main contrib non-free
deb http://mirrors.aliyun.com/debian-security bookworm-security main contrib non-free
EOF

apt update

参考资料

postgresql向量扩展pgvector的安装与入门本文简答的介绍了 rag 的架构,引申出向量数据库的作用,介绍了 - 掘金

langchain支持向量存储向量存储 | 🦜️🔗 LangChain —- Vector stores | 🦜️🔗 LangChain

前言

本文基于AI-Guide-and-Demos-zh_CN/PaperNotes/Transformer 论文精读.md at master · Hoper-J/AI-Guide-and-Demos-zh_CNTransformer论文逐段精读【论文精读】_哔哩哔哩_bilibili阅读学习

transformer贡献

实际在这一阶段的工作中,注意力机制就已经在编码器-解码器架构中被广泛应用(与 RNN 一起使用),但 Transformer 彻底颠覆了默认采取的逻辑:直接放弃 RNN 的递归结构,只使用注意力机制来编码和解码序列信息

Transformer 的主要贡献如下:

  • 取消递归结构,实现并行计算

    通过采用自注意力机制(Self-Attention),Transformer 可以同时处理多个输入序列,极大提高了计算的并行度和训练速度。

  • 引入位置编码(Positional Encoding)并结合 Attention 机制巧妙地捕捉位置信息

    在不依赖 RNN 结构的情况下,通过位置编码为序列中的每个元素嵌入位置信息,从而使模型能够感知输入的顺序。

transformer架构

image-20250807135151542

image-20250807145354145

【Transformer模型】曼妙动画轻松学,形象比喻贼好记_哔哩哔哩_bilibili

Transformer 模型基于编码器(左)- 解码器(右)架构

Transformer编码器同样由 N 个完全相同的层(原始论文中 N=6)堆叠而成,每层只有两个子层,而解码器有三个。

  1. 多头自注意力(Multi-Head Self-Attention)
    让输入序列中的每个位置都能关注序列内所有位置,直接建模全局依赖。
  2. 前馈全连接网络(Position-wise Feed-Forward Network)
    对每个位置独立地做一次两层的全连接变换(通常先升维再降维)。

同样,每个子层后都有

  • 残差连接(Residual Connection)
  • 层归一化(Layer Normalization)

另外,编码器在输入端还会用到

  • 位置编码(Positional Encoding)——给模型提供序列位置信息,因为注意力本身不包含顺序信息。

Transformer解码器由多个相同的层堆叠而成,每一层包含三个核心子层:

  1. 掩蔽多头自注意力机制(Masked Multi-Head Attention)
    用于处理目标序列,通过掩码防止当前位置关注未来位置,确保生成过程的自回归特性。
  2. 编码器-解码器注意力机制(Encoder-Decoder Attention)
    使解码器能够关注编码器输出的上下文信息,建立输入与输出序列之间的关联。
  3. 前馈神经网络(Feed-Forward Neural Network)
    对注意力机制的输出进行非线性变换,增强模型的表达能力。

此外,每个子层后均包含残差连接(Residual Connection)和层归一化(Layer Normalization),以稳定训练过程并加速收敛。最终,解码器的输出通过线性层和Softmax层映射为词汇表上的概率分布。

嵌入(Embeddings)

image-20250808103256344

在 Transformer 模型中,嵌入层(Embedding Layer) 是处理输入和输出数据的关键步骤,因为模型实际操作的是张量(tensor),而非字符串(string)。在将输入文本传递给模型之前,首先需要进行分词(tokenization),即将文本拆解为多个 token,随后这些 token 会被映射为对应的 token ID,从而转换为模型可理解的数值形式。此时,数据的形状为 (seq_len,),其中 seq_len 表示输入序列的长度。

目的:为了让模型捕捉到 token 背后复杂的语义(Semantic meaning)关系,我们需要将离散的 token ID 映射到一个高维的连续向量空间(Continuous, dense)。这意味着每个 token ID 会被转换为一个嵌入向量(embedding vector),期望通过这种方式让语义相近的词汇在向量空间中距离更近,使模型能更好地捕捉词汇之间的关系。

流程:(前面要进行分词,后面要进行位置编码)

初始化一个可学习的矩阵 E ∈ ℝ^(|V| × d_model)
|V| = 词表大小(比如 32 k、50 k),d_model = 512/768/1024…

把 token id 作为行号,直接取对应行:
x_i = E[token_id_i]
得到 [batch, seq_len, d_model] 的浮点张量。

位置编码(Positional Encoding)

image-20250808103418150

Transformer 的自注意力机制(Self-Attention)是位置无关(position-agnostic)的。也就是说,如果不做任何处理,模型无法区分“我爱你”和“你爱我”这两个句子的差异,因为自注意力机制只关注 token 之间的相关性,而不考虑它们在序列中的顺序。

为了让模型感知到 token 的位置信息,Transformer 引入了位置编码

在原始论文中,Transformer 使用的是固定位置编码(Positional Encoding),其公式如下:

其中:

  • $pos$ 表示位置索引(Position)。
  • $i$ 表示维度索引。
  • $d_{\text{model}}$ 是嵌入向量的维度。

流程:输入的是一个整数索引(位置序号 0,1,2,…)。位置编码模块先把这些整数映射成与词向量同维度的向量(例如 512 维),再把结果加到词向量上。

linear与Softmax

image-20250808111823715

从Linear到MLP AI模型的数学本质【Transformer结构拆解】_哔哩哔哩_bilibili

在 Transformer 模型中,Softmax 函数不仅在计算注意力权重时用到,在预测阶段的输出处理环节也会用到,因为预测 token 的过程可以看成是多分类问题

Softmax 函数是一种常用的激活函数,能够将任意实数向量转换为概率分布,确保每个元素的取值范围在 [0, 1] 之间,并且所有元素的和为 1。其数学定义如下:

其中:

  • $x_i$ 表示输入向量中的第 $i$ 个元素。
  • $\text{Softmax}(x_i)$ 表示输入 $x_i$ 转换后的概率。

我们可以把 Softmax 看作一种归一化的指数变换。相比于简单的比例归一化 $\frac{x_i}{\sum_j x_j}$, Softmax 通过指数变换放大数值间的差异,让较大的值对应更高的概率,同时避免了负值和数值过小的问题,让模型聚焦于权重最高的位置,同时保留全局信息(低权重仍非零)。

注意力机制

【Attention 注意力机制】激情告白transformer、Bert、GNN的精髓_哔哩哔哩_bilibili

image-20250808135811744

缩放点积注意力机制(Scaled Dot-Product Attention)

image-20250808140834132

Transformer 的核心是多头注意力机制(Multi-Head Attention),它能够捕捉输入序列中不同位置之间的依赖关系,并从多个角度对信息进行建模。模块将自底向上的进行讲解:在深入理解注意力机制前,首先需要理解论文使用的缩放点积注意力机制(Scaled Dot-Product Attention)

给定查询矩阵 $Q$、键矩阵 $K$ 和值矩阵 $V$, 其注意力输出的数学表达式如下:

  • $Q$(Query): 用于查询的向量矩阵。
  • $K$(Key): 表示键的向量矩阵,用于与查询匹配。
  • $V$(Value): 值矩阵,注意力权重最终会作用在该矩阵上。
  • $d_k$: 键或查询向量的维度。

理解 Q、K、V 的关键在于代码,它们实际上是通过线性变换从输入序列生成的

公式解释

  1. 点积计算(Dot Produce)

    将查询矩阵 $Q$ 与键矩阵的转置 $K^\top$ 做点积,计算每个查询向量与所有键向量之间的相似度:

    $\text{Scores} = Q K^\top$

    • 每一行表示某个查询与所有键之间的相似度(匹配分数)。
    • 每一列表示某个键与所有查询之间的相似度(匹配分数)。
  2. 缩放(Scaling)

    当 $d_k$ 较大时,点积的数值可能会过大,导致 Softmax 过后的梯度变得极小,因此除以 $\sqrt{d_k}$ 缩放点积结果的数值范围:

    $\text{Scaled Scores} = \frac{Q K^\top}{\sqrt{d_k}}$

    缩放后(Scaled Dot-Product)也称为注意力分数(attention scores)。

  3. Softmax 归一化

    使用 Softmax 函数将缩放后的分数转换为概率分布:

    $\text{Attention Weights} = \text{Softmax}\left(\frac{Q K^\top}{\sqrt{d_k}}\right)$

    注意:Softmax 是在每一行上进行的,这意味着每个查询的匹配分数将归一化为概率,总和为 1。

  4. 加权求和(Weighted Sum)

    最后,使用归一化后的注意力权重对值矩阵 $V$ 进行加权求和,得到每个查询位置的最终输出:
    $\text{Output} = \text{Attention Weights} \times V$

单头注意力机制(Single-Head Attention)

将输入序列(Inputs)通过线性变换生成查询矩阵(Query, Q)、键矩阵(Key, K)和值矩阵(Value, V),随后执行缩放点积注意力(Scaled Dot-Product Attention)。

掩码注意力机制(Masked Attention)

如果使用 mask 掩盖将要预测的词汇,那么 Attention 就延伸为 Masked Attention

在这段代码中,mask 矩阵用于指定哪些位置应该被遮蔽(即填充为 -∞),从而保证这些位置的注意力权重在 softmax 输出中接近于零。注意,掩码机制并不是直接在截断输入序列,也不是在算分数的时候就排除不应该看到的位置,因为看到也没有关系,不会影响与其他位置的分数,所以在传入 Softmax(计算注意力权重)之前排除就可以了。

另外,根据输入数据的来源,还可以将注意力分为自注意力(Self-Attention)和交叉注意力(Cross-Attention)

自注意力机制(Self-attention)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
【输入向量 X】(来自上一层或Embedding)

├─── 步骤①:解耦分裂 ───> 动态乘以 Wq, Wk, Wv 权重,生成 Q, K, V 矩阵
│ (此时 V 矩阵诞生了,和 FFN 还毫无关系)

├─── 步骤②:注意力对齐 ─> 计算 Q 乘以 K 的转置 (Q·K^T),得到亲疏得分矩阵

├─── 步骤③:归一化 ─────> 经过 Softmax 激活,把得分变成加起来等于 1 的权重

├─── 步骤④:财富提取 ─────> 用 Softmax 的权重去乘以 V 矩阵 (Softmax·V)


【Attention 层的最终输出】 ───> 这时,一次完整的 Attention 计算宣告结束!

▼ (数据带着完整的上下文关系,准备出舱)

【前馈神经网络 FFN】 ────────> 步骤⑤:横向关系拉完线后,数据正式进入 FFN 房间
通过 [ 升维 -> SiLU激活过滤 -> 降维 ]
查字典、背常识、提纯特征。


【输出向量,传给下一层】

image-20250808142109091

Transformer 模型架构使用到了三个看起来不同的注意力机制,我们继续忽视共有的 Multi-Head。观察输入,线条一分为三传入 Attention 模块,这意味着查询(query)、键(key)和值(value)实际上都来自同一输入序列 $\mathbf{X}$,数学表达如下:

  • $W^Q, W^K, W^V$:可训练的线性变换权重,实际上就是简单的线性层

image-20260523114549769

image-20260523114614912

image-20260523114637701

  1. $K$ 矩阵:单词的“自我介绍表”(图一)

看你的第一张图,标题写着“语法表”,字幕说:“这里的 K 矩阵就是语法表矩阵”。

  • 通俗含义:$K$(Key)是每个词躺在字典里时,向外界展现出的静态特征标签
  • 结合图一理解
    • “我” 这个词,它的 $K$ 向量是 [动词: 1, 名词: 95, 形容词: 40]。这代表它的静态属性高度偏向名词(代词)。
    • “蛇” 这个词,它的 $K$ 向量是 [动词: 10, 名词: 100, 形容词: 40]。也是一个纯纯的大名词
  • 一句话总结$K$ 矩阵就是一份“花名册”,记录了全句子每个词在各个语义/语法维度上的静态得分。
  1. $Q$ 矩阵:单词的“求偶/检索意向表”(图二)

看你的第二张图,标题变成了“需求表”,字幕说:“代表了每一个词对每个语法标签的关注程度”。

  • 通俗含义:$Q$(Query)是每个词在具体的句子里,为了理清自己的意思,主动发出的“我想找谁”的检索申请
  • 结合图二理解
    • 我们看最后那个 “蛇” 字。“蛇” 作为名词,它自己已经很完整了,但它此时急需一个修饰它的词(什么样的蛇?)。
    • 所以 “蛇” 发出的 $Q$ 申请表是:[动词: 20, 名词: 5, 形容词: 100]
    • 核心亮点:你看!“蛇” 的 $Q$ 向量里,形容词这一项拿到了 100 分!这说明 “蛇” 此时强烈的渴望在句子里抓到一个“形容词”来修饰自己。
  • 一句话总结$Q$ 矩阵就是一份“相亲需求表”,写满了每个词当前最想跟什么属性的词发生关联。
  1. $Q \times K^T$:需求与标签的碰撞,算出“关联度矩阵”(图三)

看你的第三张图,这就是大模型里最著名的核心计算。当 “蛇” 带着它的需求($Q$),去挨个查全句子所有词的静态标签($K$)时:

  • 碰壁“蛇”的Q 乘以 “我”的K $\rightarrow$ “我” 是名词,不是 “蛇” 想要的形容词,打分很低。
  • 看对眼“蛇”的Q 乘以 第一个 “毒”的K $\rightarrow$ 刚好第一个 “毒” 的形容词属性极其强烈!两者的矩阵点积(Dot Product)算出来分数极高。
  • 看图三的最后一行(蛇):经过 Softmax 归一化后,“蛇” 对三个 “毒” 分别给出了 0.25, 0.25, 0.25 的超高关联度(因为模型在这一层可能还在分辨哪一个是形容词),而对其他不相关的词给出了 .00 的关联度。
  • 注意图三里的右上角(因果掩码):你注意到有很多红色的 0 了吗?这就是我们前面反复提到的 ForCausalLM(因果模型) 的铁证!因为是自回归,前面的词(如“我”)绝对不能看到后面的词(如“蛇”),所以右上角全部被强行抹成 0。只有最后一行 “蛇” 才能看完前面所有的词。
  1. 那么,一直没出场的 $V$ 矩阵是干嘛的?

在这个例子里,通过 $Q \times K^T$,“蛇” 终于以 25% 的高分,死死锁定了第一个 “毒” 字。

但是,“蛇” 真正想要的是什么?它不仅想知道“我和你有关系”,它还想把“毒”这个字的【真实毒性、危险含义】给吸收到自己的向量里来,让自己变成一条真正的“毒蛇”。

这时候,$V$(Value)矩阵 就登场了:

  • $K$ 和 $Q$ 只是为了算分数(对暗号)而生。第一个 “毒” 的 $K$ 只是它的“形容词标签”。
  • 第一个 “毒” 真正的内核含义、画面感、语义财富,全存在 $V$ 矩阵 里。
  • 最后一步:模型拿着算出来的 0.25 的权重,乘以第一个 “毒” 的 $V$ 向量。

通过这一步,第一个 “毒” 字所代表的“危险、有毒、绿色液体”等深层语义($V$),就被 25% 地注入到了 “蛇” 的体内。经过这一层计算,“蛇” 就不再是一条普通的蛇了,它在这一层升华成了“毒蛇”。

交叉注意力机制(Cross-Attention)

在 Transformer 解码器中,除了自注意力外,还使用了 交叉注意力(Cross-Attention)

如下图所示,解码器(右)在自底向上的处理过程中,先执行自注意力机制,然后通过交叉注意力从编码器的输出中获取上下文信息。

image-20250808142428374

数学表达如下:

对比学习

Masked AttentionSelf-AttentionCross-Attention 的本质是一致的,这一点从代码调用可以看出来,三者的区别在于未来掩码的使用和输入数据的来源:

  • Masked Attention:用于解码过程,通过掩码屏蔽未来的时间步,确保模型只能基于已生成的部分进行预测,论文中解码器部分的第一个 Attention 使用的是 Masked Self-Attention。

  • Self-Attention:查询、键和值矩阵来自同一输入序列,模型通过自注意力机制学习输入序列的全局依赖关系。

  • Cross-Attention:查询矩阵来自解码器的输入,而键和值矩阵来自编码器的输出,解码器的第二个 Attention 模块就是 Cross-Attention,用于从编码器输出中获取相关的上下文信息。

    • 机器翻译中的中译英任务为例:对于中文句子“中国的首都是北京”,假设模型已经生成了部分译文“The capital of China is”,此时需要预测下一个单词。

      在这一阶段,解码器中的交叉注意力机制会使用当前已生成的译文“The capital of China is”的编码表示作为查询,并将编码器对输入句子“中国的首都是北京”编码表示作为,通过计算查询与键之间的匹配程度,生成相应的注意力权重,以此从值中提取上下文信息,基于这些信息生成下一个可能的单词(token),比如:“Beijing”。

多头注意力机制(Multi-Head Attention)

多头注意力 MultiHeadAttention 是什么_哔哩哔哩_bilibili

image-20260523113142110

image-20260523113225087

因果语言模型

image-20250808143810965

image-20250808143822630

image-20250808143845681

image-20250808145725202

多头注意力机制就是存在多个不同的权重矩阵,形成多个矩阵Z,再把它们 按最后一维(hidden)拼接(concat)→ 做一次线性变换 得到最终输出。

线性bian’h把拼接后的多头结果 Z_concat(形状 batch×seq×d_model)重新线性映射回与输入相同的维度,同时让网络可以学习如何融合不同头的信息

【Transformer模型】曼妙动画轻松学,形象比喻贼好记_哔哩哔哩_bilibili

Transformer原理及架构:多头自注意力机制_哔哩哔哩_bilibili

残差连接(Residual Connection)和层归一化(Layer Normalization, LayerNorm)

image-20250808150313758

在 Transformer 架构中,残差连接(Residual Connection)与层归一化(LayerNorm)结合使用,统称为 Add & Norm 操作。

Add(残差连接,Residual Connection)

残差连接是一种跳跃连接(Skip Connection),它将层的输入直接加到输出上(观察架构图中的箭头),对应的公式如下:

这种连接方式有效缓解了深层神经网络的梯度消失问题。

image-20250808151004667

在transform中,就是输入的矩阵x加上经过注意力机制计算出来的z矩阵

Norm(层归一化,Layer Normalization)

层归一化(LayerNorm)是一种归一化技术,用于提升训练的稳定性和模型的泛化能力。

假设输入向量为 $x = (x_1, x_2, \dots, x_d)$, LayerNorm 的计算步骤如下:

  1. 计算均值和方差
    对输入的所有特征求均值 $\mu$ 和方差 $\sigma^2$:

    $\mu = \frac{1}{d} \sum_{j=1}^{d} x_j, \quad \sigma^2 = \frac{1}{d} \sum_{j=1}^{d} (x_j - \mu)^2$

  2. 归一化公式
    将输入特征 $\hat{x}_i$ 进行归一化:

    $\hat{x}_i = \frac{x_i - \mu}{\sqrt{\sigma^2 + \epsilon}}$

    其中, $\epsilon$ 是一个很小的常数(比如 1e-9),用于防止除以零的情况。

  3. 引入可学习参数
    归一化后的输出乘以 $\gamma$ 并加上 $\beta$, 公式如下:

    $\text{Output} = \gamma \hat{x} + \beta$

    其中 $\gamma$ 和 $\beta$ 是可学习的参数,用于进一步调整归一化后的输出。

前馈神经网络 Position-wise Feed-Forward Networks(FFN)

attention输出是词和词之间的关联度,但是因为attention后,很多维度是没用的,所以需要ffn再做进一步映射,提取有用的特征或维度,传给下面

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
[ Attention 输出: 1536维 ] (充满冗余、噪声和断层)


【第一步:升维投影】 ──> 将空间暴力放大到 8960 维(把交织在一起的特征彻底解耦、揉碎拉直)


【第二步:非线性激活】 ──> 通过 SiLU 激活函数
│ *(数学性质:负数输入全部变成接近 0 的死节点)*
│ *(作用:强行抹杀掉那些没用的维度、噪声和无效特征!)*


【第三步:降维凝聚】 ──> 重新压缩投影回 1536 维


[ FFN 输出: 1536维 ] (只保留了提纯后的、对后续层真正有用的核心特征)

image-20260523113727580

image-20251220191959727

在 Transformer 中,前馈网络层(Feed-Forward Network,FFN)的作用可以概括为一句话:
“对每个位置的向量进行非线性变换,增加模型的表达能力。”

在编码器-解码器架构中,另一个看起来“大一点”的模块就是 Feed Forward,它在每个位置 $i$ 上的计算可以表示为:

其中:

  • $xi \in \mathbb{R}^{d{\text{model}}}$ 表示第 $i$ 个位置的输入向量。
  • $W1 \in \mathbb{R}^{d{\text{model}} \times d{\text{ff}}}$ 和 $W_2 \in \mathbb{R}^{d{\text{ff}} \times d_{\text{model}}}$ 是两个线性变换的权重矩阵。
  • $b1 \in \mathbb{R}^{d{\text{ff}}}$ 和 $b2 \in \mathbb{R}^{d{\text{model}}}$ 是对应的偏置向量。
  • $\text{max}(0, \cdot)$ 是 ReLU 激活函数,用于引入非线性。

大模型发展树

image-20250807171237889

image-20250807173520481

预训练语言模型

预训练语言模型(PLM)是一种通过大量文本数据进行无监督或弱监督训练的语言模型,目的是学习语言的通用表示(即语言的模式、语法、语义等)。这些模型通常在大规模文本数据上进行预训练,然后可以被微调(Fine - tuning)以适应各种下游任务,如文本分类、问答、命名实体识别等。

预训练语言模型的核心思想是利用大量的无标注文本数据来学习语言的通用特征,从而为各种自然语言处理任务提供强大的语言理解能力。预训练模型可以显著提高任务的性能,减少对标注数据的依赖,并且能够快速适应新的任务。

BERT模型(Encoder-only PLM)

针对 Encoder、Decoder 的特点,引入 ELMo 的预训练思路,开始出现不同的、对 Transformer 进行优化的思路。例如,Google 仅选择了 Encoder 层,通过将 Encoder 层进行堆叠,再提出不同的预训练任务-掩码语言模型(Masked Language Model,MLM),打造了一统自然语言理解(Natural Language Understanding,NLU)任务的代表模型——BERT

BERT,全名为 Bidirectional Encoder Representations from Transformers,是由 Google 团队在 2018年发布的预训练语言模型。该模型发布于论文《BERT: Pre-training of Deep Bidirectional Transformers for Language Understanding》,实现了包括 GLUE、MultiNLI 等七个自然语言处理评测任务的最优性能(State Of The Art,SOTA),堪称里程碑式的成果。

T5(Encoder-Decoder PLM)

BERT 也存在一些问题,例如 MLM 任务和下游任务微调的不一致性,以及无法处理超过模型训练长度的输入等问题。为了解决这些问题,研究者们提出了 Encoder-Decoder 模型,通过引入 Decoder 部分来解决这些问题,同时也为 NLP 领域带来了新的思路和方法。

T5(Text-To-Text Transfer Transformer)是由 Google 提出的一种预训练语言模型,通过将所有 NLP 任务统一表示为文本到文本的转换问题,大大简化了模型设计和任务处理。T5 基于 Transformer 架构,包含编码器和解码器两个部分,使用自注意力机制和多头注意力捕捉全局依赖关系,利用相对位置编码处理长序列中的位置信息,并在每层中包含前馈神经网络进一步处理特征。

LLama模型(Decoder-Only PLM)

LLaMA模型是由Meta(前Facebook)开发的一系列大型预训练语言模型。从LLaMA-1到LLaMA-3,LLaMA系列模型展示了大规模预训练语言模型的演进及其在实际应用中的显著潜力。

GPT模型(Decoder-Only PLM)

GPT,即 Generative Pre-Training Language Model,是由 OpenAI 团队于 2018年发布的预训练语言模型。虽然学界普遍认可 BERT 作为预训练语言模型时代的代表,但首先明确提出预训练-微调思想的模型其实是 GPT。

GPT 提出了通用预训练的概念,也就是在海量无监督语料上预训练,进而在每个特定任务上进行微调,从而实现这些任务的巨大收益。虽然在发布之初,由于性能略输于不久后发布的 BERT,没能取得轰动性成果,也没能让 GPT 所使用的 Decoder-Only 架构成为学界研究的主流,但 OpenAI 团队坚定地选择了不断扩大预训练数据、增加模型参数,在 GPT 架构上不断优化,最终在 2020年发布的 GPT-3 成就了 LLM 时代的基础,并以 GPT-3 为基座模型的 ChatGPT 成功打开新时代的大门,成为 LLM 时代的最强竞争者也是目前的最大赢家。

MOE

image-20260522162517288

image-20260522162712736

deepseek架构发展

【闪客】深入解读 DeepSeek V1~V4!男女老少都听得懂~_哔哩哔哩_bilibili

image-20260523155055056

image-20260523155617373

image-20260523155305638

image-20260523155552842

image-20260523155242286

image-20260523155646028

moe,mla,mhc,purerl

参考资料

Hello! · Transformers快速入门

jsksxs360/How-to-use-Transformers: Transformers 库快速入门教程

Hoper-J/AI-Guide-and-Demos-zh_CN: 这是一份入门AI/LLM大模型的逐步指南,包含教程和演示代码,带你从API走进本地大模型部署和微调,代码文件会提供Kaggle或Colab在线版本,即便没有显卡也可以进行学习。项目中还开设了一个小型的代码游乐场🎡,你可以尝试在里面实验一些有意思的AI脚本。同时,包含李宏毅 (HUNG-YI LEE)2024生成式人工智能导论课程的完整中文镜像作业。

Happy-LLM

梗直哥

90%人不知道的LLM黑科技:拆解Transformer如何吃透全网知识!_哔哩哔哩_bilibili

Transformer如何成为AI模型的地基_哔哩哔哩_bilibili

什么是 MCP?

  • 全称:Model Context Protocol
  • 作用:让 AI 助手(如 Claude、Cline 等)在对话过程中,动态调用外部工具(Tool)完成复杂任务(读写文件、查询数据库、调用 API 等)。
  • 组成
    1. MCP Host(宿主,如 Cline、Claude Desktop)
    2. MCP Server(提供 Tool 的后台服务)
    3. Tool(具体功能单元,如 read_file, exec_command 等)

核心概念速记

  • MCP Server
    • 一个独立进程,提供 1-N 个 Tool。
    • 可以用任何语言编写,只要暴露标准 MCP 接口。
  • Tool
    • 最小执行单元,必须包含:
      • name(唯一)
      • description(让 LLM 理解何时调用)
      • input schema(参数结构,JSON Schema)
  • 交互流程(重点)
    • 在启动mcp server时,server将tool信息传送给host
    • 用户在 Host 输入自然语言需求。
    • Host 将需求 + 可用 Tool 列表发给 LLM。
    • LLM 判断调用哪个 Tool,并填充参数。
    • Host 通过 MCP 协议向对应 Server 发送请求。
    • Server 执行 Tool 并返回结果。
    • Host 将结果合并上下文,继续对话。

mcp和fuction calling的区别

维度 Function Calling(FC) MCP(Model Context Protocol)
本质 能力 —— 某个大模型原生就带的一种「调用函数」功能 协议 —— 定义 AI 与外部世界如何长期、标准、可复用地交互
工作方式 模型在一次推理里主动决定要调用哪个函数,并吐出结构化参数 通过「客户端-服务器」架构,由 MCP Server 被动等待模型或 Agent 的请求
是否标准化 否。OpenAI、Anthropic、百度等各家接口格式不同 是。统一 JSON-RPC 2.0 协议,跨模型通用
上下文管理 单次调用,无状态;复杂多轮任务需自己维护 协议层面支持会话、状态、长链路任务
复用/共享 函数代码往往紧耦合在项目里,换模型就得重写 一次写成 MCP Server,可被任何支持 MCP 的模型/IDE/Agent 直接插用

一句话总结:
Function Calling 是「某个模型自带的快捷指令」,MCP 是「让任何模型都能统一插拔工具的工业标准」。
二者并非互斥——MCP 的实现里仍然可以用 Function Calling 去触发具体函数,但它把「怎么描述工具、怎么发现工具、怎么保持会话」这些事都标准化了,从而解决了 FC 带来的碎片化、难维护、难共享的问题 。

安装mcp

在mcp server市场查找自己想用的mcp服务,如Fetch MCP Server

复制mcp配置

1
2
3
4
5
6
7
8
9
10
{
"mcpServers": {
"fetch": {
"command": "uvx",
"args": [
"mcp-server-fetch"
]
}
}
}

在mcp host 中安装,如trae

host会自动完成对mcp的配置

创建一个mcp server

初始化项目

1
2
3
4
5
6
7
8
uv init weather

uv sync

source .venv/bin/activate

#添加依赖
uv add "mcp[cli]" httpx

创建weather.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# 导入类型提示模块,用于类型注解
from typing import Any

# 导入httpx库,用于发送HTTP请求
import httpx

# 从mcp.server.fastmcp模块导入FastMCP类
# FastMCP是一个快速构建MCP(Model Control Protocol)服务器的框架
from mcp.server.fastmcp import FastMCP

# 创建FastMCP实例,命名为"weather",日志级别设置为ERROR(只显示错误信息)
mcp = FastMCP("weather", log_level="ERROR")


# 常量定义
# NWS(National Weather Service)API的基础URL
NWS_API_BASE = "https://api.weather.gov"
# 用户代理字符串,用于标识应用程序
USER_AGENT = "weather-app/1.0"


async def make_nws_request(url: str) -> dict[str, Any] | None:
"""向NWS API发起请求并处理错误。

Args:
url: 要请求的API URL

Returns:
成功时返回解析后的JSON数据字典,失败时返回None
"""
# 设置请求头信息
headers = {
"User-Agent": USER_AGENT, # 用户代理标识
"Accept": "application/geo+json" # 接受的数据格式
}

# 创建异步HTTP客户端
async with httpx.AsyncClient() as client:
try:
# 发起GET请求,设置超时时间为30秒
response = await client.get(url, headers=headers, timeout=30.0)
# 如果响应状态码不是2xx,抛出异常
response.raise_for_status()
# 返回解析后的JSON数据
return response.json()
except Exception:
# 捕获所有异常,返回None表示请求失败
return None


def format_alert(feature: dict) -> str:
"""将警报数据格式化为可读的字符串。

Args:
feature: 包含警报信息的字典

Returns:
格式化后的警报字符串
"""
# 获取警报属性
props = feature["properties"]
# 格式化警报信息,使用get方法提供默认值防止键不存在
return f"""
事件: {props.get('event', '未知')}
区域: {props.get('areaDesc', '未知')}
严重程度: {props.get('severity', '未知')}
描述: {props.get('description', '无描述信息')}
指示: {props.get('instruction', '无具体指示')}
"""


# 使用@mcp.tool()装饰器将函数注册为MCP工具
@mcp.tool()
async def get_alerts(state: str) -> str:
"""获取指定美国州的天气警报。

Args:
state: 两个字母的美国州代码(例如:CA, NY)

Returns:
格式化后的警报信息字符串
"""
# 构建获取州警报的URL
url = f"{NWS_API_BASE}/alerts/active/area/{state}"
# 发起API请求获取数据
data = await make_nws_request(url)

# 检查数据是否有效
if not data or "features" not in data:
return "无法获取警报或未找到警报。"

# 检查是否有警报
if not data["features"]:
return "该州无活动警报。"

# 格式化所有警报
alerts = [format_alert(feature) for feature in data["features"]]
# 用分隔符连接所有警报
return "\n---\n".join(alerts)


# 注册为MCP工具的天气预报函数
@mcp.tool()
async def get_forecast(latitude: float, longitude: float) -> str:
"""获取指定位置的天气预报。

Args:
latitude: 位置的纬度
longitude: 位置的经度

Returns:
格式化后的天气预报字符串
"""
# 首先获取预报网格端点
points_url = f"{NWS_API_BASE}/points/{latitude},{longitude}"
points_data = await make_nws_request(points_url)

# 检查点数据是否获取成功
if not points_data:
return "无法获取此位置的预报数据。"

# 从点响应中获取预报URL
forecast_url = points_data["properties"]["forecast"]
forecast_data = await make_nws_request(forecast_url)

# 检查预报数据是否获取成功
if not forecast_data:
return "无法获取详细预报。"

# 将时间段格式化为可读的预报
periods = forecast_data["properties"]["periods"]
forecasts = []
# 只显示接下来的5个时间段
for period in periods[:5]:
forecast = f"""
{period['name']}:
温度: {period['temperature']}°{period['temperatureUnit']}
风力: {period['windSpeed']} {period['windDirection']}
预报: {period['detailedForecast']}
"""
forecasts.append(forecast)

# 用分隔符连接所有预报
return "\n---\n".join(forecasts)


# 程序入口点
if __name__ == "__main__":
# 初始化并运行服务器,使用stdio传输方式
mcp.run(transport='stdio')

@mcp.tool()可以将函数内的字符串,参数类型等信息传给大模型,以供大模型决定何时调用这个tool

mcp.run(transport=’stdio’)说明mcp server和host的传输方式是输入和输出

mcp server 配置信息

1
2
3
4
5
6
7
8
9
10
11
12
"weather": {
"disabled": false,
"timeout": 60,
"command": "uv",
"args": [
"--directory",
"/Users/joeygreen/PycharmProjects/weather",
"run",
"weather.py"
],
"transportType": "stdio"
}

“disabled”: false表示该服务是否被禁用。false 表示该服务是启用状态,可以正常运行。

“timeout”: 60设置该服务的超时时间,单位为秒。

“command”: “uv”指定执行该服务时使用的命令。

“args”出了执行 command 时需要传递的参数。

“transportType”: “stdio”指定服务的通信方式。stdio 表示标准输入输出流(Standard Input Output),通常用于进程间通信。

解析mcp server与host的通信

image-20250727154739013

输入为host对server发送,输出为server对host发送,以下将列举几个重要的说明

输入中:method字段为host告诉server接下来要干什么,如初始化 (Initialization)通知已初始化 (Notification)查询可用工具 (Listing Tools)调用工具 (Calling a Tool)

protocolVersion说明了mcp使用的协议版本

以下见server返回的tool信息,其中的一个参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
{
"name": "get_forecast",
"description": "Get weather forecast for a location.\n\nArgs:\n latitude: Latitude of the location\n longitude: Longitude of the location\n",
"inputSchema": {
"properties": {
"latitude": {
"title": "Latitude",
"type": "number"
},
"longitude": {
"title": "Longitude",
"type": "number"
}
},
"required": [
"latitude",
"longitude"
],
"title": "get_forecastArguments",
"type": "object"
}
}

可以和定义的函数对比学习

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# 注册为MCP工具的天气预报函数
@mcp.tool()
async def get_forecast(latitude: float, longitude: float) -> str:
"""获取指定位置的天气预报。

Args:
latitude: 位置的纬度
longitude: 位置的经度

Returns:
格式化后的天气预报字符串
"""
# 首先获取预报网格端点
points_url = f"{NWS_API_BASE}/points/{latitude},{longitude}"
points_data = await make_nws_request(points_url)

# 检查点数据是否获取成功
if not points_data:
return "无法获取此位置的预报数据。"

# 从点响应中获取预报URL
forecast_url = points_data["properties"]["forecast"]
forecast_data = await make_nws_request(forecast_url)

# 检查预报数据是否获取成功
if not forecast_data:
return "无法获取详细预报。"

# 将时间段格式化为可读的预报
periods = forecast_data["properties"]["periods"]
forecasts = []
# 只显示接下来的5个时间段
for period in periods[:5]:
forecast = f"""
{period['name']}:
温度: {period['temperature']}°{period['temperatureUnit']}
风力: {period['windSpeed']} {period['windDirection']}
预报: {period['detailedForecast']}
"""
forecasts.append(forecast)

# 用分隔符连接所有预报
return "\n---\n".join(forecasts)

description内容即为我们在定义这个tool的时候写的文档字符串(Documentation String),通常简称为 docstring

inputSchema 是在MCP(Model Control Protocol)中用来描述工具(tool)所需参数的结构和类型的规范。它本质上是一个JSON Schema。

JSON Schema 是一个用于描述和验证 JSON 数据结构的规范。你可以把它理解为 JSON 数据的“蓝图”或“模板”。

required指明哪些参数是必需的,哪些是可选的。

理解mcp的本质

以上内容皆是server与host直接的交互,本质上可以理解成host对server提供的工具进行注册与使用。这其中并不涉及到host与大模型的交互,也就是大模型是如何使用host提供的信息。实际上不同的mcp host与模型的交互协议也不同,如cline使用的是xml格式;cherry studio使用的则是fuction calling

再看mcp的全称Model Context Protocol,模型上下文协议,也就是mcp增加模型的扩展性,使他可以获取更多信息,而server就是为模型提供更多信息的工具

mcp host与模型的交互

使用中转服务器截获日志

image-20250727163811531

以下为cline发送给模型的请求

image-20250727164527797

messages包含了系统提示词与用户输入

先来看系统提示词,cline提供的提示词包括工具使用格式,工具信息,工具使用方法等。这里重点说一下,cline的工具使用格式xml

结构如下:

1
2
3
4
5
<tool_name>
<parameter1_name>value1</parameter1_name>
<parameter2_name>value2</parameter2_name>
...
</tool_name>

例如:

1
2
3
<read_file>
src/main.js
</read_file>

再举个例子

use_mcp_tool
描述:请求使用由连接的 MCP 服务器提供的工具。每个 MCP 服务器可以提供具有不同功能的多个工具。工具有定义的输入模式,用于指定必需和可选参数。
参数:

server_name: (必需) 提供该工具的 MCP 服务器的名称
tool_name: (必需) 要执行的工具的名称
arguments: (必需) 一个 JSON 对象,包含工具的输入参数,遵循工具的输入模式
用法:

1
2
3
4
5
6
7
8
9
10
<use_mcp_tool>
<server_name>服务器名称在此</server_name>
<tool_name>工具名称在此</tool_name>

{
"param1": "value1",
"param2": "value2"
}

</use_mcp_tool>

模型返回响应如下

image-20250727174504506

sse连接,流式输出

SSE 是一种基于标准 HTTP只允许服务器向客户端单向推送文本流的实时通信技术,浏览器原生支持,自动重连,常用于AI 流式回答实时日志股价/监控推送等场景。

即客户端发送一次请求,连续接受多次响应直到结束

mcp的三种传输协议

协议名称 通信方式 适用场景 优势 局限
Stdio(标准输入输出) 使用进程的标准输入(stdin)和标准输出(stdout)进行本地通信,基于 JSON-RPC 2.0 格式 本地开发、调试、IDE插件、命令行工具 简单易实现、跨平台、低延迟 仅支持本地通信,无法跨网络,低并发
SSE(Server-Sent Events) 客户端通过 HTTP POST 发送请求,服务器通过 SSE 单向推送流式响应 实时监控、新闻推送、远程服务调用 基于 HTTP,浏览器友好,支持流式数据 仅支持单向通信,MCP官方已标记为“即将废弃”
Streamable HTTP(新型流式HTTP) 支持双向流式通信的现代 HTTP 协议,替代 SSE,支持会话恢复、OAuth 认证等 分布式系统、高并发、双向实时交互 双向通信、高性能、企业级安全机制 实现较复杂,生态仍在发展中

ReAct

ReAct 是一种用于增强大型语言模型(LLMs)推理和行动能力的技术框架,它通过结合“推理”(Reasoning)和“行动”(Acting)来提升模型处理复杂任务的能力。

其工作流程通常包括以下几个步骤:

  1. 思考(Reasoning):模型对当前问题进行分析,思考下一步需要采取的行动。
  2. 行动(Acting):模型决定调用哪些工具或函数,并提供必要的参数。
  3. 观察(Observation):工具执行后返回结果,模型对结果进行观察。
  4. 响应(Response):根据观察结果,模型生成最终的用户响应。

实际应用上就是告诉大模型用ReAct这种模式来思考

参考资料

MCP终极指南 - 从原理到实战,带你深入掌握MCP(基础篇)_哔哩哔哩_bilibili

前言

本教程为langchain官方教程的学习记录

academy.langchain.com/enrollments

代码见learn-rag-langchain/academy-langgraph at main · zxj-2023/learn-rag-langchain

module-4

Parallel node execution 并行节点执行

Waiting for nodes to finish 等待节点完成

现在,让我们考虑一个案例,其中一个并行路径的步骤比另一个多。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
builder = StateGraph(State)

# Initialize each node with node_secret
builder.add_node("a", ReturnNodeValue("I'm A"))
builder.add_node("b", ReturnNodeValue("I'm B"))
builder.add_node("b2", ReturnNodeValue("I'm B2"))
builder.add_node("c", ReturnNodeValue("I'm C"))
builder.add_node("d", ReturnNodeValue("I'm D"))

# Flow
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "b2")
builder.add_edge(["b2", "c"], "d")
builder.add_edge("d", END)
graph = builder.compile()

display(Image(graph.get_graph().draw_mermaid_png()))

image-20250722143802652

在这种情况下,bb2c 都是同一个步骤的一部分。图形将在进入 d 步骤之前等待所有这些操作完成。

1
graph.invoke({"state": []})
1
2
3
4
5
6
Adding I'm A to []
Adding I'm B to ["I'm A"]
Adding I'm C to ["I'm A"]
Adding I'm B2 to ["I'm A", "I'm B", "I'm C"]
Adding I'm D to ["I'm A", "I'm B", "I'm C", "I'm B2"]
{'state': ["I'm A", "I'm B", "I'm C", "I'm B2", "I'm D"]}
Setting the order of state updates 设置状态更新的顺序

然而,在每个步骤中,我们无法对状态更新的顺序进行具体控制!简单来说,它是基于图拓扑结构由 LangGraph 确定的确定性顺序,该顺序为 \我们无法控制**

上面,我们看到 c 被添加在 b2 之前

然而,我们可以使用自定义 reducer 来定制此功能,例如,对状态更新进行排序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def sorting_reducer(left, right):
""" 合并并排序列表中的值"""
# 如果 left 不是列表,则将其转换为列表
if not isinstance(left, list):
left = [left]

# 如果 right 不是列表,则将其转换为列表
if not isinstance(right, list):
right = [right]

# 合并 left 和 right 列表,然后升序排序
return sorted(left + right, reverse=False)
class State(TypedDict):
# sorting_reducer 函数将对 state 中的值进行排序
state: Annotated[list, sorting_reducer]
1
graph.invoke({"state": []})
1
{'state': ["I'm A", "I'm B", "I'm B2", "I'm C", "I'm D"]}

现在,reducer 对更新的状态值进行排序!sorting_reducer 示例对所有值进行全局排序。我们还可以:

  1. 在并行步骤期间将输出写入状态中的单独字段
  2. 在并行步骤之后使用“汇”节点来合并和排序这些输出
  3. 合并后清除临时字段

请参阅 docs 以获取更多详细信息。

Working with LLMs 使用 LLMs

现在,让我们添加一个现实中的例子!我们希望从两个外部来源(Wikipedia 和 Web-Search)收集上下文信息,并让 LLM 回答一个问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
from langchain_core.messages import HumanMessage, SystemMessage

from langchain_community.document_loaders import WikipediaLoader
from langchain_community.tools import TavilySearchResults

def search_web(state):

""" 从网络搜索中检索文档 """

# 搜索
tavily_search = TavilySearchResults(max_results=3)
search_docs = tavily_search.invoke(state['question'])

# 将多个搜索文档转换成了一个统一格式的长文本,每个文档都有自己的元数据(如URL),并且文档之间有明确的分隔,便于后续处理或展示。
"""
- 这是一个 列表推导式 (list comprehension) ,它会遍历 search_docs 列表中的每一个元素(这里我们称之为 doc )。
- search_docs 里的每个 doc 应该是一个包含 'url' 和 'content' 键的字典。
"""
formatted_search_docs = "\n\n---\n\n".join(
[
f'<Document href="{doc["url"]}">\n{doc["content"]}\n</Document>'
for doc in search_docs
]
)

return {"context": [formatted_search_docs]}

def search_wikipedia(state):

""" 从维基百科中检索文档 """

# 搜索
search_docs = WikipediaLoader(query=state['question'],
load_max_docs=2).load()

# 格式化
formatted_search_docs = "\n\n---\n\n".join(
[
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}">\n{doc.page_content}\n</Document>'
for doc in search_docs
]
)

return {"context": [formatted_search_docs]}

def generate_answer(state):

""" 用于回答问题的节点 """

# 获取状态
context = state["context"]
question = state["question"]

# 模板
answer_template = """使用以下上下文回答问题 {question}: {context}"""
answer_instructions = answer_template.format(question=question,
context=context)

# 回答
answer = llm.invoke([SystemMessage(content=answer_instructions)]+[HumanMessage(content=f"回答问题。")])

# 将其附加到状态
return {"answer": answer}

# 添加节点
builder = StateGraph(State)

# 初始化每个节点
builder.add_node("search_web",search_web)
builder.add_node("search_wikipedia", search_wikipedia)
builder.add_node("generate_answer", generate_answer)

# 流程
builder.add_edge(START, "search_wikipedia")
builder.add_edge(START, "search_web")
builder.add_edge("search_wikipedia", "generate_answer")
builder.add_edge("search_web", "generate_answer")
builder.add_edge("generate_answer", END)
graph = builder.compile()

display(Image(graph.get_graph().draw_mermaid_png()))

image-20250722153534867

1
2
result = graph.invoke({"question": "英伟达2025年第一季度财报表现如何"})
result['answer'].content

Sub-graphs 子图

子图允许你在图表的不同部分创建和管理不同的状态。这在多智能体系统中尤其有用,尤其是在每个智能体都有各自状态的智能体团队中。

让我们考虑一个简单的例子:

  • 我有一个系统,它接收日志。
  • 该系统通过不同的代理执行两个独立的子任务(总结日志,查找故障模式)。
  • 我希望在两个不同的子图中执行这两个操作。

最重要的是要理解图表是如何传达信息的!

简而言之,通信是 \通过重叠密钥** 实现的:

  • 子图可以访问父图中的 docs(文档)。
  • 父图可以从子图中访问 summary(摘要)和 failure_report(故障报告)。

subgraph.png

  1. Logs (Traces):
    • 这是系统的输入,表示一系列的日志记录。
  2. Entry Graph:
    • 这是系统的入口图,它包含了总体状态(Overall State),其中包含:
      • docs:文档或日志数据。
      • summary report:摘要报告,这是系统执行摘要任务后生成的。
      • failure report:故障报告,这是系统执行故障分析任务后生成的。
  3. Call sub-graphs:
    • 这是从入口图调用两个子图的过程,分别用于执行摘要和故障分析任务。
  4. Summarization:
    • 这个子图负责生成日志的摘要。它的状态(Summary State)包含:
      • docs:与入口图共享的文档数据。
      • summary:摘要内容。
      • summary report:摘要报告,这是摘要任务完成后生成的。
  5. Failure Analysis:
    • 这个子图负责分析日志中的故障模式。它的状态(Failure Analysis State)包含:
      • docs:与入口图共享的文档数据。
      • failures:故障模式。
      • failure report:故障报告,这是故障分析任务完成后生成的。
  6. Finish:
    • 这是流程的结束点,表示两个子图的任务都已完成,并且生成了摘要报告和故障报告。
1
2
3
4
5
6
7
8
9
10
11
12
13
from operator import add
from typing_extensions import TypedDict
from typing import List, Optional, Annotated

#logs结构
class Log(TypedDict):
id: str
question: str
docs: Optional[List]
answer: str
grade: Optional[int]
grader: Optional[str]
feedback: Optional[str]
Sub graphs

这里是失败分析子图,它使用了 FailureAnalysisState

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from IPython.display import Image, display
from langgraph.graph import StateGraph, START, END

# 故障分析子图
class FailureAnalysisState(TypedDict):
"""用于故障分析的状态。"""
cleaned_logs: List[Log] # 清理后的日志列表
failures: List[Log] # 包含故障的日志列表
fa_summary: str # 故障分析摘要
processed_logs: List[str] # 已处理的日志标识符列表

class FailureAnalysisOutputState(TypedDict):
"""故障分析的输出状态。"""
fa_summary: str # 故障分析摘要
processed_logs: List[str] # 已处理的日志标识符列表

def get_failures(state):
"""获取包含故障的日志"""
cleaned_logs = state["cleaned_logs"]
# 从清理后的日志中筛选出包含 "grade" 键的日志,这些被视为故障
failures = [log for log in cleaned_logs if "grade" in log]
return {"failures": failures}

def generate_summary(state):
"""生成故障摘要"""
failures = state["failures"]
# 添加功能:fa_summary = summary_generation(qs_summary)
fa_summary = "Chroma 文档的检索质量不佳。"
# 为每个故障日志生成一个处理过的日志标识符
return {"fa_summary": fa_summary, "processed_logs": [f"failure-analysis-on-log-{failure['id']}" for failure in failures]}

fa_builder = StateGraph(state_schema=FailureAnalysisState,output_schema=FailureAnalysisOutputState)
fa_builder.add_node("get_failures", get_failures)
fa_builder.add_node("generate_summary", generate_summary)
fa_builder.add_edge(START, "get_failures")
fa_builder.add_edge("get_failures", "generate_summary")
fa_builder.add_edge("generate_summary", END)

graph = fa_builder.compile()
display(Image(graph.get_graph().draw_mermaid_png()))

image-20250723094949158

这里是问题总结子图,它使用了 QuestionSummarizationState

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# 摘要生成子图
class QuestionSummarizationState(TypedDict):
"""用于问题摘要的状态。"""
cleaned_logs: List[Log] # 清理后的日志列表
qs_summary: str # 问题摘要
report: str # 从摘要生成的报告
processed_logs: List[str] # 已处理的日志标识符列表

class QuestionSummarizationOutputState(TypedDict):
"""问题摘要的输出状态。"""
report: str # 最终报告
processed_logs: List[str] # 已处理的日志标识符列表

def generate_summary(state):
"""从日志生成摘要。"""
cleaned_logs = state["cleaned_logs"]
# 添加功能:summary = summarize(generate_summary)
summary = "问题集中在 ChatOllama 和 Chroma 向量存储的使用上。"
# 返回摘要以及已处理的日志ID
return {"qs_summary": summary, "processed_logs": [f"summary-on-log-{log['id']}" for log in cleaned_logs]}

def send_to_slack(state):
"""模拟发送报告。"""
qs_summary = state["qs_summary"]
# 添加功能:report = report_generation(qs_summary)
report = "foo bar baz"
return {"report": report}

qs_builder = StateGraph(QuestionSummarizationState,output_schema=QuestionSummarizationOutputState)
qs_builder.add_node("generate_summary", generate_summary)
qs_builder.add_node("send_to_slack", send_to_slack)
qs_builder.add_edge(START, "generate_summary")
qs_builder.add_edge("generate_summary", "send_to_slack")
qs_builder.add_edge("send_to_slack", END)

graph = qs_builder.compile()
display(Image(graph.get_graph().draw_mermaid_png()))

image-20250723100648199

Adding sub graphs to our parent graph 向父图添加子图

现在,我们可以将所有内容整合在一起。我们使用 EntryGraphState 创建父图。并且我们将我们的子图添加为节点!

1
2
3
4
5
6
7
8
9
# 入口图
class EntryGraphState(TypedDict):
# 原始日志数据列表
raw_logs: List[Log]
# 经过清洗处理后的日志数据列表
cleaned_logs: List[Log]
fa_summary: str # 这只会在故障分析子图中生成
report: str # 这只会在问题摘要子图中生成
processed_logs: Annotated[List[int], add] # 跟踪哪些日志已经被处理过 ,尤其是在子图之间共享状态时。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def clean_logs(state):
# 获取日志
raw_logs = state["raw_logs"]
# 数据清洗 raw_logs -> docs
cleaned_logs = raw_logs
return {"cleaned_logs": cleaned_logs}

entry_builder = StateGraph(EntryGraphState)
entry_builder.add_node("clean_logs", clean_logs)
#添加子图
entry_builder.add_node("question_summarization", qs_builder.compile())
entry_builder.add_node("failure_analysis", fa_builder.compile())

entry_builder.add_edge(START, "clean_logs")
entry_builder.add_edge("clean_logs", "failure_analysis")
entry_builder.add_edge("clean_logs", "question_summarization")
entry_builder.add_edge("failure_analysis", END)
entry_builder.add_edge("question_summarization", END)

graph = entry_builder.compile()

from IPython.display import Image, display

# 将 xray 设置为 1 将显示嵌套图的内部结构
display(Image(graph.get_graph(xray=1).draw_mermaid_png()))

image-20250723102913524

Map-reduce

LangGraph 里的 Map-Reduce 是一种并行处理模式,用于将一个大任务拆分成多个子任务(Map),再汇总结果(Reduce)。这是 LangGraph 中处理批量数据或并行节点执行的核心机制之一。

让我们设计一个系统,该系统将完成两件事情:

(1) 映射(Map) —— 根据主题生成一组笑话。
(2) 归约(Reduce) —— 从这组笑话中挑出最棒的一条。

1
2
3
4
5
6
7
8
9
10
11
12
from langchain_openai import ChatOpenAI

# Prompts we will use
subjects_prompt = """生成一个包含3个子主题的列表,这些子主题都与以下总体主题相关:{topic}。"""
joke_prompt = """生成一个关于{subject}的笑话"""
best_joke_prompt = """下面是关于{topic}的一堆笑话。选择最好的一个!返回最好笑话的ID,第一个笑话的ID从0开始。笑话:\n\n {jokes}"""
# LLM
model = ChatOpenAI(
model="qwen-plus-2025-04-28",
api_key="sk-",
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"
)
Parallelizing joke generation 并行化笑话生成

首先,让我们定义图的入口点,它将:

  • 接收用户输入的主题
  • 基于该主题生成若干“笑话子主题”
  • 将每个子主题发送到上面定义的笑话生成节点

我们的状态有一个 jokes 键,它将累积来自并行化笑话生成的笑话。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class Subjects(BaseModel):
"""定义一个Pydantic模型,用于表示主题列表。"""
subjects: list[str] # 主题字符串列表

class BestJoke(BaseModel):
"""定义一个Pydantic模型,用于表示最佳笑话的ID。"""
id: int # 最佳笑话的索引ID

class OverallState(TypedDict):
"""定义整个图的总体状态,使用TypedDict以便于类型提示和状态管理。"""
topic: str # 当前讨论的主题
subjects: list # 生成的子主题列表
jokes: Annotated[list, operator.add] # 笑话列表,使用operator.add表示列表内容会累加而不是覆盖
best_selected_joke: str # 最终选出的最佳笑话

生成笑话的主题。

1
2
3
4
5
6
7
#用于生成主题
def generate_topics(state: OverallState):
#使用 Python 的 format() 方法来构建一个提示字符串( prompt )
prompt = subjects_prompt.format(topic=state["topic"])
#.with_structured_output(Subjects) 它指示模型尝试将其输出格式化为 Subjects 类
response = model.with_structured_output(Subjects).invoke(prompt)
return {"subjects": response.subjects}

这就是妙处:我们利用 Send 为每个主题并行生成笑话。

非常实用!无论主题数量多少,它都能自动并行处理。

  • generate_joke:图中节点的名字
  • {"subject": s}:要传递的状态

Send 允许你向 generate_joke 节点发送任意结构的状态,无需与 OverallState 对齐。
在这里,generate_joke 使用自己的内部状态,我们通过 Send 按需填充即可。

在 LangGraph 里,Send 是一个“动态派发器”:它让你在运行时决定“要把哪个节点运行多少次、每次给它什么数据”,并自动并行执行。

1
2
3
4
5
6
from langgraph.types import Send
def continue_to_jokes(state: OverallState):
# 该函数根据当前状态中的主题列表,为每个主题生成一个 Send 对象。
# 每个 Send 对象都指示图将数据发送到名为 "generate_joke" 的节点,
# 并将当前主题作为 "subject" 参数传递,从而实现并行生成笑话。
return [Send("generate_joke", {"subject": s}) for s in state["subjects"]]
Joke generation (map) 笑话生成

现在,我们只需定义一个节点来生成笑话,命名为 generate_joke

生成的笑话会被写回到 OverallState 中的 jokes 字段。
该字段配有 reducer,能够自动把多次写入的列表合并成一个大列表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 定义 JokeState,用于表示生成笑话任务的输入状态。
class JokeState(TypedDict):
subject: str#笑话的主题

# 定义 Joke 模型,用于表示模型生成的笑话的结构。
class Joke(BaseModel):
joke: str#笑话的文本内容

# 定义生成笑话的函数。
def generate_joke(state: JokeState):
# 根据 joke_prompt 模板和当前状态中的主题格式化提示。
prompt = joke_prompt.format(subject=state["subject"])
# 调用语言模型,并指定输出应结构化为 Joke 类型。
response = model.with_structured_output(Joke).invoke(prompt)
# 返回一个包含生成的笑话的字典,键为 "jokes"。
return {"jokes": [response.joke]}
Best joke selection (reduce) 最佳笑话选择

现在,我们添加逻辑来挑选最好的笑话。

1
2
3
4
5
6
7
8
9
def best_joke(state: OverallState):
# 将状态中所有笑话列表连接成一个字符串,每个笑话之间用两个换行符分隔
jokes = "\n\n".join(state["jokes"])
# 使用主题和所有笑话格式化最佳笑话提示语
prompt = best_joke_prompt.format(topic=state["topic"], jokes=jokes)
# 调用模型,期望其输出符合 BestJoke 结构(包含最佳笑话的ID)
response = model.with_structured_output(BestJoke).invoke(prompt)
# 根据模型返回的ID,从笑话列表中选择最佳笑话,并将其存储在状态的 best_selected_joke 字段中
return {"best_selected_joke": state["jokes"][response.id]}
Compile 编译
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from IPython.display import Image
from langgraph.graph import END, StateGraph, START

# 构建图:将所有组件组合在一起构建我们的流程图
graph = StateGraph(OverallState)

graph.add_node("generate_topics", generate_topics)
graph.add_node("generate_joke", generate_joke)
graph.add_node("best_joke", best_joke)

graph.add_edge(START, "generate_topics")
# 根据条件决定是否继续生成笑话
graph.add_conditional_edges("generate_topics", continue_to_jokes, ["generate_joke"])
# 生成笑话后执行选择最佳笑话
graph.add_edge("generate_joke", "best_joke")
# 选择最佳笑话后流程结束
graph.add_edge("best_joke", END)


app = graph.compile()
Image(app.get_graph().draw_mermaid_png())

image-20250723112736244

1
2
for s in app.stream({"topic": "日本广岛原子弹"}):
print(s)
1
2
3
4
5
{'generate_topics': {'subjects': ['原子弹投放的历史背景与决策过程', '广岛原爆对城市与居民的影响', '战后和平运动与核裁军倡议']}}
{'generate_joke': {'jokes': ['为什么广岛的重建计划从不担心交通堵塞?因为每个人都习惯了‘瞬间消失’!(注:此笑话为虚构创作,旨在以幽默方式引发思考,并非对历史事件的不尊重)']}}
{'generate_joke': {'jokes': ['战后和平运动的人开会讨论核裁军,一个人站起来说:‘我们必须彻底销毁所有核武器!’ 另一个人犹豫地举手:‘那……我们保留一个吧,就一个,藏在冰箱后面,以防万一。’']}}
{'generate_joke': {'jokes': ["杜鲁门宣布要结束战争,顾问问是否要使用原子弹。罗斯福的棺材板突然震动了一下,丘吉尔的雪茄掉在了地上,斯大林则默默拿起了电话:'同志,我们的计划可能需要再推迟一下。'"]}}
{'best_joke': {'best_selected_joke': '为什么广岛的重建计划从不担心交通堵塞?因为每个人都习惯了‘瞬间消失’!(注:此笑话为虚构创作,旨在以幽默方式引发思考,并非对历史事件的不尊重)'}}

Research Assistant 研究助理

见另一篇文章

module-5

Chatbot with Memory 带有记忆功能的聊天机器人

在这里,我们将介绍 LangGraph Memory Store 作为一种保存和检索长期记忆的方法。

LangGraph Memory Store 是 LangGraph 提供的长期记忆存储接口,用于在 不同对话线程之间 持久化保存和检索用户信息。

名称 作用
BaseStore 抽象接口,定义了 put/get/search/delete 等操作方法
InMemoryStore 内存实现,适合原型验证
RedisStore / AsyncRedisStore 生产级实现,支持向量搜索、元数据过滤和命名空间管理

我们将构建一个使用 short-term (within-thread仅当前对话线程)long-term (across-thread跨所有对话线程 ) 内存的聊天机器人。

我们将重点关注长期 semantic memory(语义记忆),它将包含关于用户的事实信息。这些长期记忆将被用于创建一个个性化的聊天机器人,它可以记住有关用户的事实。

它将节省内存 “in the hot path”,当用户与之聊天时。

“in the hot path” 指的是:在对话或任务执行的主流程中(即用户输入后立即、同步地)主动调用工具或写入记忆,使信息实时生效并可用于下一步决策。

Introduction to the LangGraph Store LangGraph 存储简介

LangGraph Memory Store 提供了一种在 LangGraph 中 跨线程 存储和检索信息的方式。这是一个用于持久化 key-value 存储的 开源基类

1
2
3
import uuid
from langgraph.store.memory import InMemoryStore
in_memory_store = InMemoryStore()

Store 中存储对象(例如,记忆)时,我们提供:

- 对象的 namespace(类似于目录的元组)

- 对象的 key(类似于文件名)

- 对象的 value(类似于文件内容)

我们使用 put 方法通过 namespacekey 将对象保存到存储中。

image-20250725155616205

1
2
3
4
5
6
7
8
9
10
11
12
# 为要保存的记忆设置命名空间
user_id = "1" # 用户ID
namespace_for_memory = (user_id, "memories") # 记忆命名空间

# 生成一个唯一的键值
key = str(uuid.uuid4()) # 使用UUID创建唯一标识符作为键

# 值需要是一个字典格式
value = {"food_preference": "我喜欢披萨"} # 存储的食物偏好信息

# 保存记忆
in_memory_store.put(namespace_for_memory, key, value) # 将记忆存储到内存中

我们使用 search 通过 namespace 从存储中检索对象。这将返回一个列表。

1
2
3
4
5
6
7
8
9
memories = in_memory_store.search(namespace_for_memory)
type(memories)

# Metatdata
memories[0].dict()

# The key, value
print(memories[0].key, memories[0].value)
9e65de8a-f404-4974-b509-0df0566d8fb5 {'food_preference': '我喜欢披萨'}

我们还可以使用 get 通过 namespacekey 检索对象。

1
2
3
# Get the memory by namespace and key
memory = in_memory_store.get(namespace_for_memory, key)
memory.dict()
Chatbot with long-term memory 具有长期记忆的聊天机器人

我们想要一个聊天机器人,有两种记忆的方式:

  1. Short-term (within-thread) memory: 聊天机器人可以保留会话历史记录和/或允许在聊天会话中进行中断。
  2. Long-term (cross-thread) memory: 聊天机器人可以记住特定用户在所有聊天会话中的信息 跨会话

对于 short-term memory,我们将使用一个 checkpointer

  • 他们在每一步都将图状态写入线程中。
  • 他们在该线程中持久化保存聊天历史记录。
  • 他们允许图在该线程中的任何步骤被中断和/或恢复。

对于 long-term memory,我们将使用上面介绍的 LangGraph Store

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
from IPython.display import Image, display

from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.store.base import BaseStore

from langchain_core.messages import HumanMessage, SystemMessage
from langchain_core.runnables.config import RunnableConfig

# 聊天机器人指令
MODEL_SYSTEM_MESSAGE = """你是一个拥有记忆功能的有用助手,能够提供关于用户的信息。
如果你有这个用户的记忆,请使用它来个性化你的回复。
以下是记忆内容(可能为空):{memory}"""

# 根据聊天历史和任何现有记忆创建新记忆
CREATE_MEMORY_INSTRUCTION = """"你正在收集用户信息以个性化你的回复。

当前用户信息:
{memory}

指令:
1. 仔细查看下面的聊天历史
2. 识别有关用户的新信息,例如:
- 个人详情(姓名、位置)
- 偏好(喜欢、不喜欢)
- 兴趣和爱好
- 过去的经历
- 目标或未来计划
3. 将任何新信息与现有记忆合并
4. 将记忆格式化为清晰的项目符号列表
5. 如果新信息与现有记忆冲突,请保留最新版本

记住:只包括用户直接陈述的事实信息。不要做假设或推断。

基于以下聊天历史,请更新用户信息:"""

def call_model(state: MessagesState, config: RunnableConfig, store: BaseStore):

"""从存储中加载记忆并使用它来个性化聊天机器人的回复。"""

# 从配置中获取用户ID
user_id = config["configurable"]["user_id"]

# 从存储中检索记忆
namespace = ("memory", user_id)
key = "user_memory"
existing_memory = store.get(namespace, key)

# 如果存在则提取实际的记忆内容并添加前缀
if existing_memory:
# 值是一个带有memory键的字典
existing_memory_content = existing_memory.value.get('memory')
else:
existing_memory_content = "未找到现有记忆。"

# 在系统提示中格式化记忆
system_msg = MODEL_SYSTEM_MESSAGE.format(memory=existing_memory_content)

# 使用记忆以及聊天历史进行回复
response = model.invoke([SystemMessage(content=system_msg)]+state["messages"])

return {"messages": response}

def write_memory(state: MessagesState, config: RunnableConfig, store: BaseStore):

"""反思聊天历史并将记忆保存到存储中。"""

# 从配置中获取用户ID
user_id = config["configurable"]["user_id"]

# 从存储中检索现有记忆
namespace = ("memory", user_id)
existing_memory = store.get(namespace, "user_memory")

# 提取记忆
if existing_memory:
existing_memory_content = existing_memory.value.get('memory')
else:
existing_memory_content = "未找到现有记忆。"

# 在系统提示中格式化记忆
system_msg = CREATE_MEMORY_INSTRUCTION.format(memory=existing_memory_content)
new_memory = model.invoke([SystemMessage(content=system_msg)]+state['messages'])

# 覆盖存储中的现有记忆
key = "user_memory"

# 将值写入为带有memory键的字典
store.put(namespace, key, {"memory": new_memory.content})

# 定义图
builder = StateGraph(MessagesState)
builder.add_node("call_model", call_model)
builder.add_node("write_memory", write_memory)
builder.add_edge(START, "call_model")
builder.add_edge("call_model", "write_memory")
builder.add_edge("write_memory", END)

# 用于跨线程的长期记忆存储
across_thread_memory = InMemoryStore()

# 用于线程内的短期记忆检查点
within_thread_memory = MemorySaver()

# 使用检查点器和存储编译图
graph = builder.compile(checkpointer=within_thread_memory, store=across_thread_memory)

# 显示图
display(Image(graph.get_graph(xray=1).draw_mermaid_png()))

image-20250725161106319

聊天历史将通过检查点工具保存到短期记忆中。聊天机器人将回顾聊天历史。然后,它将创建并保存一个记忆到 LangGraph Store。此内存可在未来的聊天会话中访问,以个性化聊天机器人的响应。

当我们与聊天机器人交互时,我们提供两样东西:

  1. Short-term (within-thread) memory: 一个用于持久化聊天历史的 thread ID
  2. Long-term (cross-thread) memory: 一个用于将长期记忆命名空间到用户的 user ID
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 我们提供线程ID用于短期(线程内)记忆
# 我们提供用户ID用于长期(跨线程)记忆
config = {"configurable": {"thread_id": "1", "user_id": "1"}}

# 用户输入
input_messages = [HumanMessage(content="你好,我的名字是Lance")]

# 运行图
for chunk in graph.stream({"messages": input_messages}, config, stream_mode="values"):
chunk["messages"][-1].pretty_print()

================================ Human Message =================================

你好,我的名字是Lance
================================== Ai Message ==================================

你好,Lance!很高兴认识你。有什么我可以帮你的吗?😊

我们正在使用 MemorySaver 检查点来管理线程内内存。这会将聊天历史保存到线程中。我们可以查看保存到线程的聊天历史记录。

1
2
3
4
thread = {"configurable": {"thread_id": "1"}}
state = graph.get_state(thread).values
for m in state["messages"]:
m.pretty_print()

回想一下,我们使用存储库编译了该图:across_thread_memory = InMemoryStore()并且,我们向图中添加了一个节点 (write_memory),该节点反映了聊天历史并保存了一段记忆到存储中。

我们可以查看内存是否已保存到存储中。

1
2
3
4
5
6
7
8
9
10
11
# 为要保存的记忆设置命名空间
user_id = "1"
namespace = ("memory", user_id)
existing_memory = across_thread_memory.get(namespace, "user_memory")
existing_memory.dict()

{'namespace': ['memory', '1'],
'key': 'user_memory',
'value': {'memory': '- 姓名:Lance \n- 位置:旧金山 \n- 兴趣和爱好:骑自行车 \n- 喜欢的活动:在旧金山骑行,可能包括金门大桥和滨海区路线'},
'created_at': '2025-07-25T08:12:35.877160+00:00',
'updated_at': '2025-07-25T08:12:35.877161+00:00'}

现在,让我们以 相同的用户ID启动一个新线程。我们应该看到聊天机器人记住了用户的个人资料,并将其用于个性化响应。

1
2
3
4
5
6
7
8
9
# 我们提供用户ID用于跨线程记忆以及一个新的线程ID
config = {"configurable": {"thread_id": "2", "user_id": "1"}}

# 用户输入
input_messages = [HumanMessage(content="你好!你推荐我去哪里骑自行车?")]

# 运行图
for chunk in graph.stream({"messages": input_messages}, config, stream_mode="values"):
chunk["messages"][-1].pretty_print()

Chatbot with Profile Schema 带有配置文件模式的聊天机器人

我们的聊天机器人将记忆保存为字符串。在实践中,我们通常希望记忆具有结构化格式。例如,记忆可以是单个、持续更新的模式 。在我们的案例中,我们希望这是一个单一的用户档案。

我们将扩展我们的聊天机器人,将语义记忆保存到单个用户档案 中。我们还将介绍一个库 Trustcall,用于使用新信息更新此模式。

Defining a user profile schema 定义用户配置文件模式

Python 有许多不同类型的 structured data,例如 TypedDict、字典、JSON 和 Pydantic

让我们先使用 TypedDict 来定义一个用户资料模式。

1
2
3
4
5
6
from typing import TypedDict, List

class UserProfile(TypedDict):
"""带有类型字段的用户档案模式"""
user_name: str # 用户的首选名称
interests: List[str] # 用户兴趣列表
Saving a schema to the store 将模式保存到存储中

LangGraph Store 接受任何 Python 字典作为 value

1
2
3
4
5
6
# TypedDict 实例
user_profile: UserProfile = {
"user_name": "Lance", # 用户名
"interests": ["骑行", "科技", "咖啡"] # 兴趣爱好
}
user_profile

我们使用 put 方法将 TypedDict 保存到存储中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import uuid
from langgraph.store.memory import InMemoryStore

# 初始化内存存储
in_memory_store = InMemoryStore()

# 为要保存的记忆创建命名空间
user_id = "1"
namespace_for_memory = (user_id, "memory")

# 将记忆以键值对的形式保存到命名空间中
key = "user_profile"
value = user_profile
in_memory_store.put(namespace_for_memory, key, value)

我们使用 search 按命名空间从存储中检索对象。

1
2
3
4
for m in in_memory_store.search(namespace_for_memory):
print(m.dict())

{'namespace': ['1', 'memory'], 'key': 'user_profile', 'value': {'user_name': 'Lance', 'interests': ['骑行', '科技', '咖啡']}, 'created_at': '2025-07-25T08:58:06.031770+00:00', 'updated_at': '2025-07-25T08:58:06.031775+00:00', 'score': None}

我们还可以使用 get 通过命名空间和键来检索特定对象。

1
2
3
4
profile = in_memory_store.get(namespace_for_memory, "user_profile")
profile.value

{'user_name': 'Lance', 'interests': ['骑行', '科技', '咖啡']}
Chatbot with profile schema 带有配置文件模式的聊天机器人

现在我们知道了如何为记忆指定一个模式,并将其保存到存储中。现在,我们如何根据这个特定的模式 创建 记忆?

在我们的聊天机器人中,我们 想要从一个聊天里创建记忆.这就是 structured outputs格式化输出 概念有用的地方。

LangChain 的 chat model 接口有一个 PROTECTED$11$ 方法用于强制结构化输出。这在我们需要确保输出符合某个模式时非常有用,而且它会为我们解析输出。

1
2
3
4
5
6
# 将模式绑定到模型
model_with_structure = model.with_structured_output(UserProfile)

# 调用模型生成符合模式的结构化输出
structured_output = model_with_structure.invoke([HumanMessage("我的名字是Lance,我喜欢骑自行车。")])
structured_output

Structured outputs(结构化输出)
指让大模型不再“随意说人话”,而是按你事先定义好的格式(JSON / 表格 / 枚举 / 嵌套对象等)精确返回数据。相当于给模型套了一个“模具”,保证输出可直接被代码解析、入库或传给下游系统,避免再用正则、字符串拼接去“猜”结果。

使用官方的大模型组件

1
2
3
4
5
from langchain_community.chat_models import ChatTongyi
model=ChatTongyi(
model="qwen-plus-2025-07-14",
api_key="sk-"
)

现在,让我们在聊天机器人中使用它。这只需要对 write_memory 函数进行轻微修改。我们使用 model_with_structure,如上所述,来生成一个与我们模式匹配的配置文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
from IPython.display import Image, display

from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.store.base import BaseStore

from langchain_core.messages import HumanMessage, SystemMessage, AIMessage
from langchain_core.runnables.config import RunnableConfig

# 聊天机器人指令
MODEL_SYSTEM_MESSAGE = """你是一个拥有记忆功能的 helpful 助手,可以提供关于用户的信息。
如果你有这个用户的记忆,请使用它来个性化你的回复。
以下是记忆内容(可能为空):{memory}"""

# 根据聊天历史和任何现有记忆创建新记忆
CREATE_MEMORY_INSTRUCTION = """根据用户的聊天历史创建或更新用户档案记忆。
这将被保存为长期记忆。如果存在现有记忆,只需更新它。
以下是现有记忆(可能为空):{memory}"""

def call_model(state: MessagesState, config: RunnableConfig, store: BaseStore):

"""从存储中加载记忆并使用它来个性化聊天机器人的回复。"""

# 从配置中获取用户ID
user_id = config["configurable"]["user_id"]

# 从存储中检索记忆
namespace = ("memory", user_id)
existing_memory = store.get(namespace, "user_memory")

# 为系统提示格式化记忆
if existing_memory and existing_memory.value:
memory_dict = existing_memory.value
formatted_memory = (
f"姓名: {memory_dict.get('user_name', '未知')}\n"
f"兴趣: {', '.join(memory_dict.get('interests', []))}"
)
else:
formatted_memory = None

# 在系统提示中格式化记忆
system_msg = MODEL_SYSTEM_MESSAGE.format(memory=formatted_memory)

# 使用记忆以及聊天历史来回复
response = model.invoke([SystemMessage(content=system_msg)]+state["messages"])

return {"messages": response}

def write_memory(state: MessagesState, config: RunnableConfig, store: BaseStore):

"""反思聊天历史并将记忆保存到存储中。"""

# 从配置中获取用户ID
user_id = config["configurable"]["user_id"]

# 从存储中检索现有记忆
namespace = ("memory", user_id)
existing_memory = store.get(namespace, "user_memory")

# 为系统提示格式化记忆
if existing_memory and existing_memory.value:
memory_dict = existing_memory.value
formatted_memory = (
f"姓名: {memory_dict.get('user_name', '未知')}\n"
f"兴趣: {', '.join(memory_dict.get('interests', []))}"
)
else:
formatted_memory = None

# 在指令中格式化现有记忆
system_msg = CREATE_MEMORY_INSTRUCTION.format(memory=formatted_memory)

# 调用模型生成符合模式的结构化输出
new_memory = model_with_structure.invoke([SystemMessage(content=system_msg)]+state['messages'])

# 覆盖现有的用户档案记忆
key = "user_memory"
store.put(namespace, key, new_memory)

# 定义图
builder = StateGraph(MessagesState)
builder.add_node("call_model", call_model)
builder.add_node("write_memory", write_memory)
builder.add_edge(START, "call_model")
builder.add_edge("call_model", "write_memory")
builder.add_edge("write_memory", END)

# 用于长期(跨线程)记忆的存储
across_thread_memory = InMemoryStore()

# 用于短期(线程内)记忆的检查点
within_thread_memory = MemorySaver()

# 使用检查点和存储编译图
graph = builder.compile(checkpointer=within_thread_memory, store=across_thread_memory)

# 显示
display(Image(graph.get_graph(xray=1).draw_mermaid_png()))

image-20250726103516996

调用

1
2
3
4
5
6
7
8
9
10
# 我们提供线程ID用于短期(线程内)记忆
# 我们提供用户ID用于长期(跨线程)记忆
config = {"configurable": {"thread_id": "1", "user_id": "1"}}

# 用户输入
input_messages = [HumanMessage(content="你好,我的名字是Lance,我喜欢在旧金山骑自行车和在面包店吃饭。")]

# 运行图
for chunk in graph.stream({"messages": input_messages}, config, stream_mode="values"):
chunk["messages"][-1].pretty_print()

查看记忆

1
2
3
4
5
# Namespace for the memory to save
user_id = "1"
namespace = ("memory", user_id)
existing_memory = across_thread_memory.get(namespace, "user_memory")
existing_memory.value
Trustcall for creating and updating profile schemas Trustcall 用于创建和更新配置文件模式

Trustcall 是一个由 LangChain 团队开发的开源 Python 库,旨在解决大型语言模型(LLM)在生成或修改复杂 JSON 数据结构时效率低、易出错的问题

我们使用 create_extractor,传入模型以及我们的模式作为 tool。使用 TrustCall 时,可以以多种方式提供模式。

例如,我们可以传递一个 JSON 对象 / Python 字典或 Pydantic 模型。在底层,TrustCall 使用 tool calling 从输入的 messages 列表中生成 structured output。为了强制 Trustcall 生成 structured output,我们可以在 tool_choice 参数中包含模式名称。

我仅做了解了,感觉用处不多,用结果化输出就能达到效果

Chatbot with Collection Schema 带集合模式的聊天机器人

“collection” 指的是一种将语义记忆组织为多个独立文档(objects)的存储方式,而不是把所有信息都塞进一个巨大的“用户档案”(single profile)里。

假设你在做一个 AI 助手,用户说:

“我下周要去东京出差,住在涩谷区的 Sakura Hotel。”
“我朋友 Ken 也要来,他喜欢吃拉面。”

如果用 collection,你会存成两条记忆:

1
2
3
4
5
6
7
8
9
10
11
12
13
[
{
"type": "trip",
"destination": "Tokyo",
"date": "2025-08-04",
"hotel": "Sakura Hotel, Shibuya"
},
{
"type": "friend",
"name": "Ken",
"food_preference": "ramen"
}
]

每条记忆都是一个独立文档,后续你可以:

  • 搜索“trip”类型的记忆,找到用户的出差安排;
  • 搜索“friend”类型的记忆,找到 Ken 的偏好;
  • 更新 Ken 的信息时,只改一条记录,不会影响其它记忆。

Memory Agent 内存代理

现在,我们将把学到的内容整合起来,构建一个具有长期记忆的 agent

我们的代理 task_mAIstro 将帮助我们管理待办事项列表!

我们之前构建的聊天机器人 始终 会反思对话并保存记忆。task_mAIstro 将决定 何时 保存记忆(待办事项列表中的项目)。

我们之前构建的聊天机器人始终保存一种类型的记忆,即个人资料或集合。task_mAIstro 可以决定将数据保存到用户个人资料或 ToDo 事项集合中。

除此之外,task_mAIstro 还将管理程序性记忆。这允许用户更新创建ToDo项的偏好设置。

Creating an agent 创建一个代理

有许多不同的 agent 架构可供选择。在这里,我们将实现一个简单的内容,一个 ReAct 代理。这个代理将成为创建和管理待办事项列表的得力助手。

此代理可以决定更新三种类型的长期记忆:

(a) 创建或更新具有普通用户信息的用户 profile

(b) 在ToDo列表中添加或更新项目 collection

(c) 更新其自身的 instructions 以了解如何更新待办事项列表中的项目

1
2
3
4
5
6
from typing import TypedDict, Literal

# 更新记忆工具
class UpdateMemory(TypedDict):
""" 决定更新哪种记忆类型 """
update_type: Literal['user', 'todo', 'instructions']
  • 'user':用户相关信息
  • 'todo':待办事项
  • 'instructions':指令信息
Graph definition 图定义

我们添加了一个简单的路由器 route_message,它通过二元决策来节省内存。

module-6

Research Assistant 研究助理

我们的目标是围绕聊天模型构建一个轻量级、多智能体系统,以定制研究过程。

Source Selection

  • 用户可为研究自行选择任意输入源。

Planning

  • 用户提供主题后,系统生成一组 AI 分析师,每位分析师聚焦一个子主题。
  • 在研究开始前,采用 人机协同 方式对子主题进行精调。

LLM Utilization

  • 每位分析师基于所选源,与专家 AI 开展深度访谈。
  • 访谈采用多轮对话形式,以 STORM 论文所示方式提取详尽洞见。
  • 访谈过程将以“子图”形式记录,并保存各自内部状态。

Research Process

  • 专家 AI 并行收集信息,实时回答分析师提问。
  • 所有访谈通过 map-reduce 架构同时展开。

Screenshot 2024-08-26 at 7.26.33 PM.png

Generate Analysts: Human-In-The-Loop 生成分析师

创建分析师并使用人工循环(human-in-the-loop)来审查他们。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
from typing import List
from typing_extensions import TypedDict
from pydantic import BaseModel, Field

class Analyst(BaseModel):
"""
分析师模型类
用于定义单个分析师的基本信息和属性
"""
affiliation: str = Field(
description="分析师的主要隶属机构。",
)
name: str = Field(
description="分析师的姓名"
)
role: str = Field(
description="分析师在该主题背景下的角色。",
)
description: str = Field(
description="分析师的关注点、担忧和动机的描述。",
)

@property#@property 装饰器的作用是将一个方法转换为只读属性 ,让方法可以像访问属性一样使用,而不需要加括号调用。
def persona(self) -> str:
"""
生成分析师的人设信息
返回格式化的字符串包含分析师的所有关键信息
"""
return f"姓名: {self.name}\n角色: {self.role}\n隶属机构: {self.affiliation}\n描述: {self.description}\n"

class Perspectives(BaseModel):
"""
视角模型类
用于管理多个分析师的集合
"""
analysts: List[Analyst] = Field(
description="包含角色和隶属机构的分析师综合列表。",
)

class GenerateAnalystsState(TypedDict):
"""
生成分析师状态类型定义
用于类型提示,定义在生成分析师过程中需要的状态信息
"""
topic: str # 研究主题 - 用户输入的研究话题
max_analysts: int # 分析师数量 - 需要生成的分析师最大数量
human_analyst_feedback: str # 人类反馈 - 来自用户的反馈信息,用于调整分析师生成
analysts: List[Analyst] # 提出问题的分析师 - 已生成的分析师列表

Field 是 Pydantic 提供的一个函数,主要用于为模型字段添加额外的元数据和配置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
from IPython.display import Image, display
from langgraph.graph import START, END, StateGraph
from langgraph.checkpoint.memory import MemorySaver
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage

# 分析师指令模板
analyst_instructions="""您需要创建一组AI分析师角色。请仔细遵循以下指令:

1. 首先,查看研究主题:
{topic}

2. 检查任何可选的编辑反馈,这些反馈用于指导分析师的创建:

{human_analyst_feedback}

3. 根据上述文档和/或反馈确定最有趣的主题。

4. 选择前 {max_analysts} 个主题。

5. 为每个主题分配一个分析师。

6. 重要:请以JSON格式返回您的响应,结构如下:
{{
"analysts": [
{{
"name": "分析师姓名",
"affiliation": "所属机构",
"role": "角色描述",
"description": "详细描述"
}}
]
}}

7. 请确保响应中包含'json'这个词,这是必需的。"""

def create_analysts(state: GenerateAnalystsState):
""" 创建分析师 """

topic=state['topic']
max_analysts=state['max_analysts']
human_analyst_feedback=state.get('human_analyst_feedback', '')#防止为空

# 强制结构化输出
structured_llm = llm.with_structured_output(Perspectives)

# 系统消息
system_message = analyst_instructions.format(topic=topic,human_analyst_feedback=human_analyst_feedback, max_analysts=max_analysts)

# 生成分析师
analysts = structured_llm.invoke([SystemMessage(content=system_message)]+[HumanMessage(content="生成分析师集合。")])

# 将分析师列表写入状态
return {"analysts": analysts.analysts}

def human_feedback(state: GenerateAnalystsState):
""" 无操作节点,应该在此处中断 """
pass

def should_continue(state: GenerateAnalystsState):
""" 返回下一个要执行的节点 """

# 检查是否有用户反馈
human_analyst_feedback=state.get('human_analyst_feedback', None)
if human_analyst_feedback:
return "create_analysts"

# 否则结束
return END

# 添加节点和边
builder = StateGraph(GenerateAnalystsState)
builder.add_node("create_analysts", create_analysts) # 添加创建分析师节点
builder.add_node("human_feedback", human_feedback) # 添加用户反馈节点
builder.add_edge(START, "create_analysts") # 从开始到创建分析师
builder.add_edge("create_analysts", "human_feedback") # 从创建分析师到用户反馈
builder.add_conditional_edges("human_feedback", should_continue, ["create_analysts", END]) # 条件边

# 编译图
memory = MemorySaver()
graph = builder.compile(interrupt_before=['human_feedback'], checkpointer=memory) # 在用户反馈前中断

# 显示图
display(Image(graph.get_graph(xray=1).draw_mermaid_png())) # 显示流程图

image-20250723171742889

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 输入
max_analysts = 3 # 最大分析师数量
topic = "采用LangGraph作为代理框架的好处" # 研究主题
thread = {"configurable": {"thread_id": "1"}} # 线程配置,用于会话跟踪

# 运行图直到第一次中断
for event in graph.stream({"topic":topic,"max_analysts":max_analysts,}, thread, stream_mode="values"):
# 审查结果
analysts = event.get('analysts', '') # 获取分析师列表
if analysts:
for analyst in analysts:
print(f"姓名: {analyst.name}") # 打印分析师姓名
print(f"隶属机构: {analyst.affiliation}") # 打印隶属机构
print(f"角色: {analyst.role}") # 打印角色
print(f"描述: {analyst.description}") # 打印描述
print("-" * 50) # 打印分隔线
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
姓名: 艾琳·史密斯
隶属机构: 人工智能与代理系统研究所
角色: LangGraph架构专家
描述: 专注于研究基于图结构的AI代理框架,尤其是LangGraph在复杂决策流程中的模块化与可扩展性优势。
--------------------------------------------------
姓名: 拉胡尔·梅赫塔
隶属机构: 分布式系统与AI实验室
角色: 代理框架性能分析师
描述: 研究LangGraph在多代理协作中的效率提升,以及其在异步通信、状态管理和错误恢复方面的优势。
--------------------------------------------------
姓名: 艾米丽·陈
隶属机构: 人机交互与智能系统中心
角色: 代理框架用户体验研究员
描述: 分析LangGraph如何支持开发者构建更具交互性和可解释性的AI代理系统,特别是在可视化流程设计和调试方面的优势。
--------------------------------------------------
1
2
3
# 我们现在像 human_feedback 节点一样更新状态
graph.update_state(thread, {"human_analyst_feedback":
"添加一个来自初创公司的人,以增加企业家视角"}, as_node="human_feedback")
1
2
3
4
5
6
7
8
9
10
11
# 继续图的执行
for event in graph.stream(None, thread, stream_mode="values"):
# 审查结果
analysts = event.get('analysts', '') # 获取分析师列表
if analysts:
for analyst in analysts:
print(f"姓名: {analyst.name}") # 打印分析师姓名
print(f"隶属机构: {analyst.affiliation}") # 打印隶属机构
print(f"角色: {analyst.role}") # 打印角色
print(f"描述: {analyst.description}") # 打印描述
print("-" * 50) # 打印分隔线
1
2
3
4
# 如果我们满意,那么就不提供任何反馈
further_feedback = None #如果满意就返回none,如果不满意就进行反馈
graph.update_state(thread, {"human_analyst_feedback":
further_feedback}, as_node="human_feedback")
Conduct Interview 进行面试

生成问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import operator
from typing import Annotated
from langgraph.graph import MessagesState

class InterviewState(MessagesState):
"""
访谈状态类
继承自MessagesState,用于管理访谈过程中的各种状态信息
"""
max_num_turns: int # 对话轮数上限
context: Annotated[list, operator.add] # 源文档,使用operator.add进行合并
analyst: Analyst # 提问的分析师
interview: str # 访谈记录(文字记录)
sections: list # 最终章节,我们在外部状态中重复此字段以用于Send() API

class SearchQuery(BaseModel):
"""
搜索查询模型
用于定义搜索查询的结构
"""
search_query: str = Field(None, description="用于检索的搜索查询。")
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# 问题生成指令
question_instructions = """你是一名分析师,任务是采访专家以了解特定主题。

你的目标是提炼出与你主题相关的有趣且具体的见解。

1. 有趣的:人们会感到惊讶或不明显的见解。

2. 具体的:避免泛泛而谈的见解,包含来自专家的具体例子。

这是你的关注主题和目标集合:{goals}

首先使用符合你人设的名字介绍自己,然后提出你的问题。

继续提问以深入挖掘和细化你对该主题的理解。

当你对理解满意时,用"非常感谢您的帮助!"来结束采访。

记住在整个回复中保持角色特征,体现提供给你的人设和目标。"""

def generate_question(state: InterviewState):
""" 生成问题的节点 """

# 获取状态
analyst = state["analyst"] # 当前分析师
messages = state["messages"] # 对话历史

# 生成问题
system_message = question_instructions.format(goals=analyst.persona) # 格式化系统消息
question = llm.invoke([SystemMessage(content=system_message)]+messages) # 调用LLM生成问题

# 将消息写入状态
return {"messages": [question]} # 返回新生成的问题

生成问题的并行处理

专家将并行从多个来源收集信息以回答问题。

  • 特定网站:例如通过 WebBaseLoader
  • 已索引文档:例如通过 RAG
  • 网页搜索
  • 维基百科搜索

你可以尝试不同的网络搜索工具,比如 Tavily

1
2
3
4
5
6
7
8
# Web search tool
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_tavily import TavilySearch
#tavily_search = TavilySearchResults(max_results=3)
tavily_search=TavilySearch(max_result=3)

# Wikipedia search tool
from langchain_community.document_loaders import WikipediaLoader

现在,我们创建节点以搜索网络和维基百科。

我们还将创建一个节点来回答分析师的问题。最后,我们将创建节点以保存完整的采访内容,并撰写采访的摘要(“部分”)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
from langchain_core.messages import get_buffer_string

# 搜索查询编写
search_instructions = SystemMessage(content=f"""你将获得分析师和专家之间的对话。

你的目标是生成一个结构化的 JSON 对象,该对象包含一个用于网络搜索的查询字符串。

请严格按照以下步骤操作:

1. 仔细分析整个对话。
2. 特别关注分析师提出的最后一个问题。
3. 根据该问题,生成一个清晰、简洁、有效的搜索查询字符串。
4. **你的最终输出必须是一个严格的 JSON 对象,格式如下,不要包含任何其他文字或解释:**
{{"search_query": "你的查询字符串放在这里"}}

**示例:**
如果最后一个问题涉及 "LangGraph 与其他代理框架(如 AutoGen)相比的优势",
你的输出必须严格是:
{{"search_query": "LangGraph vs AutoGen agent framework advantages"}}

分析师的最后一个问题才是关键,请基于它生成查询。

**你的输出:**""")

def search_web(state: InterviewState):

""" 从网络搜索中检索文档 """

# 搜索查询
structured_llm = llm.with_structured_output(SearchQuery)
search_query = structured_llm.invoke([search_instructions]+state['messages'])

# 搜索
search_docs = tavily_search.invoke(search_query.search_query)

# 格式化
formatted_search_docs = "\n\n---\n\n".join(
[
f'<Document href="{doc["url"]}"/>\n{doc["content"]}\n</Document>'
for doc in search_docs
]
)

return {"context": [formatted_search_docs]}

def search_wikipedia(state: InterviewState):

""" 从维基百科检索文档 """

# # 搜索查询
# structured_llm = llm.with_structured_output(SearchQuery)
# search_query = structured_llm.invoke([search_instructions]+state['messages'])

# # 搜索
# search_docs = WikipediaLoader(query=search_query.search_query,
# load_max_docs=2).load()

# # 格式化
# formatted_search_docs = "\n\n---\n\n".join(
# [
# f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>'
# for doc in search_docs
# ]
# )

# return {"context": [formatted_search_docs]}
return {"context": [""]}

answer_instructions = """你是一位正在接受分析师采访的专家。

这是分析师的关注领域:{goals}。

你的目标是回答面试官提出的问题。

要回答问题,请使用此上下文:

{context}

回答问题时,请遵循以下准则:

1. 仅使用上下文中提供的信息。

2. 不要引入外部信息或在上下文中明确说明之外进行假设。

3. 上下文包含每个独立文档主题的来源。

4. 在任何相关陈述旁边包含这些来源。例如,对于来源 # 1 使用 [1]。

5. 在答案底部按顺序列出你的来源。 [1] 来源 1,[2] 来源 2,等等

6. 如果来源是:<Document source="assistant/docs/llama3_1.pdf" page="7"/>' 那么只需列出:

[1] assistant/docs/llama3_1.pdf, page 7

并跳过括号的添加以及引用中的 Document source 前言。"""

def generate_answer(state: InterviewState):

""" 回答问题的节点 """

# 获取状态
analyst = state["analyst"]
messages = state["messages"]
context = state["context"]

# 回答问题
system_message = answer_instructions.format(goals=analyst.persona, context=context)
answer = llm.invoke([SystemMessage(content=system_message)]+messages)

# 将消息命名为来自专家
answer.name = "expert"

# 将其附加到状态
return {"messages": [answer]}

def save_interview(state: InterviewState):

""" 保存采访 """

# 获取消息
messages = state["messages"]

# 将采访转换为字符串
interview = get_buffer_string(messages)

# 保存到 interviews 键
return {"interview": interview}

def route_messages(state: InterviewState,
name: str = "expert"):

""" 在问题和答案之间路由 """

# 获取消息
messages = state["messages"]
max_num_turns = state.get('max_num_turns',2)

# 检查专家答案的数量
#isinstance(m, AIMessage) :检查当前消息 m 是否是 AIMessage 类的实例。这通常用于识别由 AI 模型生成的消息。
num_responses = len(
[m for m in messages if isinstance(m, AIMessage) and m.name == name]
)

# 如果专家回答的次数超过最大轮数,则结束
if num_responses >= max_num_turns:
return 'save_interview'

# 获取提出的最后一个问题,以检查它是否表示讨论结束
#messages[-2] :分析师提出的 最后一个问题 。
#messages[-1] :专家对这个问题的 最新回答 。
last_question = messages[-2]

if "非常感谢你的帮助" in last_question.content:
return 'save_interview'
return "ask_question"

section_writer_instructions = """你是一位专业的科技作家。

你的任务是根据一组源文档创建一份简短、易于理解的报告部分。

1. 分析源文档的内容:
- 每个源文档的名称都在文档开头,带有 <Document 标签。

2. 使用 markdown 格式创建报告结构:
- 使用 ## 作为章节标题
- 使用 ### 作为小节标题

3. 按照此结构编写报告:
a. 标题 (## header)
b. 摘要 (### header)
c. 来源 (### header)

4. 根据分析师的关注领域,使你的标题引人入胜:
{focus}

5. 对于摘要部分:
- 设置与分析师关注领域相关的通用背景/上下文的摘要
- 强调从采访中收集到的新颖、有趣或令人惊讶的见解
- 创建一个使用过的源文档的编号列表
- 不要提及面试官或专家的姓名
- 目标是最多约 400 字
- 根据源文档中的信息,在报告中使用编号来源(例如,[1]、[2])

6. 在来源部分:
- 包含报告中使用的所有来源
- 提供相关网站或特定文档路径的完整链接
- 每个来源用换行符分隔。在每行末尾使用两个空格以在 Markdown 中创建换行符。
- 它看起来像:

### 来源
[1] 链接或文档名称
[2] 链接或文档名称

7. 务必合并来源。例如,这不正确:

[3] https://ai.meta.com/blog/meta-llama-3-1/
[4] https://ai.meta.com/blog/meta-llama-3-1/

不应有冗余来源。它应该只是:

[3] https://ai.meta.com/blog/meta-llama-3-1/

8. 最终审查:
- 确保报告遵循所需的结构
- 在报告标题之前不包含任何前言
- 检查所有准则是否已遵循"""

def write_section(state: InterviewState):

""" 编写章节的节点 """

# 获取状态
interview = state["interview"]
context = state["context"]
analyst = state["analyst"]

# 使用从采访(上下文)或采访本身(interview)收集的源文档编写章节
system_message = section_writer_instructions.format(focus=analyst.description)
section = llm.invoke([SystemMessage(content=system_message)]+[HumanMessage(content=f"Use this source to write your section: {context}")])

# 将其附加到状态
return {"sections": [section.content]}

# 添加节点和边
interview_builder = StateGraph(InterviewState)
interview_builder.add_node("ask_question", generate_question)
interview_builder.add_node("search_web", search_web)
interview_builder.add_node("search_wikipedia", search_wikipedia)
interview_builder.add_node("answer_question", generate_answer)
interview_builder.add_node("save_interview", save_interview)
interview_builder.add_node("write_section", write_section)

# 流程
interview_builder.add_edge(START, "ask_question")
interview_builder.add_edge("ask_question", "search_web")
interview_builder.add_edge("ask_question", "search_wikipedia")
interview_builder.add_edge("search_web", "answer_question")
interview_builder.add_edge("search_wikipedia", "answer_question")
interview_builder.add_conditional_edges("answer_question", route_messages,['ask_question','save_interview'])
interview_builder.add_edge("save_interview", "write_section")
interview_builder.add_edge("write_section", END)

# 采访
memory = MemorySaver()
interview_graph = interview_builder.compile(checkpointer=memory).with_config(run_name="Conduct Interviews")

# 视图
display(Image(interview_graph.get_graph().draw_mermaid_png()))

image-20250725101725158

Map-Reduce(Parallelze interviews: Map-Reduce) 并行化访谈

我们通过 Send() API 并行化处理访谈,这是一个映射步骤。我们将它们在 reduce 步骤中组合成报告正文。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import operator
from typing import List, Annotated
from typing_extensions import TypedDict

class ResearchGraphState(TypedDict):
topic: str # 研究主题 (字符串类型)
max_analysts: int # 分析师数量 (整数类型)
human_analyst_feedback: str # 人类分析师的反馈 (字符串类型)
analysts: List[Analyst] # 提问的分析师列表 (Analyst 对象的列表)
sections: Annotated[list, operator.add] # 报告章节列表 (使用 operator.add 作为 Send() API 的键,意味着列表可以通过相加来合并)
introduction: str # 最终报告的引言部分 (字符串类型)
content: str # 最终报告的内容主体部分 (字符串类型)
conclusion: str # 最终报告的结论部分 (字符串类型)
final_report: str # 最终完整的报告 (字符串类型)

# 从 langgraph.constants 导入 Send 类
from langgraph.constants import Send
def initiate_all_interviews(state: ResearchGraphState):
""" 这是“map”(映射)步骤,我们使用 Send API 并行运行每个采访子图 """

# 检查是否存在人类反馈
human_analyst_feedback = state.get('human_analyst_feedback')
if human_analyst_feedback:
# 如果有反馈,则返回到 "create_analysts" 节点进行调整
return "create_analysts"

# 否则,通过 Send() API 并行启动所有采访
else:
topic = state["topic"]
# 为每个分析师创建一个 Send 对象
# 目标是 "conduct_interview" 节点
# 传递的参数包括该分析师对象和一条初始消息
return [Send("conduct_interview", {"analyst": analyst,
"messages": [HumanMessage(
content=f"所以你说你正在写一篇关于 {topic} 的文章?"
)
]}) for analyst in state["analysts"]]
Finalize 最终确定

我们添加最后一个步骤,为最终报告撰写引言和结论。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import operator
from typing import List, Annotated
from typing_extensions import TypedDict

# 定义用于撰写最终报告的指令模板
report_writer_instructions = """你是一位正在撰写关于以下主题报告的技术作家:

{topic}

你有一个分析师团队。每个分析师做了两件事:

1. 他们就一个特定的子主题与专家进行了采访。
2. 他们将他们的发现写成了一份备忘录。

你的任务:

1. 你将得到一份来自你所有分析师的备忘录集合。
2. 仔细思考每份备忘录中的见解。
3. 将这些见解整合成一个清晰的整体摘要,把所有备忘录中的核心思想联系起来。
4. 将每份备忘录中的要点总结成一个连贯的单一叙述。

报告格式要求:

1. 使用 Markdown 格式。
2. 报告开头不要有前言。
3. 不要使用子标题。
4. 报告开头使用一个一级标题:## Insights (## 见解)
5. 在报告中不要提及任何分析师的名字。
6. 保留备忘录中的所有引用,这些引用会用方括号标注,例如 [1] 或 [2]。
7. 创建一个最终的、合并的来源列表,并添加到以 `## Sources` 为标题的部分。
8. 按顺序列出你的来源,不要重复。

[1] 来源 1
[2] 来源 2

以下是你的分析师提供的备忘录,你需要根据它们来撰写报告:

{context}"""

def write_report(state: ResearchGraphState):
""" 撰写报告主体内容的节点函数 """
# 获取所有章节(备忘录)
sections = state["sections"]
topic = state["topic"]

# 将所有章节(备忘录)连接成一个字符串
formatted_str_sections = "\n\n".join([f"{section}" for section in sections])

# 使用指令模板和备忘录内容,调用 LLM 生成最终报告
system_message = report_writer_instructions.format(topic=topic, context=formatted_str_sections)
report = llm.invoke([SystemMessage(content=system_message)] + [HumanMessage(content="根据这些备忘录写一份报告。")])
# 返回报告内容
return {"content": report.content}

# 定义用于撰写引言和结论的指令模板
intro_conclusion_instructions = """你是一位正在完成关于 {topic} 报告的技术作家。

你将得到报告的所有章节。

你的工作是撰写一个清晰且有说服力的引言或结论部分。

用户会指示你是写引言还是结论。

两个部分都不要有前言。

目标大约 100 个词,简洁地预览(对于引言)或回顾(对于结论)报告的所有章节。

使用 Markdown 格式。

对于你的引言,创建一个引人注目的标题,并使用 # 标题级别。
对于你的引言,使用 ## Introduction (## 引言) 作为部分标题。

对于你的结论,使用 ## Conclusion (## 结论) 作为部分标题。

以下是供你参考以撰写相应部分的章节:{formatted_str_sections}"""

def write_introduction(state: ResearchGraphState):
""" 撰写报告引言的节点函数 """
# 获取所有章节
sections = state["sections"]
topic = state["topic"]

# 将所有章节连接成一个字符串
formatted_str_sections = "\n\n".join([f"{section}" for section in sections])

# 使用指令模板和章节内容,调用 LLM 生成引言
instructions = intro_conclusion_instructions.format(topic=topic, formatted_str_sections=formatted_str_sections)
intro = llm.invoke([instructions] + [HumanMessage(content="写报告的引言")])
# 返回引言内容
return {"introduction": intro.content}

def write_conclusion(state: ResearchGraphState):
""" 撰写报告结论的节点函数 """
# 获取所有章节
sections = state["sections"]
topic = state["topic"]

# 将所有章节连接成一个字符串
formatted_str_sections = "\n\n".join([f"{section}" for section in sections])

# 使用指令模板和章节内容,调用 LLM 生成结论
instructions = intro_conclusion_instructions.format(topic=topic, formatted_str_sections=formatted_str_sections)
conclusion = llm.invoke([instructions] + [HumanMessage(content="写报告的结论")])
# 返回结论内容
return {"conclusion": conclusion.content}

def finalize_report(state: ResearchGraphState):
""" 这是“reduce”(归约)步骤,我们收集所有部分,将它们组合起来,并进行反思以写出引言/结论 """
""" 最终整合报告的节点函数 """
# 获取报告主体内容
content = state["content"]
# 如果内容以 "## Insights" 开头,则移除这个标题
if content.startswith("## Insights"):
content = content.strip("## Insights")
# 尝试分离报告主体和来源部分
if "## Sources" in content:
try:
content, sources = content.split("\n## Sources\n")
except:
sources = None # 如果分离失败,则来源部分为空
else:
sources = None

# 将引言、主体内容和结论连接起来形成最终报告
final_report = state["introduction"] + "\n\n---\n\n" + content + "\n\n---\n\n" + state["conclusion"]
# 如果存在来源部分,则将其附加到最终报告末尾
if sources is not None:
final_report += "\n\n## Sources\n" + sources
# 返回最终报告
return {"final_report": final_report}

# 添加节点和边
builder = StateGraph(ResearchGraphState)
builder.add_node("create_analysts", create_analysts)
builder.add_node("human_feedback", human_feedback)
builder.add_node("conduct_interview", interview_builder.compile()) # 将之前定义的采访图编译后作为一个节点
builder.add_node("write_report", write_report)
builder.add_node("write_introduction", write_introduction)
builder.add_node("write_conclusion", write_conclusion)
builder.add_node("finalize_report", finalize_report)

# 定义工作流逻辑
builder.add_edge(START, "create_analysts") # 从开始到创建分析师
builder.add_edge("create_analysts", "human_feedback") # 创建分析师后进入人类反馈环节
# 条件边:根据 human_feedback 节点的输出,决定是回到创建分析师还是开始采访
builder.add_conditional_edges("human_feedback", initiate_all_interviews, ["create_analysts", "conduct_interview"])
# 采访完成后,并行执行撰写报告、引言和结论
builder.add_edge("conduct_interview", "write_report")
builder.add_edge("conduct_interview", "write_introduction")
builder.add_edge("conduct_interview", "write_conclusion")
# 撰写完报告的三个部分后,汇聚到最终整合步骤
builder.add_edge(["write_conclusion", "write_report", "write_introduction"], "finalize_report")
builder.add_edge("finalize_report", END) # 最终整合后结束

# 编译图
memory = MemorySaver()
# 编译图,并设置在 'human_feedback' 节点前中断,以及使用检查点保存器
graph = builder.compile(interrupt_before=['human_feedback'], checkpointer=memory)
# 显示图的可视化表示
display(Image(graph.get_graph(xray=1).draw_mermaid_png()))

image-20250725102148881

测试

让我们提出一个关于 LangGraph 的开放式问题。

1
2
3
4
5
6
7
8
9
10
max_analysts = 3 
topic = "采用 LangGraph 作为代理框架的好处"
thread = {"configurable": {"thread_id": "1"}}

# 使用 stream 但不执行打印逻辑
for event in graph.stream({
"topic": topic,
"max_analysts": max_analysts
}, thread, stream_mode="values"):
pass # 不执行任何操作
1
2
3
#人工更新节点
graph.update_state(thread, {"human_analyst_feedback":
"请加入这家原生生成式 AI 创业公司的首席执行官。"}, as_node="human_feedback")
1
2
3
# 确认我们已经满意,返回none
graph.update_state(thread, {"human_analyst_feedback":
None}, as_node="human_feedback")
1
2
3
4
5
# 继续执行
for event in graph.stream(None, thread, stream_mode="updates"):
print("--Node--")
node_name = next(iter(event.keys()))
print(node_name)

image-20250725103159942

https://smith.langchain.com/public/6504cafd-d314-48d1-8640-57dc3f472e61/r

Elasticsearch 是当今最流行的 开源分布式搜索与分析引擎,用 Java 开发,基于 Apache Lucene 构建。它把全文检索、实时分析、时序数据、地理空间查询向量检索统一到一个平台,被广泛用于日志、指标、安全、企业搜索以及 AI/RAG 场景。

RAG 系统中,Elasticsearch 不仅是向量数据库,更是语义检索引擎和上下文构建器,更准确地说,是向量数据库 + 全文检索引擎的混合角色。

查找语句

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
GET /test_full_v1/_search
{
"query": {
"multi_match": {
"query": "高压旁路管道磁粉检测使用的是哪种磁化方法和磁悬液浓度?",
"fields": ["text", "report_name","report_url"]
}
}
}

GET /_cat/indices?v#查看所有的所有

GET test_full_v1/_count#查看文件数

GET test_full_v1/_mapping#查看映射

GET test_full_v1#查看索引内容

GET test_full_v1/_search#查看索引内容
{
"size": 3,
"query": { "match_all": {} }
}

GET /_cat/indices/zxj_test#查看索引是否存在

可视化平台kibana的内网路径:

es支持的几种检索函数

向量搜索

这里的向量搜索也就是稠密向量检索,过程为“把文本/图像等非结构化数据映射成高维向量 → 在向量空间里做近似最近邻(ANN)搜索 → 按相似度排序返回结果”

1
2
3
4
5
6
7
8
9
10
11
12
13
def vector_query(search_query: str):
# 使用与索引时相同的嵌入模型将查询文本转换为向量
vector = embeddings.embed_query(search_query)

# 构建Elasticsearch KNN查询结构
return {
"knn": {
"field": dense_vector_field, # 指定存储向量的字段名
"query_vector": vector, # 查询向量
"k": 5, # 返回最相似的5个结果
"num_candidates": 10, # 在10个候选中选择最优的5个(提高搜索效率和准确性)
}
}
bm25

BM25(Best Matching 25)也就是全文关键词搜索,或者叫传统关键词搜索,他的核心思想为“词频越高、文档越短、词越稀有,则相关性越高”,在 TF-IDF 的基础上引入词频饱和、文档长度归一化两项修正。

TF-IDF(Term Frequency–Inverse Document Frequency,词频-逆文档频率)是一种经典的 文本特征权重计算方法,用于衡量 一个词对一篇文档的重要性

项目中的bm25检索采取多字段匹配的方式,还有几个参数需要了解,如下

  1. ‘type’ :决定了如何把多个字段的 BM25 打分合并成最终得分,常用的参数有以下三种

    | type | 中文含义 | 打分逻辑 |
    | :——————————— | :—————- | :—————————————————————————————- |
    | best_fields(默认) | 最佳字段优先 | 取 得分最高的那个字段 做最终分(可用 tie_breaker 让次佳字段再贡献一点点)。 |
    | most_fields | 最多字段优先 | 把所有命中字段的得分 直接相加(类似 OR 逻辑),字段越多分越高。 |
    | cross_fields | 跨字段合并 | 把多个字段视为 一个虚拟大字段,统一计算 TF 和 IDF,解决“关键词分散在不同字段”问题。 |

  2. 'tie_breaker':当多个字段都匹配时,“最佳字段”得分 + 其余字段得分×tie_breaker 作为最终得分。

  3. 'operator':控制单个字段内的多个词项是“AND”还是“OR”关系。
    • "AND"
      要求同一个字段必须同时包含所有查询词,减少噪音,提高精准度。适合地址、姓名等跨字段严格匹配。
    • "OR"
      只要字段里出现任意一个词就匹配,召回量大,但可能引入不相关结果。

最后,每个字段还可以人工设置权重,如"fields": ["title^3", "content", "tags^2"]

1
2
3
4
5
6
7
8
9
10
11
def bm25_query(search_query: str):
# 使用BM25算法进行传统关键词匹配
return {
"query": {
"multi_match": { # 多字段匹配查询
"query": search_query,
"fields": ["text", "report_name", "report_url"] # 在这些字段中搜索
}
},
"size": 8, # 返回最多8个结果
}
混合检索

混合检索也就是 Reciprocal Rank Fusion(RRF),通过结合向量搜索和 BM25 搜索的结果综合判断。

但由于es中混合检索需要付费使用,后续检索效果评估时不做测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def hybrid_query(search_query: str):
# 使用与索引时相同的嵌入模型将查询文本转换为向量
vector = embeddings.embed_query(search_query)

# 构建混合搜索查询结构
return {
"retriever": {
# 使用RRF (Reciprocal Rank Fusion) 算法融合多个检索器的结果
"rrf": {
# 定义多个检索器列表
"retrievers": [
{
# 第一个检索器:多字段BM25关键词搜索
"standard": {
"query": {
# 使用multi_match在多个字段上进行BM25搜索
"multi_match": {
"query": search_query,
"type": "best_fields", # 选择最佳匹配字段
"fields": ["text", "report_name", "report_url"] # 在这些字段中搜索
}
}
}
},
{
# 第二个检索器:KNN向量近似搜索
"knn": {
"field": dense_vector_field, # 向量字段名(使用定义的变量)
"query_vector": vector, # 查询向量
"k": 5, # 返回5个最相似的向量结果
"num_candidates": 10, # 候选向量数量,用于提高搜索准确性
}
},
]
}
}
}
模糊检索

无论是在网页搜索、文件检索,还是数据库查询中,我们时常会因为拼写错误或信息不完整而无法找到需要的结果。模糊搜索(Fuzzy Search)应运而生,它通过识别与查询相似的词语来帮助我们获得更加灵活的搜索结果。

这里是将bm25与模糊搜索结合起来

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def fuzzy_query(search_query: str):
"""
创建模糊搜索查询函数,支持拼写错误和近似匹配

Args:
search_query (str): 用户输入的搜索查询文本

Returns:
Dict: Elasticsearch模糊搜索查询体
"""

return {
"query": {
# 使用match查询进行文本匹配
"match": {
# 在指定的文本字段中进行搜索
text_field: {
"query": search_query, # 用户的搜索查询文本
"fuzziness": "AUTO", # 自动模糊匹配设置
# fuzziness参数说明:
# - "AUTO":根据词长度自动调整模糊度
# - 0-2个字符:不允许错误
# - 3-5个字符:允许1个编辑距离错误
# - 5个以上字符:允许2个编辑距离错误
# - 也可以设置具体数值:0, 1, 2
}
},
},
}

  • AUTO 规则:0~2 字符不允许错;3~5 字符最多1错;>5 字符最多2错。
  • text 字段先分词,再对每个 token 做模糊 → 召回 Elasticsearch

elasticsearch的连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from langchain_elasticsearch import ElasticsearchRetriever
from embeddings_model import select_embeddings_model
# 根据模型名称选择嵌入模型
embedding_model_name='bge'
embeddings = select_embeddings_model(embedding_model_name)
dense_vector_field='vector'
text_field='text'
search_func=fuzzy_query
# 创建Elasticsearch检索器
retriever = ElasticsearchRetriever.from_es_params(
index_name="zxj_test", # 指定索引名称
body_func=fuzzy_query, # 查询函数
content_field="text", # 内容字段名
url='http://elasticsearch:9200/'# Elasticsearch服务器地址
)
print("连接成功")

elasticsearch的入库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
document = []
id_list = []

# 遍历文件夹中的所有.md文件
for filename in os.listdir(folder_path):
if filename.endswith(".md"):
file_path = os.path.join(folder_path, filename)

# 从文件名中提取chunk_id(去除.md扩展名)
chunk_id = filename.replace(".md", "")

# 读取MD文件内容
with open(file_path, 'r', encoding='utf-8') as file:
text_content = file.read()

# 查找对应的URL(根据文件名匹配)
report_url = ""
# 从folder_name中提取核心文档名(最后一个^后面的部分)
core_folder_name = folder_name.split('^')[-1] # 提取核心文档名

# 根据core_folder_name查找对应的URL
report_url = find_url_by_name_from_list(docs_data, core_folder_name)

# 构建document结构
doc = Document(
page_content=text_content,
metadata={
"report_name": folder_name,
"report_url": report_url,
"chunk_id": chunk_id
}
)
document.append(doc)
id_list.append(folder_name + "_" + chunk_id)

# 批量添加到向量库
es_vector_store.add_documents(documents=document, ids=id_list)
print(f" 文件夹 {folder_name} 已成功入库,共 {len(document)} 个文档块")

相关资料

Elasticsearch检索器 | 🦜️🔗 LangChain 框架

主流的检索策略

BM25 全文关键词检索

BM25(Best Matching 25)是一种久经考验的排序算法,广泛应用于传统搜索引擎中。它基于“词袋模型”,核心思想是通过关键词匹配程度来衡量文档与查询的相关性。

核心原理概览:

词频 (Term Frequency, TF):一个词在文档中出现的次数越多,通常意味着这篇文档与该词相关性越高。但BM25会进行“饱和度”处理,避免某些超高频词过度影响结果。可以想象成,一篇文章提到“苹果”10次,比提到1次更相关,但提到100次和提到50次,在“苹果”这个主题上的相关性增加可能就没那么显著了。
逆文档频率 (Inverse Document Frequency, IDF):如果一个词在整个文档集合中都很罕见(只在少数文档中出现),那么它对于区分文档主题就更重要,IDF值就高。比如“量子纠缠”这个词远比“的”、“是”这类词更能锁定专业文档。
文档长度归一化:用于平衡长短文档的得分,避免长文档仅仅因为内容多而获得不公平的高分。
工作方式举例:当用户搜索“深度学习入门教程”时,BM25会倾向于找出那些更频繁出现“深度学习”、“入门”、“教程”这些词,且这些词相对不那么常见的文档。

1. 公式整体结构

  • IDF 部分:衡量词项 $ q_i $ 的区分能力(逆文档频率)。
  • TF 部分:衡量词项 $ q_i $ 在文档 $ D $ 中的匹配程度(词频归一化)。
  • 求和:对查询中的所有词项 $ q_i $ 的得分求和,得到最终相关性分数。

2. IDF 部分

  • 意义:IDF 值越高,词项 $ q_i $ 越能区分文档(常见于少数文档中的词)。
  • 平滑处理:分子和分母均加 0.5,避免极端值(如 $ n(q_i) = 0 $ 时 ID 无限大)。
  • 参数
    • $ N $:文档总数。
    • $ n(q_i) $:包含 $ q_i $ 的文档数。

3. 词频归一化项(TF)

  • 非线性饱和:分子和分母均包含 $ f(q_i, D) $,使词频增长带来的增益逐渐减小(避免长文档中重复词项的过度影响)。
  • 文档长度归一化
    • $ |D| $:文档 $ D $ 的长度(词数)。
    • $ \text{avgdl} $:整个文档集合的平均文档长度。
    • $ b $:控制文档长度对得分的影响($ b=1 $ 时完全归一化,$ b=0 $ 时忽略长度)。
  • 参数
    • $ k_1 $:控制词频饱和的系数(通常 $ k_1 \in [1.2, 2.0] $,默认 $ k_1 = 1.5 $)。
    • $ b $:默认 $ b = 0.75 $。

这个公式虽然看起来有些复杂,但它精妙地平衡了词频、词的稀有度以及文档长度这几个核心因素,是BM25算法效果出色的关键。

BM25、全文搜索与倒排索引:它们是如何协同工作的?

这三者是构建搜索系统的关键组件:

  • 全文搜索 (Full-Text Search):这是我们希望达成的目标——在大量文本中找到包含特定信息的文档。

  • 倒排索引 (Inverted Index):这是实现高效全文搜索的数据结构基础。它像一本书末尾的详细“关键词索引”,记录了每个词出现在哪些文档中以及相关位置信息。当用户查询时,系统可以通过倒排索引快速定位到包含查询词的候选文档。

  • BM25:在通过倒排索引找到候选文档后,BM25算法登场,为每个文档计算一个相关性得分,然后按分排序,将最相关的结果呈现给用户。

把它们比作在图书馆找特定主题的书籍:

  • 你告诉图书管理员你要找关于“天体物理学”的书(用户查询)。
  • 管理员查阅一个总卡片索引(倒排索引),迅速告诉你哪些书架(文档ID)上有包含“天体物理学”这个词的书。
  • 你走到这些书架,快速翻阅这些书(BM25评分过程),根据目录、摘要和提及“天体物理学”的频繁程度及重要性,判断哪几本最符合你的需求,并把它们按相关性高低排好。

Dense Vector / kNN 向量语义检索

向量语义检索(Dense Vector / k-Nearest Neighbor,简称 kNN)是一种基于高维稠密向量表示的语义检索技术,与传统关键词倒排索引不同,它通过自然语言的上下文含义而非字面匹配来寻找最相关的文档或实体。

  1. Embedding:使用预训练语言模型(如 BERT、Sentence-Transformers)把文本映射为固定维度的稠密向量(通常 128–1024 维)。

  2. kNN 搜索:给定查询向量,在向量空间中找距离最近的 k 个文档向量。距离度量常见:

    • 余弦相似度(cosine)
    • 内积 / dot-product
    • L2 欧氏距离

Hybrid Retrieval 混合检索

既然不同的检索策略各有千秋(例如,BM25擅长关键词精确匹配,Embedding擅长语义理解),那么将它们结合起来,是不是能达到“1+1>2”的效果呢?答案是肯定的,这就是混合检索的核心思想。

常见做法:同时使用BM25(或其他稀疏检索方法)和一种或多种Embedding检索方法,然后将它们各自的检索结果进行融合排序。

融合策略举例:

RRF (Reciprocal Rank Fusion, 倒数排序融合):一种简单但鲁棒的融合方法。它不关心不同检索系统输出的原始得分,只关心每个文档在各自结果列表中的排名。一个文档 doc 的RRF得分计算如下:

其中:

  • Systems 是所有参与融合的检索系统的集合。
  • rank_s(doc) 是文档 doc 在检索系统 s 给出的结果列表中的排名(例如,第一名是1,第二名是2)。
  • k 是一个小常数(例如,常设置为60),用于平滑得分,避免排名靠后的文档得分过小或排名第一的文档得分占比过高。

Reranker 重排序器

经过上述一种或多种检索策略的“粗筛”(召回阶段),我们通常会得到一个包含较多候选文档的列表(比如几百个)。为了进一步提升最终喂给LLM的文档质量,Reranker(重排序器)应运而生。它相当于一位“精炼师”或“质量品鉴官”,对初步召回的结果进行更细致、更精准的二次排序。

  1. 召回
    倒排 / 向量 / 混合先给 Top-N(N≈100~1 k)。
  2. 拼特征
    (query, doc) 拼成一条输入:
    [CLS] 用户问题 [SEP] 标题+正文 [SEP]
  3. 打分
    扔给 Cross-Encoder 或 ColBERT → 输出一个相关性分数
  4. 重排
    按分数从高到低重新排序,只留 Top-K(K≈10~20)。
  5. 返回
    重排后的短列表交给前端或 LLM,完成一次“精读”。

参考

大模型RAG | 深入了解几种主流的检索策略(BM25、Embedding、混合检索、Reranker)-CSDN博客

项目关于elasticsearch代码阅读记录

阅读elasticsearch代码相关记录:

  1. embedding_model.select_embeddings_model:根据指定的模型名称加载
  2. make_es_vector_store
    1. docs_url = pd.read_excel(‘docs_new.xlsx’),从excel加载url并进行数据处理,保存筛选后的ur
    2. 完成文件加载测试:xizhang/retrival/docfile_test/test.py;
    3. 初始化es,使用elastic_search.load_es_index加载存储索引
    4. 完成测试elasticsearch连接与索引构建(索引名zxj_test):/aisys/repo_dev/xizhang/retrival/elasticsearch_test/test_es_connect.py,/aisys/repo_dev/xizhang/retrival/elasticsearch_test/test_add_es.py
    5. 顺序批量处理文件:共16000多份,每十份为一批进行处理,使用download_pdf.py进行文件下载,使用,使用vector_base.rewrite_file对文件进行处理,这里可以修改代码,增加对mineru处理pdf的markdown文件的处理,返回Document对象列表;
  3. elastic_search

重写了检索策略的函数,包括BM25,KNN,混合搜索

  1. elastic_retriever创建Elasticsearch检索器:根据搜索类型选择对应的查询函数,创建Elasticsearch检索器ElasticsearchRetriever.from_es_params

  2. retrievers

    1. 定义函数select_retriever,根据指定的名称选择并返回相应的检索器,目前只有bm25
0%