系列目录:本文是「AI 应用开发进阶实战」系列的第 2 篇。上一篇我们构建了生产级 RAG 系统,本篇聚焦 AI Agent 的核心能力——工具调用,深入 MCP 协议。


一、为什么 MCP 是 AI Agent 的基础设施?

1.1 Agent 工具调用的痛点

AI Agent 要真正"干活",必须能调用外部工具:读文件、调 API、查数据库、发邮件……传统做法是每个 LLM 平台定义一套 Function Calling 格式,导致:

  • 碎片化:OpenAI Functions、Anthropic Tool Use、Gemini Function Calling 各有各的格式
  • 耦合重:工具实现和 Agent 代码深度耦合,换模型就得改工具定义
  • 不可复用:写的文件读取工具只能在当前应用里用,无法跨项目共享

1.2 MCP 的解决方案

MCP(Model Context Protocol)是 Anthropic 推出的开放协议,定义了 AI 应用与外部工具/数据源之间的标准通信方式。

传统方案:每个 Agent 各写一套工具
┌─────────┐    ┌─────────┐    ┌─────────┐
│ Agent A │    │ Agent B │    │ Agent C │
│ ┌─────┐ │    │ ┌─────┐ │    │ ┌─────┐ │
│ │工具集│ │    │ │工具集│ │    │ │工具集│ │
│ └─────┘ │    │ └─────┘ │    │ └─────┘ │
└─────────┘    └─────────┘    └─────────┘
     ❌ 每个 Agent 独立实现,无法共享

MCP 方案:标准化工具服务器
┌─────────┐    ┌─────────┐    ┌─────────┐
│ Agent A │    │ Agent B │    │ Agent C │
└────┬────┘    └────┬────┘    └────┬────┘
     │              │              │
     └──────────────┼──────────────┘
                    │ MCP 协议 (JSON-RPC)
     ┌──────────────┼──────────────┐
     │              │              │
┌────┴────┐   ┌────┴────┐   ┌────┴────┐
│ 文件服务 │   │ API 服务 │   │ 数据库   │
│ MCP Srv │   │ MCP Srv │   │ MCP Srv │
└─────────┘   └─────────┘   └─────────┘
     ✅ 一次实现,所有 Agent 共享

二、MCP 协议核心概念

2.1 三大原语

# MCP 协议定义了三种核心能力:

1. Resources(资源)
   # 暴露数据给模型,类似 GET 端点
   # 例如:file://documents/report.pdf、postgres://users/table

2. Tools(工具)  
   # 模型可以调用的函数,类似 POST 端点
   # 例如:create_file()、send_email()、search_database()

3. Prompts(提示模板)
   # 预定义的交互模板
   # 例如:"代码审查"模板、"写周报"模板

2.2 通信协议:JSON-RPC 2.0

MCP 基于 JSON-RPC 2.0,所有通信都是 JSON 格式的请求-响应:

// 请求:列出所有可用工具
{
  "jsonrpc": "2.0",
  "id": 1,
  "method": "tools/list",
  "params": {}
}

// 响应:返回工具列表
{
  "jsonrpc": "2.0",
  "id": 1,
  "result": {
    "tools": [
      {
        "name": "read_file",
        "description": "Read contents of a file",
        "inputSchema": {
          "type": "object",
          "properties": {
            "path": {"type": "string", "description": "File path"}
          },
          "required": ["path"]
        }
      }
    ]
  }
}

// 请求:调用工具
{
  "jsonrpc": "2.0",
  "id": 2,
  "method": "tools/call",
  "params": {
    "name": "read_file",
    "arguments": {"path": "/home/user/notes.txt"}
  }
}

2.3 两种传输方式

传输方式 原理 优点 缺点 适用场景
stdio 标准输入输出 简单,无需网络 只能本地 本地 Agent、CLI 工具
SSE (HTTP) Server-Sent Events 远程访问,可共享 需要网络部署 团队共享、云端 Agent

三、构建第一个 MCP Server

3.1 项目结构

