一、引言

1.1 从 Function Calling 到 MCP

2024 年底,Anthropic 发布了 Model Context Protocol (MCP),迅速成为 AI 工具集成领域最受关注的技术标准。在此之前,AI Agent 与外部工具的交互主要依赖各模型厂商私有的 Function Calling 实现——OpenAI 有 function calling,Google 有 function_declarations,国产模型各自定义了自己的工具调用格式。这种碎片化导致开发者每接入一个模型就要重写一套工具调用逻辑。

MCP 的核心理念很简单:为 AI 应用提供一个标准化的"外设接口"。就像 USB-C 统一了外设连接标准一样,MCP 统一了 AI 模型与外部工具、数据源之间的通信协议。

那么,MCP 到底是什么?

简而言之,MCP 定义了三种核心能力:

  • Tools(工具):AI 可以调用的函数/API,有输入参数和输出结果
  • Resources(资源):AI 可以读取的外部数据(文件、数据库、API 响应等)
  • Prompts(提示模板):预定义的提示模板,用于特定场景的交互

MCP 采用 Client-Server 架构:一个称为 MCP Client 的中间层负责与 AI 模型对话,同时通过 JSON-RPC 2.0 协议与后端的 MCP Server 通信,获取工具定义、执行工具调用、读写资源等。

1.2 为什么选择手写 MCP Server?

市面已有官方 SDK(Python/TypeScript),但我们选择手写的理由很充分:

  1. 深入理解协议本质:SDK 封装了太多细节,手写能让我们真正理解 JSON-RPC 交互流程
  2. 轻量级验证:生产环境可能需要定制化实现,手写框架可灵活裁剪
  3. 无框架依赖:零第三方依赖,核心逻辑仅几百行代码,便于嵌入式/边缘设备部署
  4. "手写系列"传统:从零构建,吃透每一个字节

本文将带领读者从零实现一个功能完整的 MCP Server,支持工具注册、资源读取、生命周期管理等核心能力,并用一个天气查询 Agent 示例进行端到端验证。全文章节安排如下:

  • MCP 协议核心概念与 JSON-RPC 2.0 基础
  • MCP Server 架构设计
  • 从零开始手写 MCP Server(完整代码实现)
  • 编写 MCP Client 进行对接
  • 实战:构建天气查询 Agent
  • 深入:Transport 层实现
  • 性能优化与安全考量

二、MCP 协议的核心设计思路

在深入代码之前,我们先深刻理解 MCP 的设计哲学。这不仅是协议本身的知识,更是理解 AI Agent 系统架构的关键节点。

2.1 MCP 的诞生背景——为什么需要一套新协议?

在 MCP 出现之前,让 AI 模型调用外部工具一般有三种做法,各有明显短板:

做法一:直接写 Prompt。 在系统提示词里用文本描述工具函数的名称、参数含义、返回值格式,然后让模型自己生成一段符合要求的 JSON 字符串来模拟函数调用。这种方法在 2023 年上半年比较常见,因为当时还没有标准化的 Function Calling 接口。它的缺点极其明显——输出格式非常不稳定,模型偶尔会漏掉必填字段、把嵌套结构写成一维 JSON、或者 XML 和 JSON 格式混用。稍微复杂一点的参数结构(比如多级嵌套对象、数组类型的参数)就很容易格式错乱。而且每次添加新工具都要手动更新 Prompt,维护成本极高。

做法二:使用各模型厂商原生的 Function Calling。 2023 年 6 月 OpenAI 率先推出 function calling,随后 Google、Anthropic、DeepSeek、零一万物、阿里通义、百度文心等纷纷跟进。这种方式比手写 Prompt 好得多——API 层面提供了工具定义的结构化输入和模型输出自动解析。但问题是各家的格式完全不兼容:OpenAI 的 tool_choice 参数用 auto/none/force 来控制调用策略,Anthropic 的 tool_use 用 {type: "tool_use", name: ..., input: ...} 的独立 block 格式,Google 的 function_declarations 又换了一套 schema 定义方式。如果想让同一个 Agent 同时支持多个模型家族,就需要为每个模型维护一套独立版本的工具定义。

做法三:第三方框架的工具抽象层。 LangChain 的 Tool 类、LlamaIndex 的 QueryTool 等提供了统一的中层抽象,把不同模型的差异封装在框架内部。这是一大进步,但也引入了耦合问题——要么整个应用依赖某个特定框架,要么自己写适配层;而且框架对这些底层协议进行了多层封装,出了问题很难一层层排查到根因。

MCP 要解决的问题非常明确:定义 AI 模型与外部世界交互的通用通信协议,不绑定任何特定模型厂商、不依赖任何特定框架。像 HTTP 统一了 Web 通信一样,MCP 试图统一 AI 工具调用的通信标准。

2.2 MCP 的三层架构

MCP 协议的架构设计可以拆解为三个层次:

  1. 传输层(Transport Layer):消息的物理传输方式。MCP 定义了 stdio(本地子进程通信)和 SSE(远程 HTTP 通信)两种传输方式。本层只负责收发字符串形式的消息,不关心消息内容的含义。
  2. 协议层(Protocol Layer):消息的格式和序列化方式。基于 JSON-RPC 2.0 标准,定义了请求、响应、错误三种消息类型,以及消息 ID 机制(用于匹配请求和响应)、通知机制(无需响应的消息)。
  3. 应用层(Application Layer):具体的交互方法定义。包括生命周期方法(initialize、shutdown)、工具方法(tools/list、tools/call)、资源方法(resources/list、resources/read)以及提示方法(prompts/list、prompts/get)。

这种层次化设计与计算机网络中的 OSI 七层模型如出一辙——每一层只关注自己范围内的职责,上层只管调用下层提供的接口,不需要关心底层实现细节。

2.3 JSON-RPC 2.0 协议——MCP 的骨架

MCP 的通信层基于 JSON-RPC 2.0。我们先理解这个基础协议。

JSON-RPC 2.0 是一种轻量级远程过程调用协议,每个请求包含三个核心字段:

{
  "jsonrpc": "2.0",
  "id": 1,
  "method": "tools/list",
  "params": {}
}

响应格式:

{
  "jsonrpc": "2.0",
  "id": 1,
  "result": {
    "tools": [...]
  }
}

错误格式:

{
  "jsonrpc": "2.0",
  "id": 1,
  "error": {
    "code": -32601,
    "message": "Method not found"
  }
}

MCP 使用请求-响应模式处理同步调用(如列出工具、调用工具),使用通知模式处理异步事件(如初始化完成通知、日志输出)。

2.2 MCP 核心方法

MCP 协议定义了一组标准方法,分为三大类:

生命周期方法:

  • initialize — 客户端发送初始化请求,包含协议版本和能力声明
  • initialized — 初始化完成通知(通知,无响应)
  • shutdown — 关闭连接

工具相关方法:

  • tools/list — 列出服务器提供的所有工具
  • tools/call — 调用指定工具并传入参数

资源相关方法:

  • resources/list — 列出所有资源
  • resources/read — 读取指定资源内容
  • resources/subscribe — 订阅资源变更
  • resources/unsubscribe — 取消订阅

提示相关方法:

  • prompts/list — 列出提示模板
  • prompts/get — 获取特定提示模板

