微信扫码
添加专属顾问
我要投稿
探索AI编程助手与Agent的新协议MCP,实现智能工具与数据的无缝交互。 核心内容: 1. MCP协议的起源与基础架构 2. MCP客户端与服务端的交互机制 3. 实践案例:MCP Client的实现与源码分享
本文讲述了MCP 原理解析和作者的MCP Client实践,希望能实现一个自己的 agent,让 AI 不仅能与人交流,还能协助工作。文末附源码!
MCP(Model Context Protocol)是由Anthropic于2024年底提出并开源的一种协议,旨在为AI系统(如AI编程助手、Agent等)提供安全、标准化的数据访问方式。它采用客户端-服务器架构,使AI工具(如Claude Desktop、IDE插件等)能够通过MCP客户端与MCP服务端交互,访问本地或远程数据源。
官方文档:MCP Quickstart
https://modelcontextprotocol.io/quickstart/server
基础概念
基础概念讲解总结自官方文档
MCP 是客户端-服务端架构,一个 Host 可以连接多个 MCP Server。
MCP Hosts(宿主程序):如Claude Desktop、IDE等,通过MCP访问数据。
MCP Clients(客户端):与服务器建立1:1连接,处理通信。
MCP Servers(服务端):轻量级程序,提供标准化的数据或工具访问能力。
Local Data Sources(本地数据源):如文件、数据库等,由MCP服务端安全访问。
Remote Services(远程服务):如API、云服务等,MCP服务端可代理访问。
协议层与传输层
负责消息封装(framing)、请求/响应关联、高级通信模式管理。
支持两种通信方式:
1.Stdio传输(标准输入/输出)
适用于本地进程间通信。
2.HTTP + SSE传输
服务端→客户端:Server-Sent Events(SSE)
客户端→服务端:HTTP POST
适用于远程网络通信。
所有传输均采用JSON-RPC 2.0进行消息交换。
消息类型
MCP 拥有多种类型的消息来处理不同的场景
请求(Request)(期望获得响应)
interface Request { method: string; params?: { ... };}
成功响应(Result)
interface Result { [key: string]: unknown;}
错误响应(Error)
interface Error { code: number; message: string; data?: unknown;}
通知(Notification)(单向,无需响应)
interface Notification { method: string; params?: { ... };}
生命周期
类似于三次握手,MCP客户端与MCP服务端初始化建立连接会进行以下步骤:
1.客户端发送initialize
请求(含协议版本、能力集)。
2.服务端返回版本及能力信息。
3.客户端发送initialized
通知确认。
4.进入正常通信阶段。
当初始化完毕,就可以进行通信了,目前支持:
请求-响应模式(Request-Response):双向通信。
通知模式(Notification):单向消息。
有以下几种方式会关闭连接
主动关闭(close()
)。
传输层断开。
错误触发终止。
实践
基础概念介绍完毕,接下来进行实践,我希望能实现一个自己的 agent,让 AI 不仅能和我交流,还能帮我干活。换一句话就是
myAgent is a general AI agent that turns your thoughts into actions. It excels at various tasks in work and life, getting everything done while you rest. [手动doge][手动doge]
先画一个图:
如图,要实现一个这样的效果,实现一个 myAgent,启动时,MCP Client建立与 MCP 服务端的连接,此时 MCP Server 上的能力或者可调用的工具会注册进来, 让 Client 感知到这个MCP服务能够干啥。
当用户与 Agent 进行交互时,Agent 会让 MCP Client 将用户的输入发送给 AI,让 AI 解析用户意图,一并发送的还有注册在 Client 上的能力集合。
我写了一个搜索助手的 MCP Server ,能力集的数据是这样的,可以看到目前只有一个 function,get_offers,可以看到工具里有它的名字,能力描述,需要的字段(包含字段类型,字段描述)。
available_tools is: [{'type': 'function', 'function': {'name': 'get_offers', 'description': 'Get product offers from API', 'parameters': {'type': 'object', 'properties': {'keywords': {'type': 'string', 'description': 'Keywords to search for products', 'default': ''}, 'pageSize': {'type': 'number', 'description': 'Number of items per page', 'minimum': 1, 'maximum': 100, 'default': 10}}}}}]
AI 会理解用户意图,决定是否用自然语言回答用户,或者选择合适的工具,告诉 client,帮我调用它。
当输入 你好
时,传给 AI 的 message 是这样的,这里系统预设了 AI 的一个身份,用于更好的完成特定领域的任务。
[ { "role": "user", "content": "你好" }, { "role": "system", "content": "You're a helpful digital assistant who can answer questions and support users in completing tasks." }]
此时 client 接收到 AI 的消息后,会解析数据,当没有工具要调用时
AI返回的是这样的:
ChatCompletionMessage(content='你好!有什么可以帮助你的吗?', refusal=None, role='assistant', annotations=None, audio=None, function_call=None, tool_calls=None)
可以看到,AI 返回是带有角色信息的,然后本次并没有识别到需要调用工具的地方,因此直接返回给用户就好,当然,在工程应用时,可以进行额外的逻辑处理。
让 AI 长出手,AI调用 MCP Server流程揭秘
当输入 帮我找一些手表
时,输入是:
[{'role': 'user', 'content': '帮我找一些手表'}, {'role': 'system', 'content': 'You are a helpful assistant that can answer questions and help with tasks.'}]
AI返回的是
AI response is ChatCompletionMessage(content='', refusal=None, role='assistant', annotations=None, audio=None, function_call=None, tool_calls=[ChatCompletionMessageToolCall(id='0195c8c06aaf3ea050e6d8eed17380ec', function=Function(arguments='{"keywords": "手表", "pageSize": 10}', name='get_offers'), type='function')])
可以看到,AI 识别到了用户的意图,是要寻找一些手表,并自动的选择了一个工具进行调用,根据工具使用说明,决定了选择的工具应该输入什么入参。(这里和模型很相关,是一个重要的节点,识别用户意图并决定要调用工具,有时识别的并不准确,或者返回的结构不是标准可解析的,这时就触发不了工具的调用,还会引入一些辣鸡信息,可能的解决方案是 换效果更好的模型,或者用提示词来约束模型返回,或者系统自己增加鲁棒性,提升成功率)
下面举一个效果不好的例子,大家如果知道有其他解决方法欢迎留言。
AI response is ChatCompletionMessage(content='leton\n{{"name": "get_offers", "arguments": {"keywords": "手表", "pageSize": 10}}}\n</tool_call>', refusal=None, role='assistant', annotations=None, audio=None, function_call=None, tool_calls=None)
client 接收到AI 的消息后,发现要调用工具,并且也有了工具所需的参数,就会与通过协议与 MCP Server 进行通信,告诉 MCP Server 该使用 get_offers 能力了,并且期待 MCP Server 将结果返回回来:
result = await self.session.call_tool(tool_name, tool_args)
MCP Server 不负众望,将结果返回了,可以看看返回的格式是什么样的:
meta=None content=[TextContent(type='text', text='some...product...info', annotations=None)] isError=False
MCP Client 拿到数据后,再将数据发送给 AI,输入是这样的:
{ "messages": [ { "role": "user", "content": "帮我找一些手表" }, { "role": "system", "content": "You are a helpful assistant that can answer questions and help with tasks." }, { "role": "assistant", "content": "", "tool_calls": [ { "id": "0195c8c06aaf3ea050e6d8eed17380ec", "type": "function", "function": { "name": "get_offers", "arguments": "{\"keywords\": \"手表\", \"pageSize\": 10}" } } ] }, { "role": "tool", "tool_call_id": "0195c8c06aaf3ea050e6d8eed17380ec", "content": { "type": "text", "text": "some...product...info" } } ]}
最后,AI 将 MCPServer 的结果,进行总结润色,结构化返回:
?AI: [?调用工具 get_offers ?参数是 {'keywords': '手表', 'pageSize': 10}]
根据您的搜索,这里有几款手表供您参考:
1. 款式 ID: ididid2
价格: $0.24
供应商: Guangzhou Huapan Cosmetics Co., Ltd.
评分: **
收评次数: **
供应商年限: **年
推荐指数: ★★
2. 款式 ID: ididid
价格: $3.99
供应商: Shenzhen Top Quality Electronics Technology Co., Ltd.
评分: **
收评次数: **
供应商年限: **年
推荐指数: ★★
这两款手表的评价和销售情况都还不错,您可以根据自己的需求选择合适的款式。如果还有其他问题或需要更多信息,请随时告诉我。
这里给了 它 10 个品 ,但是只总结了两个品,可能适合我之前的输入 帮我找一些手表
有关,看来AI 也会偷懒?。
实际效果
实践后的总结
上面的交互过程,其实可以化简,如果在工程应用上,调用的 MCP Server 是一个预期内的结构化的结果或者触发某个任务时,可以不必进行二次 AI 调用。如上面的例子中,MCP Server 是一个搜索助手,内部发起调用的是搜索的接口,并进行结构化返回。此时在 AI 识别到用户意图并告诉 Client 该调用什么工具时,与 AI 的交互就可以结束了,由系统接管决定应该返回给用户什么,不必再将结果给到 AI 进行润色总结。
给到 AI 进行润色总结的好处是可以根据用户的输入,再结合工具获取的数据,更智能友好的返回给用户信息,这一点可以在工程应用时,进行衡量取舍。
将MCP Server 的结果交给 AI,在需要进行多轮交互场景是有必要的,根据 MCP Server的结果,进行分析及决策,动态调整要使用的工具,可以将一个复杂的任务交给 AI , 它会自己拆解成小任务,然后自动完成。
对于该场景,也进行了一些实践。
例如,让Al agent拆解抽象任务,并自己主动与系统进行多轮交互,完成任务场景。
后续文章我们会详细介绍具体实现方法,讲讲如何让Al在我的电脑上玩起贪吃蛇?
附录
import asyncioimport jsonimport osimport tracebackfrom typing import Optionalfrom contextlib import AsyncExitStackfrom mcp import ClientSession, StdioServerParametersfrom mcp.client.stdio import stdio_clientfrom openai import OpenAIfrom dotenv import load_dotenvload_dotenv() # load environment variables from .envclass MCPClient: def __init__(self): # Initialize session and client objects self.session: Optional[ClientSession] = None self.exit_stack = AsyncExitStack() self.client = OpenAI( api_key=os.getenv("OPENAI_API_KEY"), base_url=os.getenv("OPENAI_BASE_URL") ) self.model = os.getenv("OPENAI_MODEL") self.messages = [ { "role": "system", "content": "You are a versatile assistant capable of answering questions, completing tasks, and intelligently invoking specialized tools to deliver optimal results." } ] self.available_tools = [] @staticmethod def convert_custom_object(obj): """ 将自定义对象转换为字典 """ if hasattr(obj, "__dict__"): # 如果对象有 __dict__ 属性,直接使用 return obj.__dict__ elif isinstance(obj, (list, tuple)): # 如果是列表或元组,递归处理 return [MCPClient.convert_custom_object(item) for item in obj] elif isinstance(obj, dict): # 如果是字典,递归处理值 return {key: MCPClient.convert_custom_object(value) for key, value in obj.items()} else: # 其他类型(如字符串、数字等)直接返回 return obj async def connect_to_server(self, server_script_path: str): """Connect to an MCP server Args: server_script_path: Path to the server script (.py or .js) """ is_python = server_script_path.endswith('.py') is_js = server_script_path.endswith('.js') if not (is_python or is_js): raise ValueError("Server script must be a .py or .js file") command = "python" if is_python else "node" server_params = StdioServerParameters( command=command, args=[server_script_path], env=None ) stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params)) self.stdio, self.write = stdio_transport self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write)) await self.session.initialize() # List available tools response = await self.session.list_tools() tools = response.tools print("\nConnected to server with tools:", [tool.name for tool in tools]) async def process_query(self, query: str) -> str: """Process a query with multi-turn tool calling support""" # Add user query to message history self.messages.append({ "role": "user", "content": query }) # Get available tools if not already set if not self.available_tools: response = await self.session.list_tools() self.available_tools = [{ "type": "function", "function": { "name": tool.name, "description": tool.description, "parameters": tool.inputSchema } } for tool in response.tools] current_response = self.client.chat.completions.create( model=self.model, messages=self.messages, tools=self.available_tools, stream=False ) # Print initial response if exists if current_response.choices[0].message.content: print("\n? AI:", current_response.choices[0].message.content) # 直到下一次交互 AI 没有选择调用工具时退出循环 while current_response.choices[0].message.tool_calls: # AI 一次交互中可能会调用多个工具 for tool_call in current_response.choices[0].message.tool_calls: tool_name = tool_call.function.name try: tool_args = json.loads(tool_call.function.arguments) except json.JSONDecodeError: tool_args = {} print(f"\n? 调用工具 {tool_name}") print(f"? 参数: {tool_args}") # Execute tool call result = await self.session.call_tool(tool_name, tool_args) print(f"\n工具结果: {result}") # Add AI message and tool result to history self.messages.append(current_response.choices[0].message) self.messages.append({ "role": "tool", "tool_call_id": tool_call.id, "content": json.dumps(result.content) }) # Get next response current_response = self.client.chat.completions.create( model=self.model, messages=self.messages, tools=self.available_tools, stream=False ) # Add final response to history self.messages.append(current_response.choices[0].message) return current_response.choices[0].message.content or "" async def chat_loop(self): """Run an interactive chat loop""" print("\nMCP Client Started!") print("Type your queries or 'quit' to exit.") while True: try: query = input("\nCommend: ").strip() if query.lower() == 'quit': break response = await self.process_query(query) print("\n?AI: " + response) except Exception as e: print(f"\nError occurs: {e}") traceback.print_exc() async def cleanup(self): """Clean up resources""" await self.exit_stack.aclose()async def main(): if len(sys.argv) < 2: print("Usage: python client.py <path_to_server_script>") sys.exit(1) client = MCPClient() try: await client.connect_to_server(sys.argv[1]) await client.chat_loop() finally: await client.cleanup()if __name__ == "__main__": import sys asyncio.run(main())
/** * MCP客户端实现 * * 提供与MCP服务器的连接、工具调用和聊天交互功能 * * 主要功能: * 1. 连接Python或JavaScript实现的MCP服务器 * 2. 获取服务器提供的工具列表 * 3. 通过OpenAI API处理用户查询 * 4. 自动处理工具调用链 * 5. 提供交互式命令行界面 * * 使用说明: * 1. 确保设置OPENAI_API_KEY环境变量 * 2. 通过命令行参数指定MCP服务器脚本路径 * 3. 启动后输入查询或'quit'退出 * * 依赖: * - @modelcontextprotocol/sdk: MCP协议SDK * - openai: OpenAI API客户端 * - dotenv: 环境变量加载 */import { Client } from "@modelcontextprotocol/sdk/client/index.js";import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js";import OpenAI from "openai";import type { ChatCompletionMessageParam } from "openai/resources/chat/completions";import type { Tool } from "@modelcontextprotocol/sdk/types.js";import * as dotenv from "dotenv";import * as readline from 'readline';// 加载环境变量配置dotenv.config();/** * MCP客户端类,封装与MCP服务器的交互逻辑 */class MCPClient { private openai: OpenAI; // OpenAI API客户端实例 private client: Client; // MCP协议客户端实例 private messages: ChatCompletionMessageParam[] = [ { role: "system", content: "You are a versatile assistant capable of answering questions, completing tasks, and intelligently invoking specialized tools to deliver optimal results." }, ]; // 聊天消息历史记录,用于维护对话上下文 private availableTools: any[] = []; // 服务器提供的可用工具列表,格式化为OpenAI工具格式 /** * 构造函数,初始化OpenAI和MCP客户端 * * @throws {Error} 如果OPENAI_API_KEY环境变量未设置 * * 初始化过程: * 1. 检查必要的环境变量 * 2. 创建OpenAI客户端实例 * 3. 创建MCP客户端实例 * 4. 初始化消息历史记录 */ constructor() { if (!process.env.OPENAI_API_KEY) { throw new Error("OPENAI_API_KEY环境变量未设置"); } this.openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY, baseURL: process.env.OPENAI_BASE_URL, }); this.client = new Client( { name: "my-mcp-client", version: "1.0.0", }, ); } /** * 连接到MCP服务器 * * @param {string} serverScriptPath - 服务器脚本路径(.py或.js) * @returns {Promise<void>} 连接成功时解析 * @throws {Error} 如果服务器脚本不是.py或.js文件,或连接失败 * * 连接过程: * 1. 检查脚本文件扩展名 * 2. 根据扩展名决定使用python或node执行 * 3. 通过stdio建立连接 * 4. 获取服务器工具列表并转换为OpenAI工具格式 * * 注意事项: * - 服务器脚本必须具有可执行权限 * - 连接成功后会自动获取工具列表 */ async connectToServer(serverScriptPath: string) { const isPython = serverScriptPath.endsWith('.py'); const isJs = serverScriptPath.endsWith('.js'); if (!isPython && !isJs) { throw new Error("Server script must be a .py or .js file"); } const command = isPython ? "python" : "node"; const transport = new StdioClientTransport({ command, args: [serverScriptPath], }); await this.client.connect(transport); // 获取并转换可用工具列表 const tools = (await this.client.listTools()).tools as unknown as Tool[]; this.availableTools = tools.map(tool => ({ type: "function" as const, function: { name: tool.name as string, description: tool.description as string, parameters: { type: "object", properties: tool.inputSchema.properties as Record<string, unknown>, required: tool.inputSchema.required as string[], }, } })); console.log("\n已连接到服务器,可用工具:", tools.map(tool => tool.name)); } /** * 处理工具调用链 * * @param {OpenAI.Chat.Completions.ChatCompletion} response - 初始OpenAI响应,包含工具调用 * @param {ChatCompletionMessageParam[]} messages - 当前消息历史记录 * @returns {Promise<OpenAI.Chat.Completions.ChatCompletion>} 最终OpenAI响应 * * 处理流程: * 1. 检查响应中是否包含工具调用 * 2. 循环处理所有工具调用 * 3. 解析每个工具调用的参数 * 4. 执行工具调用 * 5. 将工具结果添加到消息历史 * 6. 获取下一个OpenAI响应 * * 错误处理: * - 参数解析失败时使用空对象继续执行 * - 工具调用失败会抛出异常 * * 注意事项: * - 此方法会修改传入的messages数组 * - 可能多次调用OpenAI API */ private async toolCalls(response: OpenAI.Chat.Completions.ChatCompletion, messages: ChatCompletionMessageParam[]) { let currentResponse = response; // 直到下一次交互 AI 没有选择调用工具时退出循环 while (currentResponse.choices[0].message.tool_calls) { if (currentResponse.choices[0].message.content) { console.log("\n? AI: tool_calls", JSON.stringify(currentResponse.choices[0].message)); } // AI 一次交互中可能会调用多个工具 for (const toolCall of currentResponse.choices[0].message.tool_calls) { const toolName = toolCall.function.name; const rawArgs = toolCall.function.arguments; let toolArgs; try { console.log(`rawArgs is ===== ${rawArgs}`) toolArgs = "{}" == JSON.parse(rawArgs) ? {} : JSON.parse(rawArgs); if (typeof toolArgs === "string") { toolArgs = JSON.parse(toolArgs); } } catch (error) { console.error('⚠️ 参数解析失败,使用空对象替代'); toolArgs = {}; } console.log(`\n? 调用工具 ${toolName}`); console.log(`? 参数:`, toolArgs); // 调用工具获取结果 const result = await this.client.callTool({ name: toolName, arguments: toolArgs }); console.log(`\n result is ${JSON.stringify(result)}`); // 添加 AI 的响应和工具调用结果到消息历史 // console.log(`? currentResponse.choices[0].message:`, currentResponse.choices[0].message); messages.push(currentResponse.choices[0].message); messages.push({ role: "tool", tool_call_id: toolCall.id, content: JSON.stringify(result.content), } as ChatCompletionMessageParam); } // console.log(`? messages: `, messages); // 获取下一个响应 currentResponse = await this.openai.chat.completions.create({ model: process.env.OPENAI_MODEL as string, messages: messages, tools: this.availableTools, }); } return currentResponse; } /** * 处理用户查询 * * @param {string} query - 用户输入的查询字符串 * @returns {Promise<string>} AI生成的响应内容 * * 处理流程: * 1. 将用户查询添加到消息历史 * 2. 调用OpenAI API获取初始响应 * 3. 如果有工具调用,处理工具调用链 * 4. 返回最终响应内容 * * 错误处理: * - OpenAI API调用失败会抛出异常 * - 工具调用链中的错误会被捕获并记录 * * 注意事项: * - 此方法会更新内部消息历史 * - 可能触发多个工具调用 */ async processQuery(query: string): Promise<string> { // 添加用户查询到消息历史 this.messages.push({ role: "user", content: query, }); // 初始OpenAI API调用 let response = await this.openai.chat.completions.create({ model: process.env.OPENAI_MODEL as string, messages: this.messages, tools: this.availableTools, }); // 打印初始响应 if (response.choices[0].message.content) { console.log("\n? AI:", response.choices[0].message); } // 处理工具调用链 if (response.choices[0].message.tool_calls) { response = await this.toolCalls(response, this.messages); } // 更新消息历史 this.messages.push(response.choices[0].message); return response.choices[0].message.content || ""; } /** * 启动交互式聊天循环 * * @returns {Promise<void>} 当用户退出时解析 * * 功能: * 1. 持续接收用户输入 * 2. 处理用户查询 * 3. 显示AI响应 * 4. 输入'quit'退出 * * 实现细节: * - 使用readline模块实现交互式输入输出 * - 循环处理直到用户输入退出命令 * - 捕获并显示处理过程中的错误 * * 注意事项: * - 此方法是阻塞调用,会一直运行直到用户退出 * - 确保在调用前已连接服务器 */ async chatLoop() { console.log("\nMCP Client Started!"); console.log("Type your queries or 'quit' to exit."); const rl = readline.createInterface({ input: process.stdin, output: process.stdout, }); while (true) { const query = await new Promise<string>((resolve) => { rl.question("\nQuery: ", resolve); }); if (query.toLowerCase() === 'quit') { break; } try { const response = await this.processQuery(query); console.log("\n" + response); } catch (e) { console.error("\nError:", e instanceof Error ? e.message : String(e)); } } rl.close(); } /** * 清理资源 * * @returns {Promise<void>} 资源清理完成后解析 * * 关闭以下资源: * 1. MCP客户端连接 * 2. 任何打开的句柄 * * 最佳实践: * - 应在程序退出前调用 * - 建议在finally块中调用以确保执行 * * 注意事项: * - 多次调用是安全的 * - 清理后实例不可再用 */ async cleanup() { if (this.client) { await this.client.close(); } }}/** * 主函数 * * 程序入口点,执行流程: * 1. 检查命令行参数 * 2. 创建MCP客户端实例 * 3. 连接到指定服务器脚本 * 4. 启动交互式聊天循环 * 5. 退出时清理资源 * * @throws {Error} 如果缺少命令行参数或连接失败 * * 使用示例: * ```bash * node index.js /path/to/server.js * ``` * * 退出码: * - 0: 正常退出 * - 1: 参数错误或运行时错误 */async function main() { if (process.argv.length < 3) { console.log("Usage: node dist/index.js <path_to_server_script>"); process.exit(1); } const client = new MCPClient(); try { await client.connectToServer(process.argv[2]); await client.chatLoop(); } finally { await client.cleanup(); }}main().catch((error) => { console.error("Error:", error); process.exit(1);});
import { Server } from '@modelcontextprotocol/sdk/server/index.js';
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js';
import {
CallToolRequestSchema,
ErrorCode,
ListToolsRequestSchema,
McpError,
} from '@modelcontextprotocol/sdk/types.js';
import axios from 'axios';
interface Product {
id: string;
name: string;
supplier: string;
省略...
}
interface Offer {
productId: string;
title: string;
companyName: string;
省略...
}
interface ApiResponse {
ret: string[];
encode: string;
code: number;
traceId: string;
msg: string;
time: number;
data: {
offers: Offer[];
resultCount: string;
totalCount: number;
};
}
class ProductOffersServer {
private server: Server;
private baseUrl = '换成你要调用的 url';
constructor() {
this.server = new Server(
{
name: 'search-assistant-server',
version: '0.1.0',
},
{
capabilities: {
tools: {},
},
}
);
this.setupToolHandlers();
// Error handling
this.server.onerror = (error) => console.error('[MCP Error]', error);
process.on('SIGINT', async () => {
await this.server.close();
process.exit(0);
});
}
private async fetchOffers(keywords?: string, pageSize?: number): Promise<Product[]> {
try {
const params: Record<string, any> = {};
if (keywords) params.keywords = keywords;
if (pageSize) params.pageSize = pageSize;
const response = await axios.get<ApiResponse>(this.baseUrl, { params });
return response.data.data.offers.map(offer => ({
id: offer.productId,
name: offer.title,
supplier: offer.companyName
省略...
}));
} catch (error) {
if (axios.isAxiosError(error)) {
throw new McpError(
ErrorCode.InternalError,
`Failed to fetch offers: ${error.message}`
);
}
throw error;
}
}
private setupToolHandlers() {
this.server.setRequestHandler(ListToolsRequestSchema, async () => ({
tools: [
{
name: 'get_offers',
description: 'Get product offers from API',
inputSchema: {
type: 'object',
properties: {
keywords: {
type: 'string',
description: 'Keywords to search for products',
default: ''
},
pageSize: {
type: 'number',
description: 'Number of items per page',
minimum: 1,
maximum: 100,
default: 10
}
}
}
}
]
}));
this.server.setRequestHandler(CallToolRequestSchema, async (request) => {
if (request.params.name !== 'get_offers') {
throw new McpError(
ErrorCode.MethodNotFound,
`Unknown tool: ${request.params.name}`
);
}
const args = request.params.arguments as { keywords?: string; pageSize?: number };
try {
const products = await this.fetchOffers(args.keywords, args.pageSize);
return {
content: [
{
type: 'text',
text: JSON.stringify({
products: products,
totalCount: products.length
}, null, 2)
}
]
};
} catch (error) {
if (error instanceof McpError) {
throw error;
}
throw new McpError(
ErrorCode.InternalError,
`Failed to fetch offers: ${error}`
);
}
});
}
async run() {
const transport = new StdioServerTransport();
await this.server.connect(transport);
console.error('Product Offers MCP server running on stdio');
}
}
const server = new ProductOffersServer();
server.run().catch(console.error);
SelectDB 实现日志高效存储与实时分析
企业级日志数据具有数据量巨大、写入和查询速度快、结构多样的特点,本方案基于阿里云云数据库 SelectDB 版构建高性能、低成本、分析能力强大的日志存储与分析解决方案,覆盖运维监控、安全审计、业务分析等场景,并通过智能索引与分级存储实现数据亚秒级检索。
53AI,企业落地大模型首选服务商
产品:场景落地咨询+大模型应用平台+行业解决方案
承诺:免费场景POC验证,效果验证后签署服务协议。零风险落地应用大模型,已交付160+中大型企业
2025-04-16
A2A协议:打破Agent“孤岛”,开启智能协作新时代
2025-04-16
GitMCP: 让AI助手拥抱开源世界的桥梁
2025-04-16
解放双手!Index:顶尖开源AI浏览器代理,复杂网页任务一键搞定 (Claude 3.7 驱动)
2025-04-16
CortexON:开源通用AI Agent,又一替代Manus产品
2025-04-16
AI涌现人类情感!希腊「乐之神」Orpheus开源,单卡可跑语音流式推理
2025-04-16
Second Me 重磅升级:全平台 Docker 支持,Mac/Windows/Linux 全覆盖!
2025-04-15
Spring AI 1.0.0 M7 发布!很炸裂!!
2025-04-15
OpenManus初体验,整合千问大模型Qwen/QwQ-32B
2025-01-01
2024-07-25
2025-01-21
2024-05-06
2024-09-20
2024-07-20
2024-06-12
2024-07-11
2024-08-13
2024-12-26
2025-04-15
2025-04-13
2025-04-10
2025-04-07
2025-04-03
2025-04-03
2025-04-03
2025-04-01