手写 MCP Server:从零实现 Model Context Protocol,构建 AI Agent 工具调用框架
一、引言
1.1 从 Function Calling 到 MCP
2024 年底,Anthropic 发布了 Model Context Protocol (MCP),迅速成为 AI 工具集成领域最受关注的技术标准。在此之前,AI Agent 与外部工具的交互主要依赖各模型厂商私有的 Function Calling 实现——OpenAI 有 function calling,Google 有 function_declarations,国产模型各自定义了自己的工具调用格式。这种碎片化导致开发者每接入一个模型就要重写一套工具调用逻辑。
MCP 的核心理念很简单:为 AI 应用提供一个标准化的"外设接口"。就像 USB-C 统一了外设连接标准一样,MCP 统一了 AI 模型与外部工具、数据源之间的通信协议。
那么,MCP 到底是什么?
简而言之,MCP 定义了三种核心能力:
- Tools(工具):AI 可以调用的函数/API,有输入参数和输出结果
- Resources(资源):AI 可以读取的外部数据(文件、数据库、API 响应等)
- Prompts(提示模板):预定义的提示模板,用于特定场景的交互
MCP 采用 Client-Server 架构:一个称为 MCP Client 的中间层负责与 AI 模型对话,同时通过 JSON-RPC 2.0 协议与后端的 MCP Server 通信,获取工具定义、执行工具调用、读写资源等。
1.2 为什么选择手写 MCP Server?
市面已有官方 SDK(Python/TypeScript),但我们选择手写的理由很充分:
- 深入理解协议本质:SDK 封装了太多细节,手写能让我们真正理解 JSON-RPC 交互流程
- 轻量级验证:生产环境可能需要定制化实现,手写框架可灵活裁剪
- 无框架依赖:零第三方依赖,核心逻辑仅几百行代码,便于嵌入式/边缘设备部署
- "手写系列"传统:从零构建,吃透每一个字节
本文将带领读者从零实现一个功能完整的 MCP Server,支持工具注册、资源读取、生命周期管理等核心能力,并用一个天气查询 Agent 示例进行端到端验证。全文章节安排如下:
- MCP 协议核心概念与 JSON-RPC 2.0 基础
- MCP Server 架构设计
- 从零开始手写 MCP Server(完整代码实现)
- 编写 MCP Client 进行对接
- 实战:构建天气查询 Agent
- 深入:Transport 层实现
- 性能优化与安全考量
二、MCP 协议的核心设计思路
在深入代码之前,我们先深刻理解 MCP 的设计哲学。这不仅是协议本身的知识,更是理解 AI Agent 系统架构的关键节点。
2.1 MCP 的诞生背景——为什么需要一套新协议?
在 MCP 出现之前,让 AI 模型调用外部工具一般有三种做法,各有明显短板:
做法一:直接写 Prompt。 在系统提示词里用文本描述工具函数的名称、参数含义、返回值格式,然后让模型自己生成一段符合要求的 JSON 字符串来模拟函数调用。这种方法在 2023 年上半年比较常见,因为当时还没有标准化的 Function Calling 接口。它的缺点极其明显——输出格式非常不稳定,模型偶尔会漏掉必填字段、把嵌套结构写成一维 JSON、或者 XML 和 JSON 格式混用。稍微复杂一点的参数结构(比如多级嵌套对象、数组类型的参数)就很容易格式错乱。而且每次添加新工具都要手动更新 Prompt,维护成本极高。
做法二:使用各模型厂商原生的 Function Calling。 2023 年 6 月 OpenAI 率先推出 function calling,随后 Google、Anthropic、DeepSeek、零一万物、阿里通义、百度文心等纷纷跟进。这种方式比手写 Prompt 好得多——API 层面提供了工具定义的结构化输入和模型输出自动解析。但问题是各家的格式完全不兼容:OpenAI 的 tool_choice 参数用 auto/none/force 来控制调用策略,Anthropic 的 tool_use 用 {type: "tool_use", name: ..., input: ...} 的独立 block 格式,Google 的 function_declarations 又换了一套 schema 定义方式。如果想让同一个 Agent 同时支持多个模型家族,就需要为每个模型维护一套独立版本的工具定义。
做法三:第三方框架的工具抽象层。 LangChain 的 Tool 类、LlamaIndex 的 QueryTool 等提供了统一的中层抽象,把不同模型的差异封装在框架内部。这是一大进步,但也引入了耦合问题——要么整个应用依赖某个特定框架,要么自己写适配层;而且框架对这些底层协议进行了多层封装,出了问题很难一层层排查到根因。
MCP 要解决的问题非常明确:定义 AI 模型与外部世界交互的通用通信协议,不绑定任何特定模型厂商、不依赖任何特定框架。像 HTTP 统一了 Web 通信一样,MCP 试图统一 AI 工具调用的通信标准。
2.2 MCP 的三层架构
MCP 协议的架构设计可以拆解为三个层次:
- 传输层(Transport Layer):消息的物理传输方式。MCP 定义了 stdio(本地子进程通信)和 SSE(远程 HTTP 通信)两种传输方式。本层只负责收发字符串形式的消息,不关心消息内容的含义。
- 协议层(Protocol Layer):消息的格式和序列化方式。基于 JSON-RPC 2.0 标准,定义了请求、响应、错误三种消息类型,以及消息 ID 机制(用于匹配请求和响应)、通知机制(无需响应的消息)。
- 应用层(Application Layer):具体的交互方法定义。包括生命周期方法(initialize、shutdown)、工具方法(tools/list、tools/call)、资源方法(resources/list、resources/read)以及提示方法(prompts/list、prompts/get)。
这种层次化设计与计算机网络中的 OSI 七层模型如出一辙——每一层只关注自己范围内的职责,上层只管调用下层提供的接口,不需要关心底层实现细节。
2.3 JSON-RPC 2.0 协议——MCP 的骨架
MCP 的通信层基于 JSON-RPC 2.0。我们先理解这个基础协议。
JSON-RPC 2.0 是一种轻量级远程过程调用协议,每个请求包含三个核心字段:
{
"jsonrpc": "2.0",
"id": 1,
"method": "tools/list",
"params": {}
}
响应格式:
{
"jsonrpc": "2.0",
"id": 1,
"result": {
"tools": [...]
}
}
错误格式:
{
"jsonrpc": "2.0",
"id": 1,
"error": {
"code": -32601,
"message": "Method not found"
}
}
MCP 使用请求-响应模式处理同步调用(如列出工具、调用工具),使用通知模式处理异步事件(如初始化完成通知、日志输出)。
2.2 MCP 核心方法
MCP 协议定义了一组标准方法,分为三大类:
生命周期方法:
initialize— 客户端发送初始化请求,包含协议版本和能力声明initialized— 初始化完成通知(通知,无响应)shutdown— 关闭连接
工具相关方法:
tools/list— 列出服务器提供的所有工具tools/call— 调用指定工具并传入参数
资源相关方法:
resources/list— 列出所有资源resources/read— 读取指定资源内容resources/subscribe— 订阅资源变更resources/unsubscribe— 取消订阅
提示相关方法:
prompts/list— 列出提示模板prompts/get— 获取特定提示模板
2.3 MCP 传输层(Transport)
MCP 支持两种传输方式:
- stdio 传输:通过标准输入输出流通信,适合本地子进程模式
- SSE (Server-Sent Events) 传输:基于 HTTP,支持远程通信,包括 SSE 流和 POST 请求回调
本篇我们重点实现 stdio 传输和简化版 HTTP 传输。
三、MCP Server 架构设计
让我们先勾勒出 Server 的整体架构。
3.1 模块划分
mcp-server/
├── server.py # 主服务器,处理请求路由
├── protocol.py # JSON-RPC 2.0 协议实现
├── transport.py # 传输层抽象(stdio / HTTP)
├── tool_manager.py # 工具注册与管理
├── resource_manager.py # 资源管理
├── examples/
│ ├── weather_tool.py # 天气查询工具示例
│ └── calculator.py # 计算器工具示例
└── client_demo.py # 测试用 MCP Client
3.2 核心数据结构
MCP Server 的核心是一个 MethodRegistry,它维护了工具列表和请求路由表:
# server.py 的核心设计
class MCPServer:
def __init__(self):
self.tools = {} # name -> ToolDefinition
self.resources = {} # uri -> ResourceDefinition
self.session_id = None # 连接会话ID
self.client_capabilities = {} # 客户端能力声明
self.protocol_version = "2025-03-26" # 当前MCP协议版本
每个工具的定义结构如下:
class ToolDefinition:
name: str # 工具名称
description: str # 工具描述
input_schema: dict # JSON Schema 格式的输入参数定义
handler: callable # 实际执行的函数
3.3 请求处理流程
MCP Server 的请求处理流程非常清晰:
- 初始化:客户端发送
initialize→ Server 返回协议版本和能力声明 → 客户端发送initialized通知 - 工具发现:客户端定期或按需调用
tools/list→ Server 返回所有已注册工具 - 工具调用:客户端发送
tools/call→ Server 查找工具、校验参数、执行函数 → 返回结果 - 资源读取:客户端发送
resources/read→ Server 查找资源、读取内容 → 返回结果 - 关闭:任意一方发送关闭信号
四、从零开始手写 MCP Server
### 4.1 为什么选择 Python 作为实现语言?
选择 Python 有几个现实的考量:
- AI 生态的语言:无论是 LLaMA Factory、Transformers、LangChain 还是 Dify,Python 是 AI 领域事实上的标准语言。MCP 的 Python SDK 也是官方主推的。
- 零开销的原型验证:Python 标准库自带 json、urllib、http.server 等模块,不需要安装任何第三方依赖即可完成整个协议的实现。
- 易于理解:相比 TypeScript 的异步回调或 Rust 的所有权系统,Python 的代码最为直观,适合作为学习参考实现。
当然,生产环境的 MCP Server 用 TypeScript/Go/Rust 实现也是常见选择——TypeScript 因为与前端生态的天然亲和力,在 Dify 等应用框架的插件系统中被广泛使用;Go 适合构建高性能网关层;Rust 则用于对安全性要求极高的场景。不过核心协议逻辑是一致的,理解了本文的实现,切换到其他语言只是语法上的翻译工作。
4.2 协议层实现(protocol.py)
首先,我们实现 JSON-RPC 2.0 协议的消息解析和构建。这部分是所有 MCP 通信的基础,相当于 HTTP 协议中 TCP/IP 层之上那层简单的序列化协议:
# protocol.py - JSON-RPC 2.0 协议实现
import json
import uuid
from typing import Any, Dict, Optional, Union
# 标准 JSON-RPC 错误码
PARSE_ERROR = -32700
INVALID_REQUEST = -32600
METHOD_NOT_FOUND = -32601
INVALID_PARAMS = -32602
INTERNAL_ERROR = -32603
# MCP 特定错误码(-32000 ~ -32099 范围)
TOOL_NOT_FOUND = -32000
TOOL_EXECUTION_ERROR = -32001
RESOURCE_NOT_FOUND = -32002
RESOURCE_READ_ERROR = -32003
class JSONRPCRequest:
"""JSON-RPC 2.0 请求对象"""
def __init__(self, method: str, params: Optional[Dict] = None,
request_id: Optional[Union[int, str]] = None):
self.jsonrpc = "2.0"
self.method = method
self.params = params or {}
self.id = request_id if request_id is not None else str(uuid.uuid4())
def to_dict(self) -> Dict:
d = {
"jsonrpc": "2.0",
"method": self.method,
"params": self.params,
"id": self.id
}
return d
def to_json(self) -> str:
return json.dumps(self.to_dict())
class JSONRPCResponse:
"""JSON-RPC 2.0 响应对象"""
def __init__(self, request_id: Optional[Union[int, str]] = None):
self.jsonrpc = "2.0"
self.id = request_id
self.result: Optional[Any] = None
self.error: Optional[Dict] = None
def set_result(self, result: Any):
self.result = result
self.error = None
def set_error(self, code: int, message: str, data: Any = None):
self.error = {"code": code, "message": message}
if data is not None:
self.error["data"] = data
self.result = None
def to_dict(self) -> Dict:
d = {"jsonrpc": "2.0", "id": self.id}
if self.error:
d["error"] = self.error
elif self.result is not None:
d["result"] = self.result
else:
d["result"] = None
return d
def to_json(self) -> str:
return json.dumps(self.to_dict())
@classmethod
def error_response(cls, request_id, code: int, message: str,
data: Any = None) -> "JSONRPCResponse":
resp = cls(request_id)
resp.set_error(code, message, data)
return resp
@classmethod
def success_response(cls, request_id, result: Any) -> "JSONRPCResponse":
resp = cls(request_id)
resp.set_result(result)
return resp
def parse_message(raw: str) -> Optional[JSONRPCRequest]:
"""解析JSON-RPC消息。通知类消息(无id)仍然返回请求对象,id为None"""
try:
data = json.loads(raw)
except json.JSONDecodeError as e:
return None
if not isinstance(data, dict) or data.get("jsonrpc") != "2.0":
return None
method = data.get("method")
if not method:
return None
req = JSONRPCRequest(
method=method,
params=data.get("params", {}),
request_id=data.get("id")
)
return req
def is_notification(req: JSONRPCRequest) -> bool:
"""判断是否为通知(无需响应)"""
return req.id is None
4.3 工具管理器——理解 MCP 的核心抽象(tool_manager.py)
如果说协议层是 MCP 的神经系统,那工具管理器就是 MCP 的心脏。所有 AI Agent 能够调用的能力都通过它来注册和调遣。
工具管理器的设计有几个关键考量:
第一,工具定义与实现分离。工具描述(name、description、inputSchema)是给 AI 模型看的——模型根据这些信息决定何时调用哪个工具以及传入什么参数。而 handler 是实际执行的函数——它接收模型填入的参数,执行真正的业务逻辑。这种分离使得我们可以交换不同模型(甚至同一模型的不同版本)而对工具实现完全无影响。
第二,JSON Schema 作为契约。我们不是让工具自己声明参数格式,而是使用标准的 JSON Schema 来描述。这样做的好处是:任何兼容 MCP 的客户端(包括 AI SDK、Agent 框架等)都能自动理解工具的参数结构,不需要为每个工具定制解析代码。
第三,链式注册 API。register_tool 返回 self 使得可以连续注册多个工具,代码更简洁:
server.register_tool(...).register_tool(...)
下面看看参数的 JSON Schema 校验实现。虽然本实现只做了基础的类型检查,但在生产环境中,你会希望校验更细粒度——比如字段是否在枚举值中、字符串是否符合正则模式、数字是否在指定范围内。完整的 jsonschema 库支持这些,但在我们的场景中,手写基础校验更有助于理解原理。
# tool_manager.py - 工具注册与管理
from typing import Any, Callable, Dict, List, Optional
import json
class ToolDefinition:
"""工具定义"""
def __init__(
self,
name: str,
description: str,
input_schema: Dict,
handler: Callable[..., Any]
):
self.name = name
self.description = description
self.input_schema = input_schema # JSON Schema
self.handler = handler
def to_dict(self) -> Dict:
return {
"name": self.name,
"description": self.description,
"inputSchema": self.input_schema
}
def execute(self, arguments: Dict) -> Any:
"""执行工具,返回结果"""
# 参数校验(精简版)
self._validate_args(arguments)
return self.handler(**arguments)
def _validate_args(self, arguments: Dict):
"""JSON Schema 基础校验"""
schema = self.input_schema
if "required" not in schema:
return
for field in schema["required"]:
if field not in arguments:
raise ValueError(f"缺少必填参数: {field}")
# 类型校验
if "properties" in schema:
for field, value in arguments.items():
if field in schema["properties"]:
field_schema = schema["properties"][field]
if "type" in field_schema:
expected = field_schema["type"]
if expected == "string" and not isinstance(value, str):
raise TypeError(
f"参数 '{field}' 应为 {expected},"
f"实际为 {type(value).__name__}"
)
elif expected == "number" and not isinstance(
value, (int, float)
):
raise TypeError(
f"参数 '{field}' 应为 {expected},"
f"实际为 {type(value).__name__}"
)
elif expected == "integer" and not isinstance(
value, int
):
raise TypeError(
f"参数 '{field}' 应为 {expected},"
f"实际为 {type(value).__name__}"
)
elif expected == "boolean" and not isinstance(
value, bool
):
raise TypeError(
f"参数 '{field}' 应为 {expected},"
f"实际为 {type(value).__name__}"
)
elif expected == "array" and not isinstance(
value, list
):
raise TypeError(
f"参数 '{field}' 应为 {expected},"
f"实际为 {type(value).__name__}"
)
class ToolManager:
"""工具管理器"""
def __init__(self):
self._tools: Dict[str, ToolDefinition] = {}
def register_tool(
self,
name: str,
description: str,
input_schema: Dict,
handler: Callable
) -> "ToolManager":
"""注册一个工具"""
tool = ToolDefinition(name, description, input_schema, handler)
self._tools[name] = tool
return self # 支持链式调用
def unregister_tool(self, name: str) -> bool:
"""注销工具"""
if name in self._tools:
del self._tools[name]
return True
return False
def list_tools(self) -> List[Dict]:
"""列出所有工具定义"""
return [tool.to_dict() for tool in self._tools.values()]
def get_tool(self, name: str) -> Optional[ToolDefinition]:
"""获取指定工具"""
return self._tools.get(name)
def call_tool(self, name: str, arguments: Dict) -> Any:
"""调用工具"""
tool = self.get_tool(name)
if tool is None:
raise ValueError(f"工具 '{name}' 未找到")
return tool.execute(arguments)
4.4 资源管理器——让 AI 读取外部数据(resource_manager.py)
工具(Tools)让 AI 可以
# resource_manager.py - 资源管理
from typing import Any, Callable, Dict, List, Optional
class ResourceDefinition:
"""资源定义"""
def __init__(
self,
uri: str,
name: str,
description: str,
mime_type: str = "text/plain",
reader: Optional[Callable[[], Any]] = None
):
self.uri = uri
self.name = name
self.description = description
self.mime_type = mime_type # "text/plain", "application/json" 等
self.reader = reader
def to_dict(self) -> Dict:
return {
"uri": self.uri,
"name": self.name,
"description": self.description,
"mimeType": self.mime_type
}
def read(self) -> Any:
"""读取资源内容"""
if self.reader:
return self.reader()
return f"Resource: {self.uri}"
class ResourceManager:
"""资源管理器"""
def __init__(self):
self._resources: Dict[str, ResourceDefinition] = {}
self._subscriptions: Dict[str, List[str]] = {} # uri -> client_ids
def register_resource(
self,
uri: str,
name: str,
description: str,
mime_type: str = "text/plain",
reader: Optional[Callable] = None
) -> "ResourceManager":
resource = ResourceDefinition(
uri, name, description, mime_type, reader
)
self._resources[uri] = resource
return self
def list_resources(self) -> List[Dict]:
return [r.to_dict() for r in self._resources.values()]
def read_resource(self, uri: str) -> Dict:
resource = self._resources.get(uri)
if resource is None:
raise ValueError(f"资源 '{uri}' 未找到")
content = resource.read()
return {
"contents": [
{
"uri": uri,
"mimeType": resource.mime_type,
"text": str(content)
}
]
}
def subscribe(self, uri: str, client_id: str):
"""订阅资源变更"""
if uri not in self._subscriptions:
self._subscriptions[uri] = []
self._subscriptions[uri].append(client_id)
def unsubscribe(self, uri: str, client_id: str):
"""取消订阅"""
if uri in self._subscriptions:
self._subscriptions[uri] = [
cid for cid in self._subscriptions[uri]
if cid != client_id
]
4.4 传输层抽象(transport.py)
传输层支持 stdio 和 HTTP 两种模式:
# transport.py - 传输层抽象
import json
import sys
from abc import ABC, abstractmethod
from typing import Callable, Optional
class MessageTransport(ABC):
"""消息传输层抽象基类"""
@abstractmethod
def send(self, message: str):
"""发送消息"""
pass
@abstractmethod
def receive(self) -> Optional[str]:
"""接收消息(阻塞),返回None表示连接关闭"""
pass
@abstractmethod
def close(self):
"""关闭传输"""
pass
class StdioTransport(MessageTransport):
"""基于标准输入输出的传输层"""
def __init__(self):
self._running = True
def send(self, message: str):
"""通过 stdout 发送消息(带换行符分割)"""
sys.stdout.write(message + "\n")
sys.stdout.flush()
def receive(self) -> Optional[str]:
"""通过 stdin 读取消息"""
try:
line = sys.stdin.readline()
if not line:
self._running = False
return None
# 处理 Content-Length 头(MCP 标准中的 HTTP 风格消息帧)
# 我们这里使用简化的换行分隔协议
line = line.strip()
if not line:
return self.receive() # 跳过空行
return line
except EOFError:
self._running = False
return None
def close(self):
self._running = False
class SimpleHttpTransport(MessageTransport):
"""
基于 HTTP 的简化传输层。
使用 SSE (Server-Sent Events) 发送消息到客户端,
通过 POST 回调接收客户端请求。
注意:这是一个简化实现,适合学习和测试。
"""
def __init__(
self,
host: str = "127.0.0.1",
port: int = 8080,
on_message: Optional[Callable] = None
):
self.host = host
self.port = port
self._running = False
self._message_queue = []
self.on_message = on_message
def send(self, message: str):
"""实际由HTTP服务器在SSE连接上发送"""
self._message_queue.append(message)
def receive(self) -> Optional[str]:
"""阻塞等待消息"""
import time
while self._running:
if self._message_queue:
return self._message_queue.pop(0)
time.sleep(0.01)
# 非阻塞模式下复用
if self._message_queue:
return self._message_queue.pop(0)
return None
def close(self):
self._running = False
def start(self):
"""启动 HTTP 服务器(简化版,使用内置 http.server)"""
from http.server import HTTPServer, BaseHTTPRequestHandler
import threading
import urllib.parse
transport = self
class MCPHTTPHandler(BaseHTTPRequestHandler):
def do_GET(self):
"""SSE 端点"""
if self.path == "/sse":
self.send_response(200)
self.send_header("Content-Type", "text/event-stream")
self.send_header("Cache-Control", "no-cache")
self.send_header("Connection", "keep-alive")
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
transport._running = True
# 发送端点信息
self.wfile.write(
f"event: endpoint\ndata: /messages\n\n".encode()
)
self.wfile.flush()
# 循环发送消息
try:
while transport._running:
if transport._message_queue:
msg = transport._message_queue.pop(0)
self.wfile.write(
f"event: message\ndata: {msg}\n\n"
.encode()
)
self.wfile.flush()
import time
time.sleep(0.01)
except (BrokenPipeError, ConnectionResetError):
transport._running = False
else:
self.send_response(404)
self.end_headers()
def do_POST(self):
"""消息接收端点"""
if self.path == "/messages":
content_length = int(
self.headers.get("Content-Length", 0)
)
body = self.rfile.read(content_length).decode()
transport._message_queue.append(body)
self.send_response(202)
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
else:
self.send_response(404)
self.end_headers()
def do_OPTIONS(self):
"""CORS 预检请求"""
self.send_response(200)
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header(
"Access-Control-Allow-Methods",
"GET, POST, OPTIONS"
)
self.send_header(
"Access-Control-Allow-Headers",
"Content-Type"
)
self.end_headers()
def log_message(self, format, *args):
pass # 静默日志
server = HTTPServer((self.host, self.port), MCPHTTPHandler)
thread = threading.Thread(
target=server.serve_forever, daemon=True
)
thread.start()
print(
f"[MCP Server] HTTP 传输已启动: "
f"http://{self.host}:{self.port}/sse"
)
return server
4.5 主服务器(server.py)
现在把所有组件整合到主服务器中:
# server.py - MCP Server 主实现
import json
import signal
import sys
from typing import Any, Dict, Optional
from protocol import (
JSONRPCRequest, JSONRPCResponse,
parse_message, is_notification,
PARSE_ERROR, INVALID_REQUEST, METHOD_NOT_FOUND,
INTERNAL_ERROR, TOOL_NOT_FOUND, TOOL_EXECUTION_ERROR,
RESOURCE_NOT_FOUND, RESOURCE_READ_ERROR
)
from tool_manager import ToolManager
from resource_manager import ResourceManager
from transport import MessageTransport, StdioTransport, SimpleHttpTransport
class MCPServer:
"""MCP Server 主类"""
def __init__(self, transport: Optional[MessageTransport] = None):
self.tool_manager = ToolManager()
self.resource_manager = ResourceManager()
self.transport = transport or StdioTransport()
self.session_id = None
self.client_capabilities = {}
self.server_capabilities = {
"protocolVersion": "2025-03-26",
"capabilities": {
"tools": {
"listChanged": True
},
"resources": {
"subscribe": True,
"listChanged": True
},
"prompts": {
"listChanged": True
}
}
}
self._running = False
self._method_handlers = self._build_handlers()
def _build_handlers(self) -> Dict:
"""构建方法处理路由表"""
return {
# 生命周期
"initialize": self._handle_initialize,
"shutdown": self._handle_shutdown,
# 工具
"tools/list": self._handle_tools_list,
"tools/call": self._handle_tools_call,
# 资源
"resources/list": self._handle_resources_list,
"resources/read": self._handle_resources_read,
"resources/subscribe": self._handle_resources_subscribe,
"resources/unsubscribe": self._handle_resources_unsubscribe,
# 提示
"prompts/list": self._handle_prompts_list,
"prompts/get": self._handle_prompts_get,
}
# ========== 请求处理器 ==========
def _handle_initialize(self, params: Dict,
req_id) -> JSONRPCResponse:
"""处理初始化请求"""
self.session_id = params.get("sessionId")
self.client_capabilities = params.get("capabilities", {})
result = {
"protocolVersion": self.server_capabilities["protocolVersion"],
"capabilities": self.server_capabilities["capabilities"],
"serverInfo": {
"name": "mcp-server-from-scratch",
"version": "1.0.0"
}
}
return JSONRPCResponse.success_response(req_id, result)
def _handle_shutdown(self, params: Dict,
req_id) -> JSONRPCResponse:
"""处理关闭请求"""
self._running = False
return JSONRPCResponse.success_response(req_id, None)
def _handle_tools_list(self, params: Dict,
req_id) -> JSONRPCResponse:
"""列出所有工具"""
tools = self.tool_manager.list_tools()
return JSONRPCResponse.success_response(req_id, {"tools": tools})
def _handle_tools_call(self, params: Dict,
req_id) -> JSONRPCResponse:
"""调用工具"""
name = params.get("name", "")
arguments = params.get("arguments", {})
try:
result = self.tool_manager.call_tool(name, arguments)
return JSONRPCResponse.success_response(
req_id, {
"content": [
{
"type": "text",
"text": str(result)
if not isinstance(result, str)
else result
}
],
"isError": False
}
)
except ValueError as e:
code = TOOL_NOT_FOUND if "未找到" in str(e) else TOOL_EXECUTION_ERROR
return JSONRPCResponse.error_response(
req_id, code, str(e)
)
except Exception as e:
return JSONRPCResponse.error_response(
req_id, TOOL_EXECUTION_ERROR,
f"工具执行失败: {str(e)}"
)
def _handle_resources_list(self, params: Dict,
req_id) -> JSONRPCResponse:
"""列出资源"""
resources = self.resource_manager.list_resources()
return JSONRPCResponse.success_response(
req_id, {"resources": resources}
)
def _handle_resources_read(self, params: Dict,
req_id) -> JSONRPCResponse:
"""读取资源"""
uri = params.get("uri", "")
try:
result = self.resource_manager.read_resource(uri)
return JSONRPCResponse.success_response(req_id, result)
except ValueError:
return JSONRPCResponse.error_response(
req_id, RESOURCE_NOT_FOUND, f"资源 '{uri}' 未找到"
)
except Exception as e:
return JSONRPCResponse.error_response(
req_id, RESOURCE_READ_ERROR,
f"读取资源失败: {str(e)}"
)
def _handle_resources_subscribe(self, params: Dict,
req_id) -> JSONRPCResponse:
"""订阅资源"""
uri = params.get("uri", "")
try:
self.resource_manager.subscribe(
uri, self.session_id or "unknown"
)
return JSONRPCResponse.success_response(req_id, None)
except Exception as e:
return JSONRPCResponse.error_response(
req_id, INTERNAL_ERROR, str(e)
)
def _handle_resources_unsubscribe(self, params: Dict,
req_id) -> JSONRPCResponse:
"""取消订阅"""
uri = params.get("uri", "")
try:
self.resource_manager.unsubscribe(
uri, self.session_id or "unknown"
)
return JSONRPCResponse.success_response(req_id, None)
except Exception as e:
return JSONRPCResponse.error_response(
req_id, INTERNAL_ERROR, str(e)
)
def _handle_prompts_list(self, params: Dict,
req_id) -> JSONRPCResponse:
"""列出提示模板"""
return JSONRPCResponse.success_response(
req_id, {"prompts": []}
)
def _handle_prompts_get(self, params: Dict,
req_id) -> JSONRPCResponse:
"""获取提示模板"""
name = params.get("name", "")
return JSONRPCResponse.success_response(
req_id, {
"prompt": {
"name": name,
"description": f"Prompt: {name}",
"messages": []
}
}
)
# ========== 消息处理主循环 ==========
def handle_message(self, raw: str) -> Optional[str]:
"""处理一条原始消息,返回响应(通知返回None)"""
request = parse_message(raw)
if request is None:
# 解析失败,返回解析错误
resp = JSONRPCResponse.error_response(
None, PARSE_ERROR, "无效的JSON-RPC消息"
)
return resp.to_json()
handler = self._method_handlers.get(request.method)
if handler is None:
resp = JSONRPCResponse.error_response(
request.id, METHOD_NOT_FOUND,
f"未知方法: {request.method}"
)
return resp.to_json()
try:
response = handler(request.params, request.id)
except Exception as e:
response = JSONRPCResponse.error_response(
request.id, INTERNAL_ERROR, f"内部错误: {str(e)}"
)
# 通知不需要响应
if is_notification(request):
return None
return response.to_json()
def run(self):
"""启动 MCP Server 主循环"""
self._running = True
print("[MCP Server] 已启动, 等待消息...", file=sys.stderr)
try:
while self._running:
raw = self.transport.receive()
if raw is None:
break
response = self.handle_message(raw.strip())
if response:
self.transport.send(response)
except KeyboardInterrupt:
print("\n[MCP Server] 收到中断信号", file=sys.stderr)
finally:
self.cleanup()
def cleanup(self):
"""资源清理"""
self._running = False
self.transport.close()
print("[MCP Server] 已关闭", file=sys.stderr)
# ========== 便捷方法 ==========
def register_tool(self, name: str, description: str,
input_schema: Dict, handler: callable):
"""注册工具"""
self.tool_manager.register_tool(
name, description, input_schema, handler
)
def register_resource(self, uri: str, name: str,
description: str,
mime_type: str = "text/plain",
reader=None):
"""注册资源"""
self.resource_manager.register_resource(
uri, name, description, mime_type, reader
)
五、实战:构建天气查询 Agent
现在我们用实现的 MCP Server 来构建一个实用的工具——天气查询 Agent。
5.1 天气查询工具(examples/weather_tool.py)
# examples/weather_tool.py - 天气查询工具
import json
import urllib.request
import urllib.parse
# 模拟天气数据(实际使用时应接入真实API)
MOCK_WEATHER_DATA = {
"北京": {"temp": 28, "humidity": 45, "wind": "3级",
"condition": "晴", "aqi": 72},
"上海": {"temp": 31, "humidity": 72, "wind": "2级",
"condition": "多云", "aqi": 85},
"广州": {"temp": 33, "humidity": 80, "wind": "3级",
"condition": "阵雨", "aqi": 62},
"深圳": {"temp": 32, "humidity": 78, "wind": "3级",
"condition": "多云", "aqi": 55},
"成都": {"temp": 27, "humidity": 65, "wind": "1级",
"condition": "阴", "aqi": 90},
"杭州": {"temp": 30, "humidity": 70, "wind": "2级",
"condition": "晴", "aqi": 68},
"武汉": {"temp": 34, "humidity": 68, "wind": "2级",
"condition": "晴", "aqi": 95},
"西安": {"temp": 29, "humidity": 55, "wind": "2级",
"condition": "晴", "aqi": 78},
}
def get_weather(city: str) -> str:
"""
查询指定城市的当前天气
参数:
city: 城市名称(中文)
返回:
格式化的天气信息字符串
"""
city = city.strip()
# 尝试从模拟数据中获取
if city in MOCK_WEATHER_DATA:
data = MOCK_WEATHER_DATA[city]
return json.dumps({
"city": city,
"temperature": f"{data['temp']}°C",
"humidity": f"{data['humidity']}%",
"wind": data['wind'],
"condition": data['condition'],
"aqi": data['aqi'],
"advice": "适合户外活动" if data['aqi'] < 80
else "敏感人群建议减少户外活动",
"update_time": "2026-07-01 08:00:00"
}, ensure_ascii=False, indent=2)
else:
# 城市不在预设列表中
return json.dumps({
"city": city,
"error": f"暂未收录 '{city}' 的天气数据",
"available_cities": list(MOCK_WEATHER_DATA.keys())
}, ensure_ascii=False, indent=2)
# 工具注册配置
weather_tool_config = {
"name": "get_weather",
"description": "查询指定城市的当前天气信息,包括温度、湿度、风速、空气质量等",
"input_schema": {
"type": "object",
"properties": {
"city": {
"type": "string",
"description": "城市名称,例如:北京、上海、广州"
}
},
"required": ["city"]
},
"handler": get_weather
}
def get_weather_by_coordinates(latitude: float,
longitude: float) -> str:
"""
通过经纬度查询天气
参数:
latitude: 纬度
longitude: 经度
返回:
天气信息
"""
# 简化实现:根据坐标匹配最近城市
cities_coords = {
"北京": (39.90, 116.40),
"上海": (31.23, 121.47),
"广州": (23.13, 113.26),
"深圳": (22.54, 114.06),
}
# 简单的最近距离匹配
min_dist = float("inf")
nearest_city = None
for city, (lat, lon) in cities_coords.items():
dist = (latitude - lat) ** 2 + (longitude - lon) ** 2
if dist < min_dist:
min_dist = dist
nearest_city = city
if nearest_city:
return get_weather(nearest_city)
return json.dumps({"error": "无法定位到已知城市"},
ensure_ascii=False)
weather_by_coords_config = {
"name": "get_weather_by_coordinates",
"description": "通过经纬度查询天气",
"input_schema": {
"type": "object",
"properties": {
"latitude": {
"type": "number",
"description": "纬度坐标"
},
"longitude": {
"type": "number",
"description": "经度坐标"
}
},
"required": ["latitude", "longitude"]
},
"handler": get_weather_by_coordinates
}
5.2 启动服务器(run_server.py)
# run_server.py - 启动 MCP Server
import sys
import os
# 将项目根目录加入路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from server import MCPServer
from transport import StdioTransport, SimpleHttpTransport
from examples.weather_tool import (
weather_tool_config, weather_by_coords_config
)
def main():
# 创建服务器
transport_mode = sys.argv[1] if len(sys.argv) > 1 else "stdio"
if transport_mode == "http":
port = int(sys.argv[2]) if len(sys.argv) > 2 else 8080
transport = SimpleHttpTransport(port=port)
server = MCPServer(transport=transport)
transport.start()
else:
server = MCPServer()
# 注册天气查询工具
server.register_tool(
name=weather_tool_config["name"],
description=weather_tool_config["description"],
input_schema=weather_tool_config["input_schema"],
handler=weather_tool_config["handler"]
)
# 注册坐标查询工具
server.register_tool(
name=weather_by_coords_config["name"],
description=weather_by_coords_config["description"],
input_schema=weather_by_coords_config["input_schema"],
handler=weather_by_coords_config["handler"]
)
# 注册一个计算器工具(另一个示例)
def calculator(expression: str) -> str:
"""安全执行数学表达式"""
import ast
allowed_ops = {
ast.Add: "+", ast.Sub: "-",
ast.Mult: "*", ast.Div: "/",
ast.Pow: "**", ast.USub: "-",
ast.BinOp: "op", ast.UnaryOp: "op"
}
try:
tree = ast.parse(expression.strip(), mode="eval")
# 安全检查:只允许数字和基础运算符
for node in ast.walk(tree):
if isinstance(node, ast.Expression):
continue
if isinstance(node, ast.Constant):
if not isinstance(node.value, (int, float)):
raise ValueError("仅支持数字运算")
elif type(node) not in [
ast.Constant, ast.BinOp, ast.UnaryOp,
ast.Add, ast.Sub, ast.Mult, ast.Div,
ast.Pow, ast.USub, ast.Mod
]:
raise ValueError(f"不允许的操作: {type(node)}")
result = eval(compile(tree, "", "eval"),
{"__builtins__": {}}, {})
return str(result)
except Exception as e:
return f"计算错误: {str(e)}"
server.register_tool(
name="calculator",
description="执行数学计算,支持 + - * / ** %% 等运算符",
input_schema={
"type": "object",
"properties": {
"expression": {
"type": "string",
"description": "数学表达式,例如: (3+5)*2"
}
},
"required": ["expression"]
},
handler=calculator
)
# 注册一个系统时钟资源
import datetime
def get_system_time():
return datetime.datetime.now().strftime(
"%Y-%m-%d %H:%M:%S"
)
server.register_resource(
uri="system://time",
name="System Time",
description="当前系统时间",
mime_type="text/plain",
reader=get_system_time
)
print(
f"MCP Server 已启动 (transport: {transport_mode})",
file=sys.stderr
)
print(f"已注册工具: ", file=sys.stderr)
for tool in server.tool_manager.list_tools():
print(f" - {tool['name']}: {tool['description']}",
file=sys.stderr)
print(f"已注册资源: ", file=sys.stderr)
for res in server.resource_manager.list_resources():
print(f" - {res['uri']}: {res['description']}",
file=sys.stderr)
server.run()
if __name__ == "__main__":
main()
5.3 客户端演示(client_demo.py)
# client_demo.py - MCP Client 演示
import json
import sys
import subprocess
import time
from typing import Any, Dict, List, Optional
class MCPClient:
"""简化的 MCP Client,通过 stdio 与 Server 通信"""
def __init__(self, server_command: List[str]):
self.process = subprocess.Popen(
server_command,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1
)
self.request_id = 0
self._initialized = False
def _send_request(self, method: str,
params: Dict = None) -> Dict:
"""发送请求并等待响应"""
self.request_id += 1
request = {
"jsonrpc": "2.0",
"method": method,
"params": params or {},
"id": self.request_id
}
request_str = json.dumps(request)
print(f"[发送] {method} (id={self.request_id})",
file=sys.stderr)
self.process.stdin.write(request_str + "\n")
self.process.stdin.flush()
# 读取响应
response_line = self.process.stdout.readline()
if not response_line:
raise ConnectionError("服务器连接已断开")
response = json.loads(response_line.strip())
print(f"[接收] 状态: {'成功' if 'result' in response else '失败'}",
file=sys.stderr)
return response
def initialize(self):
"""完成初始化握手"""
# 发送 initialize 请求
resp = self._send_request("initialize", {
"protocolVersion": "2025-03-26",
"capabilities": {},
"clientInfo": {
"name": "mcp-client-demo",
"version": "1.0.0"
}
})
if "error" in resp:
raise RuntimeError(
f"初始化失败: {resp['error']['message']}"
)
print(
f"[初始化] Server: "
f"{resp['result']['serverInfo']['name']} "
f"v{resp['result']['serverInfo']['version']}",
file=sys.stderr
)
print(
f"[初始化] 能力: "
f"{list(resp['result']['capabilities'].keys())}",
file=sys.stderr
)
self._initialized = True
return resp["result"]
def list_tools(self) -> List[Dict]:
"""列出所有工具"""
resp = self._send_request("tools/list")
if "error" in resp:
raise RuntimeError(
f"列出工具失败: {resp['error']['message']}"
)
return resp["result"]["tools"]
def call_tool(self, name: str, arguments: Dict) -> Any:
"""调用工具"""
resp = self._send_request("tools/call", {
"name": name,
"arguments": arguments
})
if "error" in resp:
raise RuntimeError(
f"调用工具失败: {resp['error']['message']}"
)
return resp["result"]
def list_resources(self) -> List[Dict]:
"""列出所有资源"""
resp = self._send_request("resources/list")
if "error" in resp:
raise RuntimeError(
f"列出资源失败: {resp['error']['message']}"
)
return resp["result"]["resources"]
def read_resource(self, uri: str) -> Any:
"""读取资源"""
resp = self._send_request("resources/read", {"uri": uri})
if "error" in resp:
raise RuntimeError(
f"读取资源失败: {resp['error']['message']}"
)
return resp["result"]
def close(self):
"""关闭连接"""
if self.process:
self.process.terminate()
self.process.wait(timeout=3)
def demo_session():
"""演示交互"""
client = MCPClient(["python3", "run_server.py"])
try:
# 第一步:初始化
print("\n=== MCP Server 初始化 ===", file=sys.stderr)
server_info = client.initialize()
print(f"协议版本: {server_info['protocolVersion']}",
file=sys.stderr)
# 第二步:列出工具
print("\n=== 列出已注册工具 ===", file=sys.stderr)
tools = client.list_tools()
for t in tools:
print(f" 📌 {t['name']}: {t['description']}",
file=sys.stderr)
props = t.get("inputSchema", {}).get("properties", {})
for pname, pdef in props.items():
print(f" 参数 '{pname}': {pdef.get('description', '')}",
file=sys.stderr)
# 第三步:调用工具
print("\n=== 调用工具 ===", file=sys.stderr)
# 查询北京天气
print("\n--- 查询北京天气 ---", file=sys.stderr)
result = client.call_tool("get_weather", {"city": "北京"})
content = result["content"][0]["text"]
weather_data = json.loads(content)
print(f"🌤 {weather_data['city']}天气:", file=sys.stderr)
print(f" 温度: {weather_data['temperature']}",
file=sys.stderr)
print(f" 湿度: {weather_data['humidity']}",
file=sys.stderr)
print(f" 风力: {weather_data['wind']}",
file=sys.stderr)
print(f" 状况: {weather_data['condition']}",
file=sys.stderr)
print(f" 空气质量: AQI {weather_data['aqi']}",
file=sys.stderr)
print(f" 建议: {weather_data['advice']}",
file=sys.stderr)
# 计算功能
print("\n--- 计算器 ---", file=sys.stderr)
expressions = ["(3+5)*2", "2**10", "100/3"]
for expr in expressions:
result = client.call_tool("calculator",
{"expression": expr})
calc_result = result["content"][0]["text"]
print(f" {expr} = {calc_result}", file=sys.stderr)
# 第四步:列出资源
print("\n=== 列出资源 ===", file=sys.stderr)
resources = client.list_resources()
for r in resources:
print(f" 📄 {r['uri']}: {r['description']}",
file=sys.stderr)
# 第五步:读取资源
print("\n=== 读取系统时间 ===", file=sys.stderr)
time_result = client.read_resource("system://time")
print(f" 🕐 {time_result['contents'][0]['text']}",
file=sys.stderr)
print("\n✅ 所有演示完成!", file=sys.stderr)
finally:
client.close()
if __name__ == "__main__":
demo_session()
5.4 运行验证
# 运行客户端 demo(自动启动 Server 子进程)
python3 client_demo.py
# 或者以 HTTP 模式启动 Server
python3 run_server.py http 8080
六、深入探讨实现细节
6.1 JSON-RPC 消息帧
在我们手写的 stdio 传输中,消息以换行符分隔。MCP 标准实际上允许 Content-Length 头来支持跨行 JSON 消息:
Content-Length: 123
{"jsonrpc":"2.0","id":1,"method":"tools/list","params":{}}
在生产环境中,应当实现这种帧格式。这里的关键问题是:如果 JSON 长度超过一行怎么办?标准做法是:
- 读取一行获取
Content-Length值 - 跳过空行(headers 与 body 之间的分隔)
- 读取
Content-Length指定字节数的 body
6.2 参数校验的扩展
我们当前的参数校验实现了基础的 JSON Schema 校验。实际生产环境中,可以引入 jsonschema 库进行完整校验,支持:
- 嵌套对象校验
- 枚举值校验
- 正则表达式格式校验
- 最小/最大值约束
6.3 错误处理策略
MCP Server 的错误处理分为三个层次:
- 协议层错误:JSON 解析失败、请求格式错误等
- 业务层错误:工具不存在、参数无效、资源不存在等
- 执行层错误:工具函数内部抛出异常
对于业务层错误,错误码使用 -32000 ~ -32099 范围的 MCP 自定义码;对于执行层错误,isError 字段在工具调用响应中为 true。
# 扩展的工具错误处理
"isError": True,
"content": [
{
"type": "text",
"text": "工具执行遇到错误: [错误描述]"
}
]
6.4 并发安全
当前实现是单线程顺序处理。如果工具执行耗时较长(如网络请求),会阻塞后续请求。优化方案:
- 异步事件循环:使用
asyncio和aiofiles实现异步传输层 - 请求队列:多线程处理,每个请求在独立线程中执行
- 超时机制:为每个工具调用设置超时,防止死锁
# 超时包装器示例
import concurrent.futures
import functools
def with_timeout(timeout: int = 30):
"""工具超时装饰器"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
with concurrent.futures.ThreadPoolExecutor() as pool:
future = pool.submit(func, *args, **kwargs)
try:
return future.result(timeout=timeout)
except concurrent.futures.TimeoutError:
raise TimeoutError(
f"工具执行超时 ({timeout}s)"
)
return wrapper
return decorator
# 使用
@with_timeout(timeout=10)
def get_weather(city: str):
# 网络请求等...
pass
6.5 日志与调试支持
MCP 协议支持 $/log 通知,Server 可以通过发送日志通知帮助客户端调试:
# 日志通知构建
def build_log_notification(level: str, message: str, data=None):
notification = {
"jsonrpc": "2.0",
"method": "$/log",
"params": {
"level": level, # debug, info, warning, error
"message": message,
}
}
if data:
notification["params"]["data"] = data
return json.dumps(notification)
七、安全与生产化建议
7.1 输入校验
这是 MCP Server 最重要的安全防线:
# 输入消毒处理
import re
def sanitize_tool_name(name: str) -> bool:
"""验证工具名称是否合法"""
return bool(re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', name))
def validate_uri(uri: str) -> bool:
"""验证资源URI"""
allowed_schemes = {"file", "system", "db", "api"}
scheme = uri.split("://")[0] if "://" in uri else ""
return scheme in allowed_schemes
7.2 工具权限控制
对于敏感操作,应当实现细粒度的权限控制:
class PermissionManager:
"""权限管理器"""
def __init__(self):
self._tool_permissions = {} # tool_name -> set of allowed roles
self._resource_permissions = {} # uri_pattern -> set of allowed roles
def require_tool_role(self, tool_name: str, *roles: str):
self._tool_permissions[tool_name] = set(roles)
def check_tool_access(self, tool_name: str, role: str) -> bool:
if tool_name not in self._tool_permissions:
return True # 默认允许
return role in self._tool_permissions[tool_name]
7.3 速率限制
防止恶意调用消耗资源:
import time
from collections import defaultdict
class RateLimiter:
"""速率限制器"""
def __init__(self, max_calls: int = 60, window: int = 60):
self.max_calls = max_calls # 窗口内最大调用次数
self.window = window # 时间窗口(秒)
self._calls: Dict[str, list] = defaultdict(list)
def allow(self, client_id: str) -> bool:
now = time.time()
# 清理过期记录
self._calls[client_id] = [
t for t in self._calls[client_id]
if now - t < self.window
]
# 检查是否超限
if len(self._calls[client_id]) >= self.max_calls:
return False
self._calls[client_id].append(now)
return True
7.4 与 LLM 集成
MCP Server 的真正价值在于与 AI 模型集成。以 OpenAI/DeepSeek API 为例:
def run_mcp_agent(user_query: str):
"""使用 MCP Server 作为工具提供者运行 Agent"""
import openai
# 1. 启动 MCP Server 并获取工具列表
client = MCPClient(["python3", "run_server.py"])
client.initialize()
tools = client.list_tools()
# 2. 将 MCP 工具定义转换为 OpenAI 工具格式
openai_tools = []
for tool in tools:
openai_tools.append({
"type": "function",
"function": {
"name": tool["name"],
"description": tool["description"],
"parameters": tool["inputSchema"]
}
})
# 3. 调用 LLM
messages = [{"role": "user", "content": user_query}]
response = openai.chat.completions.create(
model="deepseek-chat", # 或 "gpt-4o"
messages=messages,
tools=openai_tools,
tool_choice="auto"
)
# 4. 处理工具调用
choice = response.choices[0]
if choice.finish_reason == "tool_calls":
for tc in choice.message.tool_calls:
args = json.loads(tc.function.arguments)
result = client.call_tool(
tc.function.name, args
)
print(f"工具 '{tc.function.name}' 返回: {result}")
messages.append({
"role": "tool",
"tool_call_id": tc.id,
"content": str(result)
})
# 将结果送回 LLM 继续处理
final_response = openai.chat.completions.create(
model="deepseek-chat",
messages=messages
)
return final_response.choices[0].message.content
return choice.message.content
八、总结与拓展
8.1 实现回顾
本文从零实现了一个完整的 MCP Server,涵盖:
- ✅ JSON-RPC 2.0 协议实现(protocol.py)
- ✅ 工具注册与管理(tool_manager.py)
- ✅ 资源注册与管理(resource_manager.py)
- ✅ 双传输层支持——stdio 和 HTTP(transport.py)
- ✅ 生命周期管理——初始化、运行、关闭
- ✅ 天气查询和计算器工具示例
- ✅ 完整的 Client-Server 通信演示
整站代码量约 600 行,纯 Python 标准库实现,零外部依赖。
8.2 可扩展方向
- 工具分组:按领域划分工具集,支持懒加载
- 动态工具发现:支持热加载新工具,不用重启 Server
- API Gateway 集成:将 MCP Server 注册到统一网关,实现工具共享
- 流式输出:工具长时间运行时支持流式返回部分结果
- 工具组合:定义 worklow,让多个工具协同完成复杂任务
- 持久化状态:保存工具调用历史,支持回放和审计
8.3 MCP 的生态现状
截至 2026 年中,MCP 生态已经相当丰富:
- 官方 SDK:Python、TypeScript、Java、Kotlin 四种语言官方支持,覆盖了主流的后端开发语言栈
- 预制 Server:文件系统、Git、Slack、GitHub、PostgreSQL、SQLite、Redis、Docker 等数十种开箱即用的服务端实现,涵盖了开发者在日常工作中最常接触的基础设施
- 集成框架:LangChain、LlamaIndex、Dify、AutoGPT 等主流 AI 应用框架均已原生支持 MCP,这意味着你手写的 MCP Server 可以无缝接入这些生态
- 主流模型:Claude(Anthropic 是 MCP 的发起方)、GPT-4o(OpenAI 于 2025 年 Q2 宣布支持)、DeepSeek V4(通过兼容层实现)、Qwen、GLM 等国内外主流模型均已支持 MCP 协议
了解这些背景后,本文的实践会更有价值——你手写的 MCP Server 可以无缝接入上述任一生态系统。这就像当年 HTTP 协议统一了 Web 通信标准一样——一旦你理解了 MCP 的核心机制,就掌握了一种通用的 AI 工具集成能力。
8.4 常见问题与调试技巧
在实际开发 MCP Server 时,可能会遇到一些典型问题:
Q1:Client 连接后工具列表为空?
最常见的原因是工具注册发生在 Server 启动之后。确保在调用 server.run() 之前完成所有工具注册。我们代码中用链式调用模式实现了这一点,但如果使用了异步初始化,要注意时序问题。
Q2:JSON-RPC 请求无响应?
检查消息帧格式是否正确。如果是 stdio 模式,每条消息末尾必须有换行符。如果是 HTTP 模式,检查 POST 回调地址是否正确。可以在 Server 的 stderr 输出中查看收到的原始消息。
Q3:工具调用出现参数校验错误?
JSON Schema 的 properties 中声明的字段名必须与 handler 函数的参数名一致。例如工具定义中声明了参数 city,handler 函数也必须接收 city 参数——大小写、下划线都必须完全匹配。
Q4:怎样调试传输层问题?
建议在开发阶段开启详细日志:
# 在 server.py 中添加日志
import logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s [%(levelname)s] %(message)s'
)
logger = logging.getLogger("MCP")
📚 延伸阅读
如果你对 AI Agent 的系统设计感兴趣,推荐阅读我的另一篇文章:
👉 DeepSeek 实战指南:提示词工程、API 集成与效率提升全攻略
这篇文章系统地拆解了 DeepSeek 的提示词工程技巧、API 封装方法以及日常效率提升场景,全文代码可直接运行,适合已经上手 DeepSeek 但希望更高效使用的开发者。
本文是"手写 AI 系统"系列文章之一。该系列从零实现 AI 系统中的关键组件,涵盖 RAG、Agent、Function Calling、MCP 等核心技术,帮助你深入理解底层原理,构建属于自己的 AI 工具。
更多推荐

所有评论(0)