2.3 MCP 传输层(Transport)

MCP 支持两种传输方式:

  1. stdio 传输:通过标准输入输出流通信,适合本地子进程模式
  2. SSE (Server-Sent Events) 传输:基于 HTTP,支持远程通信,包括 SSE 流和 POST 请求回调

本篇我们重点实现 stdio 传输和简化版 HTTP 传输。

三、MCP Server 架构设计

让我们先勾勒出 Server 的整体架构。

3.1 模块划分

mcp-server/
├── server.py          # 主服务器,处理请求路由
├── protocol.py        # JSON-RPC 2.0 协议实现
├── transport.py       # 传输层抽象(stdio / HTTP)
├── tool_manager.py    # 工具注册与管理
├── resource_manager.py # 资源管理
├── examples/
│   ├── weather_tool.py  # 天气查询工具示例
│   └── calculator.py    # 计算器工具示例
└── client_demo.py     # 测试用 MCP Client

3.2 核心数据结构

MCP Server 的核心是一个 MethodRegistry,它维护了工具列表和请求路由表:

# server.py 的核心设计
class MCPServer:
    def __init__(self):
        self.tools = {}           # name -> ToolDefinition
        self.resources = {}       # uri -> ResourceDefinition
        self.session_id = None    # 连接会话ID
        self.client_capabilities = {}  # 客户端能力声明
        self.protocol_version = "2025-03-26"  # 当前MCP协议版本

每个工具的定义结构如下:

class ToolDefinition:
    name: str           # 工具名称
    description: str    # 工具描述
    input_schema: dict  # JSON Schema 格式的输入参数定义
    handler: callable   # 实际执行的函数

3.3 请求处理流程

MCP Server 的请求处理流程非常清晰:

  1. 初始化:客户端发送 initialize → Server 返回协议版本和能力声明 → 客户端发送 initialized 通知
  2. 工具发现:客户端定期或按需调用 tools/list → Server 返回所有已注册工具
  3. 工具调用:客户端发送 tools/call → Server 查找工具、校验参数、执行函数 → 返回结果
  4. 资源读取:客户端发送 resources/read → Server 查找资源、读取内容 → 返回结果
  5. 关闭:任意一方发送关闭信号

四、从零开始手写 MCP Server

### 4.1 为什么选择 Python 作为实现语言?

选择 Python 有几个现实的考量:

  1. AI 生态的语言:无论是 LLaMA Factory、Transformers、LangChain 还是 Dify,Python 是 AI 领域事实上的标准语言。MCP 的 Python SDK 也是官方主推的。
  2. 零开销的原型验证:Python 标准库自带 json、urllib、http.server 等模块,不需要安装任何第三方依赖即可完成整个协议的实现。
  3. 易于理解:相比 TypeScript 的异步回调或 Rust 的所有权系统,Python 的代码最为直观,适合作为学习参考实现。

当然,生产环境的 MCP Server 用 TypeScript/Go/Rust 实现也是常见选择——TypeScript 因为与前端生态的天然亲和力,在 Dify 等应用框架的插件系统中被广泛使用;Go 适合构建高性能网关层;Rust 则用于对安全性要求极高的场景。不过核心协议逻辑是一致的,理解了本文的实现,切换到其他语言只是语法上的翻译工作。

4.2 协议层实现(protocol.py)

首先,我们实现 JSON-RPC 2.0 协议的消息解析和构建。这部分是所有 MCP 通信的基础,相当于 HTTP 协议中 TCP/IP 层之上那层简单的序列化协议:

# protocol.py - JSON-RPC 2.0 协议实现
import json
import uuid
from typing import Any, Dict, Optional, Union

# 标准 JSON-RPC 错误码
PARSE_ERROR = -32700
INVALID_REQUEST = -32600
METHOD_NOT_FOUND = -32601
INVALID_PARAMS = -32602
INTERNAL_ERROR = -32603

# MCP 特定错误码(-32000 ~ -32099 范围)
TOOL_NOT_FOUND = -32000
TOOL_EXECUTION_ERROR = -32001
RESOURCE_NOT_FOUND = -32002
RESOURCE_READ_ERROR = -32003


class JSONRPCRequest:
    """JSON-RPC 2.0 请求对象"""
    def __init__(self, method: str, params: Optional[Dict] = None,
                 request_id: Optional[Union[int, str]] = None):
        self.jsonrpc = "2.0"
        self.method = method
        self.params = params or {}
        self.id = request_id if request_id is not None else str(uuid.uuid4())

    def to_dict(self) -> Dict:
        d = {
            "jsonrpc": "2.0",
            "method": self.method,
            "params": self.params,
            "id": self.id
        }
        return d

    def to_json(self) -> str:
        return json.dumps(self.to_dict())


class JSONRPCResponse:
    """JSON-RPC 2.0 响应对象"""
    def __init__(self, request_id: Optional[Union[int, str]] = None):
        self.jsonrpc = "2.0"
        self.id = request_id
        self.result: Optional[Any] = None
        self.error: Optional[Dict] = None

    def set_result(self, result: Any):
        self.result = result
        self.error = None

    def set_error(self, code: int, message: str, data: Any = None):
        self.error = {"code": code, "message": message}
        if data is not None:
            self.error["data"] = data
        self.result = None

    def to_dict(self) -> Dict:
        d = {"jsonrpc": "2.0", "id": self.id}
        if self.error:
            d["error"] = self.error
        elif self.result is not None:
            d["result"] = self.result
        else:
            d["result"] = None
        return d

    def to_json(self) -> str:
        return json.dumps(self.to_dict())

    @classmethod
    def error_response(cls, request_id, code: int, message: str,
                       data: Any = None) -> "JSONRPCResponse":
        resp = cls(request_id)
        resp.set_error(code, message, data)
        return resp

    @classmethod
    def success_response(cls, request_id, result: Any) -> "JSONRPCResponse":
        resp = cls(request_id)
        resp.set_result(result)
        return resp


def parse_message(raw: str) -> Optional[JSONRPCRequest]:
    """解析JSON-RPC消息。通知类消息(无id)仍然返回请求对象,id为None"""
    try:
        data = json.loads(raw)
    except json.JSONDecodeError as e:
        return None

    if not isinstance(data, dict) or data.get("jsonrpc") != "2.0":
        return None

    method = data.get("method")
    if not method:
        return None

    req = JSONRPCRequest(
        method=method,
        params=data.get("params", {}),
        request_id=data.get("id")
    )
    return req


def is_notification(req: JSONRPCRequest) -> bool:
    """判断是否为通知(无需响应)"""
    return req.id is None

4.3 工具管理器——理解 MCP 的核心抽象(tool_manager.py)

如果说协议层是 MCP 的神经系统,那工具管理器就是 MCP 的心脏。所有 AI Agent 能够调用的能力都通过它来注册和调遣。

工具管理器的设计有几个关键考量:

第一,工具定义与实现分离。工具描述(name、description、inputSchema)是给 AI 模型看的——模型根据这些信息决定何时调用哪个工具以及传入什么参数。而 handler 是实际执行的函数——它接收模型填入的参数,执行真正的业务逻辑。这种分离使得我们可以交换不同模型(甚至同一模型的不同版本)而对工具实现完全无影响。

