本章将自己动手开发一个MCP客户端,希望大家喜欢。
一、工作原理

核心部分由两部分构成:
-
Session: MCP客户端与MCP服务端的会话,连接后可以获得MCP Server中的工具(tools)、资源(resource)以及提示(prompts)以及远程调用tools的能力。 -
LLM: MCP客户端的大脑,基于用户提出的需求自动判断是否需要调用工具以及调用哪个工具,所以此LLM必须是要能支持Tool Call才可以正常使用。
二、MCP Client构建
1、创建McpClient类
import asyncio
import json
import sys
import time
from typing import Optional
from contextlib import AsyncExitStack
from mcp.client.sse import sse_client
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from openai import AsyncOpenAI
class MCPClient:
def __init__(self):
# Initialize session and client objects
self.session: Optional[ClientSession] = None
self.exit_stack = AsyncExitStack()
self.client = AsyncOpenAI(
# 此处使用阿里的qwen-plus大模型
api_key="your api key",
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
)
2、创建连接到MCP服务端函数
MCP服务端传输协议支持stdio和sse,所以此处创建2个函数
async def connect_to_server(self, server_script_path: str):
"""Connect to an MCP server
Args:
server_script_path: Path to the server script (.py or .js)
"""
is_python = server_script_path.endswith(".py")
is_js = server_script_path.endswith(".js")
if not (is_python or is_js):
raise ValueError("Server script must be a .py or .js file")
command = "python"if is_python else"node"
server_params = StdioServerParameters(
command=command, args=[server_script_path], env=None
)
stdio_transport = await self.exit_stack.enter_async_context(
stdio_client(server_params)
)
self.stdio, self.write = stdio_transport
self.session = await self.exit_stack.enter_async_context(
ClientSession(self.stdio, self.write)
)
await self.session.initialize()
# List available tools
response = await self.session.list_tools()
tools = response.tools
print("nConnected to server with tools:", [tool.name for tool in tools])
async def connect_to_sse_server(self, server_url: str):
"""Connect to an MCP server
Args:
server_script_path: Path to the server script (.py or .js)
"""
self._streams_context = sse_client(url=server_url)
streams = await self._streams_context.__aenter__()
self._session_context = ClientSession(*streams)
self.session = await self._session_context.__aenter__()
await self.session.initialize()
# List available tools
response = await self.session.list_tools()
tools = response.tools
print("nConnected to server with tools:", [tool.name for tool in tools])
3、处理用户需求
async def process_query(self, query: str) -> str:
"""使用 LLM 和 MCP 服务器提供的工具处理查询"""
messages = [
{
"role": "user",
"content": query
}
]
response = await self.session.list_tools()
available_tools = [{
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"parameters": tool.inputSchema
}
} for tool in response.tools]
# 初始化 LLM API 调用
response = await self.client.chat.completions.create(
model="qwen-plus",
messages=messages,
tools=available_tools # 将工具列表传递给 LLM
)
final_text = []
message = response.choices[0].message
print(response.choices[0])
final_text.append(message.content or "")
# 处理响应并处理工具调用
if message.tool_calls:
# 处理每个工具调用
for tool_call in message.tool_calls:
tool_name = tool_call.function.name
tool_args = json.loads(tool_call.function.arguments)
# 执行工具调用
start_time = time.time()
result = await self.session.call_tool(tool_name, tool_args)
end_time = time.time()
print(f"Tool {tool_name} took {end_time - start_time} seconds to execute")
final_text.append(f"[Calling tool {tool_name} with args {tool_args}]")
# 将工具调用和结果添加到消息历史
messages.append({
"role": "assistant",
"tool_calls": [
{
"id": tool_call.id,
"type": "function",
"function": {
"name": tool_name,
"arguments": json.dumps(tool_args)
}
}
]
})
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": str(result.content)
})
# 将工具调用的结果交给 LLM
response = await self.client.chat.completions.create(
model="qwen-plus",
messages=messages,
tools=available_tools
)
message = response.choices[0].message
if message.content:
final_text.append(message.content)
return"n".join(final_text)
4、循环聊天函数
async def chat_loop(self):
"""Run an interactive chat loop"""
print("nMCP Client Started!")
print("Type your queries or 'quit' to exit.")
while True:
try:
query = input("nQuery: ").strip()
if query.lower() == 'quit':
break
response = await self.process_query(query)
print("n" + response)
except Exception as e:
print(f"nError: {str(e)}")
5、会话清理
async def cleanup(self):
"""Clean up resources"""
await self.exit_stack.aclose()
6、入口函数
async def main():
if len(sys.argv) < 2:
print("Usage: python client.py <path_to_server_script>")
sys.exit(1)
client = MCPClient()
try:
# 根据MCP Server传输协议进行选择
await client.connect_to_sse_server(sys.argv[1])
await client.chat_loop()
finally:
await client.cleanup()
if __name__ == "__main__":
# asyncio.run(main())
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
7、程序执行
# MCP Server传输协议为stdio,启动命令为:
uv run client.py "完整server端脚本路径"
# MCP Server传输协议为sse,启动命令为:
uv run client.py http://127.0.0.1:8000/sse
# 上面的sse地址根据自己的进行调整
8、运行截图

