微信扫码
添加专属顾问
我要投稿
100行代码揭示MCP原理,深入了解AI应用集成方案。 核心内容: 1. MCP核心原理及代码实现 2. MCP通信方式:STDIO与SSE 3. MCP的SSE通信流程及实现
本文通过100行代码看到MCP的核心原理并不复杂,但它的设计巧妙深入理解使我们能够超越简单的SDK使用,创建更强大、更灵活的AI应用集成方案。
当我开始研究 Model Context Protocol (MCP)接入的时候,发现一个问题,绝大多数的文档都是以 @mcp.tool
这样注解的方式注入。但如果当前有很多异步的业务流程,接入会非常麻烦,它并没有一个代码实体的存在可以加注解。难道需要为一个个流程编写同步函数吗?
好奇心驱使我进一步分析MCP的通信原理,看看是不是有什么办法能更方便地接入MCP,理解MCP的原理。
MCP的通信方式
MCP提供了STDIO和SSE两种传输协议,当前很多实验性的工具都是使用STDIO传输。不过如果提供服务的话,基本就是SSE(Server-Sent Events)。所以本文重点分析讨论SSE的MCP接入模式。
在搜索SSE的时候,看到了阮一峰老师在2017年对于SSE的特点归纳:
SSE 与 WebSocket 作用相似,都是建立浏览器与服务器之间的通信渠道,然后服务器向浏览器推送信息。
总体来说,WebSocket 更强大和灵活。因为它是全双工通道,可以双向通信;SSE 是单向通道,只能服务器向浏览器发送,因为流信息本质上就是下载。如果浏览器向服务器发送信息,就变成了另一次 HTTP 请求。
这个特点让我更加好奇了,stdio中可以使用stdin来进行输入,使用stdout来进行输出。但是SSE是单向通道,MCP要如何实现双向通信呢?是建立两根SSE通道吗?带着这个疑问,我开始了动手实践。
MCP的SSE通信流程
利用MCP官方提供的工具npx @modelcontextprotocol/inspector
可以比较方便地拉起一个验证MCP的管理页。针对这个管理页抓包就能发现一些SSE的通信端倪。
1./sse这个URL只负责推送信息,并不能发送信息,发送信息需要另外的URL。
2.Client连接上/sse
这个地址的第一个Event就是告诉Client发送信息需要去哪个URL发,这个URL通常会带上唯一的会话ID。
观察这个抓包情况,我们前面的双向通信疑问基本可以有答案了:
1.只有一根SSE长连接,用来Server向Client推送数据,另外一个Client向Server发送请求的通道是使用普通的HTTP POST请求。
2.Client向Server发送的HTTP POST请求中只使用2xx反馈是否收到指令,所有的数据返回是通过一开始的SSE长连接来推送。
为了验证这个猜想,我还特地做一个实验,使用curl模拟了POST/messsage?sessionId=***
发送一个请求包,果不其然在SSE的事件流中多了一条事件。
MCP的SSE通信实现
通过上一个章节的抓包,我们基本摸清了MCP的SSE通信流程:
1./sse URL建立SSE长链之后先返回一个endpoint
(常见为/message
),数据格式为纯文本的同域名URL字符串。
2.client使用POST向endpoint(/message)
发送调用请求,POST中的body满足JSON-RPC规范,包含字段 jsonrpc
、method
、params
、id
。
3.在 /sse
长连接中返回的event满足JSON-RPC规范,包含字段jsonrpc
、result
、id
、error(执行错误时)
。
看起好像并不复杂,我们尝试用Python来实现一下(不使用MCP Python SDK)。
from fastapi import FastAPI, Request
import uuid
from sse_starlette.sse import EventSourceResponse
from pydantic import BaseModel
import json
app = FastAPI()
mcpHub = {}
class McpRequest(BaseModel):
id: Optional[int] = None
jsonrpc: str
method: str
params: Optional[dict] = None
class MCPServer:
def __init__(self):
self.queue = asyncio.Queue()
async def reader(self):
while True:
event = await self.queue.get()
yield event
async def request(self, payload: McpRequest):
if payload.method == "initialize":
await self.queue.put({"event": "message", "data": ..})
elif payload.method == "tools/list":
...
async def sse():
client_id = str(uuid.uuid4())
mcp = MCPServer()
mcpHub[client_id] = mcp
await mcp.queue.put({"event": "endpoint", "data": f"/message?client_id={client_id}"})
return EventSourceResponse(mcp.reader())
async def message(request: Request, payload: McpRequest):
client_id = request.query_params.get("client_id")
if client_id not in mcpHub:
return "no client"
await mcpHub[client_id].request(payload)
return "ok"
在这段代码中,我们引入了这样几个设计:
1.我们使用了asyncio.Queue()
来解耦业务流和MCP服务流。这个消息队列联动EventSourceResponse
的数据流。每往这个消息队列打一个消息,就会自动通过EventSourceResponse
的数据流向Client推送一条消息。这样Client在Server侧看起来就是一个标准的订阅MQ的消费者。
2.在内存中维护一个client_id
映射消息队列的字典,这样一旦有消息进入就可以知晓使用的是哪个MQ,然后往对应的MQ里面投递消息。在分布式系统中,这个client_id
可以是消息队列的全局唯一标识,这样无论打到哪台机器上,都能够找到正确的队列。
3.服务侧在处理之后,将消息投递回消息队列之后,Client就能感知。MCPServer和MCPClient保持长链之后,后方的业务系统侧理论上可以进行无限时长执行(如果Client侧不主动超时退出),一切均以消息投递回来为准。
我们可以参考文档来看看有哪些method
需要被支持:
MCP的订阅模式扩展思考
在MCP的resource的method中,有个不起眼的resources/subcribe
引起了我的注意。首先。我们来看看什么是resource
,官方给出的定义是:
Resources represent any kind of data that an MCP server wants to make available to clients. This can include:File contents、Database records、API responses、Live system data、Screenshots and images、Log files、And more
所以,如果我们使用resources/subcribe
订阅一个Database
,那么这个数据库的所有变动就会源源不断地推送过来,这就非常近似流计算的常见使用形态了。
因为SSE已经让Server建立向Client的单向数据流,所以如果Client发起一个订阅,我们就创建一个Flink流计算任务向MQ打消息,就非常原生地实现了资源的订阅。我们可以扩展一下上面的这个拓扑结构。
1.从大模型视角看流计算:基于MCP协议,大模型实际上能够非常优雅地接入流计算的能力,来完成复杂业务逻辑构建。
2.从流计算视角看大模型:使用MCP协议之后,大模型似乎就变成了一个标准流计算处理节点,能够接收流式消息,也能给向另外的MQ投递消息。
不得不说,这个确实就是MCP设计上的一个优势。感觉MCP有点像RPC,又有点像MQ,那么这到底是什么呢?我们不妨从编程模型的角度来思考一下。
MCP的编程模型思考
MCP从编程模型的角度来看,本质上是一种有状态的双向RPC(远程过程调用)模型,结合了事件驱动和请求-响应的特性。这种混合模式使其在AI应用与外部系统集成方面具有独特优势。
MCP的核心特征包括:
1.有状态会话:与传统无状态REST API不同,MCP维护会话状态,客户端和服务器之间建立长期连接,会话具有明确的生命周期。
2.双向通信:不仅客户端可以调用服务器(传统RPC模式),服务器也可以调用客户端(反向RPC)。例如,服务器可以请求客户端执行AI采样。
3.基于能力的协商:初始化阶段进行能力协商,动态发现可用功能,适应不同实现和版本。
4.事件通知机制:支持单向通知,资源变更订阅模式,异步事件处理。
5.标准化接口:定义了一组标准操作,使用JSON Schema定义参数和返回值,促进互操作性。
为了更好地理解MCP的定位,我们可以将其与其他常见的编程模型进行比较:
MCP vs REST API
MCP vs 消息队列(MQ)
MCP vs WebSocket
MCP在各种编程模型中占据了一个独特的位置:
比REST API更有状态和双向,但比消息队列更直接和轻量。
比WebSocket更结构化和标准化,但比gRPC更灵活和易于理解。
比GraphQL更专注于工具调用,但比RPC更关注资源和上下文。
同时,正因为MCP这样的一个独特的功能位,不要因为当前的一些能力局限性,就放弃了MCP的原生化的适配。异步任务、事件驱动等架构本身就应该能够原生对接MCP。
MCP服务的简单实现
既然MCP的整个运行原理并不复杂,我们就尝试自己实现一次,致敬一下这个优秀的设计。
from fastapi import FastAPI, Request
from sse_starlette.sse import EventSourceResponse
import asyncio
import json
import uuid
from pydantic import BaseModel
from typing import Optional
import uvicorn
import inspect
app = FastAPI()
mcpHub = {}
class McpRequest(BaseModel):
id: Optional[int] = None
jsonrpc: str
method: str
params: Optional[dict] = None
class MCPServer:
def __init__(self, name, message_path, tools):
self.queue = asyncio.Queue()
self.client_id = str(uuid.uuid4())
self.message_path = message_path
self.info = {
"protocolVersion": "2024-11-05",
"capabilities": {
"experimental": {},
"tools": {
"listChanged": False
}
},
"serverInfo": {
"name": name,
"version": "1.6.0"
}
}
self.tools = tools
def list_tool(self):
result = []
for tool in self.tools:
toolInfo = {
"name": tool.__name__,
"description": tool.__doc__,
"inputSchema": {"type": "object","properties":{}},
}
for name, param in inspect.signature(tool).parameters.items():
toolInfo["inputSchema"]["properties"][name] = {
"title": name,
"type": "string",
}
result.append(toolInfo)
return result
async def reader(self):
while True:
event = await self.queue.get()
yield event
@staticmethod
def response(result, id):
message = {
"jsonrpc": "2.0",
"result": result,
}
if id is not None:
message["id"] = id
return json.dumps(message)
async def request(self, req: McpRequest):
if req.method == "initialize":
await self.queue.put({"event": "message", "data": self.response(self.info, req.id)})
elif req.method == "tools/list":
await self.queue.put({"event": "message", "data": self.response({"tools": self.list_tool()}, req.id)})
elif req.method == "tools/call":
for tool in self.tools:
if tool.__name__ == req.params.get("name"):
result = await tool(**req.params["arguments"])
await self.queue.put({"event": "message", "data": self.response({"content": result, "isError": False}, req.id)})
break
async def test(state=None):
"""
description
"""
result = f"hi {state}"
await asyncio.sleep(1)
result += "!"
return result
async def receive_test():
mcp = MCPServer(name="mcp-test",message_path="/send_test", tools=[test])
mcpHub[mcp.client_id] = mcp
await mcp.queue.put({"event": "endpoint", "data": f"{mcp.message_path}?client_id={mcp.client_id}"})
return EventSourceResponse(mcp.reader())
async def send_test(request: Request, payload: McpRequest):
client_id = request.query_params.get("client_id")
if client_id not in mcpHub:
return "no client"
await mcpHub[client_id].request(payload)
return "ok"
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8001)
如上大概100行左右的代码,我们实现了一个简易版本的MCP服务,较官方的MCP Python SDK,我们获得了几个重要的特性优化:
1.tool注册不再依赖@mcp.tool
这样的注解,完全可以动态传入,针对不同的场景,提供不同MCP URL,上面提供不同的Tool。
2.编程模型为MQ驱动的服务,对接异步系统、事件驱动的系统或平台较为友好。参考该Python实现,转化成其他语言的版本也较为方便。
3.不依赖 /sse /message 这些默认路由地址,也能正常运行,证明MCP的URL可以完全自定义。
总结:理解MCP的本质
本文深入探讨MCP的原理、通信机制和编程模型本质之后,我们看到MCP不仅仅是一个简单的API或SDK,而是一个精心设计的协议,它:
1.采用client-host-server架构,支持多种服务器连接;
2.实现了有状态的双向RPC模型,结合了事件驱动特性;
3.提供了标准化的工具调用和资源访问机制;
4.支持动态能力协商和功能发现;
5.相比较MQ、API、WS,占据了独特的功能位置,专为AI应用与外部系统集成而优化;
正如我们通过100行代码看到的,MCP的核心原理并不复杂,但它的设计巧妙,这种深入理解将使我们能够超越简单的SDK使用,创建更强大、更灵活的AI应用集成方案。
53AI,企业落地大模型首选服务商
产品:场景落地咨询+大模型应用平台+行业解决方案
承诺:免费场景POC验证,效果验证后签署服务协议。零风险落地应用大模型,已交付160+中大型企业
2025-04-19
Exa:给 AI Agent 的 “Bing API”
2025-04-19
Chrome过时了,AI Agent需要自己的浏览器。
2025-04-19
n8n + mcp王炸组合:5个节点轻松搭建一个AI工作流
2025-04-18
火山引擎OS Agent解决方案、豆包1.5·UI-TARS模型发布
2025-04-18
前沿导读 | 基于大模型智能体的出行行为模拟
2025-04-18
大概念模型(Large Concept Models)会取代提示工程吗?
2025-04-18
AI 重要概念科普与热门技术解析
2025-04-18
我对于AI领域商业模式的思考
2024-08-13
2024-06-13
2024-08-21
2024-09-23
2024-07-31
2024-05-28
2024-08-04
2024-04-26
2024-07-09
2024-09-17
2025-04-18
2025-04-16
2025-04-13
2025-04-13
2025-04-13
2025-04-12
2025-04-12
2025-04-11