my-mcp-server/
├── server.py          # MCP 服务器主逻辑
├── tools/
│   ├── __init__.py
│   ├── file_tools.py  # 文件操作工具
│   └── web_tools.py   # 网络请求工具
├── requirements.txt
└── README.md

3.2 核心 Server 实现

# server.py
import json
import sys
import asyncio
from typing import Any, Dict
from tools.file_tools import FileTools
from tools.web_tools import WebTools


class MCPServer:
    """MCP 协议服务器——stdio 传输"""
    
    def __init__(self):
        self.tools: Dict[str, Any] = {}
        self._register_tools()
    
    def _register_tools(self):
        """注册所有工具"""
        file_tools = FileTools()
        web_tools = WebTools()
        
        # 注册文件工具
        self.register_tool("read_file", file_tools.read_file, {
            "type": "object",
            "properties": {
                "path": {
                    "type": "string",
                    "description": "Absolute or relative file path"
                }
            },
            "required": ["path"]
        })
        
        self.register_tool("write_file", file_tools.write_file, {
            "type": "object",
            "properties": {
                "path": {"type": "string", "description": "File path to write"},
                "content": {"type": "string", "description": "Content to write"}
            },
            "required": ["path", "content"]
        })
        
        self.register_tool("list_directory", file_tools.list_directory, {
            "type": "object",
            "properties": {
                "path": {"type": "string", "description": "Directory path"},
                "pattern": {"type": "string", "description": "Optional glob pattern"}
            },
            "required": ["path"]
        })
        
        # 注册网络工具
        self.register_tool("http_get", web_tools.http_get, {
            "type": "object",
            "properties": {
                "url": {"type": "string", "description": "Request URL"},
                "headers": {"type": "object", "description": "Optional headers"}
            },
            "required": ["url"]
        })
        
        self.register_tool("http_post", web_tools.http_post, {
            "type": "object",
            "properties": {
                "url": {"type": "string"},
                "body": {"type": "object"},
                "headers": {"type": "object"}
            },
            "required": ["url", "body"]
        })
    
    def register_tool(self, name: str, handler, schema: dict):
        """注册单个工具"""
        self.tools[name] = {
            "handler": handler,
            "schema": schema
        }
    
    def handle_request(self, request: dict) -> dict:
        """处理 JSON-RPC 请求"""
        method = request.get("method", "")
        req_id = request.get("id")
        
        try:
            if method == "tools/list":
                result = self._handle_list_tools()
            elif method == "tools/call":
                result = self._handle_call_tool(request.get("params", {}))
            elif method == "initialize":
                result = self._handle_initialize(request.get("params", {}))
            else:
                return self._error(req_id, -32601, f"Method not found: {method}")
            
            return {
                "jsonrpc": "2.0",
                "id": req_id,
                "result": result
            }
            
        except Exception as e:
            return self._error(req_id, -32000, str(e))
    
    def _handle_initialize(self, params: dict) -> dict:
        """初始化握手"""
        return {
            "protocolVersion": "2024-11-05",
            "capabilities": {
                "tools": {}
            },
            "serverInfo": {
                "name": "my-mcp-server",
                "version": "1.0.0"
            }
        }
    
    def _handle_list_tools(self) -> dict:
        """返回所有可用工具"""
        tools = []
        for name, tool_info in self.tools.items():
            tools.append({
                "name": name,
                "description": tool_info["handler"].__doc__ or "",
                "inputSchema": tool_info["schema"]
            })
        return {"tools": tools}
    
    def _handle_call_tool(self, params: dict) -> dict:
        """调用指定工具"""
        tool_name = params.get("name")
        arguments = params.get("arguments", {})
        
        if tool_name not in self.tools:
            raise ValueError(f"Unknown tool: {tool_name}")
        
        handler = self.tools[tool_name]["handler"]
        
        # 参数校验
        schema = self.tools[tool_name]["schema"]
        self._validate_args(arguments, schema)
        
        # 调用工具
        result = handler(**arguments)
        
        # 统一包装返回值
        return {
            "content": [{
                "type": "text",
                "text": json.dumps(result, ensure_ascii=False, indent=2)
            }]
        }
    
    def _validate_args(self, args: dict, schema: dict):
        """简单参数校验"""
        for required in schema.get("required", []):
            if required not in args:
                raise ValueError(f"Missing required parameter: {required}")
    
    def _error(self, req_id, code: int, message: str) -> dict:
        return {
            "jsonrpc": "2.0",
            "id": req_id,
            "error": {"code": code, "message": message}
        }
    
    async def run_stdio(self):
        """stdio 模式运行——读取 stdin,写入 stdout"""
        loop = asyncio.get_event_loop()
        reader = asyncio.StreamReader()
        
        # 从 stdin 逐行读取 JSON-RPC 请求
        for line in sys.stdin:
            line = line.strip()
            if not line:
                continue
            
            try:
                request = json.loads(line)
                response = self.handle_request(request)
                # 写入 stdout
                sys.stdout.write(json.dumps(response) + "\n")
                sys.stdout.flush()
            except json.JSONDecodeError as e:
                error_resp = self._error(None, -32700, f"Parse error: {e}")
                sys.stdout.write(json.dumps(error_resp) + "\n")
                sys.stdout.flush()