第二,JSON Schema 作为契约。我们不是让工具自己声明参数格式,而是使用标准的 JSON Schema 来描述。这样做的好处是:任何兼容 MCP 的客户端(包括 AI SDK、Agent 框架等)都能自动理解工具的参数结构,不需要为每个工具定制解析代码。

第三,链式注册 APIregister_tool 返回 self 使得可以连续注册多个工具,代码更简洁:

server.register_tool(...).register_tool(...)

下面看看参数的 JSON Schema 校验实现。虽然本实现只做了基础的类型检查,但在生产环境中,你会希望校验更细粒度——比如字段是否在枚举值中、字符串是否符合正则模式、数字是否在指定范围内。完整的 jsonschema 库支持这些,但在我们的场景中,手写基础校验更有助于理解原理。

# tool_manager.py - 工具注册与管理
from typing import Any, Callable, Dict, List, Optional
import json


class ToolDefinition:
    """工具定义"""
    def __init__(
        self,
        name: str,
        description: str,
        input_schema: Dict,
        handler: Callable[..., Any]
    ):
        self.name = name
        self.description = description
        self.input_schema = input_schema  # JSON Schema
        self.handler = handler

    def to_dict(self) -> Dict:
        return {
            "name": self.name,
            "description": self.description,
            "inputSchema": self.input_schema
        }

    def execute(self, arguments: Dict) -> Any:
        """执行工具,返回结果"""
        # 参数校验(精简版)
        self._validate_args(arguments)
        return self.handler(**arguments)

    def _validate_args(self, arguments: Dict):
        """JSON Schema 基础校验"""
        schema = self.input_schema
        if "required" not in schema:
            return
        for field in schema["required"]:
            if field not in arguments:
                raise ValueError(f"缺少必填参数: {field}")
        # 类型校验
        if "properties" in schema:
            for field, value in arguments.items():
                if field in schema["properties"]:
                    field_schema = schema["properties"][field]
                    if "type" in field_schema:
                        expected = field_schema["type"]
                        if expected == "string" and not isinstance(value, str):
                            raise TypeError(
                                f"参数 '{field}' 应为 {expected},"
                                f"实际为 {type(value).__name__}"
                            )
                        elif expected == "number" and not isinstance(
                            value, (int, float)
                        ):
                            raise TypeError(
                                f"参数 '{field}' 应为 {expected},"
                                f"实际为 {type(value).__name__}"
                            )
                        elif expected == "integer" and not isinstance(
                            value, int
                        ):
                            raise TypeError(
                                f"参数 '{field}' 应为 {expected},"
                                f"实际为 {type(value).__name__}"
                            )
                        elif expected == "boolean" and not isinstance(
                            value, bool
                        ):
                            raise TypeError(
                                f"参数 '{field}' 应为 {expected},"
                                f"实际为 {type(value).__name__}"
                            )
                        elif expected == "array" and not isinstance(
                            value, list
                        ):
                            raise TypeError(
                                f"参数 '{field}' 应为 {expected},"
                                f"实际为 {type(value).__name__}"
                            )


class ToolManager:
    """工具管理器"""
    def __init__(self):
        self._tools: Dict[str, ToolDefinition] = {}

    def register_tool(
        self,
        name: str,
        description: str,
        input_schema: Dict,
        handler: Callable
    ) -> "ToolManager":
        """注册一个工具"""
        tool = ToolDefinition(name, description, input_schema, handler)
        self._tools[name] = tool
        return self  # 支持链式调用

    def unregister_tool(self, name: str) -> bool:
        """注销工具"""
        if name in self._tools:
            del self._tools[name]
            return True
        return False

    def list_tools(self) -> List[Dict]:
        """列出所有工具定义"""
        return [tool.to_dict() for tool in self._tools.values()]

    def get_tool(self, name: str) -> Optional[ToolDefinition]:
        """获取指定工具"""
        return self._tools.get(name)

    def call_tool(self, name: str, arguments: Dict) -> Any:
        """调用工具"""
        tool = self.get_tool(name)
        if tool is None:
            raise ValueError(f"工具 '{name}' 未找到")
        return tool.execute(arguments)

4.4 资源管理器——让 AI 读取外部数据(resource_manager.py)

工具(Tools)让 AI 可以

# resource_manager.py - 资源管理
from typing import Any, Callable, Dict, List, Optional


class ResourceDefinition:
    """资源定义"""
    def __init__(
        self,
        uri: str,
        name: str,
        description: str,
        mime_type: str = "text/plain",
        reader: Optional[Callable[[], Any]] = None
    ):
        self.uri = uri
        self.name = name
        self.description = description
        self.mime_type = mime_type  # "text/plain", "application/json" 等
        self.reader = reader

    def to_dict(self) -> Dict:
        return {
            "uri": self.uri,
            "name": self.name,
            "description": self.description,
            "mimeType": self.mime_type
        }

    def read(self) -> Any:
        """读取资源内容"""
        if self.reader:
            return self.reader()
        return f"Resource: {self.uri}"


class ResourceManager:
    """资源管理器"""
    def __init__(self):
        self._resources: Dict[str, ResourceDefinition] = {}
        self._subscriptions: Dict[str, List[str]] = {}  # uri -> client_ids

    def register_resource(
        self,
        uri: str,
        name: str,
        description: str,
        mime_type: str = "text/plain",
        reader: Optional[Callable] = None
    ) -> "ResourceManager":
        resource = ResourceDefinition(
            uri, name, description, mime_type, reader
        )
        self._resources[uri] = resource
        return self

    def list_resources(self) -> List[Dict]:
        return [r.to_dict() for r in self._resources.values()]

    def read_resource(self, uri: str) -> Dict:
        resource = self._resources.get(uri)
        if resource is None:
            raise ValueError(f"资源 '{uri}' 未找到")
        content = resource.read()
        return {
            "contents": [
                {
                    "uri": uri,
                    "mimeType": resource.mime_type,
                    "text": str(content)
                }
            ]
        }

    def subscribe(self, uri: str, client_id: str):
        """订阅资源变更"""
        if uri not in self._subscriptions:
            self._subscriptions[uri] = []
        self._subscriptions[uri].append(client_id)

    def unsubscribe(self, uri: str, client_id: str):
        """取消订阅"""
        if uri in self._subscriptions:
            self._subscriptions[uri] = [
                cid for cid in self._subscriptions[uri]
                if cid != client_id
            ]

4.4 传输层抽象(transport.py)

传输层支持 stdio 和 HTTP 两种模式:

# transport.py - 传输层抽象
import json
import sys
from abc import ABC, abstractmethod
from typing import Callable, Optional


class MessageTransport(ABC):
    """消息传输层抽象基类"""

    @abstractmethod
    def send(self, message: str):
        """发送消息"""
        pass

    @abstractmethod
    def receive(self) -> Optional[str]:
        """接收消息(阻塞),返回None表示连接关闭"""
        pass

    @abstractmethod
    def close(self):
        """关闭传输"""
        pass


