多 Agent 协作:MCP 跨语言工具共享 (Java Server + Go Client)
·
一、为什么需要跨语言 MCP?
在一个真实的 AI 应用中,工具可能由不同团队、不同语言开发:
- Java 技能引擎:管理搜索引擎、数据库查询等企业级工具
- Go Agent:作为 MCP Client,调用 Java Server 暴露的工具
- Python 数据管道:提供数据分析工具
MCP 的 stdio 传输天然支持跨语言通信。本篇展示:Java 提供 MCP Server,Go 编写 Agent Client,通过 stdio 管道完成工具共享。
┌─────────────────────┐ stdio pipe ┌──────────────────────┐
│ Go Agent Client │ ◄──────────────────────► │ Java MCP Server │
│ │ JSON-RPC 2.0 over stdin │ │
│ - 用户意图解析 │ /stdout │ - 技能引擎 │
│ - LLM 调用编排 │ │ - Google 搜索 │
│ - 工具调用请求 │ │ - SQL 查询 │
│ - 结果归并 │ │ - 数据导出 │
└─────────────────────┘ └──────────────────────┘
二、Java 端:MCP Server + 技能引擎
2.1 适配技能引擎到 MCP 协议
在 第 1 篇 的 MCP Server 基础上,将 MessageHandler 接入 SkillEngine:
// com/example/mcp/SkillMcpServer.java
package com.example.mcp;
import com.example.skillengine.SkillEngine;
import com.example.skillengine.tool.ToolResult;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.nio.file.Path;
/**
* 集成技能引擎的 MCP Server。
* 继承第 1 篇的 McpServer,替换 MessageHandler。
*/
public class SkillMcpServer {
private static final ObjectMapper MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception {
// 初始化技能引擎,加载 skills/ 目录下所有配置
Path skillsDir = Path.of(args.length > 0 ? args[0] : "./skills");
SkillEngine skillEngine = new SkillEngine(skillsDir);
skillEngine.loadAll();
skillEngine.enableHotReload(); // 修改 YAML 自动生效
// 启动 MCP Server(复用第 1 篇的 StdioTransport)
StdioTransport transport = new StdioTransport();
transport.start();
System.err.println("[SkillMcpServer] Ready with tools: " +
skillEngine.listTools().size());
boolean initialized = false;
while (true) {
JsonRpcMessage request = transport.receive();
String method = request.getMethod();
String id = request.getId();
// initialize 生命周期
if ("initialize".equals(method)) {
ObjectNode result = MAPPER.createObjectNode();
result.put("protocolVersion", "2024-11-05");
ObjectNode caps = result.putObject("capabilities");
caps.putObject("tools").put("listChanged", true);
ObjectNode info = result.putObject("serverInfo");
info.put("name", "java-skill-mcp-server");
info.put("version", "2.0.0");
transport.send(JsonRpcMessage.success(id, result));
} else if ("notifications/initialized".equals(method)) {
initialized = true;
System.err.println("[SkillMcpServer] Client initialized");
} else if ("tools/list".equals(method)) {
// 从 SkillEngine 获取工具列表
ArrayNode tools = MAPPER.valueToTree(skillEngine.listTools());
ObjectNode result = MAPPER.createObjectNode();
result.set("tools", tools);
transport.send(JsonRpcMessage.success(id, result));
} else if ("tools/call".equals(method)) {
String toolName = request.getParams().path("name").asText();
JsonNode arguments = request.getParams().path("arguments");
try {
// 委托给 SkillEngine 执行(带超时和重试)
ToolResult toolResult = skillEngine.execute(toolName, arguments);
ObjectNode result = MAPPER.createObjectNode();
ArrayNode content = result.putArray("content");
ObjectNode textNode = content.addObject();
textNode.put("type", "text");
textNode.put("text", toolResult.getText());
result.put("isError", toolResult.isError());
transport.send(JsonRpcMessage.success(id, result));
} catch (Exception e) {
ObjectNode result = MAPPER.createObjectNode();
ArrayNode content = result.putArray("content");
content.addObject()
.put("type", "text")
.put("text", "Tool execution failed: " + e.getMessage());
result.put("isError", true);
transport.send(JsonRpcMessage.success(id, result));
}
}
}
}
}
2.2 编译与运行
# 编译 Java MCP Server
mvn clean package -DskipTests
# 启动(stdio 模式,等待 Go Client 连接)
java -jar target/skill-mcp-server-2.0.0.jar ./skills
三、Go 端:MCP Client + Agent 编排
3.1 MCP Client 实现
// internal/client/mcp_client.go
package client
import (
"bufio"
"context"
"encoding/json"
"fmt"
"io"
"os/exec"
"sync"
"time"
)
// MCPClient 连接一个 MCP Server(通过 stdio 子进程)
type MCPClient struct {
cmd *exec.Cmd
stdin io.WriteCloser
reader *bufio.Reader
mu sync.Mutex
nextID int
pending map[int]chan json.RawMessage // ID → 响应 channel
pendingMu sync.Mutex
tools []ToolInfo
}
type ToolInfo struct {
Name string `json:"name"`
Description string `json:"description"`
Skill string `json:"skill,omitempty"`
InputSchema InputSchema `json:"inputSchema"`
}
type InputSchema struct {
Type string `json:"type"`
Properties map[string]Property `json:"properties"`
Required []string `json:"required"`
}
type Property struct {
Type string `json:"type"`
Description string `json:"description"`
}
// NewMCPClient 启动一个 MCP Server 子进程并建立 stdio 连接
func NewMCPClient(ctx context.Context, command string, args ...string) (*MCPClient, error) {
cmd := exec.CommandContext(ctx, command, args...)
stdin, err := cmd.StdinPipe()
if err != nil {
return nil, fmt.Errorf("stdin pipe: %w", err)
}
stdout, err := cmd.StdoutPipe()
if err != nil {
return nil, fmt.Errorf("stdout pipe: %w", err)
}
// 日志走 stderr,不干扰协议
cmd.Stderr = os.Stderr // 需要 import "os"
if err := cmd.Start(); err != nil {
return nil, fmt.Errorf("start server: %w", err)
}
c := &MCPClient{
cmd: cmd,
stdin: stdin,
reader: bufio.NewReader(stdout),
pending: make(map[int]chan json.RawMessage),
}
// 启动响应读取 goroutine
go c.readResponses()
// MCP 初始化握手
if err := c.initialize(ctx); err != nil {
c.Close()
return nil, fmt.Errorf("initialize: %w", err)
}
// 拉取工具列表
if err := c.fetchTools(ctx); err != nil {
c.Close()
return nil, fmt.Errorf("fetch tools: %w", err)
}
return c, nil
}
func (c *MCPClient) initialize(ctx context.Context) error {
params := map[string]interface{}{
"protocolVersion": "2024-11-05",
"clientInfo": map[string]string{
"name": "go-mcp-client",
"version": "1.0.0",
},
}
result, err := c.call(ctx, "initialize", params)
if err != nil {
return err
}
// 发送 initialized 通知
_ = c.send(ctx, "notifications/initialized", nil)
var resp struct {
ServerInfo struct {
Name string `json:"name"`
Version string `json:"version"`
} `json:"serverInfo"`
}
json.Unmarshal(result, &resp)
fmt.Fprintf(os.Stderr, "[client] Connected to %s v%s\n", resp.ServerInfo.Name, resp.ServerInfo.Version)
return nil
}
func (c *MCPClient) fetchTools(ctx context.Context) error {
result, err := c.call(ctx, "tools/list", nil)
if err != nil {
return err
}
var resp struct {
Tools []ToolInfo `json:"tools"`
}
if err := json.Unmarshal(result, &resp); err != nil {
return fmt.Errorf("parse tools list: %w", err)
}
c.tools = resp.Tools
fmt.Fprintf(os.Stderr, "[client] Loaded %d tools from server\n", len(c.tools))
return nil
}
// CallTool 调用远程工具
func (c *MCPClient) CallTool(ctx context.Context, name string, args map[string]interface{}) (string, error) {
params := map[string]interface{}{
"name": name,
"arguments": args,
}
result, err := c.call(ctx, "tools/call", params)
if err != nil {
return "", fmt.Errorf("call tool %s: %w", name, err)
}
var resp struct {
Content []struct {
Type string `json:"type"`
Text string `json:"text"`
} `json:"content"`
IsError bool `json:"isError"`
}
if err := json.Unmarshal(result, &resp); err != nil {
return "", fmt.Errorf("parse tool result: %w", err)
}
if len(resp.Content) == 0 {
return "", fmt.Errorf("empty tool result")
}
if resp.IsError {
return "", fmt.Errorf("tool error: %s", resp.Content[0].Text)
}
return resp.Content[0].Text, nil
}
// GetTools 返回已加载的工具列表
func (c *MCPClient) GetTools() []ToolInfo {
return c.tools
}
// ─── 内部方法 ───
func (c *MCPClient) call(ctx context.Context, method string, params interface{}) (json.RawMessage, error) {
c.mu.Lock()
id := c.nextID
c.nextID++
c.mu.Unlock()
// 创建响应 channel
respCh := make(chan json.RawMessage, 1)
c.pendingMu.Lock()
c.pending[id] = respCh
c.pendingMu.Unlock()
defer func() {
c.pendingMu.Lock()
delete(c.pending, id)
c.pendingMu.Unlock()
}()
// 发送请求
req := map[string]interface{}{
"jsonrpc": "2.0",
"id": id,
"method": method,
}
if params != nil {
req["params"] = params
}
if err := c.send(ctx, method, params); err != nil {
return nil, err
}
// 等待响应
select {
case <-ctx.Done():
return nil, ctx.Err()
case resp := <-respCh:
// 检查是否包含 error 字段
var errResp struct {
Error *struct {
Code int `json:"code"`
Message string `json:"message"`
} `json:"error"`
}
json.Unmarshal(resp, &errResp)
if errResp.Error != nil {
return nil, fmt.Errorf("rpc error %d: %s", errResp.Error.Code, errResp.Error.Message)
}
return resp, nil
}
}
func (c *MCPClient) send(ctx context.Context, method string, params interface{}) error {
data, err := json.Marshal(struct {
JSONRPC string `json:"jsonrpc"`
ID int `json:"id"`
Method string `json:"method"`
Params interface{} `json:"params,omitempty"`
}{
JSONRPC: "2.0",
ID: c.nextID,
Method: method,
Params: params,
})
if err != nil {
return fmt.Errorf("marshal request: %w", err)
}
c.mu.Lock()
defer c.mu.Unlock()
// 写请求 + 换行分隔符
if _, err := c.stdin.Write(append(data, '\n')); err != nil {
return fmt.Errorf("write request: %w", err)
}
return nil
}
func (c *MCPClient) readResponses() {
for {
line, err := c.reader.ReadBytes('\n')
if err != nil {
if err != io.EOF {
fmt.Fprintf(os.Stderr, "[client] read error: %v\n", err)
}
return
}
var resp struct {
JSONRPC string `json:"jsonrpc"`
ID int `json:"id"`
Result json.RawMessage `json:"result"`
Error json.RawMessage `json:"error"`
}
if err := json.Unmarshal(line, &resp); err != nil {
fmt.Fprintf(os.Stderr, "[client] parse error: %v\n", err)
continue
}
c.pendingMu.Lock()
ch, ok := c.pending[resp.ID]
c.pendingMu.Unlock()
if ok {
// 返回包含 result 的完整消息,让调用方自己解析
var fullResp struct {
Result json.RawMessage `json:"result"`
Error json.RawMessage `json:"error"`
}
json.Unmarshal(line, &fullResp)
ch <- fullResp.Result
}
}
}
func (c *MCPClient) Close() error {
c.stdin.Close()
return c.cmd.Wait()
}
3.2 Go Agent:编排 LLM + MCP 工具调用
// cmd/agent/main.go
package main
import (
"context"
"fmt"
"os"
"strings"
"github.com/example/mcp-client-go/internal/client"
)
// 注意:实际需导入 os 包
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
// 1. 启动 Java MCP Server 子进程,建立 stdio 连接
fmt.Fprintln(os.Stderr, "[agent] Starting Java MCP Server...")
mcpClient, err := client.NewMCPClient(ctx,
"java", "-jar", "../java-server/target/skill-mcp-server-2.0.0.jar", "../skills")
if err != nil {
fmt.Fprintf(os.Stderr, "[agent] Failed to connect: %v\n", err)
os.Exit(1)
}
defer mcpClient.Close()
// 2. 展示可用工具
fmt.Println("=== Available Tools ===")
for _, tool := range mcpClient.GetTools() {
fmt.Printf(" - %s: %s [skill=%s]\n", tool.Name, tool.Description, tool.Skill)
}
// 3. 模拟 Agent 编排:LLM 决定调用 google_search
userQuery := "最新的 AI Agent 框架有哪些?"
fmt.Printf("\n=== User Query ===\n%s\n\n", userQuery)
// 3a. 构建 LLM 提示词(简化版,生产环境应调用 OpenAI/Claude API)
toolsDesc := buildToolsDescription(mcpClient.GetTools())
prompt := fmt.Sprintf(`你是一个 AI 助手,可以使用以下工具:
%s
用户问题:%s
请决定使用哪个工具以及参数(仅返回工具名和参数 JSON)。
格式:TOOL: tool_name
ARGS: {"key": "value"}`, toolsDesc, userQuery)
// 3b. 调用 LLM(这里用模拟的 LLM 决策代替)
fmt.Fprintln(os.Stderr, "[agent] Asking LLM to decide tool...")
llmDecision := analyzeWithLLM(prompt) // 生产环境:实际 LLM API 调用
// 3c. 解析 LLM 返回的工具调用
toolName, toolArgs := parseToolCall(llmDecision)
fmt.Printf("[agent] LLM decided: %s(%v)\n", toolName, toolArgs)
// 3d. 通过 MCP Client 调用远程工具
fmt.Fprintln(os.Stderr, "[agent] Calling tool via MCP...")
result, err := mcpClient.CallTool(ctx, toolName, toolArgs)
if err != nil {
fmt.Printf("[agent] Tool call failed: %v\n", err)
os.Exit(1)
}
// 4. 展示结果
fmt.Println("=== Tool Result ===")
fmt.Println(truncate(result, 500))
// 5. 将工具结果送回 LLM 生成最终回答(略)
fmt.Println("\n=== Final Answer ===")
fmt.Println("根据搜索结果,当前主流的 AI Agent 框架包括 LangGraph、CrewAI、AutoGen 等...(演示回答)")
}
func buildToolsDescription(tools []client.ToolInfo) string {
var b strings.Builder
for _, t := range tools {
b.WriteString(fmt.Sprintf("- %s: %s\n", t.Name, t.Description))
}
return b.String()
}
// 模拟 LLM 工具选择(生产环境替换为 OpenAI API 调用)
func analyzeWithLLM(prompt string) string {
return `TOOL: google_search
ARGS: {"query": "latest AI Agent frameworks 2025", "numResults": 5}`
}
func parseToolCall(decision string) (string, map[string]interface{}) {
lines := strings.Split(strings.TrimSpace(decision), "\n")
toolName := ""
args := make(map[string]interface{})
for _, line := range lines {
if strings.HasPrefix(line, "TOOL:") {
toolName = strings.TrimSpace(strings.TrimPrefix(line, "TOOL:"))
}
if strings.HasPrefix(line, "ARGS:") {
jsonStr := strings.TrimSpace(strings.TrimPrefix(line, "ARGS:"))
_ = json.Unmarshal([]byte(jsonStr), &args)
}
}
return toolName, args
}
func truncate(s string, maxLen int) string {
if len(s) <= maxLen {
return s
}
return s[:maxLen] + "..."
}
// 需要 import: "time", "encoding/json"
四、完整调用链路
用户: "最新的 AI Agent 框架有哪些?"
Go Agent:
├─ 解析用户意图
├─ 构建 prompt(含 MCP 工具列表)
├─ LLM 决策 → google_search(query="AI Agent frameworks")
│
├─ mcpClient.CallTool("google_search", args)
│ │
│ ▼ JSON-RPC over stdio
│ ┌──────────────────────────────────┐
│ │ Java MCP Server │
│ │ ├─ tools/call 路由 │
│ │ ├─ SkillEngine.execute(...) │
│ │ │ ├─ 超时控制 (15s) │
│ │ │ ├─ 重试 (3次) │
│ │ │ └─ GoogleSearchTool.execute()
│ │ └─ 返回 JSON-RPC response │
│ └──────────────────────────────────┘
│
├─ 接收工具返回 → 搜索结果
├─ 二次 LLM 调用 → 生成最终回答
└─ 返回给用户
五、生产化部署考虑
5.1 连接池化
当多个 Agent 需要调用工具时,stdio 子进程模式有上限:
// 连接池 —— 预创建多个 MCP Server 实例
type MCPClientPool struct {
clients chan *MCPClient
cmd string
args []string
}
func NewPool(ctx context.Context, size int, cmd string, args ...string) (*MCPClientPool, error) {
pool := &MCPClientPool{
clients: make(chan *MCPClient, size),
cmd: cmd,
args: args,
}
for i := 0; i < size; i++ {
c, err := NewMCPClient(ctx, cmd, args...)
if err != nil {
return nil, err
}
pool.clients <- c
}
return pool, nil
}
func (p *MCPClientPool) Acquire(ctx context.Context) (*MCPClient, error) {
select {
case c := <-p.clients:
return c, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}
func (p *MCPClientPool) Release(c *MCPClient) {
p.clients <- c
}
5.2 生产环境替代方案
| 方案 | 适用场景 | 优点 | 缺点 |
|---|---|---|---|
| stdio 子进程 | 单机部署,轻度使用 | 简单,零网络开销 | 进程数有限 |
| HTTP SSE 传输 | 多服务分布式 | 水平扩展,负载均衡 | 网络延迟 |
| K8s Sidecar | 大规模生产 | 自动扩缩,健康检查 | 运维复杂度 |
5.3 健康检查
func (c *MCPClient) HealthCheck(ctx context.Context) error {
_, err := c.call(ctx, "tools/list", nil)
if err != nil {
// 重连逻辑
return fmt.Errorf("health check failed: %w", err)
}
return nil
}
六、系列总结
五篇文章覆盖了 MCP 工程化的完整链路:
| 篇 | 主题 | 语言 | 核心能力 |
|---|---|---|---|
| 1 | MCP 协议核心 | Java | JSON-RPC、stdio 传输、工具注册 |
| 2 | 生产级 MCP Server | Go | goroutine 池、超时、优雅关闭 |
| 3 | 工具调用中间件 | Go | 截断、超时、熔断、可观测性 |
| 4 | 技能系统设计 | Java | 配置驱动、热加载、重试机制 |
| 5 | 跨语言协作 | Java+Go | stdio IPC、连接池、多 Agent 编排 |
核心思想:AI Agent 的工具调用不是"写个函数"就能搞定的。从协议层(MCP)、基础设施层(并发/超时)、可靠性层(中间件)、编排层(技能系统)到集成层(跨语言),每一层都需要工程化思维。
更多推荐
所有评论(0)