# 工具实现
class FileTools:
    """文件系统操作工具集"""
    
    def read_file(self, path: str) -> dict:
        """读取文件内容"""
        import os
        if not os.path.exists(path):
            return {"error": f"File not found: {path}"}
        
        with open(path, "r", encoding="utf-8") as f:
            content = f.read()
        
        return {
            "path": path,
            "size": len(content),
            "lines": content.count("\n") + 1,
            "content": content[:10000]  # 限制最大返回 10K 字符
        }
    
    def write_file(self, path: str, content: str) -> dict:
        """写入文件"""
        import os
        os.makedirs(os.path.dirname(path) or ".", exist_ok=True)
        
        with open(path, "w", encoding="utf-8") as f:
            f.write(content)
        
        return {
            "path": path,
            "written": len(content),
            "success": True
        }
    
    def list_directory(self, path: str, pattern: str = "*") -> dict:
        """列出目录内容"""
        import os, fnmatch
        if not os.path.isdir(path):
            return {"error": f"Not a directory: {path}"}
        
        items = []
        for name in os.listdir(path):
            if fnmatch.fnmatch(name, pattern):
                full_path = os.path.join(path, name)
                items.append({
                    "name": name,
                    "type": "directory" if os.path.isdir(full_path) else "file",
                    "size": os.path.getsize(full_path) if os.path.isfile(full_path) else 0
                })
        
        return {"path": path, "items": items, "count": len(items)}


class WebTools:
    """网络请求工具集"""
    
    def http_get(self, url: str, headers: dict = None) -> dict:
        """发送 HTTP GET 请求"""
        import urllib.request, json as _json
        
        req = urllib.request.Request(url)
        if headers:
            for k, v in headers.items():
                req.add_header(k, v)
        req.add_header("User-Agent", "MCP-Server/1.0")
        
        try:
            with urllib.request.urlopen(req, timeout=30) as resp:
                body = resp.read().decode("utf-8")
                return {
                    "status": resp.status,
                    "headers": dict(resp.headers),
                    "body": body[:5000]  # 限制返回
                }
        except Exception as e:
            return {"error": str(e)}
    
    def http_post(self, url: str, body: dict, headers: dict = None) -> dict:
        """发送 HTTP POST 请求"""
        import urllib.request, json as _json
        
        data = _json.dumps(body).encode("utf-8")
        req = urllib.request.Request(url, data=data, method="POST")
        req.add_header("Content-Type", "application/json")
        if headers:
            for k, v in headers.items():
                req.add_header(k, v)
        
        try:
            with urllib.request.urlopen(req, timeout=30) as resp:
                resp_body = resp.read().decode("utf-8")
                return {
                    "status": resp.status,
                    "body": resp_body[:5000]
                }
        except Exception as e:
            return {"error": str(e)}


if __name__ == "__main__":
    server = MCPServer()
    asyncio.run(server.run_stdio())

3.3 测试 MCP Server

# test_server.py
import json
import subprocess
import sys

