理解MCP中的 根资源(Roots)和通讯机制
根资源(Roots) 是一种 URI(统一资源标识符),用于客户端向服务器建议需要关注的资源范围。当客户端连接到服务器时,会声明服务器应该关注哪些根资源(Roots)。虽然根资源(Roots)主要用于文件系统路径,但它们也可以是任何有效的 URI,包括 HTTP URL。 传输层(Transports)是 MCP(Model Context Protocol)中实现客户端与服务器通信的基础。传输层负责处理消息的发送和接收机制。

什么是 根资源(Roots)?
根资源(Roots) 是一种 URI(统一资源标识符),用于客户端向服务器建议需要关注的资源范围。当客户端连接到服务器时,会声明服务器应该关注哪些根资源(Roots)。虽然根资源(Roots)主要用于文件系统路径,但它们也可以是任何有效的 URI,包括 HTTP URL。
例如,根资源(Roots) 可以是:
file:///home/user/projects/myapp
https://api.example.com/v1
为什么使用根资源(Roots)?
根资源(Roots) 有以下重要作用:
- 指引性:为服务器提供相关资源和位置的指引。
- 清晰性:明确哪些资源属于工作空间的一部分。
- 组织性:通过多个 Roots 同时管理不同的资源。
根资源(Roots)的工作原理
当客户端支持根资源(Roots) 时:
- 在连接过程中声明
roots
能力。 - 向服务器提供建议的 Roots 列表。
- (若支持)当 Roots 发生变化时通知服务器。
虽然根资源(Roots)只是提供信息,并不强制执行,但服务器应当:
- 尊重客户端提供的根资源(Roots)。
- 使用根资源(Roots)的 URI 定位和访问资源。
- 优先在根资源(Roots)边界内执行操作。
常见用例
根资源(Roots) 通常用于定义:
- 项目目录
- 代码库位置
- API 端点
- 配置文件位置
- 资源边界
最佳实践
在使用根资源(Roots) 时:
- 仅建议必要的资源。
- 为根资源(Roots)使用清晰、描述性的名称。
- 监控根资源的可访问性。
- 优雅地处理根资源的变化。
示例
以下是一个典型的 MCP 客户端如何暴露根资源(Roots)的示例:
{
"roots": [
{
"uri": "file:///home/user/projects/frontend",
"name": "前端代码库"
},
{
"uri": "https://api.example.com/v1",
"name": "API 端点"
}
]
}
该配置建议服务器同时关注本地代码库和一个 API 端点,并保持它们在逻辑上的独立性。
MCP 的通信机制
传输层(Transports)是 MCP(Model Context Protocol)中实现客户端与服务器通信的基础。传输层负责处理消息的发送和接收机制。
消息格式
MCP 使用 JSON-RPC 2.0 作为其传输格式。传输层负责将 MCP 协议消息转换为 JSON-RPC 格式进行传输,并将接收到的 JSON-RPC 消息转换回 MCP 协议消息。
JSON-RPC 消息分为以下三种类型:
请求(Requests)
{
jsonrpc: "2.0",
id: number | string,
method: string,
params?: object
}
响应(Responses)
{
jsonrpc: "2.0",
id: number | string,
result?: object,
error?: {
code: number,
message: string,
data?: unknown
}
}
通知(Notifications)
{
jsonrpc: "2.0",
method: string,
params?: object
}
内置传输类型
MCP 提供了两种标准传输实现:
标准输入/输出(stdio)
stdio
传输通过标准输入和输出流实现通信。这种方式特别适合本地集成和命令行工具。
适用场景:
- 构建命令行工具
- 实现本地集成
- 简化进程通信
- 与 shell 脚本结合使用
TypeScript 示例(服务器)
const server = new Server({
name: "example-server",
version: "1.0.0"
}, {
capabilities: {}
});
const transport = new StdioServerTransport();
await server.connect(transport);
TypeScript 示例(客户端)
const client = new Client({
name: "example-client",
version: "1.0.0"
}, {
capabilities: {}
});
const transport = new StdioClientTransport({
command: "./server",
args: ["--option", "value"]
});
await client.connect(transport);
Python 示例(服务器)
app = Server("example-server")
async with stdio_server() as streams:
await app.run(
streams[0],
streams[1],
app.create_initialization_options()
)
Python 示例(客户端)
params = StdioServerParameters(
command="./server",
args=["--option", "value"]
)
async with stdio_client(params) as streams:
async with ClientSession(streams[0], streams[1]) as session:
await session.initialize()
服务器发送事件(SSE)
SSE 传输通过 HTTP POST 请求实现客户端到服务器的通信,并支持服务器到客户端的流式更新。
适用场景:
- 仅需要服务器到客户端流式更新
- 需要适应受限的网络环境
- 实现简单更新机制
TypeScript 示例(服务器)
const server = new Server({
name: "example-server",
version: "1.0.0"
}, {
capabilities: {}
});
const transport = new SSEServerTransport("/message", response);
await server.connect(transport);
TypeScript 示例(客户端)
const client = new Client({
name: "example-client",
version: "1.0.0"
}, {
capabilities: {}
});
const transport = new SSEClientTransport(
new URL("http://localhost:3000/sse")
);
await client.connect(transport);
Python 示例(服务器)
from mcp.server.sse import SseServerTransport
from starlette.applications import Starlette
from starlette.routing import Route
app = Server("example-server")
sse = SseServerTransport("/messages")
async def handle_sse(scope, receive, send):
async with sse.connect_sse(scope, receive, send) as streams:
await app.run(streams[0], streams[1], app.create_initialization_options())
async def handle_messages(scope, receive, send):
await sse.handle_post_message(scope, receive, send)
starlette_app = Starlette(
routes=[
Route("/sse", endpoint=handle_sse),
Route("/messages", endpoint=handle_messages, methods=["POST"]),
]
)
Python 示例(客户端)
async with sse_client("http://localhost:8000/sse") as streams:
async with ClientSession(streams[0], streams[1]) as session:
await session.initialize()
自定义传输
MCP 支持为特定需求实现自定义传输。任何传输实现都需要遵循 Transport
接口:
适用场景:
- 定制网络协议
- 专用通信通道
- 与现有系统集成
- 优化性能
TypeScript 示例
interface Transport {
start(): Promise<void>; // 开始处理消息
send(message: JSONRPCMessage): Promise<void>; // 发送 JSON-RPC 消息
close(): Promise<void>; // 关闭连接
// 回调
onclose?: () => void;
onerror?: (error: Error) => void;
onmessage?: (message: JSONRPCMessage) => void;
}
Python 示例
建议使用 anyio
兼容性更强的库来实现传输层:
@contextmanager
async def create_transport(
read_stream: MemoryObjectReceiveStream[JSONRPCMessage | Exception],
write_stream: MemoryObjectSendStream[JSONRPCMessage]
):
async with anyio.create_task_group() as tg:
try:
tg.start_soon(lambda: process_messages(read_stream))
async with write_stream:
yield write_stream
finally:
tg.cancel_scope.cancel()
await write_stream.aclose()
await read_stream.aclose()
错误处理
传输实现需要处理各种错误场景:
- 连接错误
- 消息解析错误
- 协议错误
- 网络超时
- 资源清理
示例
TypeScript
class ExampleTransport implements Transport {
async start() {
try {
// 连接逻辑
} catch (error) {
this.onerror?.(new Error(`连接失败: ${error}`));
throw error;
}
}
async send(message: JSONRPCMessage) {
try {
// 消息发送逻辑
} catch (error) {
this.onerror?.(new Error(`消息发送失败: ${error}`));
throw error;
}
}
}
Python
async def example_transport(scope, receive, send):
try:
# 创建流
read_stream_writer, read_stream = anyio.create_memory_object_stream(0)
write_stream, write_stream_reader = anyio.create_memory_object_stream(0)
async def message_handler():
async with read_stream_writer:
pass
async with anyio.create_task_group() as tg:
tg.start_soon(message_handler)
yield read_stream, write_stream
except Exception as exc:
logger.error(f"传输错误: {exc}")
raise exc
最佳实践
- 正确处理连接生命周期
- 实现全面的错误处理
- 在连接关闭时清理资源
- 使用合适的超时机制
- 在发送前验证消息
- 记录传输事件以便调试
- 实现重连逻辑(如适用)
- 在消息队列中处理背压问题
- 监控连接状态
- 实现必要的安全措施
安全注意事项
身份验证与授权
- 实现身份验证机制
- 验证客户端凭据
- 安全地处理令牌
- 执行授权检查
数据安全
- 使用 TLS 加密传输
- 加密敏感数据
- 验证消息完整性
- 实施消息大小限制
- 清理输入数据
网络安全
- 实现速率限制
- 使用适当的超时机制
- 处理拒绝服务攻击场景
- 监控异常通信模式
- 配置合适的防火墙规则
调试传输问题
- 启用调试日志
- 监控消息流
- 检查连接状态
- 验证消息格式
- 测试错误场景
- 使用网络分析工具
- 实现健康检查
- 监控资源使用
- 测试边界条件
- 使用适当的错误跟踪工具