微信扫码
添加专属顾问
我要投稿
探索MCP协议客户端的构建与原理,为非Cursor用户带来实操指南。 核心内容: 1. MCP客户端的工作原理解析 2. 如何构建MCP客户端 3. 创建连接到MCP服务端的函数详解
本章将自己动手开发一个MCP客户端,希望大家喜欢。
核心部分由两部分构成:
import asyncio
import json
import sys
import time
from typing import Optional
from contextlib import AsyncExitStack
from mcp.client.sse import sse_client
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from openai import AsyncOpenAI
class MCPClient:
def __init__(self):
# Initialize session and client objects
self.session: Optional[ClientSession] = None
self.exit_stack = AsyncExitStack()
self.client = AsyncOpenAI(
# 此处使用阿里的qwen-plus大模型
api_key="your api key",
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
)
MCP服务端传输协议支持stdio和sse,所以此处创建2个函数
async def connect_to_server(self, server_script_path: str):
"""Connect to an MCP server
Args:
server_script_path: Path to the server script (.py or .js)
"""
is_python = server_script_path.endswith(".py")
is_js = server_script_path.endswith(".js")
if not (is_python or is_js):
raise ValueError("Server script must be a .py or .js file")
command = "python"if is_python else"node"
server_params = StdioServerParameters(
command=command, args=[server_script_path], env=None
)
stdio_transport = await self.exit_stack.enter_async_context(
stdio_client(server_params)
)
self.stdio, self.write = stdio_transport
self.session = await self.exit_stack.enter_async_context(
ClientSession(self.stdio, self.write)
)
await self.session.initialize()
# List available tools
response = await self.session.list_tools()
tools = response.tools
print("\nConnected to server with tools:", [tool.name for tool in tools])
async def connect_to_sse_server(self, server_url: str):
"""Connect to an MCP server
Args:
server_script_path: Path to the server script (.py or .js)
"""
self._streams_context = sse_client(url=server_url)
streams = await self._streams_context.__aenter__()
self._session_context = ClientSession(*streams)
self.session = await self._session_context.__aenter__()
await self.session.initialize()
# List available tools
response = await self.session.list_tools()
tools = response.tools
print("\nConnected to server with tools:", [tool.name for tool in tools])
async def process_query(self, query: str) -> str:
"""使用 LLM 和 MCP 服务器提供的工具处理查询"""
messages = [
{
"role": "user",
"content": query
}
]
response = await self.session.list_tools()
available_tools = [{
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"parameters": tool.inputSchema
}
} for tool in response.tools]
# 初始化 LLM API 调用
response = await self.client.chat.completions.create(
model="qwen-plus",
messages=messages,
tools=available_tools # 将工具列表传递给 LLM
)
final_text = []
message = response.choices[0].message
print(response.choices[0])
final_text.append(message.content or "")
# 处理响应并处理工具调用
if message.tool_calls:
# 处理每个工具调用
for tool_call in message.tool_calls:
tool_name = tool_call.function.name
tool_args = json.loads(tool_call.function.arguments)
# 执行工具调用
start_time = time.time()
result = await self.session.call_tool(tool_name, tool_args)
end_time = time.time()
print(f"Tool {tool_name} took {end_time - start_time} seconds to execute")
final_text.append(f"[Calling tool {tool_name} with args {tool_args}]")
# 将工具调用和结果添加到消息历史
messages.append({
"role": "assistant",
"tool_calls": [
{
"id": tool_call.id,
"type": "function",
"function": {
"name": tool_name,
"arguments": json.dumps(tool_args)
}
}
]
})
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": str(result.content)
})
# 将工具调用的结果交给 LLM
response = await self.client.chat.completions.create(
model="qwen-plus",
messages=messages,
tools=available_tools
)
message = response.choices[0].message
if message.content:
final_text.append(message.content)
return"\n".join(final_text)
async def chat_loop(self):
"""Run an interactive chat loop"""
print("\nMCP Client Started!")
print("Type your queries or 'quit' to exit.")
while True:
try:
query = input("\nQuery: ").strip()
if query.lower() == 'quit':
break
response = await self.process_query(query)
print("\n" + response)
except Exception as e:
print(f"\nError: {str(e)}")
async def cleanup(self):
"""Clean up resources"""
await self.exit_stack.aclose()
async def main():
if len(sys.argv) < 2:
print("Usage: python client.py <path_to_server_script>")
sys.exit(1)
client = MCPClient()
try:
# 根据MCP Server传输协议进行选择
await client.connect_to_sse_server(sys.argv[1])
await client.chat_loop()
finally:
await client.cleanup()
if __name__ == "__main__":
# asyncio.run(main())
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
# MCP Server传输协议为stdio,启动命令为:
uv run client.py "完整server端脚本路径"
# MCP Server传输协议为sse,启动命令为:
uv run client.py http://127.0.0.1:8000/sse
# 上面的sse地址根据自己的进行调整
import asyncio
import json
import sys
import time
from typing import Optional
from contextlib import AsyncExitStack
from mcp.client.sse import sse_client
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from openai import AsyncOpenAI
class MCPClient:
def __init__(self):
# Initialize session and client objects
self.session: Optional[ClientSession] = None
self.exit_stack = AsyncExitStack()
self.client = AsyncOpenAI(
# 如果没有配置环境变量,请用百炼API Key替换:api_key="sk-xxx"
api_key="sk-e6fdadbd5f774ee0be35f6ecc1c52fe7",
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
)
async def connect_to_server(self, server_script_path: str):
"""Connect to an MCP server
Args:
server_script_path: Path to the server script (.py or .js)
"""
is_python = server_script_path.endswith(".py")
is_js = server_script_path.endswith(".js")
if not (is_python or is_js):
raise ValueError("Server script must be a .py or .js file")
command = "python"if is_python else"node"
server_params = StdioServerParameters(
command=command, args=[server_script_path], env=None
)
stdio_transport = await self.exit_stack.enter_async_context(
stdio_client(server_params)
)
self.stdio, self.write = stdio_transport
self.session = await self.exit_stack.enter_async_context(
ClientSession(self.stdio, self.write)
)
await self.session.initialize()
# List available tools
response = await self.session.list_tools()
tools = response.tools
print("\nConnected to server with tools:", [tool.name for tool in tools])
async def connect_to_sse_server(self, server_url: str):
"""Connect to an MCP server
Args:
server_script_path: Path to the server script (.py or .js)
"""
self._streams_context = sse_client(url=server_url)
streams = await self._streams_context.__aenter__()
self._session_context = ClientSession(*streams)
self.session = await self._session_context.__aenter__()
await self.session.initialize()
# List available tools
response = await self.session.list_tools()
tools = response.tools
print("\nConnected to server with tools:", [tool.name for tool in tools])
async def process_query(self, query: str) -> str:
"""使用 LLM 和 MCP 服务器提供的工具处理查询"""
messages = [
{
"role": "user",
"content": query
}
]
response = await self.session.list_tools()
available_tools = [{
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"parameters": tool.inputSchema
}
} for tool in response.tools]
# 初始化 LLM API 调用
response = await self.client.chat.completions.create(
model="qwen-plus",
messages=messages,
tools=available_tools # 将工具列表传递给 LLM
)
final_text = []
message = response.choices[0].message
print(response.choices[0])
final_text.append(message.content or "")
# 处理响应并处理工具调用
if message.tool_calls:
# 处理每个工具调用
for tool_call in message.tool_calls:
tool_name = tool_call.function.name
tool_args = json.loads(tool_call.function.arguments)
# 执行工具调用
start_time = time.time()
result = await self.session.call_tool(tool_name, tool_args)
end_time = time.time()
print(f"Tool {tool_name} took {end_time - start_time} seconds to execute")
final_text.append(f"[Calling tool {tool_name} with args {tool_args}]")
# 将工具调用和结果添加到消息历史
messages.append({
"role": "assistant",
"tool_calls": [
{
"id": tool_call.id,
"type": "function",
"function": {
"name": tool_name,
"arguments": json.dumps(tool_args)
}
}
]
})
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": str(result.content)
})
# 将工具调用的结果交给 LLM
response = await self.client.chat.completions.create(
model="qwen-plus",
messages=messages,
tools=available_tools
)
message = response.choices[0].message
if message.content:
final_text.append(message.content)
return"\n".join(final_text)
async def chat_loop(self):
"""Run an interactive chat loop"""
print("\nMCP Client Started!")
print("Type your queries or 'quit' to exit.")
while True:
try:
query = input("\nQuery: ").strip()
if query.lower() == 'quit':
break
response = await self.process_query(query)
print("\n" + response)
except Exception as e:
print(f"\nError: {str(e)}")
async def cleanup(self):
"""Clean up resources"""
await self.exit_stack.aclose()
async def main():
if len(sys.argv) < 2:
print("Usage: python client.py <path_to_server_script>")
sys.exit(1)
client = MCPClient()
try:
# 根据MCP Server传输协议进行选择
await client.connect_to_sse_server(sys.argv[1])
await client.chat_loop()
finally:
await client.cleanup()
if __name__ == "__main__":
# asyncio.run(main())
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
本文主要讲述了如何通过代码来构建MCP Client,通过对整体构建步骤的分析,相信每个人都能理解MCP Client的工作机制。大家后续可以尝试自己构建一个个性化的MCP Client,配合MCP Server,尽情的发挥你的想象。
53AI,企业落地大模型首选服务商
产品:场景落地咨询+大模型应用平台+行业解决方案
承诺:免费场景POC验证,效果验证后签署服务协议。零风险落地应用大模型,已交付160+中大型企业
2025-04-16
Reranker模型应用场景、技术实现与性能对比
2025-04-16
让大模型可以轻松读取代码仓库:MCP-Repo2LLM
2025-04-16
大模型应用开发入门分享
2025-04-16
Altman 正在悄悄打造“AI版X”?ChatGPT社交功能曝光
2025-04-16
微软AI核心战略解密:让OpenAI冲锋陷阵当炮灰,采用跟随者策略坐收技术红利
2025-04-16
Claude 推出高级 Research 功能,并深度集成 Google Workspace
2025-04-16
智能体工作流与设计模式解析
2025-04-16
GPT-4.1一手实测,实力绝对被低估了
2024-08-13
2024-06-13
2024-08-21
2024-09-23
2024-07-31
2024-05-28
2024-08-04
2024-04-26
2024-07-09
2024-09-17
2025-04-13
2025-04-13
2025-04-13
2025-04-12
2025-04-12
2025-04-11
2025-04-11
2025-04-10