class StdioTransport(MessageTransport):
    """基于标准输入输出的传输层"""
    def __init__(self):
        self._running = True

    def send(self, message: str):
        """通过 stdout 发送消息(带换行符分割)"""
        sys.stdout.write(message + "\n")
        sys.stdout.flush()

    def receive(self) -> Optional[str]:
        """通过 stdin 读取消息"""
        try:
            line = sys.stdin.readline()
            if not line:
                self._running = False
                return None
            # 处理 Content-Length 头(MCP 标准中的 HTTP 风格消息帧)
            # 我们这里使用简化的换行分隔协议
            line = line.strip()
            if not line:
                return self.receive()  # 跳过空行
            return line
        except EOFError:
            self._running = False
            return None

    def close(self):
        self._running = False


class SimpleHttpTransport(MessageTransport):
    """
    基于 HTTP 的简化传输层。
    使用 SSE (Server-Sent Events) 发送消息到客户端,
    通过 POST 回调接收客户端请求。
    注意:这是一个简化实现,适合学习和测试。
    """
    def __init__(
        self,
        host: str = "127.0.0.1",
        port: int = 8080,
        on_message: Optional[Callable] = None
    ):
        self.host = host
        self.port = port
        self._running = False
        self._message_queue = []
        self.on_message = on_message

    def send(self, message: str):
        """实际由HTTP服务器在SSE连接上发送"""
        self._message_queue.append(message)

    def receive(self) -> Optional[str]:
        """阻塞等待消息"""
        import time
        while self._running:
            if self._message_queue:
                return self._message_queue.pop(0)
            time.sleep(0.01)

        # 非阻塞模式下复用
        if self._message_queue:
            return self._message_queue.pop(0)
        return None

    def close(self):
        self._running = False

    def start(self):
        """启动 HTTP 服务器(简化版,使用内置 http.server)"""
        from http.server import HTTPServer, BaseHTTPRequestHandler
        import threading
        import urllib.parse

        transport = self

        class MCPHTTPHandler(BaseHTTPRequestHandler):
            def do_GET(self):
                """SSE 端点"""
                if self.path == "/sse":
                    self.send_response(200)
                    self.send_header("Content-Type", "text/event-stream")
                    self.send_header("Cache-Control", "no-cache")
                    self.send_header("Connection", "keep-alive")
                    self.send_header("Access-Control-Allow-Origin", "*")
                    self.end_headers()

                    transport._running = True
                    # 发送端点信息
                    self.wfile.write(
                        f"event: endpoint\ndata: /messages\n\n".encode()
                    )
                    self.wfile.flush()

                    # 循环发送消息
                    try:
                        while transport._running:
                            if transport._message_queue:
                                msg = transport._message_queue.pop(0)
                                self.wfile.write(
                                    f"event: message\ndata: {msg}\n\n"
                                    .encode()
                                )
                                self.wfile.flush()
                            import time
                            time.sleep(0.01)
                    except (BrokenPipeError, ConnectionResetError):
                        transport._running = False
                else:
                    self.send_response(404)
                    self.end_headers()

            def do_POST(self):
                """消息接收端点"""
                if self.path == "/messages":
                    content_length = int(
                        self.headers.get("Content-Length", 0)
                    )
                    body = self.rfile.read(content_length).decode()
                    transport._message_queue.append(body)
                    self.send_response(202)
                    self.send_header("Access-Control-Allow-Origin", "*")
                    self.end_headers()
                else:
                    self.send_response(404)
                    self.end_headers()

            def do_OPTIONS(self):
                """CORS 预检请求"""
                self.send_response(200)
                self.send_header("Access-Control-Allow-Origin", "*")
                self.send_header(
                    "Access-Control-Allow-Methods",
                    "GET, POST, OPTIONS"
                )
                self.send_header(
                    "Access-Control-Allow-Headers",
                    "Content-Type"
                )
                self.end_headers()

            def log_message(self, format, *args):
                pass  # 静默日志

        server = HTTPServer((self.host, self.port), MCPHTTPHandler)
        thread = threading.Thread(
            target=server.serve_forever, daemon=True
        )
        thread.start()
        print(
            f"[MCP Server] HTTP 传输已启动: "
            f"http://{self.host}:{self.port}/sse"
        )
        return server

4.5 主服务器(server.py)

现在把所有组件整合到主服务器中:

# server.py - MCP Server 主实现
import json
import signal
import sys
from typing import Any, Dict, Optional

from protocol import (
    JSONRPCRequest, JSONRPCResponse,
    parse_message, is_notification,
    PARSE_ERROR, INVALID_REQUEST, METHOD_NOT_FOUND,
    INTERNAL_ERROR, TOOL_NOT_FOUND, TOOL_EXECUTION_ERROR,
    RESOURCE_NOT_FOUND, RESOURCE_READ_ERROR
)
from tool_manager import ToolManager
from resource_manager import ResourceManager
from transport import MessageTransport, StdioTransport, SimpleHttpTransport