def test_mcp_server():
    """模拟 Agent 客户端测试 MCP Server"""
    
    # 启动 server 作为子进程
    proc = subprocess.Popen(
        [sys.executable, "server.py"],
        stdin=subprocess.PIPE,
        stdout=subprocess.PIPE,
        text=True
    )
    
    def send_request(method: str, params: dict = None) -> dict:
        """发送 JSON-RPC 请求并获取响应"""
        request = {
            "jsonrpc": "2.0",
            "id": 1,
            "method": method,
            "params": params or {}
        }
        proc.stdin.write(json.dumps(request) + "\n")
        proc.stdin.flush()
        return json.loads(proc.stdout.readline())
    
    # 1. 初始化
    init_resp = send_request("initialize", {
        "protocolVersion": "2024-11-05",
        "capabilities": {}
    })
    print("初始化:", json.dumps(init_resp, indent=2, ensure_ascii=False))
    
    # 2. 列出工具
    tools_resp = send_request("tools/list")
    print(f"\n可用工具: {len(tools_resp['result']['tools'])} 个")
    for tool in tools_resp['result']['tools']:
        print(f"  - {tool['name']}: {tool['description']}")
    
    # 3. 调用 read_file
    read_resp = send_request("tools/call", {
        "name": "read_file",
        "arguments": {"path": __file__}  # 读取自身
    })
    print(f"\n读取文件: {read_resp['result']['content'][0]['text'][:200]}...")
    
    # 4. 调用 write_file
    write_resp = send_request("tools/call", {
        "name": "write_file",
        "arguments": {
            "path": "/tmp/test_mcp.txt",
            "content": "Hello from MCP Server!"
        }
    })
    print(f"\n写入文件: {write_resp['result']}")
    
    proc.terminate()
    print("\n所有测试通过!")


if __name__ == "__main__":
    test_mcp_server()

# 输出示例:
# 初始化: {"jsonrpc": "2.0", "id": 1, "result": {"protocolVersion": "2024-11-05", ...}}
# 
# 可用工具: 5 个
#   - read_file: 读取文件内容
#   - write_file: 写入文件
#   - list_directory: 列出目录内容
#   - http_get: 发送 HTTP GET 请求
#   - http_post: 发送 HTTP POST 请求
# 
# 读取文件: {"path": "...", "size": 1234, "content": "..."}...
# 写入文件: ...
# 所有测试通过!

四、进阶:SSE 远程 MCP Server

线上部署时,stdio 不够用——用 SSE (Server-Sent Events) 构建可通过 HTTP 访问的 MCP Server。

# server_sse.py
import json
import asyncio
from aiohttp import web
from server import MCPServer


class MCPSSEServer:
    """MCP Server with SSE transport"""
    
    def __init__(self, host="0.0.0.0", port=8080):
        self.mcp = MCPServer()
        self.host = host
        self.port = port
        self.sessions: dict = {}  # session_id -> queue
    
    async def handle_sse(self, request):
        """SSE 连接端点"""
        session_id = request.query.get("session", "default")
        
        response = web.StreamResponse()
        response.headers["Content-Type"] = "text/event-stream"
        response.headers["Cache-Control"] = "no-cache"
        response.headers["Connection"] = "keep-alive"
        await response.prepare(request)
        
        queue = asyncio.Queue()
        self.sessions[session_id] = queue
        
        # 发送 endpoint 事件
        endpoint = f"/message?session={session_id}"
        await response.write(
            f"event: endpoint\ndata: {endpoint}\n\n".encode()
        )
        
        try:
            while True:
                message = await queue.get()
                await response.write(
                    f"data: {json.dumps(message)}\n\n".encode()
                )
        except asyncio.CancelledError:
            pass
        finally:
            self.sessions.pop(session_id, None)
        
        return response
    
    async def handle_message(self, request):
        """JSON-RPC 消息端点"""
        session_id = request.query.get("session", "default")
        body = await request.json()
        
        # 处理请求
        response = self.mcp.handle_request(body)
        
        # 通过 SSE 推送响应
        if session_id in self.sessions:
            await self.sessions[session_id].put(response)
        
        return web.Response(status=202)  # Accepted
    
    async def start(self):
        """启动服务器"""
        app = web.Application()
        app.router.add_get("/sse", self.handle_sse)
        app.router.add_post("/message", self.handle_message)
        
        runner = web.AppRunner(app)
        await runner.setup()
        site = web.TCPSite(runner, self.host, self.port)
        await site.start()
        
        print(f"MCP SSE Server running at http://{self.host}:{self.port}")
        print(f"SSE endpoint: http://{self.host}:{self.port}/sse")
        
        # Keep running
        await asyncio.Event().wait()


