MCP 客户端实战

配置环境

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 创建项目目录
uv init mcp-client
cd mcp-client

# 创建虚拟环境
uv venv

# 激活虚拟环境
# 在 Windows 上:
.venv\Scripts\activate
# 在 Unix 或 MacOS 上:
source .venv/bin/activate

# 安装所需包
uv add mcp anthropic python-dotenv
#使用镜像源安装
uv add mcp anthropic python-dotenv --index-url https://pypi.tuna.tsinghua.edu.cn/simple/

# 删除样板文件
# 在 Windows 上:
del main.py
# 在 Unix 或 MacOS 上:
rm main.py

# 创建我们的主文件
touch client.py

设置 API 密钥

创建一个 .env 文件来存储它:

1
2
# Create .env file
touch .env

将您的密钥添加到 .env 文件:

1
ANTHROPIC_API_KEY=<your key here>

.env 添加到您的 .gitignore

1
echo ".env" >> .gitignore

.env 文件名添加到 .gitignore 文件中,这样 Git 就会忽略 .env 文件,不会将其纳入版本控制。

创建客户端

基本客户端结构

首先,让我们设置我们的导入并创建基本的客户端类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import asyncio
from typing import Optional
from contextlib import AsyncExitStack

from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client

from anthropic import Anthropic
from dotenv import load_dotenv

load_dotenv() # 从 .env 文件加载环境变量

class MCPClient:
def __init__(self):
# 初始化会话和客户端对象
self.session: Optional[ClientSession] = None # MCP客户端会话
self.exit_stack = AsyncExitStack() # 异步上下文管理器堆栈,用于资源清理
self.anthropic = Anthropic() # Anthropic AI 客户端

# 后续方法将在这里定义

服务器连接管理

接下来,我们将实现连接到 MCP 服务器的功能:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
async def connect_to_server(self, server_script_path: str):
"""连接到MCP服务器

Args:
server_script_path: 服务器脚本路径 (.py 或 .js 文件)
"""
# 检查是否为Python文件
is_python = server_script_path.endswith('.py')
# 检查是否为JavaScript文件
is_js = server_script_path.endswith('.js')

# 如果不是Python或JavaScript文件,则抛出错误
if not (is_python or is_js):
raise ValueError("服务器脚本必须是 .py 或 .js 文件")

# 根据文件类型确定执行命令
command = "python" if is_python else "node"

# 创建服务器参数对象
server_params = StdioServerParameters(
command=command, # 执行命令
args=[server_script_path], # 脚本路径作为参数
env=None # 环境变量(使用默认环境)
)

# 建立stdio客户端连接并将其添加到异步上下文管理器中
stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
self.stdio, self.write = stdio_transport

# 创建客户端会话并将其添加到异步上下文管理器中
self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))

# 初始化会话
await self.session.initialize()

# 列出可用的工具
response = await self.session.list_tools()
tools = response.tools

# 打印连接的服务器提供的工具列表
print("\n已连接到服务器,可用工具:", [tool.name for tool in tools])

查询处理逻辑

现在让我们添加处理查询和调用工具的核心功能:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
async def process_query(self, query: str) -> str:
"""使用Claude和可用工具处理查询"""
# 构建消息列表
messages = [
{
"role": "user", # 用户角色
"content": query # 用户查询内容
}
]

# 获取可用工具列表
response = await self.session.list_tools()
available_tools = [{
"name": tool.name, # 工具名称
"description": tool.description, # 工具描述
"input_schema": tool.inputSchema # 工具输入模式
} for tool in response.tools]

# 初始Claude API调用
response = self.anthropic.messages.create(
model="qwen3-235b-a22b", # 使用的模型
max_tokens=1000, # 最大返回令牌数
messages=messages, # 消息历史
tools=available_tools # 可用工具
)

# 处理响应并处理工具调用
final_text = [] # 存储最终文本结果

assistant_message_content = [] # 存储助手消息内容
for content in response.content: # 遍历响应内容
if content.type == 'text': # 如果是文本内容
final_text.append(content.text) # 添加到最终结果
assistant_message_content.append(content) # 添加到助手消息
elif content.type == 'tool_use': # 如果是工具调用
tool_name = content.name # 工具名称
tool_args = content.input # 工具参数

# 执行工具调用
result = await self.session.call_tool(tool_name, tool_args)
final_text.append(f"[调用工具 {tool_name},参数 {tool_args}]")

assistant_message_content.append(content)
# 添加助手消息到历史
messages.append({
"role": "assistant",
"content": assistant_message_content
})
# 添加工具执行结果到历史
messages.append({
"role": "user",
"content": [
{
"type": "tool_result", # 工具结果类型
"tool_use_id": content.id, # 工具使用ID
"content": result.content # 工具执行结果
}
]
})

# 获取Claude的下一个响应
response = self.anthropic.messages.create(
model="qwen3-235b-a22b",
max_tokens=1000,
messages=messages,
tools=available_tools
)

