支持私有化部署
AI知识库

53AI知识库

学习大模型的前沿技术与行业应用场景


MCP协议之MCP Client一文入门

发布日期:2025-03-19 13:27:17 浏览次数: 2638 作者:AI真香笔记
推荐语

探索MCP协议客户端的构建与原理,为非Cursor用户带来实操指南。

核心内容:
1. MCP客户端的工作原理解析
2. 如何构建MCP客户端
3. 创建连接到MCP服务端的函数详解

杨芳贤
53A创始人/腾讯云(TVP)最具价值专家

本章将自己动手开发一个MCP客户端,希望大家喜欢。

一、工作原理

核心部分由两部分构成:

  • Session: MCP客户端与MCP服务端的会话,连接后可以获得MCP Server中的工具(tools)、资源(resource)以及提示(prompts)以及远程调用tools的能力。
  • LLM: MCP客户端的大脑,基于用户提出的需求自动判断是否需要调用工具以及调用哪个工具,所以此LLM必须是要能支持Tool Call才可以正常使用。

二、MCP Client构建

1、创建McpClient类

import asyncio
import json
import sys
import time
from typing import Optional
from contextlib import AsyncExitStack
from mcp.client.sse import sse_client
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from openai import AsyncOpenAI

class MCPClient:
    def __init__(self):
        # Initialize session and client objects
        self.session: Optional[ClientSession] = None
        self.exit_stack = AsyncExitStack()
        self.client = AsyncOpenAI(
            # 此处使用阿里的qwen-plus大模型
            api_key="your api key",
            base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
        )

2、创建连接到MCP服务端函数

MCP服务端传输协议支持stdio和sse,所以此处创建2个函数

async def connect_to_server(self, server_script_path: str):
    """Connect to an MCP server

    Args:
        server_script_path: Path to the server script (.py or .js)
    "
""
    is_python = server_script_path.endswith(".py")
    is_js = server_script_path.endswith(".js")
    if not (is_python or is_js):
        raise ValueError("Server script must be a .py or .js file")

    command = "python"if is_python else"node"
    server_params = StdioServerParameters(
        command=command, args=[server_script_path], env=None
    )

    stdio_transport = await self.exit_stack.enter_async_context(
        stdio_client(server_params)
    )
    self.stdio, self.write = stdio_transport
    self.session = await self.exit_stack.enter_async_context(
        ClientSession(self.stdio, self.write)
    )

    await self.session.initialize()

    # List available tools
    response = await self.session.list_tools()
    tools = response.tools
    print("\nConnected to server with tools:", [tool.name for tool in tools])

async def connect_to_sse_server(self, server_url: str):
    """Connect to an MCP server

    Args:
        server_script_path: Path to the server script (.py or .js)
    "
""
    self._streams_context = sse_client(url=server_url)

    streams = await self._streams_context.__aenter__()
    self._session_context = ClientSession(*streams)
    self.session = await self._session_context.__aenter__()

    await self.session.initialize()
    # List available tools
    response = await self.session.list_tools()
    tools = response.tools
    print("\nConnected to server with tools:", [tool.name for tool in tools])

3、处理用户需求