9、完整源码
import asyncio
import json
import sys
import time
from typing import Optional
from contextlib import AsyncExitStack
from mcp.client.sse import sse_client
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from openai import AsyncOpenAI
class MCPClient:
def __init__(self):
# Initialize session and client objects
self.session: Optional[ClientSession] = None
self.exit_stack = AsyncExitStack()
self.client = AsyncOpenAI(
# 如果没有配置环境变量,请用百炼API Key替换:api_key="sk-xxx"
api_key="sk-e6fdadbd5f774ee0be35f6ecc1c52fe7",
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
)
async def connect_to_server(self, server_script_path: str):
"""Connect to an MCP server
Args:
server_script_path: Path to the server script (.py or .js)
"""
is_python = server_script_path.endswith(".py")
is_js = server_script_path.endswith(".js")
if not (is_python or is_js):
raise ValueError("Server script must be a .py or .js file")
command = "python"if is_python else"node"
server_params = StdioServerParameters(
command=command, args=[server_script_path], env=None
)
stdio_transport = await self.exit_stack.enter_async_context(
stdio_client(server_params)
)
self.stdio, self.write = stdio_transport
self.session = await self.exit_stack.enter_async_context(
ClientSession(self.stdio, self.write)
)
await self.session.initialize()
# List available tools
response = await self.session.list_tools()
tools = response.tools
print("nConnected to server with tools:", [tool.name for tool in tools])
async def connect_to_sse_server(self, server_url: str):
"""Connect to an MCP server
Args:
server_script_path: Path to the server script (.py or .js)
"""
self._streams_context = sse_client(url=server_url)
streams = await self._streams_context.__aenter__()
self._session_context = ClientSession(*streams)
self.session = await self._session_context.__aenter__()
await self.session.initialize()
# List available tools
response = await self.session.list_tools()
tools = response.tools
print("nConnected to server with tools:", [tool.name for tool in tools])
async def process_query(self, query: str) -> str:
"""使用 LLM 和 MCP 服务器提供的工具处理查询"""
messages = [
{
"role": "user",
"content": query
}
]
response = await self.session.list_tools()
available_tools = [{
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"parameters": tool.inputSchema
}
} for tool in response.tools]
# 初始化 LLM API 调用
response = await self.client.chat.completions.create(
model="qwen-plus",
messages=messages,
tools=available_tools # 将工具列表传递给 LLM
)
final_text = []
message = response.choices[0].message
print(response.choices[0])
final_text.append(message.content or "")
# 处理响应并处理工具调用
if message.tool_calls:
# 处理每个工具调用
for tool_call in message.tool_calls:
tool_name = tool_call.function.name
tool_args = json.loads(tool_call.function.arguments)
# 执行工具调用
start_time = time.time()
result = await self.session.call_tool(tool_name, tool_args)
end_time = time.time()
print(f"Tool {tool_name} took {end_time - start_time} seconds to execute")
final_text.append(f"[Calling tool {tool_name} with args {tool_args}]")
# 将工具调用和结果添加到消息历史
messages.append({
"role": "assistant",
"tool_calls": [
{
"id": tool_call.id,
"type": "function",
"function": {
"name": tool_name,
"arguments": json.dumps(tool_args)
}
}
]
})
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": str(result.content)
})
# 将工具调用的结果交给 LLM
response = await self.client.chat.completions.create(
model="qwen-plus",
messages=messages,
tools=available_tools
)
message = response.choices[0].message
if message.content:
final_text.append(message.content)
return"n".join(final_text)
async def chat_loop(self):
"""Run an interactive chat loop"""
print("nMCP Client Started!")
print("Type your queries or 'quit' to exit.")
while True:
try:
query = input("nQuery: ").strip()
if query.lower() == 'quit':
break
response = await self.process_query(query)
print("n" + response)
except Exception as e:
print(f"nError: {str(e)}")
async def cleanup(self):
"""Clean up resources"""
await self.exit_stack.aclose()
async def main():
if len(sys.argv) < 2:
print("Usage: python client.py <path_to_server_script>")
sys.exit(1)
client = MCPClient()
try:
# 根据MCP Server传输协议进行选择
await client.connect_to_sse_server(sys.argv[1])
await client.chat_loop()
finally:
await client.cleanup()
if __name__ == "__main__":
# asyncio.run(main())
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
结语
本文主要讲述了如何通过代码来构建MCP Client,通过对整体构建步骤的分析,相信每个人都能理解MCP Client的工作机制。大家后续可以尝试自己构建一个个性化的MCP Client,配合MCP Server,尽情的发挥你的想象。