支持私有化部署
AI知识库

53AI知识库

学习大模型的前沿技术与行业应用场景


AI Agent的利器——MCP的java server实现原理

发布日期:2025-03-21 05:49:41 浏览次数: 2391 作者:海燕技术栈
推荐语

深入解析MCP技术在Java服务器中的应用

核心内容:
1. McpServer整体设计及其两种模式
2. 异步与同步规范的构建器模式实现
3. McpServer的主要功能:服务器信息配置

杨芳贤
53A创始人/腾讯云(TVP)最具价值专家

让我分析一下 McpServer 的实现

1. 整体设计

McpServer 是一个工厂接口,用于创建 MCP 服务器实例,支持两种模式:

public interface McpServer {
    // 1. 同步服务器构建器
    static SyncSpec sync(ServerMcpTransport transport)
    
    // 2. 异步服务器构建器
    static AsyncSpec async(ServerMcpTransport transport)
}

2. 构建器模式实现

AsyncSpec (异步规范)

class AsyncSpec {
    private final ServerMcpTransport transport;
    private McpSchema.Implementation serverInfo;
    private McpSchema.ServerCapabilities serverCapabilities;
    
    // 功能注册容器
    private final List<AsyncToolRegistration> tools;
    private final Map<String, AsyncResourceRegistration> resources;
    private final List<ResourceTemplate> resourceTemplates;
    private final Map<String, AsyncPromptRegistration> prompts;
}

SyncSpec (同步规范)

class SyncSpec {
    private final ServerMcpTransport transport;
    private McpSchema.Implementation serverInfo;
    private McpSchema.ServerCapabilities serverCapabilities;
    
    // 功能注册容器
    private final List<SyncToolRegistration> tools;
    private final Map<String, SyncResourceRegistration> resources;
    private final List<ResourceTemplate> resourceTemplates;
    private final Map<String, SyncPromptRegistration> prompts;
}

3. 主要功能

1. 服务器信息配置

// 异步配置
AsyncSpec serverInfo(String name, String version) {
    this.serverInfo = new McpSchema.Implementation(name, version);
    return this;
}

// 同步配置
SyncSpec serverInfo(String name, String version) {
    this.serverInfo = new McpSchema.Implementation(name, version);
    return this;
}

2. 工具注册

// 异步工具
AsyncSpec tool(McpSchema.Tool tool, 
    Function<Map<String, Object>, Mono<CallToolResult>> handler)
 
{
    this.tools.add(new AsyncToolRegistration(tool, handler));
    return this;
}

// 同步工具
SyncSpec tool(McpSchema.Tool tool, 
    Function<Map<String, Object>, CallToolResult> handler)
 
{
    this.tools.add(new SyncToolRegistration(tool, handler));
    return this;
}

3. 资源管理

// 异步资源
AsyncSpec resources(Map<String, AsyncResourceRegistration> resources) {
    this.resources.putAll(resources);
    return this;
}

// 同步资源
SyncSpec resources(Map<String, SyncResourceRegistration> resources) {
    this.resources.putAll(resources);
    return this;
}

4. 提示模板

// 异步提示
AsyncSpec prompts(AsyncPromptRegistration... prompts) {
    for (AsyncPromptRegistration prompt : prompts) {
        this.prompts.put(prompt.prompt().name(), prompt);
    }
    return this;
}

// 同步提示
SyncSpec prompts(SyncPromptRegistration... prompts) {
    for (SyncPromptRegistration prompt : prompts) {
        this.prompts.put(prompt.prompt().name(), prompt);
    }
    return this;
}

4. 使用示例

异步服务器

McpServer.async(transport)
    .serverInfo("async-server""1.0.0")
    .capabilities(new ServerCapabilities(...))
    // 注册工具
    .tool(new Tool("calculator""计算器", schema),
          args -> Mono.just(new CallToolResult(calculate(args))))
    // 注册资源
    .resources(Map.of(
        "file"new AsyncResourceRegistration(fileResource, 
            req -> Mono.just(new ReadResourceResult(readFile(req))))
    ))
    // 注册提示
    .prompts(new AsyncPromptRegistration(prompt, 
        req -> Mono.just(new GetPromptResult(getPrompt(req)))))
    .build();