async def process_query(self, query: str) -> str:
    """使用 LLM 和 MCP 服务器提供的工具处理查询"""
    messages = [
        {
            "role""user",
            "content": query
        }
    ]

    response = await self.session.list_tools()
    available_tools = [{
        "type""function",
        "function": {
            "name": tool.name,
            "description": tool.description,
            "parameters": tool.inputSchema
        }
    } for tool in response.tools]

    # 初始化 LLM API 调用
    response = await self.client.chat.completions.create(
        model="qwen-plus",
        messages=messages,
        tools=available_tools # 将工具列表传递给 LLM
    )


    final_text = []
    message = response.choices[0].message
    print(response.choices[0])
    final_text.append(message.content or "")

    # 处理响应并处理工具调用
    if message.tool_calls:
        # 处理每个工具调用
        for tool_call in message.tool_calls:
            tool_name = tool_call.function.name
            tool_args = json.loads(tool_call.function.arguments)

            # 执行工具调用
            start_time = time.time()
            result = await self.session.call_tool(tool_name, tool_args)
            end_time = time.time()
            print(f"Tool {tool_name} took {end_time - start_time} seconds to execute")
            final_text.append(f"[Calling tool {tool_name} with args {tool_args}]")

            # 将工具调用和结果添加到消息历史
            messages.append({
                "role""assistant",
                "tool_calls": [
                    {
                        "id": tool_call.id,
                        "type""function",
                        "function": {
                            "name": tool_name,
                            "arguments": json.dumps(tool_args)
                        }
                    }
                ]
            })
            messages.append({
                "role""tool",
                "tool_call_id": tool_call.id,
                "content": str(result.content)
            })

        # 将工具调用的结果交给 LLM
        response = await self.client.chat.completions.create(
            model="qwen-plus",
            messages=messages,
            tools=available_tools
        )

        message = response.choices[0].message
        if message.content:
            final_text.append(message.content)

    return"\n".join(final_text)

4、循环聊天函数

async def chat_loop(self):
    """Run an interactive chat loop"""
    print("\nMCP Client Started!")
    print("Type your queries or 'quit' to exit.")

    while True:
        try:
            query = input("\nQuery: ").strip()

            if query.lower() == 'quit':
                break

            response = await self.process_query(query)
            print("\n" + response)

        except Exception as e:
            print(f"\nError: {str(e)}")

5、会话清理

async def cleanup(self):
    """Clean up resources"""
    await self.exit_stack.aclose()

6、入口函数

async def main():
    if len(sys.argv) < 2:
        print("Usage: python client.py <path_to_server_script>")
        sys.exit(1)

    client = MCPClient()
    try:
        # 根据MCP Server传输协议进行选择
        await client.connect_to_sse_server(sys.argv[1])
        await client.chat_loop()
    finally:
        await client.cleanup()


if __name__ == "__main__":
    # asyncio.run(main())
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

7、程序执行

# MCP Server传输协议为stdio,启动命令为:
uv run client.py "完整server端脚本路径"

# MCP Server传输协议为sse,启动命令为:
uv run client.py http://127.0.0.1:8000/sse
# 上面的sse地址根据自己的进行调整

8、运行截图

9、完整源码

import asyncio
import json
import sys
import time
from typing import Optional
from contextlib import AsyncExitStack
from mcp.client.sse import sse_client
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from openai import AsyncOpenAI


class MCPClient:
    def __init__(self):
        # Initialize session and client objects
        self.session: Optional[ClientSession] = None
        self.exit_stack = AsyncExitStack()
        self.client = AsyncOpenAI(
            # 如果没有配置环境变量,请用百炼API Key替换:api_key="sk-xxx"
            api_key="sk-e6fdadbd5f774ee0be35f6ecc1c52fe7",
            base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
        )

    async def connect_to_server(self, server_script_path: str):
        """Connect to an MCP server

        Args:
            server_script_path: Path to the server script (.py or .js)
        "
""
        is_python = server_script_path.endswith(".py")
        is_js = server_script_path.endswith(".js")
        if not (is_python or is_js):
            raise ValueError("Server script must be a .py or .js file")

        command = "python"if is_python else"node"
        server_params = StdioServerParameters(
            command=command, args=[server_script_path], env=None
        )

        stdio_transport = await self.exit_stack.enter_async_context(
            stdio_client(server_params)
        )
        self.stdio, self.write = stdio_transport
        self.session = await self.exit_stack.enter_async_context(
            ClientSession(self.stdio, self.write)
        )

        await self.session.initialize()

        # List available tools
        response = await self.session.list_tools()
        tools = response.tools
        print("\nConnected to server with tools:", [tool.name for tool in tools])

    async def connect_to_sse_server(self, server_url: str):
        """Connect to an MCP server

        Args:
            server_script_path: Path to the server script (.py or .js)
        "