class MCPServer:
    """MCP Server 主类"""

    def __init__(self, transport: Optional[MessageTransport] = None):
        self.tool_manager = ToolManager()
        self.resource_manager = ResourceManager()
        self.transport = transport or StdioTransport()
        self.session_id = None
        self.client_capabilities = {}
        self.server_capabilities = {
            "protocolVersion": "2025-03-26",
            "capabilities": {
                "tools": {
                    "listChanged": True
                },
                "resources": {
                    "subscribe": True,
                    "listChanged": True
                },
                "prompts": {
                    "listChanged": True
                }
            }
        }
        self._running = False
        self._method_handlers = self._build_handlers()

    def _build_handlers(self) -> Dict:
        """构建方法处理路由表"""
        return {
            # 生命周期
            "initialize": self._handle_initialize,
            "shutdown": self._handle_shutdown,
            # 工具
            "tools/list": self._handle_tools_list,
            "tools/call": self._handle_tools_call,
            # 资源
            "resources/list": self._handle_resources_list,
            "resources/read": self._handle_resources_read,
            "resources/subscribe": self._handle_resources_subscribe,
            "resources/unsubscribe": self._handle_resources_unsubscribe,
            # 提示
            "prompts/list": self._handle_prompts_list,
            "prompts/get": self._handle_prompts_get,
        }

    # ========== 请求处理器 ==========

    def _handle_initialize(self, params: Dict,
                           req_id) -> JSONRPCResponse:
        """处理初始化请求"""
        self.session_id = params.get("sessionId")
        self.client_capabilities = params.get("capabilities", {})
        result = {
            "protocolVersion": self.server_capabilities["protocolVersion"],
            "capabilities": self.server_capabilities["capabilities"],
            "serverInfo": {
                "name": "mcp-server-from-scratch",
                "version": "1.0.0"
            }
        }
        return JSONRPCResponse.success_response(req_id, result)

    def _handle_shutdown(self, params: Dict,
                         req_id) -> JSONRPCResponse:
        """处理关闭请求"""
        self._running = False
        return JSONRPCResponse.success_response(req_id, None)

    def _handle_tools_list(self, params: Dict,
                           req_id) -> JSONRPCResponse:
        """列出所有工具"""
        tools = self.tool_manager.list_tools()
        return JSONRPCResponse.success_response(req_id, {"tools": tools})

    def _handle_tools_call(self, params: Dict,
                           req_id) -> JSONRPCResponse:
        """调用工具"""
        name = params.get("name", "")
        arguments = params.get("arguments", {})
        try:
            result = self.tool_manager.call_tool(name, arguments)
            return JSONRPCResponse.success_response(
                req_id, {
                    "content": [
                        {
                            "type": "text",
                            "text": str(result)
                                if not isinstance(result, str)
                                else result
                        }
                    ],
                    "isError": False
                }
            )
        except ValueError as e:
            code = TOOL_NOT_FOUND if "未找到" in str(e) else TOOL_EXECUTION_ERROR
            return JSONRPCResponse.error_response(
                req_id, code, str(e)
            )
        except Exception as e:
            return JSONRPCResponse.error_response(
                req_id, TOOL_EXECUTION_ERROR,
                f"工具执行失败: {str(e)}"
            )

    def _handle_resources_list(self, params: Dict,
                               req_id) -> JSONRPCResponse:
        """列出资源"""
        resources = self.resource_manager.list_resources()
        return JSONRPCResponse.success_response(
            req_id, {"resources": resources}
        )

    def _handle_resources_read(self, params: Dict,
                               req_id) -> JSONRPCResponse:
        """读取资源"""
        uri = params.get("uri", "")
        try:
            result = self.resource_manager.read_resource(uri)
            return JSONRPCResponse.success_response(req_id, result)
        except ValueError:
            return JSONRPCResponse.error_response(
                req_id, RESOURCE_NOT_FOUND, f"资源 '{uri}' 未找到"
            )
        except Exception as e:
            return JSONRPCResponse.error_response(
                req_id, RESOURCE_READ_ERROR,
                f"读取资源失败: {str(e)}"
            )

    def _handle_resources_subscribe(self, params: Dict,
                                    req_id) -> JSONRPCResponse:
        """订阅资源"""
        uri = params.get("uri", "")
        try:
            self.resource_manager.subscribe(
                uri, self.session_id or "unknown"
            )
            return JSONRPCResponse.success_response(req_id, None)
        except Exception as e:
            return JSONRPCResponse.error_response(
                req_id, INTERNAL_ERROR, str(e)
            )

    def _handle_resources_unsubscribe(self, params: Dict,
                                      req_id) -> JSONRPCResponse:
        """取消订阅"""
        uri = params.get("uri", "")
        try:
            self.resource_manager.unsubscribe(
                uri, self.session_id or "unknown"
            )
            return JSONRPCResponse.success_response(req_id, None)
        except Exception as e:
            return JSONRPCResponse.error_response(
                req_id, INTERNAL_ERROR, str(e)
            )

    def _handle_prompts_list(self, params: Dict,
                             req_id) -> JSONRPCResponse:
        """列出提示模板"""
        return JSONRPCResponse.success_response(
            req_id, {"prompts": []}
        )

    def _handle_prompts_get(self, params: Dict,
                            req_id) -> JSONRPCResponse:
        """获取提示模板"""
        name = params.get("name", "")
        return JSONRPCResponse.success_response(
            req_id, {
                "prompt": {
                    "name": name,
                    "description": f"Prompt: {name}",
                    "messages": []
                }
            }
        )

    # ========== 消息处理主循环 ==========

    def handle_message(self, raw: str) -> Optional[str]:
        """处理一条原始消息,返回响应(通知返回None)"""
        request = parse_message(raw)
        if request is None:
            # 解析失败,返回解析错误
            resp = JSONRPCResponse.error_response(
                None, PARSE_ERROR, "无效的JSON-RPC消息"
            )
            return resp.to_json()

        handler = self._method_handlers.get(request.method)
        if handler is None:
            resp = JSONRPCResponse.error_response(
                request.id, METHOD_NOT_FOUND,
                f"未知方法: {request.method}"
            )
            return resp.to_json()

        try:
            response = handler(request.params, request.id)
        except Exception as e:
            response = JSONRPCResponse.error_response(
                request.id, INTERNAL_ERROR, f"内部错误: {str(e)}"
            )

        # 通知不需要响应
        if is_notification(request):
            return None

        return response.to_json()

    def run(self):
        """启动 MCP Server 主循环"""
        self._running = True
        print("[MCP Server] 已启动, 等待消息...", file=sys.stderr)

        try:
            while self._running:
                raw = self.transport.receive()
                if raw is None:
                    break
                response = self.handle_message(raw.strip())
                if response:
                    self.transport.send(response)
        except KeyboardInterrupt:
            print("\n[MCP Server] 收到中断信号", file=sys.stderr)
        finally:
            self.cleanup()

    def cleanup(self):
        """资源清理"""
        self._running = False
        self.transport.close()
        print("[MCP Server] 已关闭", file=sys.stderr)

    # ========== 便捷方法 ==========

    def register_tool(self, name: str, description: str,
                      input_schema: Dict, handler: callable):
        """注册工具"""
        self.tool_manager.register_tool(
            name, description, input_schema, handler
        )

    def register_resource(self, uri: str, name: str,
                          description: str,
                          mime_type: str = "text/plain",
                          reader=None):
        """注册资源"""
        self.resource_manager.register_resource(
            uri, name, description, mime_type, reader
        )

五、实战:构建天气查询 Agent

现在我们用实现的 MCP Server 来构建一个实用的工具——天气查询 Agent。

5.1 天气查询工具(examples/weather_tool.py)

# examples/weather_tool.py - 天气查询工具
import json
import urllib.request
import urllib.parse


# 模拟天气数据(实际使用时应接入真实API)
MOCK_WEATHER_DATA = {
    "北京": {"temp": 28, "humidity": 45, "wind": "3级",
             "condition": "晴", "aqi": 72},
    "上海": {"temp": 31, "humidity": 72, "wind": "2级",
             "condition": "多云", "aqi": 85},
    "广州": {"temp": 33, "humidity": 80, "wind": "3级",
             "condition": "阵雨", "aqi": 62},
    "深圳": {"temp": 32, "humidity": 78, "wind": "3级",
             "condition": "多云", "aqi": 55},
    "成都": {"temp": 27, "humidity": 65, "wind": "1级",
             "condition": "阴", "aqi": 90},
    "杭州": {"temp": 30, "humidity": 70, "wind": "2级",
             "condition": "晴", "aqi": 68},
    "武汉": {"temp": 34, "humidity": 68, "wind": "2级",
             "condition": "晴", "aqi": 95},
    "西安": {"temp": 29, "humidity": 55, "wind": "2级",
             "condition": "晴", "aqi": 78},
}


def get_weather(city: str) -> str:
    """
    查询指定城市的当前天气

    参数:
        city: 城市名称(中文)

    返回:
        格式化的天气信息字符串
    """
    city = city.strip()

    # 尝试从模拟数据中获取
    if city in MOCK_WEATHER_DATA:
        data = MOCK_WEATHER_DATA[city]
        return json.dumps({
            "city": city,
            "temperature": f"{data['temp']}°C",
            "humidity": f"{data['humidity']}%",
            "wind": data['wind'],
            "condition": data['condition'],
            "aqi": data['aqi'],
            "advice": "适合户外活动" if data['aqi'] < 80
                      else "敏感人群建议减少户外活动",
            "update_time": "2026-07-01 08:00:00"
        }, ensure_ascii=False, indent=2)
    else:
        # 城市不在预设列表中
        return json.dumps({
            "city": city,
            "error": f"暂未收录 '{city}' 的天气数据",
            "available_cities": list(MOCK_WEATHER_DATA.keys())
        }, ensure_ascii=False, indent=2)


