AI工具集:服务器端基于云端AI模型使用SSE封装自定义MCP服务
本文介绍了如何基于宝塔面板搭建云端AI模型MCP服务,实现多用户跨设备调用。主要内容包括:1)购买讯飞星辰Coding Plan套餐(首月3.9元不限量);2)在宝塔面板部署Node.js服务,使用SSE协议封装MCP服务;3)提供核心代码实现,包括API调用、服务注册和会话管理。该方案支持OpenAI和Anthropic兼容接口,可满足代码生成、技术评审等开发需求,解决多场地协同使用问题。
AI工具集:服务器端基于云端AI模型使用SSE封装自定义MCP服务
背景
购买 Coding Plan 除了能替代 AI编辑器 内置 Agent 处理代码问题,还可以封装自定义 MCP 服务。在 技术方案 / 代码修改 / 单元测试 / Code Review时候进行审核互补,毕竟单一模型在不同能力的侧重上有所差异。搭建服务器常驻 MCP服务 便于 公司 / 家 / 朋友 调用的方案变得很有意义
本文将一步步带你配置服务器版本 MCP 服务解决多用户设备/多场地同时的使用问题,也可以学习自建MCP服务的过程和注意事项
资源应用介绍
- Agent:讯飞星辰 MaaS · Astron Coding Plan
- 编辑器:Trae(其他编辑器类同)
- 服务器:宝塔面板Node搭建SSE通信(Linux面板11.7.0)
1. Agent套餐
Coding Plan是专为开发者打造的高性能 AI 算力订阅服务,可一站式调用顶流大模型,全面提效代码生产。
1.1 购买套餐
最最最主要是便宜,首月3.9元不限量 https://maas.xfyun.cn/packageSubscription

1.2 可用模型

2. 搭建MCP服务
2.1 上传文件
新建一个目录,上传【package.json】【sse.js】