# 添加响应文本到最终结果
final_text.append(response.content[0].text)

# 返回连接后的最终文本结果
return "\n".join(final_text)

交互式聊天界面

现在我们将添加聊天循环和清理功能:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
async def chat_loop(self):
"""运行交互式聊天循环"""
print("\nMCP客户端已启动!")
print("输入您的问题或输入'quit'退出。")

while True: # 无限循环,持续接收用户输入
try:
query = input("\n问题: ").strip() # 获取用户输入并去除首尾空格

if query.lower() == 'quit': # 如果用户输入'quit'(不区分大小写)
break # 退出循环

# 处理用户查询并获取响应
response = await self.process_query(query)
print("\n" + response) # 打印AI响应结果

except Exception as e: # 捕获所有异常
print(f"\n错误: {str(e)}") # 打印错误信息

async def cleanup(self):
"""清理资源"""
await self.exit_stack.aclose() # 异步关闭所有在exit_stack中管理的资源

主入口点

最后,我们将添加主要的执行逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
async def main():
# 检查命令行参数数量,如果少于2个则显示使用说明
if len(sys.argv) < 2:
print("用法: python client.py <服务器脚本路径>")
sys.exit(1) # 退出程序,返回错误码1

# 创建MCP客户端实例
client = MCPClient()
try:
# 连接到服务器,sys.argv[1]是第一个命令行参数(服务器脚本路径)
await client.connect_to_server(sys.argv[1])
# 启动交互式聊天循环
await client.chat_loop()
finally:
# 确保程序结束时清理资源
await client.cleanup()

# 程序入口点
if __name__ == "__main__":
import sys # 导入sys模块用于处理命令行参数
# 运行异步主函数
asyncio.run(main())
QQ20250801-164738

运行客户端

要使您的客户端与任何 MCP 服务器运行:

1
2
uv run client.py path/to/server.py # python server
uv run client.py path/to/build/index.js # node server

客户端将:

  1. 连接到指定服务器
  2. 列出可用工具
  3. 开始一个交互式聊天会话,您可以在其中:
    • 输入查询
    • 查看工具执行情况
    • 从 Claude 获取响应

运作流程

当你提交查询时:

  1. 客户端从服务器获取可用工具列表
  2. 你的查询连同工具描述一起发送给 Claude
  3. Claude 决定使用哪些工具(如果有的话)
  4. 客户端通过服务器执行任何请求的工具调用
  5. 结果会发送回 Claude
  6. Claude 提供自然语言响应
  7. 响应显示给您

参考资料

Build an MCP Client - Model Context Protocol

适配openai版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
import asyncio
import json
import sys
from typing import Optional
from contextlib import AsyncExitStack

from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client

from openai import OpenAI
from dotenv import load_dotenv
import os

load_dotenv() # 从 .env 文件加载环境变量

class MCPClient:
def __init__(self):
# 初始化会话和客户端对象
self.session: Optional[ClientSession] = None # MCP客户端会话
self.exit_stack = AsyncExitStack() # 异步上下文管理器堆栈,用于资源清理
self.anthropic = OpenAI(
api_key=os.getenv("DASHSCOPE_API_KEY"),
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"
) # 使用OpenAI兼容模式连接通义千问

async def connect_to_server(self, server_script_path: str):
"""连接到MCP服务器

Args:
server_script_path (str): 服务器脚本路径,支持.py或.js文件

Raises:
ValueError: 当脚本文件不是.py或.js格式时抛出
"""
# 检查是否为Python文件
is_python = server_script_path.endswith('.py')
# 检查是否为JavaScript文件
is_js = server_script_path.endswith('.js')

# 如果不是Python或JavaScript文件,则抛出错误
if not (is_python or is_js):
raise ValueError("服务器脚本必须是 .py 或 .js 文件")

# 根据文件类型确定执行命令
command = "python" if is_python else "node"

# 创建服务器参数对象
server_params = StdioServerParameters(
command=command, # 执行命令
args=[server_script_path], # 脚本路径作为参数
env=None # 环境变量(使用默认环境)
)

# 建立stdio客户端连接并将其添加到异步上下文管理器中
stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
self.stdio, self.write = stdio_transport

# 创建客户端会话并将其添加到异步上下文管理器中
self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))

# 初始化会话
await self.session.initialize()

# 列出可用的工具
response = await self.session.list_tools()
tools = response.tools

# 打印连接的服务器提供的工具列表
print("\n已连接到服务器,可用工具:", [tool.name for tool in tools])

async def process_query(self, query: str) -> str:
"""使用Qwen和可用工具处理查询"""
# 构建消息列表
messages = [
{
"role": "user",
"content": query
}
]

# 获取可用工具列表并转换为OpenAI格式
response = await self.session.list_tools()
available_tools = []
for tool in response.tools:
schema = tool.inputSchema
if isinstance(schema, str):
schema = json.loads(schema)
if isinstance(schema, dict) and "properties" in schema:
schema = {"type": "object", **schema}

