理解MCP中的 根资源(Roots)和通讯机制

根资源(Roots) 是一种 URI(统一资源标识符),用于客户端向服务器建议需要关注的资源范围。当客户端连接到服务器时,会声明服务器应该关注哪些根资源(Roots)。虽然根资源(Roots)主要用于文件系统路径,但它们也可以是任何有效的 URI,包括 HTTP URL。 传输层(Transports)是 MCP(Model Context Protocol)中实现客户端与服务器通信的基础。传输层负责处理消息的发送和接收机制。

理解MCP中的 根资源(Roots)和通讯机制

什么是 根资源(Roots)?

根资源(Roots) 是一种 URI(统一资源标识符),用于客户端向服务器建议需要关注的资源范围。当客户端连接到服务器时,会声明服务器应该关注哪些根资源(Roots)。虽然根资源(Roots)主要用于文件系统路径,但它们也可以是任何有效的 URI,包括 HTTP URL。

例如,根资源(Roots) 可以是:

file:///home/user/projects/myapp
https://api.example.com/v1

为什么使用根资源(Roots)?

根资源(Roots) 有以下重要作用:

  1. 指引性:为服务器提供相关资源和位置的指引。
  2. 清晰性:明确哪些资源属于工作空间的一部分。
  3. 组织性:通过多个 Roots 同时管理不同的资源。

根资源(Roots)的工作原理

当客户端支持根资源(Roots) 时:

  1. 在连接过程中声明 roots 能力。
  2. 向服务器提供建议的 Roots 列表。
  3. (若支持)当 Roots 发生变化时通知服务器。

虽然根资源(Roots)只是提供信息,并不强制执行,但服务器应当:

  1. 尊重客户端提供的根资源(Roots)。
  2. 使用根资源(Roots)的 URI 定位和访问资源。
  3. 优先在根资源(Roots)边界内执行操作。

常见用例

根资源(Roots) 通常用于定义:

  • 项目目录
  • 代码库位置
  • API 端点
  • 配置文件位置
  • 资源边界

最佳实践

在使用根资源(Roots) 时:

  1. 仅建议必要的资源。
  2. 为根资源(Roots)使用清晰、描述性的名称。
  3. 监控根资源的可访问性。
  4. 优雅地处理根资源的变化。

示例

以下是一个典型的 MCP 客户端如何暴露根资源(Roots)的示例:

{
  "roots": [
    {
      "uri": "file:///home/user/projects/frontend",
      "name": "前端代码库"
    },
    {
      "uri": "https://api.example.com/v1",
      "name": "API 端点"
    }
  ]
}

该配置建议服务器同时关注本地代码库和一个 API 端点,并保持它们在逻辑上的独立性。

MCP 的通信机制

传输层(Transports)是 MCP(Model Context Protocol)中实现客户端与服务器通信的基础。传输层负责处理消息的发送和接收机制。

消息格式

MCP 使用 JSON-RPC 2.0 作为其传输格式。传输层负责将 MCP 协议消息转换为 JSON-RPC 格式进行传输,并将接收到的 JSON-RPC 消息转换回 MCP 协议消息。

JSON-RPC 消息分为以下三种类型:

请求(Requests)

{
  jsonrpc: "2.0",
  id: number | string,
  method: string,
  params?: object
}

响应(Responses)

{
  jsonrpc: "2.0",
  id: number | string,
  result?: object,
  error?: {
    code: number,
    message: string,
    data?: unknown
  }
}

通知(Notifications)

{
  jsonrpc: "2.0",
  method: string,
  params?: object
}

内置传输类型

MCP 提供了两种标准传输实现:

标准输入/输出(stdio)

stdio 传输通过标准输入和输出流实现通信。这种方式特别适合本地集成和命令行工具。

适用场景:

  • 构建命令行工具
  • 实现本地集成
  • 简化进程通信
  • 与 shell 脚本结合使用

TypeScript 示例(服务器)

const server = new Server({
  name: "example-server",
  version: "1.0.0"
}, {
  capabilities: {}
});

const transport = new StdioServerTransport();
await server.connect(transport);

TypeScript 示例(客户端)

const client = new Client({
  name: "example-client",
  version: "1.0.0"
}, {
  capabilities: {}
});

const transport = new StdioClientTransport({
  command: "./server",
  args: ["--option", "value"]
});
await client.connect(transport);

Python 示例(服务器)

app = Server("example-server")

async with stdio_server() as streams:
    await app.run(
        streams[0],
        streams[1],
        app.create_initialization_options()
    )

Python 示例(客户端)

params = StdioServerParameters(
    command="./server",
    args=["--option", "value"]
)

async with stdio_client(params) as streams:
    async with ClientSession(streams[0], streams[1]) as session:
        await session.initialize()

服务器发送事件(SSE)

SSE 传输通过 HTTP POST 请求实现客户端到服务器的通信,并支持服务器到客户端的流式更新。