2.1.1 package.json
{
"name": "agentmcp",
"version": "1.0.0",
"description": "MCP 服务,调用云端 AI 模型",
"main": "index.js",
"scripts": {
"start:sse": "node sse.js"
},
"keywords": [],
"author": "",
"license": "ISC",
"dependencies": {
"@modelcontextprotocol/sdk": "^1.29.0",
"express": "^4.18.2",
"zod": "^4.4.3"
}
}
2.1.2 sse.js
sse.js 是 MCP (Model Context Protocol) 服务的远程部署版本 ,使用 SSE (Server-Sent Events) 协议实现 HTTP 远程访问。本地使用 stdio 模式,和本文不一致
替换自己的 API_KEY,若是其他平台更换 url地址 和模型ID
#!/usr/bin/env node
const { McpServer } = require('@modelcontextprotocol/sdk/server/mcp.js');
const { SSEServerTransport } = require('@modelcontextprotocol/sdk/server/sse.js');
const express = require('express');
const { z } = require('zod');
/**
* 调用云端 AI 模型进行对话或代码生成
* @param {Array<{role: 'user'|'assistant', content: string}>} messages - 对话消息数组
* @param {Object} [options={}] - 配置选项
* @param {string} [options.model='astron-code-latest'] - 模型名称
* @param {'openai'|'anthropic'} [options.provider='openai'] - API 提供商类型
* @returns {Promise<{content: Array<{type: string, text: string}>}>} AI 响应结果
* @throws {Error} 当 API 请求失败或返回非 JSON 格式时抛出错误
*/
async function callAI(messages, options = {}) {
const maxTokens = 131072; // 128k - 模型限制
const OPENAI_URL = 'https://maas-coding-api.cn-huabei-1.xf-yun.com/v2/chat/completions';
const ANTHROPIC_URL = 'https://maas-coding-api.cn-huabei-1.xf-yun.com/anthropic/v1/messages';
const MODEL_ID = 'astron-code-latest';
const API_KEY = 'xxxxxxxx'; // 替换自己的key
const { model = MODEL_ID, provider = 'openai' } = options;
const apiUrl = provider === 'anthropic' ? ANTHROPIC_URL : OPENAI_URL;
const isAnthropic = provider === 'anthropic';
const body = isAnthropic
? { model, max_tokens: maxTokens, messages: messages.map(m => ({ role: m.role, content: m.content })) }
: { model, messages: messages.map(m => ({ role: m.role, content: m.content })), max_tokens: maxTokens };
const res = await fetch(apiUrl, {
method: 'POST',
headers: { 'Content-Type': 'application/json', 'Authorization': `Bearer ${API_KEY}`, 'X-Api-Key': API_KEY, 'X-Language': 'zh-CN', ...(isAnthropic ? { 'anthropic-version': '2023-06-01' } : {}) },
body: JSON.stringify(body)
});
if (!res.ok) {
const errText = await res.text();
throw new Error(`API ${res.status}: ${errText.slice(0, 500)}`);
}
const contentType = res.headers.get('content-type') || '';
if (!contentType.includes('application/json')) {
const errText = await res.text();
throw new Error(`API返回非JSON格式(${contentType}): ${errText.slice(0, 200)}`);
}
const data = await res.json();
if (isAnthropic) return { content: [{ type: 'text', text: data.content?.[0]?.text || '' }] };
return { content: [{ type: 'text', text: data.choices?.[0]?.message?.content || data.choices?.[0]?.message?.reasoning_content || JSON.stringify(data) }] };
}
/**
* 创建 MCP 服务器实例
* @description 定义服务名称和版本号,用于与客户端建立通信
*/
const server = new McpServer({
name: 'agentmcp',
version: '1.0.0'
});
/**
* 注册 'chat' 工具
* @description 提供调用云端 AI 模型进行对话或代码生成的能力
* 支持 OpenAI 和 Anthropic 兼容接口
* @param {messages} 对话消息数组,支持 user/assistant 角色
* @param {model} [可选] 指定使用的模型名称(默认 astron-code-latest)
* @param {provider} [可选] API 提供商类型(openai 或 anthropic)
*/
server.tool('chat', '调用云端 AI 模型进行对话或代码生成,支持 OpenAI 和 Anthropic 兼容接口', {
messages: z.array(z.object({
role: z.enum(['user', 'assistant']),
content: z.string()
})),
model: z.string().optional().describe('模型名称,默认 astron-code-latest'),
provider: z.enum(['openai', 'anthropic']).optional().describe('API 提供商,默认 openai')
}, async ({ messages, model, provider }) => {
try {
return await callAI(messages, { model, provider });
} catch (err) {
return { content: [{ type: 'text', text: `[ERROR] ${err.message}` }], isError: true };
}
});
/**
* Transport 会话存储
* @description 使用 Map 存储多个 SSE 连接的 transport 实例
* 每个连接有唯一的 sessionId,支持多个客户端同时连接
* 当连接关闭时自动清理对应的 transport,防止内存泄漏
*/
const transports = new Map();
/**
* 初始化 Express 应用并配置路由
* @description 创建 HTTP 服务,配置以下端点:
* - GET /:健康检查端点,返回服务状态信息
* - GET /sse:建立 SSE 长连接,用于服务端向客户端推送数据
* - POST /message?sessionId=<id>:接收客户端发送的消息请求
* @param {number} PORT - 监听端口,默认从环境变量读取,fallback 到 5067
* @returns {void}
*/
async function main() {
const app = express();
const PORT = process.env.PORT || 5067;
/**
* 配置 Express 中间件
* @description 启用 JSON 请求体解析,限制请求大小防止内存溢出
* POST /message 端点依赖此中间件来解析 MCP 协议消息
*/
app.use(express.json({ limit: '10mb' }));
/**
* 健康检查端点
* @route GET /
* @description 返回服务基本信息和运行状态
* 用于监控服务是否正常运行,以及确认部署成功
*/
app.get('/', (req, res) => {
res.json({
status: 'ok',
service: 'agentmcp',
version: '1.0.0',
mode: 'sse',
activeConnections: transports.size,
endpoints: {
sse: '/sse',
message: '/message?sessionId=<id>'
},
timestamp: new Date().toISOString()
});
});
/**
* SSE 端点 - 建立长连接
* @route GET /sse
* @description 客户端通过此端点建立 Server-Sent Events 连接
*
* 工作流程:
* 1. 接收客户端的 GET 请求
* 2. 创建 SSEServerTransport 实例,绑定到当前响应对象
* 3. 将 transport 存储到 transports Map(key=sessionId)
* 4. 设置连接关闭时的清理回调
* 5. 将 transport 连接到 MCP server,开始监听事件
* 6. 保持长连接开放,等待服务端推送数据
*
* 支持反向代理部署(如 Nginx),需确保代理配置了正确的 SSE 头
*/
app.get('/sse', async (req, res) => {
console.log('[SSE] 新客户端连接:', req.ip, req.headers['user-agent']);
try {
/**
* 创建 SSE Server Transport 实例
* @param {string} '/message' - POST 消息端点路径
* @param {Response} res - Express 响应对象,用于建立 SSE 流
*/
const transport = new SSEServerTransport('/message', res);
/**
* 存储 transport 到 Map
* @description 使用 sessionId 作为 key,支持多客户端并发连接
* 每个客户端有独立的 session,互不干扰
*/
transports.set(transport.sessionId, transport);
console.log(`[SSE] Transport 已创建,sessionId: ${transport.sessionId}`);
/**
* 注册连接关闭清理回调
* @description 当客户端断开连接时:
* 1. 从 transports Map 中移除对应 transport
* 2. 释放相关资源,防止内存泄漏
* 3. 输出日志便于监控和调试
*/
res.on('close', () => {
console.log(`[SSE] 客户端断开连接,sessionId: ${transport.sessionId}`);
transports.delete(transport.sessionId);
});
/**
* 连接到 MCP Server
* @description 将 transport 连接到 server 后,
* server 可以通过此 transport 向客户端推送事件和响应
*/
await server.connect(transport);
console.log('[SSE] 客户端连接成功');
} catch (err) {
console.error('[SSE] 连接失败:', err.message);
if (!res.headersSent) {
res.status(500).json({ error: 'SSE connection failed', message: err.message });
}
}
});
/**
* 消息处理端点 - 接收客户端请求
* @route POST /message?sessionId=<id>
* @description 客户端通过此端点发送工具调用等请求
*
* 工作流程:
* 1. 从查询参数获取 sessionId(必需)
* 2. 从 transports Map 中查找对应的 transport
* 3. 验证 transport 类型是否为 SSEServerTransport
* 4. 将消息转发给 transport 处理
* 5. Transport 将消息传递给 MCP server 执行对应工具
* 6. 执行结果通过之前建立的 SSE 连接异步返回给客户端
*
* 注意:此端点必须与 /sse 端点配对使用
* 客户端需要先建立 SSE 连接(GET /sse),获得 sessionId 后才能发送消息
*
* @query {string} sessionId - 从 GET /sse 响应中获得的会话 ID
*/
app.post('/message', async (req, res) => {
console.log('[Message] 收到请求');
try {
/**
* 获取 sessionId
* @description 从 URL 查询参数中提取 sessionId
* 这是 MCP 协议规定的标准方式,用于关联 SSE 连接和消息通道
*/
const sessionId = req.query.sessionId;
if (!sessionId) {
throw new Error('Missing required query parameter: sessionId');
}
/**
* 查找对应的 transport
* @description 从 transports Map 中根据 sessionId 获取 transport 实例
* 如果找不到,说明客户端未先建立 SSE 连接或连接已过期
*/
const transport = transports.get(sessionId);
if (!transport) {
throw new Error(`No active SSE connection for sessionId: ${sessionId}. Active sessions: ${Array.from(transports.keys()).join(', ')}`);
}
/**
* 验证 transport 类型
* @description 确保 transport 是 SSEServerTransport 实例
* 防止类型混淆导致的安全问题或错误行为
*/
if (!(transport instanceof SSEServerTransport)) {
throw new Error('Invalid transport type for this session');
}
/**
* 处理消息
* @description 将请求转发给 SSEServerTransport 处理
* transport 内部会解析 MCP 协议消息并路由到正确的处理器
* 处理结果会自动通过 SSE 连接返回给客户端
*/
await transport.handlePostMessage(req, res, req.body);
console.log('[Message] 消息处理成功');
} catch (err) {
console.error('[Message] 处理失败:', err.message);
if (!res.headersSent) {
res.status(500).json({
error: 'Message handling failed',
message: err.message,
hint: '请确保已先建立 SSE 连接(GET /sse)并获得有效的 sessionId'
});
}
}
});
/**
* 全局错误处理中间件
* @description 捕获所有未处理的异常,返回统一格式的错误响应
* 防止服务因未捕获的异常而崩溃
*/
app.use((err, req, res, next) => {
console.error('[Error]', err.stack);
res.status(500).json({
error: 'Internal Server Error',
message: err.message
});
});
/**
* 启动 HTTP 服务器监听
* @description 在指定端口启动 Express 服务,输出运行状态信息
* 启动后可通过 http://host:port/sse 访问 MCP 服务
*/
app.listen(PORT, () => {
console.log(`========================================`);
console.log(`MCP Server running in SSE mode`);
console.log(`Local URL: http://localhost:${PORT}`);
console.log(`SSE endpoint: http://localhost:${PORT}/sse`);
console.log(`Message endpoint: http://localhost:${PORT}/message`);
console.log(`Health check: http://localhost:${PORT}/`);
console.log(`========================================`);
});
}
/**
* 启动 MCP SSE 服务器
* @description 调用 main 函数启动服务,捕获并输出异常信息到控制台
* 异常通常包括端口占用、网络配置等问题
*/
main().catch(console.error);
/**
* 优雅关闭处理
* @description 监听 SIGINT 信号(Ctrl+C),确保:
* 1. 关闭所有活跃的 SSE 连接
* 2. 清空 transports Map
* 3. 释放 MCP 服务器资源
* 4. 安全退出进程,避免资源泄漏
*/
process.on('SIGINT', async () => {
console.log('\n正在关闭 MCP 服务器...');
console.log(`[Shutdown] 当前活跃连接数: ${transports.size}`);
/**
* 关闭所有活跃的 transport
* @description 遍历 transports Map,逐个关闭每个 SSE 连接
* 确保所有客户端都能收到连接关闭通知
*/
for (const [sessionId, transport] of transports) {
try {
console.log(`[Shutdown] 正在关闭连接: ${sessionId}`);
await transport.close();
transports.delete(sessionId);
} catch (err) {
console.error(`[Shutdown] 关闭连接 ${sessionId} 失败:`, err.message);
}
}
await server.close();
console.log('[Shutdown] MCP 服务器已安全关闭');
process.exit(0);
});
/**
* 进程异常处理
* @description 捕获未处理的 Promise 拒绝,防止进程意外崩溃
* 输出详细的错误堆栈信息便于排查问题
*/
process.on('unhandledRejection', (reason, promise) => {
console.error('未处理的 Promise 拒绝:', reason);
});
2.2 配置服务器端
2.2.1 创建node服务