# 工具注册配置
weather_tool_config = {
    "name": "get_weather",
    "description": "查询指定城市的当前天气信息,包括温度、湿度、风速、空气质量等",
    "input_schema": {
        "type": "object",
        "properties": {
            "city": {
                "type": "string",
                "description": "城市名称,例如:北京、上海、广州"
            }
        },
        "required": ["city"]
    },
    "handler": get_weather
}


def get_weather_by_coordinates(latitude: float,
                                longitude: float) -> str:
    """
    通过经纬度查询天气

    参数:
        latitude: 纬度
        longitude: 经度

    返回:
        天气信息
    """
    # 简化实现:根据坐标匹配最近城市
    cities_coords = {
        "北京": (39.90, 116.40),
        "上海": (31.23, 121.47),
        "广州": (23.13, 113.26),
        "深圳": (22.54, 114.06),
    }

    # 简单的最近距离匹配
    min_dist = float("inf")
    nearest_city = None
    for city, (lat, lon) in cities_coords.items():
        dist = (latitude - lat) ** 2 + (longitude - lon) ** 2
        if dist < min_dist:
            min_dist = dist
            nearest_city = city

    if nearest_city:
        return get_weather(nearest_city)
    return json.dumps({"error": "无法定位到已知城市"},
                       ensure_ascii=False)


weather_by_coords_config = {
    "name": "get_weather_by_coordinates",
    "description": "通过经纬度查询天气",
    "input_schema": {
        "type": "object",
        "properties": {
            "latitude": {
                "type": "number",
                "description": "纬度坐标"
            },
            "longitude": {
                "type": "number",
                "description": "经度坐标"
            }
        },
        "required": ["latitude", "longitude"]
    },
    "handler": get_weather_by_coordinates
}

5.2 启动服务器(run_server.py)

# run_server.py - 启动 MCP Server
import sys
import os

# 将项目根目录加入路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))

from server import MCPServer
from transport import StdioTransport, SimpleHttpTransport
from examples.weather_tool import (
    weather_tool_config, weather_by_coords_config
)


def main():
    # 创建服务器
    transport_mode = sys.argv[1] if len(sys.argv) > 1 else "stdio"

    if transport_mode == "http":
        port = int(sys.argv[2]) if len(sys.argv) > 2 else 8080
        transport = SimpleHttpTransport(port=port)
        server = MCPServer(transport=transport)
        transport.start()
    else:
        server = MCPServer()

    # 注册天气查询工具
    server.register_tool(
        name=weather_tool_config["name"],
        description=weather_tool_config["description"],
        input_schema=weather_tool_config["input_schema"],
        handler=weather_tool_config["handler"]
    )

    # 注册坐标查询工具
    server.register_tool(
        name=weather_by_coords_config["name"],
        description=weather_by_coords_config["description"],
        input_schema=weather_by_coords_config["input_schema"],
        handler=weather_by_coords_config["handler"]
    )

    # 注册一个计算器工具(另一个示例)
    def calculator(expression: str) -> str:
        """安全执行数学表达式"""
        import ast
        allowed_ops = {
            ast.Add: "+", ast.Sub: "-",
            ast.Mult: "*", ast.Div: "/",
            ast.Pow: "**", ast.USub: "-",
            ast.BinOp: "op", ast.UnaryOp: "op"
        }
        try:
            tree = ast.parse(expression.strip(), mode="eval")
            # 安全检查:只允许数字和基础运算符
            for node in ast.walk(tree):
                if isinstance(node, ast.Expression):
                    continue
                if isinstance(node, ast.Constant):
                    if not isinstance(node.value, (int, float)):
                        raise ValueError("仅支持数字运算")
                elif type(node) not in [
                    ast.Constant, ast.BinOp, ast.UnaryOp,
                    ast.Add, ast.Sub, ast.Mult, ast.Div,
                    ast.Pow, ast.USub, ast.Mod
                ]:
                    raise ValueError(f"不允许的操作: {type(node)}")
            result = eval(compile(tree, "", "eval"),
                          {"__builtins__": {}}, {})
            return str(result)
        except Exception as e:
            return f"计算错误: {str(e)}"

    server.register_tool(
        name="calculator",
        description="执行数学计算,支持 + - * / ** %% 等运算符",
        input_schema={
            "type": "object",
            "properties": {
                "expression": {
                    "type": "string",
                    "description": "数学表达式,例如: (3+5)*2"
                }
            },
            "required": ["expression"]
        },
        handler=calculator
    )

    # 注册一个系统时钟资源
    import datetime

    def get_system_time():
        return datetime.datetime.now().strftime(
            "%Y-%m-%d %H:%M:%S"
        )

    server.register_resource(
        uri="system://time",
        name="System Time",
        description="当前系统时间",
        mime_type="text/plain",
        reader=get_system_time
    )

    print(
        f"MCP Server 已启动 (transport: {transport_mode})",
        file=sys.stderr
    )
    print(f"已注册工具: ", file=sys.stderr)
    for tool in server.tool_manager.list_tools():
        print(f"  - {tool['name']}: {tool['description']}",
              file=sys.stderr)
    print(f"已注册资源: ", file=sys.stderr)
    for res in server.resource_manager.list_resources():
        print(f"  - {res['uri']}: {res['description']}",
              file=sys.stderr)

    server.run()


if __name__ == "__main__":
    main()

5.3 客户端演示(client_demo.py)

# client_demo.py - MCP Client 演示
import json
import sys
import subprocess
import time
from typing import Any, Dict, List, Optional


class MCPClient:
    """简化的 MCP Client,通过 stdio 与 Server 通信"""

    def __init__(self, server_command: List[str]):
        self.process = subprocess.Popen(
            server_command,
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True,
            bufsize=1
        )
        self.request_id = 0
        self._initialized = False

    def _send_request(self, method: str,
                      params: Dict = None) -> Dict:
        """发送请求并等待响应"""
        self.request_id += 1
        request = {
            "jsonrpc": "2.0",
            "method": method,
            "params": params or {},
            "id": self.request_id
        }
        request_str = json.dumps(request)
        print(f"[发送] {method} (id={self.request_id})",
              file=sys.stderr)
        self.process.stdin.write(request_str + "\n")
        self.process.stdin.flush()

        # 读取响应
        response_line = self.process.stdout.readline()
        if not response_line:
            raise ConnectionError("服务器连接已断开")
        response = json.loads(response_line.strip())
        print(f"[接收] 状态: {'成功' if 'result' in response else '失败'}",
              file=sys.stderr)
        return response

    def initialize(self):
        """完成初始化握手"""
        # 发送 initialize 请求
        resp = self._send_request("initialize", {
            "protocolVersion": "2025-03-26",
            "capabilities": {},
            "clientInfo": {
                "name": "mcp-client-demo",
                "version": "1.0.0"
            }
        })
        if "error" in resp:
            raise RuntimeError(
                f"初始化失败: {resp['error']['message']}"
            )
        print(
            f"[初始化] Server: "
            f"{resp['result']['serverInfo']['name']} "
            f"v{resp['result']['serverInfo']['version']}",
            file=sys.stderr
        )
        print(
            f"[初始化] 能力: "
            f"{list(resp['result']['capabilities'].keys())}",
            file=sys.stderr
        )
        self._initialized = True
        return resp["result"]

    def list_tools(self) -> List[Dict]:
        """列出所有工具"""
        resp = self._send_request("tools/list")
        if "error" in resp:
            raise RuntimeError(
                f"列出工具失败: {resp['error']['message']}"
            )
        return resp["result"]["tools"]

    def call_tool(self, name: str, arguments: Dict) -> Any:
        """调用工具"""
        resp = self._send_request("tools/call", {
            "name": name,
            "arguments": arguments
        })
        if "error" in resp:
            raise RuntimeError(
                f"调用工具失败: {resp['error']['message']}"
            )
        return resp["result"]

    def list_resources(self) -> List[Dict]:
        """列出所有资源"""
        resp = self._send_request("resources/list")
        if "error" in resp:
            raise RuntimeError(
                f"列出资源失败: {resp['error']['message']}"
            )
        return resp["result"]["resources"]

    def read_resource(self, uri: str) -> Any:
        """读取资源"""
        resp = self._send_request("resources/read", {"uri": uri})
        if "error" in resp:
            raise RuntimeError(
                f"读取资源失败: {resp['error']['message']}"
            )
        return resp["result"]

    def close(self):
        """关闭连接"""
        if self.process:
            self.process.terminate()
            self.process.wait(timeout=3)