适用场景:

  • 仅需要服务器到客户端流式更新
  • 需要适应受限的网络环境
  • 实现简单更新机制

TypeScript 示例(服务器)

const server = new Server({
  name: "example-server",
  version: "1.0.0"
}, {
  capabilities: {}
});

const transport = new SSEServerTransport("/message", response);
await server.connect(transport);

TypeScript 示例(客户端)

const client = new Client({
  name: "example-client",
  version: "1.0.0"
}, {
  capabilities: {}
});

const transport = new SSEClientTransport(
  new URL("http://localhost:3000/sse")
);
await client.connect(transport);

Python 示例(服务器)

from mcp.server.sse import SseServerTransport
from starlette.applications import Starlette
from starlette.routing import Route

app = Server("example-server")
sse = SseServerTransport("/messages")

async def handle_sse(scope, receive, send):
    async with sse.connect_sse(scope, receive, send) as streams:
        await app.run(streams[0], streams[1], app.create_initialization_options())

async def handle_messages(scope, receive, send):
    await sse.handle_post_message(scope, receive, send)

starlette_app = Starlette(
    routes=[
        Route("/sse", endpoint=handle_sse),
        Route("/messages", endpoint=handle_messages, methods=["POST"]),
    ]
)

Python 示例(客户端)

async with sse_client("http://localhost:8000/sse") as streams:
    async with ClientSession(streams[0], streams[1]) as session:
        await session.initialize()

自定义传输

MCP 支持为特定需求实现自定义传输。任何传输实现都需要遵循 Transport 接口:

适用场景:

  • 定制网络协议
  • 专用通信通道
  • 与现有系统集成
  • 优化性能

TypeScript 示例

interface Transport {
  start(): Promise<void>; // 开始处理消息
  send(message: JSONRPCMessage): Promise<void>; // 发送 JSON-RPC 消息
  close(): Promise<void>; // 关闭连接

  // 回调
  onclose?: () => void;
  onerror?: (error: Error) => void;
  onmessage?: (message: JSONRPCMessage) => void;
}

Python 示例

建议使用 anyio 兼容性更强的库来实现传输层:

@contextmanager
async def create_transport(
    read_stream: MemoryObjectReceiveStream[JSONRPCMessage | Exception],
    write_stream: MemoryObjectSendStream[JSONRPCMessage]
):
    async with anyio.create_task_group() as tg:
        try:
            tg.start_soon(lambda: process_messages(read_stream))
            async with write_stream:
                yield write_stream
        finally:
            tg.cancel_scope.cancel()
            await write_stream.aclose()
            await read_stream.aclose()

错误处理

传输实现需要处理各种错误场景:

  1. 连接错误
  2. 消息解析错误
  3. 协议错误
  4. 网络超时
  5. 资源清理

示例

TypeScript

class ExampleTransport implements Transport {
  async start() {
    try {
      // 连接逻辑
    } catch (error) {
      this.onerror?.(new Error(`连接失败: ${error}`));
      throw error;
    }
  }

  async send(message: JSONRPCMessage) {
    try {
      // 消息发送逻辑
    } catch (error) {
      this.onerror?.(new Error(`消息发送失败: ${error}`));
      throw error;
    }
  }
}

Python

async def example_transport(scope, receive, send):
    try:
        # 创建流
        read_stream_writer, read_stream = anyio.create_memory_object_stream(0)
        write_stream, write_stream_reader = anyio.create_memory_object_stream(0)

        async def message_handler():
            async with read_stream_writer:
                pass

        async with anyio.create_task_group() as tg:
            tg.start_soon(message_handler)
            yield read_stream, write_stream
    except Exception as exc:
        logger.error(f"传输错误: {exc}")
        raise exc

最佳实践

  1. 正确处理连接生命周期
  2. 实现全面的错误处理
  3. 在连接关闭时清理资源
  4. 使用合适的超时机制
  5. 在发送前验证消息
  6. 记录传输事件以便调试
  7. 实现重连逻辑(如适用)
  8. 在消息队列中处理背压问题
  9. 监控连接状态
  10. 实现必要的安全措施

安全注意事项

身份验证与授权

  • 实现身份验证机制
  • 验证客户端凭据
  • 安全地处理令牌
  • 执行授权检查

数据安全

  • 使用 TLS 加密传输
  • 加密敏感数据
  • 验证消息完整性
  • 实施消息大小限制
  • 清理输入数据

网络安全

  • 实现速率限制
  • 使用适当的超时机制
  • 处理拒绝服务攻击场景
  • 监控异常通信模式
  • 配置合适的防火墙规则

调试传输问题

  1. 启用调试日志
  2. 监控消息流
  3. 检查连接状态
  4. 验证消息格式
  5. 测试错误场景
  6. 使用网络分析工具
  7. 实现健康检查
  8. 监控资源使用
  9. 测试边界条件
  10. 使用适当的错误跟踪工具