2.2.2 添加域名(非必须)
不配置可以使用ip调用也行

2.2.3 配置nginx

# ========== 核心修复:MCP SSE 服务代理配置 ==========
location / {
proxy_pass http://127.0.0.1:5067;
# 基础 Header 设置(只设置一次,避免重复)
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# 【关键】请求体大小限制 - MCP 消息可能很大
client_max_body_size 50m;
# 【关键】SSE 必须配置:禁用所有缓冲
proxy_buffering off;
proxy_cache off;
proxy_request_buffering off;
# 超时设置(SSE 需要长连接)
proxy_connect_timeout 60s;
proxy_read_timeout 86400s;
proxy_send_timeout 86400s;
# HTTP/1.1 长连接支持
proxy_http_version 1.1;
proxy_set_header Connection '';
# WebSocket 升级支持(如果需要)
proxy_set_header Upgrade $http_upgrade;
# ⚠️ 删除这行:不要强制设置 Content-Type!
# proxy_set_header Content-Type 'application/json';
# 支持分块传输编码
chunked_transfer_encoding on;
# 禁用对响应的压缩(SSE 不需要)
gzip off;
proxy_set_header Accept-Encoding '';
}
3. 编辑器调用MCP

JSON 数据
{
"mcpServers": {
"agentmcp": {
"url": "http://xxxx.com/sse"
}
}
}
4. 测试功能
发送一个对话,测试mcp服务功能。恭喜搭建完毕,可以在公司 和 在家 调用。