同步服务器

McpServer.sync(transport)
    .serverInfo("sync-server""1.0.0")
    .capabilities(new ServerCapabilities(...))
    // 注册工具
    .tool(new Tool("calculator""计算器", schema),
          args -> new CallToolResult(calculate(args)))
    // 注册资源
    .resources(Map.of(
        "file"new SyncResourceRegistration(fileResource, 
            req -> new ReadResourceResult(readFile(req)))
    ))
    // 注册提示
    .prompts(new SyncPromptRegistration(prompt, 
        req -> new GetPromptResult(getPrompt(req))))
    .build();

5. 主要特点

  1. 双模式支持:
  • 同步模式:适合简单场景
  • 异步模式:适合高并发场景
  • 流式构建器:
    • 链式调用
    • 参数验证
    • 类型安全
  • 功能模块化:
    • 工具注册
    • 资源管理
    • 提示模板
    • 根目录变更通知
  • 扩展性设计:
    • 支持自定义传输层
    • 可扩展的功能注册
    • 灵活的配置选项
  • 错误处理:
    • 参数验证
    • 异常传播
    • 资源管理

    这个设计提供了一个灵活且强大的 MCP 服务器构建框架,可以根据不同需求选择同步或异步模式,并通过构建器模式提供清晰的配置接口。

    McpAsyncServer

    让我详细分析 McpAsyncServer 的实现原理和用法:

    1. 基本架构

    public class McpAsyncServer {
        private final DefaultMcpSession mcpSession;        // MCP 会话管理
        private final ServerMcpTransport transport;        // 传输层
        private final ServerCapabilities serverCapabilities; // 服务器能力
        
        // 线程安全的功能注册容器
        private final CopyOnWriteArrayList<AsyncToolRegistration> tools;
        private final ConcurrentHashMap<String, AsyncResourceRegistration> resources;
        private final ConcurrentHashMap<String, AsyncPromptRegistration> prompts;
    }

    2. 主要功能模块

    2.1 初始化处理

    private DefaultMcpSession.RequestHandler<InitializeResult> asyncInitializeRequestHandler() {
        return params -> {
            // 1. 解析初始化请求
            InitializeRequest request = transport.unmarshalFrom(params, 
                new TypeReference<InitializeRequest>() {});
                
            // 2. 保存客户端信息
            this.clientCapabilities = request.capabilities();
            this.clientInfo = request.clientInfo();
            
            // 3. 协议版本协商
            String serverVersion = negotiateProtocolVersion(request.protocolVersion());
            
            // 4. 返回服务器信息
            return Mono.just(new InitializeResult(
                serverVersion,
                this.serverCapabilities,
                this.serverInfo,
                null
            ));
        };
    }

    2.2 工具管理

    // 添加工具
    public Mono<Void> addTool(AsyncToolRegistration toolRegistration) {
        return Mono.defer(() -> {
            // 检查重复
            if (toolExists(toolRegistration.tool().name())) {
                return Mono.error(new McpError("Tool already exists"));
            }
            
            // 添加工具
            tools.add(toolRegistration);
            
            // 通知客户端
            return notifyToolsListChanged();
        });
    }

    // 工具调用处理
    private RequestHandler<CallToolResult> toolsCallRequestHandler() {
        return params -> {
            CallToolRequest request = unmarshalRequest(params);
            return findTool(request.name())
                .map(tool -> tool.call().apply(request.arguments()))
                .orElse(Mono.error(new McpError("Tool not found")));
        };
    }

    2.3 资源管理

    // 添加资源
    public Mono<Void> addResource(AsyncResourceRegistration resource) {
        return Mono.defer(() -> {
            resources.putIfAbsent(resource.resource().uri(), resource);
            return notifyResourcesListChanged();
        });
    }

    // 资源读取处理
    private RequestHandler<ReadResourceResult> resourcesReadRequestHandler() {
        return params -> {
            ReadResourceRequest request = unmarshalRequest(params);
            return Optional.ofNullable(resources.get(request.uri()))
                .map(r -> r.readHandler().apply(request))
                .orElse(Mono.error(new McpError("Resource not found")));
        };
    }

    3. 使用示例

    3.1 创建服务器

    // 1. 创建传输层
    ServerMcpTransport transport = new StdioServerTransport();

    // 2. 配置服务器
    McpAsyncServer server = McpServer.async(transport)
        .serverInfo("my-server""1.0.0")
        .capabilities(new ServerCapabilities(...))
        // 注册工具
        .tool(new Tool("calculator""计算器", schema),
              args -> Mono.just(new CallToolResult(calculate(args))))
        // 注册资源
        .resources(new AsyncResourceRegistration(
            new Resource("file://data"),
            req -> Mono.just(new ReadResourceResult(readFile(req)))
        ))
        .build();

    // 3. 启动服务器
    server.initialize().subscribe();

    3.2 动态管理功能

    // 添加新工具
    server.addTool(new AsyncToolRegistration(
        new Tool("newTool""新工具", schema),
        args -> Mono.just(new CallToolResult(process(args)))
    )).subscribe();

    // 移除工具
    server.removeTool("oldTool").subscribe();

    // 添加资源
    server.addResource(new AsyncResourceRegistration(
        new Resource("db://users"),
        req -> Mono.just(new ReadResourceResult(queryDb(req)))
    )).subscribe();

    3.3 日志管理

    // 发送日志通知
    server.loggingNotification(new LoggingMessageNotification(
        LoggingLevel.INFO,
        "操作完成"
    )).subscribe();

    4. 核心特点

    1. 异步响应式设计:
    • 基于 Project Reactor
    • 非阻塞操作
    • 响应式流处理
  • 线程安全:
    • 使用 CopyOnWriteArrayList
    • 使用 ConcurrentHashMap
    • 原子操作支持
  • 可扩展性:
    • 动态工具注册
    • 动态资源管理
    • 灵活的提示模板
  • 事件通知:
    • 工具列表变更
    • 资源列表变更
    • 日志消息通知
  • 错误处理:
    • 统一错误模型
    • 异常传播
    • 优雅降级

    5. 最佳实践

    1. 初始化顺序:
    // 1. 配置服务器
    McpAsyncServer server = configureServer();

    // 2. 注册核心功能
    registerCoreFeatures(server);

    // 3. 启动服务器
    server.initialize()
        .doOnSuccess(() -> log.info("Server started"))
        .doOnError(e -> log.error("Start failed", e))
        .subscribe();
    1. 错误处理:
    server.addTool(toolRegistration)
        .doOnSuccess(() -> log.info("Tool added"))
        .doOnError(McpError.classe -> log.warn("Tool error", e))
        .doOnError(Exception.classe -> log.error("System error", e))
        .onErrorResume(e -> Mono.empty())
        .subscribe()
    ;
    1. 资源管理:
    // 使用 try-with-resources
    try (McpAsyncServer server = createServer()) {
        server.initialize()
            .then(server.addTools())
            .then(server.addResources())
            .block();
    }

    McpAsyncServer 提供了一个功能完整的异步 MCP 服务器实现,适合构建高性能、可扩展的 AI 工具和资源服务。

    McpSyncServer

    让我分析 McpSyncServer 的实现原理和用法:

    1. 基本设计

    public class McpSyncServer {
        private final McpAsyncServer asyncServer;  // 委托异步服务器
    }

    这是一个典型的装饰器模式实现,将异步服务器包装成同步接口。

    2. 核心功能实现

    2.1 工具管理

    // 添加工具
    public void addTool(SyncToolRegistration toolHandler) {
        // 转换为异步并阻塞等待
        this.asyncServer.addTool(
            AsyncToolRegistration.fromSync(toolHandler)
        ).block();
    }

    // 移除工具
    public void removeTool(String toolName) {
        this.asyncServer.removeTool(toolName).block();
    }

    2.2 资源管理

    // 添加资源
    public void addResource(SyncResourceRegistration resourceHandler) {
        this.asyncServer.addResource(
            AsyncResourceRegistration.fromSync(resourceHandler)
        ).block();
    }

    // 移除资源
    public void removeResource(String resourceUri) {
        this.asyncServer.removeResource(resourceUri).block();
    }

    2.3 提示管理

    // 添加提示
    public void addPrompt(SyncPromptRegistration promptRegistration) {
        this.asyncServer.addPrompt(
            AsyncPromptRegistration.fromSync(promptRegistration)
        ).block();
    }

    // 移除提示
    public void removePrompt(String promptName) {
        this.asyncServer.removePrompt(promptName).block();
    }

    3. 使用示例

    3.1 创建服务器

    // 1. 创建同步服务器
    McpSyncServer server = McpServer.sync(transport)
        .serverInfo("my-server""1.0.0")
        .capabilities(new ServerCapabilities(...))
        // 注册工具
        .tool(new Tool("calculator""计算器", schema),
              args -> new CallToolResult(calculate(args)))
        // 注册资源
        .resource(new Resource("file://data"),
                  req -> new ReadResourceResult(readFile(req)))
        .build();

    3.2 功能管理

    // 添加工具
    server.addTool(new SyncToolRegistration(
        new Tool("newTool""新工具", schema),
        args -> new CallToolResult(process(args))
    ));

    // 添加资源
    server.addResource(new SyncResourceRegistration(
        new Resource("db://users"),
        req -> new ReadResourceResult(queryDb(req))
    ));

    // 发送通知
    server.notifyToolsListChanged();

    3.3 消息创建

    // 创建消息
    CreateMessageResult result = server.createMessage(
        new CreateMessageRequest(
            "prompt",
            Map.of("key""value")
        )
    );

    4. 主要特点

    1. 同步包装:
    // 将异步操作转换为同步
    public void someOperation(Parameters params) {
        asyncServer.someOperation(params).block();
    }
    1. 错误处理:
    try {
        server.addTool(toolRegistration);
    catch (McpError e) {
        // 处理 MCP 错误
    catch (Exception e) {
        // 处理其他错误
    }
    1. 资源管理:
    // 优雅关闭
    public void closeGracefully() {
        this.asyncServer.closeGracefully().block();
    }

    // 立即关闭
    public void close() {
        this.asyncServer.close();
    }
    1. 状态访问:
    // 获取服务器信息
    ServerCapabilities capabilities = server.getServerCapabilities();
    Implementation serverInfo = server.getServerInfo();

    // 获取客户端信息
    ClientCapabilities clientCaps = server.getClientCapabilities();
    Implementation clientInfo = server.getClientInfo();

    5. 最佳实践

    1. 初始化流程:
    // 创建服务器
    McpSyncServer server = createServer();

    try {
        // 添加核心功能
        server.addTool(calculatorTool);
        server.addResource(fileResource);
        
        // 通知变更
        server.notifyToolsListChanged();
        server.notifyResourcesListChanged();
    catch (Exception e) {
        server.close();
        throw e;
    }
    1. 资源清理:
    try (AutoCloseable ignored = () -> server.closeGracefully()) {
        // 使用服务器
        server.addTool(tool);
        server.addResource(resource);
    // 自动关闭
    1. 错误恢复:
    try {
        server.addTool(tool);
    catch (McpError e) {
        // 记录错误
        log.error("Tool registration failed", e);
        // 尝试恢复
        server.notifyToolsListChanged();
    }

    6. 与异步版本的区别

    1. API 风格:
    • 同步版本:直接返回结果
    • 异步版本:返回 Mono/Flux
  • 错误处理:
    • 同步版本:try-catch
    • 异步版本:onError 操作符
  • 资源管理:
    • 同步版本:直接阻塞
    • 异步版本:响应式流
  • 使用场景:
    • 同步版本:简单应用、测试
    • 异步版本:高并发、响应式应用

    McpSyncServer 通过包装 McpAsyncServer,为不需要异步编程的场景提供了更简单的同步 API,同时保持了底层实现的一致性。

    53AI,企业落地大模型首选服务商

    产品:场景落地咨询+大模型应用平台+行业解决方案

    承诺:免费场景POC验证,效果验证后签署服务协议。零风险落地应用大模型,已交付160+中大型企业

    联系我们

    售前咨询
    186 6662 7370
    预约演示
    185 8882 0121

    微信扫码

    添加专属顾问

    回到顶部

    加载中...

    扫码咨询