def demo_session():
    """演示交互"""
    client = MCPClient(["python3", "run_server.py"])

    try:
        # 第一步:初始化
        print("\n=== MCP Server 初始化 ===", file=sys.stderr)
        server_info = client.initialize()
        print(f"协议版本: {server_info['protocolVersion']}",
              file=sys.stderr)

        # 第二步:列出工具
        print("\n=== 列出已注册工具 ===", file=sys.stderr)
        tools = client.list_tools()
        for t in tools:
            print(f"  📌 {t['name']}: {t['description']}",
                  file=sys.stderr)
            props = t.get("inputSchema", {}).get("properties", {})
            for pname, pdef in props.items():
                print(f"     参数 '{pname}': {pdef.get('description', '')}",
                      file=sys.stderr)

        # 第三步:调用工具
        print("\n=== 调用工具 ===", file=sys.stderr)

        # 查询北京天气
        print("\n--- 查询北京天气 ---", file=sys.stderr)
        result = client.call_tool("get_weather", {"city": "北京"})
        content = result["content"][0]["text"]
        weather_data = json.loads(content)
        print(f"🌤  {weather_data['city']}天气:", file=sys.stderr)
        print(f"   温度: {weather_data['temperature']}",
              file=sys.stderr)
        print(f"   湿度: {weather_data['humidity']}",
              file=sys.stderr)
        print(f"   风力: {weather_data['wind']}",
              file=sys.stderr)
        print(f"   状况: {weather_data['condition']}",
              file=sys.stderr)
        print(f"   空气质量: AQI {weather_data['aqi']}",
              file=sys.stderr)
        print(f"   建议: {weather_data['advice']}",
              file=sys.stderr)

        # 计算功能
        print("\n--- 计算器 ---", file=sys.stderr)
        expressions = ["(3+5)*2", "2**10", "100/3"]
        for expr in expressions:
            result = client.call_tool("calculator",
                                       {"expression": expr})
            calc_result = result["content"][0]["text"]
            print(f"   {expr} = {calc_result}", file=sys.stderr)

        # 第四步:列出资源
        print("\n=== 列出资源 ===", file=sys.stderr)
        resources = client.list_resources()
        for r in resources:
            print(f"  📄 {r['uri']}: {r['description']}",
                  file=sys.stderr)

        # 第五步:读取资源
        print("\n=== 读取系统时间 ===", file=sys.stderr)
        time_result = client.read_resource("system://time")
        print(f"  🕐 {time_result['contents'][0]['text']}",
              file=sys.stderr)

        print("\n✅ 所有演示完成!", file=sys.stderr)

    finally:
        client.close()


if __name__ == "__main__":
    demo_session()

5.4 运行验证

# 运行客户端 demo(自动启动 Server 子进程)
python3 client_demo.py

# 或者以 HTTP 模式启动 Server
python3 run_server.py http 8080

六、深入探讨实现细节

6.1 JSON-RPC 消息帧

在我们手写的 stdio 传输中,消息以换行符分隔。MCP 标准实际上允许 Content-Length 头来支持跨行 JSON 消息:

Content-Length: 123

{"jsonrpc":"2.0","id":1,"method":"tools/list","params":{}}

在生产环境中,应当实现这种帧格式。这里的关键问题是:如果 JSON 长度超过一行怎么办?标准做法是:

  1. 读取一行获取 Content-Length
  2. 跳过空行(headers 与 body 之间的分隔)
  3. 读取 Content-Length 指定字节数的 body

6.2 参数校验的扩展

我们当前的参数校验实现了基础的 JSON Schema 校验。实际生产环境中,可以引入 jsonschema 库进行完整校验,支持:

  • 嵌套对象校验
  • 枚举值校验
  • 正则表达式格式校验
  • 最小/最大值约束

6.3 错误处理策略

MCP Server 的错误处理分为三个层次:

  1. 协议层错误:JSON 解析失败、请求格式错误等
  2. 业务层错误:工具不存在、参数无效、资源不存在等
  3. 执行层错误:工具函数内部抛出异常

对于业务层错误,错误码使用 -32000 ~ -32099 范围的 MCP 自定义码;对于执行层错误,isError 字段在工具调用响应中为 true

# 扩展的工具错误处理
"isError": True,
"content": [
    {
        "type": "text",
        "text": "工具执行遇到错误: [错误描述]"
    }
]

6.4 并发安全

当前实现是单线程顺序处理。如果工具执行耗时较长(如网络请求),会阻塞后续请求。优化方案:

  1. 异步事件循环:使用 asyncioaiofiles 实现异步传输层
  2. 请求队列:多线程处理,每个请求在独立线程中执行
  3. 超时机制:为每个工具调用设置超时,防止死锁
# 超时包装器示例
import concurrent.futures
import functools

def with_timeout(timeout: int = 30):
    """工具超时装饰器"""
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            with concurrent.futures.ThreadPoolExecutor() as pool:
                future = pool.submit(func, *args, **kwargs)
                try:
                    return future.result(timeout=timeout)
                except concurrent.futures.TimeoutError:
                    raise TimeoutError(
                        f"工具执行超时 ({timeout}s)"
                    )
        return wrapper
    return decorator

# 使用
@with_timeout(timeout=10)
def get_weather(city: str):
    # 网络请求等...
    pass

6.5 日志与调试支持

MCP 协议支持 $/log 通知,Server 可以通过发送日志通知帮助客户端调试:

# 日志通知构建
def build_log_notification(level: str, message: str, data=None):
    notification = {
        "jsonrpc": "2.0",
        "method": "$/log",
        "params": {
            "level": level,  # debug, info, warning, error
            "message": message,
        }
    }
    if data:
        notification["params"]["data"] = data
    return json.dumps(notification)

七、安全与生产化建议

7.1 输入校验

这是 MCP Server 最重要的安全防线:

# 输入消毒处理
import re