available_tools.append({
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"parameters": schema
}
})

# 第一次调用模型
response = self.anthropic.chat.completions.create(
model="qwen3-235b-a22b",
max_tokens=1000,
messages=messages,
tools=available_tools,
extra_body={"enable_thinking": False}
)

final_text = []
message = response.choices[0].message

# 处理文本内容
if message.content:
final_text.append(message.content)

# 处理工具调用
if message.tool_calls:
for tool_call in message.tool_calls:
tool_name = tool_call.function.name
tool_args = json.loads(tool_call.function.arguments)

# 执行工具调用
result = await self.session.call_tool(tool_name, tool_args)
final_text.append(f"[调用工具 {tool_name},参数 {tool_args}]")

# 处理工具结果
tool_result_content = ""
if result.content:
for item in result.content:
if hasattr(item, 'type') and item.type == 'text':
tool_result_content += item.text
else:
tool_result_content += str(item)

# 添加助手消息到历史
messages.append({
"role": "assistant",
"content": None,
"tool_calls": [tool_call]
})

# 添加工具执行结果到历史
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": tool_result_content
})

# 再次调用模型
response = self.anthropic.chat.completions.create(
model="qwen3-235b-a22b",
max_tokens=1000,
messages=messages,
tools=available_tools,
extra_body={"enable_thinking": False}
)

# 处理最终响应
if response.choices and response.choices[0].message.content:
final_text.append(response.choices[0].message.content)

return "\n".join(final_text)

async def chat_loop(self):
"""运行交互式聊天循环"""
print("\nMCP客户端已启动!")
print("输入您的问题或输入'quit'退出。")

while True: # 无限循环,持续接收用户输入
try:
query = input("\n问题: ").strip() # 获取用户输入并去除首尾空格

if query.lower() == 'quit': # 如果用户输入'quit'(不区分大小写)
break # 退出循环

# 处理用户查询并获取响应
response = await self.process_query(query)
print("\n" + response) # 打印AI响应结果

except Exception as e: # 捕获所有异常
print(f"\n错误: {str(e)}") # 打印错误信息
import traceback
traceback.print_exc() # 打印详细错误信息

async def cleanup(self):
"""清理资源"""
await self.exit_stack.aclose() # 异步关闭所有在exit_stack中管理的资源

async def main():
# 检查命令行参数数量,如果少于2个则显示使用说明
if len(sys.argv) < 2:
print("用法: python client.py <服务器脚本路径>")
sys.exit(1) # 退出程序,返回错误码1

# 创建MCP客户端实例
client = MCPClient()
try:
# 连接到服务器,sys.argv[1]是第一个命令行参数(服务器脚本路径)
await client.connect_to_server(sys.argv[1])
# 启动交互式聊天循环
await client.chat_loop()
finally:
# 确保程序结束时清理资源
await client.cleanup()

# 程序入口点
if __name__ == "__main__":
# 运行异步主函数
asyncio.run(main())

openai和claude在工具调用的差异

  1. 工具格式转换修复

问题:MCP工具格式与OpenAI API不兼容 修复

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 原错误格式
available_tools = [{
"name": tool.name,
"description": tool.description,
"input_schema": tool.inputSchema
}]

# 修复后格式(符合OpenAI规范)
available_tools = [{
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"parameters": schema # 正确的JSON Schema格式
}
}]

  1. API响应处理修复

问题:错误访问了OpenAI响应对象的属性 修复

1
2
3
4
5
6
7
8
9
10
# 原错误代码
for content in response.content: # ❌ response没有content属性

# 修复后代码
message = response.choices[0].message # ✅ 正确的访问路径
if message.content:
final_text.append(message.content)
if message.tool_calls:
for tool_call in message.tool_calls:
# 处理工具调用

  1. 工具调用结果处理修复

问题:错误处理MCP工具调用返回的结果结构 修复

1
2
3
4
5
6
7
8
9
10
11
# 原错误代码
"content": result.content # ❌ 可能包含复杂对象

# 修复后代码
tool_result_content = ""
if result.content:
for item in result.content:
if hasattr(item, 'type') and item.type == 'text':
tool_result_content += item.text
else:
tool_result_content += str(item)
  1. 消息历史构建修复

问题:工具调用后消息历史格式不正确 修复

1
2
3
4
5
6
7
8
9
10
11
# 正确的消息历史格式
messages.append({
"role": "assistant",
"content": None,
"tool_calls": [tool_call]
})
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": tool_result_content
})

  1. JSON Schema兼容性处理

问题:MCP返回的schema可能缺少必要的根类型定义 修复

1
2
3
4
5
schema = tool.inputSchema
if isinstance(schema, str):
schema = json.loads(schema)
if isinstance(schema, dict) and "properties" in schema:
schema = {"type": "object", **schema} # 确保有根类型