if __name__ == "__main__":
    server = MCPSSEServer(port=8080)
    asyncio.run(server.start())

五、安全最佳实践

MCP Server 让 Agent 有了操作系统的能力,安全是第一要务。

class SecureMCPServer(MCPServer):
    """带安全限制的 MCP Server"""
    
    def __init__(self, allowed_paths: list = None):
        super().__init__()
        self.allowed_paths = allowed_paths or ["/home/user/workspace"]
        self.blocked_commands = ["rm -rf", "format", "shutdown"]
    
    def _handle_call_tool(self, params: dict) -> dict:
        """加入安全检查"""
        tool_name = params.get("name")
        arguments = params.get("arguments", {})
        
        # 1. 文件路径白名单
        if "path" in arguments:
            if not self._is_path_allowed(arguments["path"]):
                raise ValueError(
                    f"Access denied: {arguments['path']} is outside allowed paths"
                )
        
        # 2. URL 白名单(可选)
        if "url" in arguments:
            if not self._is_url_allowed(arguments["url"]):
                raise ValueError(
                    f"Access denied: {arguments['url']} is not in allowed domains"
                )
        
        # 3. 操作审计日志
        self._audit_log(tool_name, arguments)
        
        # 4. 速率限制(每个工具 10次/分钟)
        if not self._check_rate_limit(tool_name):
            raise ValueError(f"Rate limit exceeded for tool: {tool_name}")
        
        return super()._handle_call_tool(params)
    
    def _is_path_allowed(self, path: str) -> bool:
        import os
        real_path = os.path.realpath(os.path.expanduser(path))
        return any(
            real_path.startswith(os.path.realpath(allowed))
            for allowed in self.allowed_paths
        )
    
    def _is_url_allowed(self, url: str) -> bool:
        from urllib.parse import urlparse
        allowed_domains = ["api.example.com", "docs.internal.company.com"]
        hostname = urlparse(url).hostname or ""
        return any(hostname.endswith(d) for d in allowed_domains)
    
    def _audit_log(self, tool: str, args: dict):
        import logging, time
        logging.info(
            f"[AUDIT] {time.time()} | tool={tool} | args={json.dumps(args)}"
        )
    
    def _check_rate_limit(self, tool: str) -> bool:
        # 简单的内存速率限制
        import time
        now = time.time()
        key = f"rate:{tool}"
        
        if not hasattr(self, "_rate_limits"):
            self._rate_limits = {}
        
        if key not in self._rate_limits:
            self._rate_limits[key] = []
        
        # 清理 60 秒前的记录
        self._rate_limits[key] = [
            t for t in self._rate_limits[key] 
            if now - t < 60
        ]
        
        if len(self._rate_limits[key]) >= 10:
            return False
        
        self._rate_limits[key].append(now)
        return True

安全清单

□ 路径白名单:限制文件访问范围
□ 域名白名单:限制网络请求目标  
□ 命令过滤:禁止危险操作(rm -rf 等)
□ 审计日志:记录所有工具调用
□ 速率限制:防止滥用(10次/分钟/工具)
□ 参数校验:严格验证所有输入
□ 超时控制:每个工具调用最多 30s
□ 用户确认:危险操作(删除/覆盖)需要额外确认

六、在 Agent 中使用 MCP Server

# agent_client.py
import json
import subprocess
import sys
from typing import List, Dict