def sanitize_tool_name(name: str) -> bool:
    """验证工具名称是否合法"""
    return bool(re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', name))

def validate_uri(uri: str) -> bool:
    """验证资源URI"""
    allowed_schemes = {"file", "system", "db", "api"}
    scheme = uri.split("://")[0] if "://" in uri else ""
    return scheme in allowed_schemes

7.2 工具权限控制

对于敏感操作,应当实现细粒度的权限控制:

class PermissionManager:
    """权限管理器"""
    def __init__(self):
        self._tool_permissions = {}  # tool_name -> set of allowed roles
        self._resource_permissions = {}  # uri_pattern -> set of allowed roles

    def require_tool_role(self, tool_name: str, *roles: str):
        self._tool_permissions[tool_name] = set(roles)

    def check_tool_access(self, tool_name: str, role: str) -> bool:
        if tool_name not in self._tool_permissions:
            return True  # 默认允许
        return role in self._tool_permissions[tool_name]

7.3 速率限制

防止恶意调用消耗资源:

import time
from collections import defaultdict

class RateLimiter:
    """速率限制器"""
    def __init__(self, max_calls: int = 60, window: int = 60):
        self.max_calls = max_calls       # 窗口内最大调用次数
        self.window = window             # 时间窗口(秒)
        self._calls: Dict[str, list] = defaultdict(list)

    def allow(self, client_id: str) -> bool:
        now = time.time()
        # 清理过期记录
        self._calls[client_id] = [
            t for t in self._calls[client_id]
            if now - t < self.window
        ]
        # 检查是否超限
        if len(self._calls[client_id]) >= self.max_calls:
            return False
        self._calls[client_id].append(now)
        return True

7.4 与 LLM 集成

MCP Server 的真正价值在于与 AI 模型集成。以 OpenAI/DeepSeek API 为例:

def run_mcp_agent(user_query: str):
    """使用 MCP Server 作为工具提供者运行 Agent"""
    import openai

    # 1. 启动 MCP Server 并获取工具列表
    client = MCPClient(["python3", "run_server.py"])
    client.initialize()
    tools = client.list_tools()

    # 2. 将 MCP 工具定义转换为 OpenAI 工具格式
    openai_tools = []
    for tool in tools:
        openai_tools.append({
            "type": "function",
            "function": {
                "name": tool["name"],
                "description": tool["description"],
                "parameters": tool["inputSchema"]
            }
        })

    # 3. 调用 LLM
    messages = [{"role": "user", "content": user_query}]
    response = openai.chat.completions.create(
        model="deepseek-chat",  # 或 "gpt-4o"
        messages=messages,
        tools=openai_tools,
        tool_choice="auto"
    )

    # 4. 处理工具调用
    choice = response.choices[0]
    if choice.finish_reason == "tool_calls":
        for tc in choice.message.tool_calls:
            args = json.loads(tc.function.arguments)
            result = client.call_tool(
                tc.function.name, args
            )
            print(f"工具 '{tc.function.name}' 返回: {result}")
            messages.append({
                "role": "tool",
                "tool_call_id": tc.id,
                "content": str(result)
            })
        # 将结果送回 LLM 继续处理
        final_response = openai.chat.completions.create(
            model="deepseek-chat",
            messages=messages
        )
        return final_response.choices[0].message.content

    return choice.message.content

八、总结与拓展

8.1 实现回顾

本文从零实现了一个完整的 MCP Server,涵盖:

  • ✅ JSON-RPC 2.0 协议实现(protocol.py)
  • ✅ 工具注册与管理(tool_manager.py)
  • ✅ 资源注册与管理(resource_manager.py)
  • ✅ 双传输层支持——stdio 和 HTTP(transport.py)
  • ✅ 生命周期管理——初始化、运行、关闭
  • ✅ 天气查询和计算器工具示例
  • ✅ 完整的 Client-Server 通信演示

整站代码量约 600 行,纯 Python 标准库实现,零外部依赖。

8.2 可扩展方向

  1. 工具分组:按领域划分工具集,支持懒加载
  2. 动态工具发现:支持热加载新工具,不用重启 Server
  3. API Gateway 集成:将 MCP Server 注册到统一网关,实现工具共享
  4. 流式输出:工具长时间运行时支持流式返回部分结果
  5. 工具组合:定义 worklow,让多个工具协同完成复杂任务
  6. 持久化状态:保存工具调用历史,支持回放和审计

8.3 MCP 的生态现状

截至 2026 年中,MCP 生态已经相当丰富:

  • 官方 SDK:Python、TypeScript、Java、Kotlin 四种语言官方支持,覆盖了主流的后端开发语言栈
  • 预制 Server:文件系统、Git、Slack、GitHub、PostgreSQL、SQLite、Redis、Docker 等数十种开箱即用的服务端实现,涵盖了开发者在日常工作中最常接触的基础设施
  • 集成框架:LangChain、LlamaIndex、Dify、AutoGPT 等主流 AI 应用框架均已原生支持 MCP,这意味着你手写的 MCP Server 可以无缝接入这些生态
  • 主流模型:Claude(Anthropic 是 MCP 的发起方)、GPT-4o(OpenAI 于 2025 年 Q2 宣布支持)、DeepSeek V4(通过兼容层实现)、Qwen、GLM 等国内外主流模型均已支持 MCP 协议

了解这些背景后,本文的实践会更有价值——你手写的 MCP Server 可以无缝接入上述任一生态系统。这就像当年 HTTP 协议统一了 Web 通信标准一样——一旦你理解了 MCP 的核心机制,就掌握了一种通用的 AI 工具集成能力。

8.4 常见问题与调试技巧

在实际开发 MCP Server 时,可能会遇到一些典型问题:

Q1:Client 连接后工具列表为空?
最常见的原因是工具注册发生在 Server 启动之后。确保在调用 server.run() 之前完成所有工具注册。我们代码中用链式调用模式实现了这一点,但如果使用了异步初始化,要注意时序问题。

Q2:JSON-RPC 请求无响应?
检查消息帧格式是否正确。如果是 stdio 模式,每条消息末尾必须有换行符。如果是 HTTP 模式,检查 POST 回调地址是否正确。可以在 Server 的 stderr 输出中查看收到的原始消息。

Q3:工具调用出现参数校验错误?
JSON Schema 的 properties 中声明的字段名必须与 handler 函数的参数名一致。例如工具定义中声明了参数 city,handler 函数也必须接收 city 参数——大小写、下划线都必须完全匹配。

Q4:怎样调试传输层问题?
建议在开发阶段开启详细日志:

# 在 server.py 中添加日志
import logging
logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s [%(levelname)s] %(message)s'
)
logger = logging.getLogger("MCP")

📚 延伸阅读

如果你对 AI Agent 的系统设计感兴趣,推荐阅读我的另一篇文章:

👉 DeepSeek 实战指南:提示词工程、API 集成与效率提升全攻略

这篇文章系统地拆解了 DeepSeek 的提示词工程技巧、API 封装方法以及日常效率提升场景,全文代码可直接运行,适合已经上手 DeepSeek 但希望更高效使用的开发者。


本文是"手写 AI 系统"系列文章之一。该系列从零实现 AI 系统中的关键组件,涵盖 RAG、Agent、Function Calling、MCP 等核心技术,帮助你深入理解底层原理,构建属于自己的 AI 工具。

Logo

Agent 垂直技术社区,欢迎活跃、内容共建。

更多推荐