""
        self._streams_context = sse_client(url=server_url)

        streams = await self._streams_context.__aenter__()
        self._session_context = ClientSession(*streams)
        self.session = await self._session_context.__aenter__()

        await self.session.initialize()
        # List available tools
        response = await self.session.list_tools()
        tools = response.tools
        print("\nConnected to server with tools:", [tool.name for tool in tools])
        
    async def process_query(self, query: str) -> str:
        """使用 LLM 和 MCP 服务器提供的工具处理查询"""
        messages = [
            {
                "role""user",
                "content": query
            }
        ]

        response = await self.session.list_tools()
        available_tools = [{
            "type""function",
            "function": {
                "name": tool.name,
                "description": tool.description,
                "parameters": tool.inputSchema
            }
        } for tool in response.tools]

        # 初始化 LLM API 调用
        response = await self.client.chat.completions.create(
            model="qwen-plus",
            messages=messages,
            tools=available_tools # 将工具列表传递给 LLM
        )

    
        final_text = []
        message = response.choices[0].message
        print(response.choices[0])
        final_text.append(message.content or "")

        # 处理响应并处理工具调用
        if message.tool_calls:
            # 处理每个工具调用
            for tool_call in message.tool_calls:
                tool_name = tool_call.function.name
                tool_args = json.loads(tool_call.function.arguments)
                
                # 执行工具调用
                start_time = time.time()
                result = await self.session.call_tool(tool_name, tool_args)
                end_time = time.time()
                print(f"Tool {tool_name} took {end_time - start_time} seconds to execute")
                final_text.append(f"[Calling tool {tool_name} with args {tool_args}]")

                # 将工具调用和结果添加到消息历史
                messages.append({
                    "role""assistant",
                    "tool_calls": [
                        {
                            "id": tool_call.id,
                            "type""function",
                            "function": {
                                "name": tool_name,
                                "arguments": json.dumps(tool_args)
                            }
                        }
                    ]
                })
                messages.append({
                    "role""tool",
                    "tool_call_id": tool_call.id,
                    "content": str(result.content)
                })

            # 将工具调用的结果交给 LLM
            response = await self.client.chat.completions.create(
                model="qwen-plus",
                messages=messages,
                tools=available_tools
            )
            
            message = response.choices[0].message
            if message.content:
                final_text.append(message.content)

        return"\n".join(final_text)
    
    async def chat_loop(self):
        """Run an interactive chat loop"""
        print("\nMCP Client Started!")
        print("Type your queries or 'quit' to exit.")

        while True:
            try:
                query = input("\nQuery: ").strip()

                if query.lower() == 'quit':
                    break

                response = await self.process_query(query)
                print("\n" + response)

            except Exception as e:
                print(f"\nError: {str(e)}")

    async def cleanup(self):
        """Clean up resources"""
        await self.exit_stack.aclose()


async def main():
    if len(sys.argv) < 2:
        print("Usage: python client.py <path_to_server_script>")
        sys.exit(1)

    client = MCPClient()
    try:
        # 根据MCP Server传输协议进行选择
        await client.connect_to_sse_server(sys.argv[1])
        await client.chat_loop()
    finally:
        await client.cleanup()


if __name__ == "__main__":
    # asyncio.run(main())
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

结语

本文主要讲述了如何通过代码来构建MCP Client,通过对整体构建步骤的分析,相信每个人都能理解MCP Client的工作机制。大家后续可以尝试自己构建一个个性化的MCP Client,配合MCP Server,尽情的发挥你的想象。

53AI,企业落地大模型首选服务商

产品:场景落地咨询+大模型应用平台+行业解决方案

承诺:免费场景POC验证,效果验证后签署服务协议。零风险落地应用大模型,已交付160+中大型企业

联系我们

售前咨询
186 6662 7370
预约演示
185 8882 0121

微信扫码

添加专属顾问

回到顶部

加载中...

扫码咨询