class MCPClient:
    """AI Agent 的 MCP 客户端"""
    
    def __init__(self, server_command: List[str]):
        """启动 MCP Server 子进程"""
        self.proc = subprocess.Popen(
            server_command,
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
            text=True,
            bufsize=1
        )
        self.tools: Dict = {}
        self._initialize()
        self._discover_tools()
    
    def _send(self, method: str, params: dict = None) -> dict:
        """发送 JSON-RPC 请求"""
        request = {
            "jsonrpc": "2.0",
            "id": self._next_id(),
            "method": method,
            "params": params or {}
        }
        self.proc.stdin.write(json.dumps(request) + "\n")
        self.proc.stdin.flush()
        return json.loads(self.proc.stdout.readline())
    
    def _next_id(self):
        if not hasattr(self, "_id_counter"):
            self._id_counter = 0
        self._id_counter += 1
        return self._id_counter
    
    def _initialize(self):
        """握手初始化"""
        resp = self._send("initialize", {
            "protocolVersion": "2024-11-05",
            "capabilities": {}
        })
        print(f"Connected to MCP Server: {resp['result']['serverInfo']}")
    
    def _discover_tools(self):
        """发现可用工具"""
        resp = self._send("tools/list")
        self.tools = {
            tool["name"]: tool 
            for tool in resp["result"]["tools"]
        }
        print(f"Discovered {len(self.tools)} tools")
    
    def call_tool(self, name: str, **kwargs) -> dict:
        """调用工具"""
        resp = self._send("tools/call", {
            "name": name,
            "arguments": kwargs
        })
        
        if "error" in resp:
            raise RuntimeError(resp["error"]["message"])
        
        # 解析返回内容
        content = resp["result"]["content"]
        if content and content[0]["type"] == "text":
            return json.loads(content[0]["text"])
        return content
    
    def get_tool_schemas_for_llm(self) -> list:
        """转换为 OpenAI Function Calling 格式"""
        schemas = []
        for name, tool in self.tools.items():
            schemas.append({
                "type": "function",
                "function": {
                    "name": name,
                    "description": tool.get("description", ""),
                    "parameters": tool.get("inputSchema", {})
                }
            })
        return schemas
    
    def close(self):
        self.proc.terminate()


# === 使用 MCP Client ===
if __name__ == "__main__":
    from openai import OpenAI
    
    client = OpenAI(api_key="sk-xxx")
    mcp = MCPClient([sys.executable, "server.py"])
    
    # 获取工具定义,传给 LLM
    tools = mcp.get_tool_schemas_for_llm()
    
    messages = [
        {"role": "system", "content": "你可以使用工具操作文件和发送网络请求。"},
        {"role": "user", "content": "读取 /tmp/test.txt 的内容,然后将其中的域名替换为 example.com 后写回"}
    ]
    
    # LLM 决定调用工具
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=messages,
        tools=tools
    )
    
    # 执行工具调用
    msg = response.choices[0].message
    if msg.tool_calls:
        for tc in msg.tool_calls:
            args = json.loads(tc.function.arguments)
            result = mcp.call_tool(tc.function.name, **args)
            print(f"Tool: {tc.function.name}({args})")
            print(f"Result: {json.dumps(result, ensure_ascii=False, indent=2)}")
    
    mcp.close()

七、总结与下一篇预告

MCP 协议解决了 AI Agent 工具调用的三个核心问题:

1. 标准化    → JSON-RPC 2.0 + 统一工具描述格式
2. 解耦      → 工具实现与 Agent 代码分离,换模型不改工具
3. 可复用    → 一次编写 MCP Server,所有 Agent 共享

关键代码回顾

  • MCPServer:处理 JSON-RPC 请求,注册/调用工具
  • SecureMCPServer:路径白名单、速率限制、审计日志
  • MCPClient:启动 Server 子进程,发现工具,转换 LLM 格式

下一篇:Graph RAG——知识图谱 + 大模型的融合推理。当文档之间存在复杂实体关系时,纯向量检索不够用,我们将用 Neo4j + LangChain 构建图增强的 RAG 系统。


本文完整代码已开源。下一篇:Graph RAG(即将发布)

Logo

Agent 垂直技术社区,欢迎活跃、内容共建。

更多推荐