5. 整体通信流程图
| 设计决策 | 原因 | 优势 |
|---|---|---|
| SSE 双通道设计 | HTTP 单向限制 | 服务端可主动推送,无需 WebSocket Map |
| 存储会话 | 多客户端并发 | 支持多用户同时连接,互不干扰 |
| Zod 参数校验 | 类型安全 | 早失败、自动文档、防注入 |
| 三层错误防护 | 防御性编程 | sessionId → session有效性 → 类型验证 |
| 优雅关闭 | 资源管理 | 无泄漏、数据完整、日志可追踪 |
| 分离关注点 | 可维护性 | AI 调用 / MCP 协议 / HTTP 各司其职 |
6. 方案锁定原因
为什么需要 MCP Server?
MCP 协议要求:
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Client │ ←→ │ Transport │ ←→ │ Server │
│ (Trae IDE) │ │ (SSE/stdio)│ │ (McpServer) │
└─────────────┘ └─────────────┘ └─────────────┘
McpServer 的职责:
1. 定义服务能力(工具、资源、提示词)
2. 处理 JSON-RPC 协议消息
3. 管理会话状态
4. 路由工具调用到对应的 handler
为什么需要 SSE 而不是普通 HTTP?
❌ 普通 HTTP 请求-响应模式:
Client ──request──> Server ──response──> Client (结束)
问题: 服务器无法主动向客户端推送数据!
✅ SSE (Server-Sent Events):
Client ──GET /sse──> Server ──保持连接── Client
│
├── push event 1 ──> Client
├── push event 2 ──> Client
├── push event 3 ──> Client
└── ...
优势:
- 服务器可以主动推送(工具执行结果、通知等)
- 单向通信,简单可靠
- 自动重连(浏览器原生支持)
- 基于 HTTP,无需 WebSocket 的复杂性
为什么需要 ?sessionId= 参数?
MCP SSE 协议的双通道设计:
通道 1: SSE 长连接 (GET /sse)
- 方向: Server → Client
- 用途: 推送工具执行结果、通知
- 特点: 长时间保持
通道 2: HTTP 请求 (POST /message?sessionId=xxx)
- 方向: Client → Server
- 用途: 发送工具调用请求
- 特点: 短连接,按需发起
问题: 如何关联这两个通道?
答案: sessionId!
为什么需要全局错误处理?
没有全局错误处理的后果:
场景: 某个路由抛出未捕获异常
结果:
❌ Express 默认行为 -> 返回 HTML 错误页(对 API 不友好)
❌ 客户端收到非 JSON 响应 -> 解析失败
❌ 可能泄露堆栈信息(安全隐患)
❌ 进程可能崩溃(unhandled rejection)
有全局错误处理:
✅ 统一返回 JSON 格式错误
✅ 记录详细日志便于排查
✅ 保护敏感信息(不暴露堆栈给客户端)
✅ 保持进程稳定运行
为什么需要优雅关闭?
❌ 强制关闭 (kill -9):
问题:
- SSE 连接突然断开,客户端收到错误
- 正在处理的请求丢失
- 文件句柄、网络 socket 未释放
- 可能导致资源泄漏
✅ 优雅关闭 (Ctrl+C / SIGINT):
流程:
1. 停止接受新连接
2. 等待进行中的请求完成
3. 通知所有客户端即将关闭
4. 关闭所有 SSE 连接
5. 清理所有资源
6. 安全退出
优势:
- 客户端体验好(收到关闭通知)
- 数据完整性(请求不会丢失)
- 资源无泄漏
- 日志完整(可追踪关闭原因)
7. 调用建议
在编辑器定义【个人规则】或 【项目规则】,约定自动触发时机 和 内容,避免每次还要在对话框内说明调用,比较麻烦
优先级权重:对话框 > 项目规则 > 个人规则
更多推荐


所有评论(0)