AI Agent 编排引擎实战:从单次调用到多步推理的调度架构
AI Agent 编排引擎实战:从单次调用到多步推理的调度架构
一、引言痛点:Agent 编排的工程化挑战
单个 LLM 调用谁都会写,但把多个 Agent 串成一条能跑的生产链路,那是另一回事。现实中的 Agent 编排面临一堆工程问题:步骤间上下文怎么传递、中间结果挂了怎么重试、长链路超时怎么处理、多 Agent 并行怎么聚合、Token 消耗怎么控制。很多人拿 LangChain 拼个 demo 就以为搞定了 Agent 编排,结果一上生产就各种翻车——链路一长就超时、重试逻辑缺失导致状态不一致、Token 爆炸直接把预算烧光。
Agent 编排的本质是一个分布式调度问题,跟微服务编排是同一类挑战。本文直接上方案,从编排引擎设计、状态管理、错误处理三个维度,讲清楚怎么构建一个能在生产环境跑的 Agent 编排系统。
二、编排引擎架构设计
2.1 DAG 驱动的编排模型
flowchart TD
A[用户输入] --> B[意图识别 Agent]
B --> C[信息抽取 Agent]
B --> D[上下文检索 Agent]
C --> E[推理 Agent]
D --> E
E --> F{结果校验}
F -->|通过| G[输出格式化 Agent]
F -->|不通过| E
G --> H[最终输出]
style F fill:#f9f,stroke:#333
style E fill:#bbf,stroke:#333
2.2 编排引擎核心实现
// AI Agent 编排引擎
// 设计原则:DAG 定义流程,状态机驱动执行,失败可重试
package agent
import (
"context"
"fmt"
"sync"
"time"
)
// Step 定义编排中的一个步骤
type Step struct {
Name string // 步骤名称
Agent Agent // 执行该步骤的 Agent
DependsOn []string // 依赖的步骤名
Timeout time.Duration // 超时时间
MaxRetries int // 最大重试次数
RetryDelay time.Duration // 重试间隔
}
// Agent 接口定义
type Agent interface {
Name() string
Execute(ctx context.Context, input map[string]interface{}) (map[string]interface{}, error)
}
// StepResult 步骤执行结果
type StepResult struct {
StepName string
Output map[string]interface{}
Duration time.Duration
Retries int
TokenUsed int
Error error
}
// Pipeline 编排管线
type Pipeline struct {
steps map[string]*Step
order []string // 拓扑排序后的执行顺序
resultStore *ResultStore // 中间结果存储
}
// NewPipeline 创建编排管线
func NewPipeline(steps ...*Step) (*Pipeline, error) {
p := &Pipeline{
steps: make(map[string]*Step),
resultStore: NewResultStore(),
}
for _, s := range steps {
p.steps[s.Name] = s
}
// 拓扑排序
order, err := p.topologicalSort()
if err != nil {
return nil, fmt.Errorf("拓扑排序失败: %w", err)
}
p.order = order
return p, nil
}
// Execute 执行编排管线
func (p *Pipeline) Execute(ctx context.Context, initialInput map[string]interface{}) (*PipelineResult, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
results := make(map[string]*StepResult)
var mu sync.Mutex
// 按拓扑序执行
for _, stepName := range p.order {
step := p.steps[stepName]
// 收集上游依赖的输出
input := p.collectInputs(step, results)
// 合并初始输入
for k, v := range initialInput {
if _, exists := input[k]; !exists {
input[k] = v
}
}
// 执行步骤(带重试)
result := p.executeStepWithRetry(ctx, step, input)
mu.Lock()
results[stepName] = result
mu.Unlock()
// 存储中间结果
if result.Error == nil {
p.resultStore.Store(stepName, result.Output)
} else {
// 关键步骤失败,终止管线
return &PipelineResult{
StepResults: results,
Error: fmt.Errorf("步骤 %s 执行失败: %w", stepName, result.Error),
}, result.Error
}
}
return &PipelineResult{StepResults: results}, nil
}
// executeStepWithRetry 带重试的步骤执行
func (p *Pipeline) executeStepWithRetry(ctx context.Context, step *Step, input map[string]interface{}) *StepResult {
var result *StepResult
for attempt := 0; attempt <= step.MaxRetries; attempt++ {
if attempt > 0 {
select {
case <-ctx.Done():
return &StepResult{StepName: step.Name, Error: ctx.Err()}
case <-time.After(step.RetryDelay):
}
}
start := time.Now()
// 带超时的执行
stepCtx, cancel := context.WithTimeout(ctx, step.Timeout)
output, err := step.Agent.Execute(stepCtx, input)
cancel()
duration := time.Since(start)
result = &StepResult{
StepName: step.Name,
Output: output,
Duration: duration,
Retries: attempt,
Error: err,
}
if err == nil {
return result
}
// 不可重试的错误直接返回
if !isRetryableError(err) {
return result
}
}
return result
}
// collectInputs 收集上游步骤的输出作为当前步骤的输入
func (p *Pipeline) collectInputs(step *Step, results map[string]*StepResult) map[string]interface{} {
input := make(map[string]interface{})
for _, dep := range step.DependsOn {
if r, ok := results[dep]; ok && r.Error == nil {
for k, v := range r.Output {
input[fmt.Sprintf("%s.%s", dep, k)] = v
}
}
}
return input
}
// topologicalSort 拓扑排序
func (p *Pipeline) topologicalSort() ([]string, error) {
inDegree := make(map[string]int)
for name := range p.steps {
inDegree[name] = 0
}
for _, step := range p.steps {
for _, dep := range step.DependsOn {
inDegree[step.Name]++
}
}
var queue []string
for name, degree := range inDegree {
if degree == 0 {
queue = append(queue, name)
}
}
var order []string
for len(queue) > 0 {
name := queue[0]
queue = queue[1:]
order = append(order, name)
for _, step := range p.steps {
for _, dep := range step.DependsOn {
if dep == name {
inDegree[step.Name]--
if inDegree[step.Name] == 0 {
queue = append(queue, step.Name)
}
}
}
}
}
if len(order) != len(p.steps) {
return nil, fmt.Errorf("存在循环依赖")
}
return order, nil
}
func isRetryableError(err error) bool {
// 超时、限流、网络错误可重试
return true
}
// PipelineResult 管线执行结果
type PipelineResult struct {
StepResults map[string]*StepResult
Error error
}
// ResultStore 中间结果存储
type ResultStore struct {
mu sync.RWMutex
store map[string]map[string]interface{}
}
func NewResultStore() *ResultStore {
return &ResultStore{
store: make(map[string]map[string]interface{}),
}
}
func (rs *ResultStore) Store(stepName string, output map[string]interface{}) {
rs.mu.Lock()
defer rs.mu.Unlock()
rs.store[stepName] = output
}
func (rs *ResultStore) Get(stepName string) (map[string]interface{}, bool) {
rs.mu.RLock()
defer rs.mu.RUnlock()
out, ok := rs.store[stepName]
return out, ok
}
三、上下文管理与 Token 控制
3.1 上下文压缩策略
flowchart LR
A[原始上下文<br/>~8000 tokens] --> B[关键信息提取<br/>保留决策相关内容]
B --> C[摘要压缩<br/>~2000 tokens]
C --> D[滑动窗口<br/>保留最近 N 轮]
D --> E[压缩后上下文<br/>~3000 tokens]
A --> F[Token 预算分配]
F --> F1[系统提示: 500]
F --> F2[上下文: 3000]
F --> F3[当前输入: 1000]
F --> F4[输出预留: 1500]
3.2 Token 预算管理器
// Token 预算管理器
// 核心思路:给每一步分配 Token 预算,超了就压缩
package agent
import "fmt"
type TokenBudget struct {
Total int
System int
Context int
Input int
Output int
Used int
}
func NewTokenBudget(total int) *TokenBudget {
return &TokenBudget{
Total: total,
System: int(float64(total) * 0.1), // 10% 给系统提示
Context: int(float64(total) * 0.4), // 40% 给上下文
Input: int(float64(total) * 0.2), // 20% 给当前输入
Output: int(float64(total) * 0.3), // 30% 给输出
}
}
func (tb *TokenBudget) Allocate(stepName string, estimatedTokens int) (int, error) {
remaining := tb.Total - tb.Used
if estimatedTokens > remaining {
// 压缩上下文释放空间
released := tb.compressContext(estimatedTokens - remaining)
remaining += released
}
if estimatedTokens > remaining {
return 0, fmt.Errorf("Token 预算不足: 需要 %d, 剩余 %d", estimatedTokens, remaining)
}
tb.Used += estimatedTokens
return estimatedTokens, nil
}
func (tb *TokenBudget) compressContext(needed int) int {
// 实际实现:摘要压缩、滑动窗口、关键信息提取
// 这里返回模拟值
released := needed
if released > tb.Context/2 {
released = tb.Context / 2
}
tb.Context -= released
return released
}
func (tb *TokenBudget) Report() string {
return fmt.Sprintf(
"Token 预算: 总计=%d, 已用=%d(%0.1f%%), 剩余=%d",
tb.Total, tb.Used, float64(tb.Used)/float64(tb.Total)*100, tb.Total-tb.Used,
)
}
四、错误处理与可观测性
4.1 错误分类与处理策略
| 错误类型 | 示例 | 处理策略 |
|---|---|---|
| 可重试 | API 限流、网络超时 | 指数退避重试 |
| 可降级 | 某步骤输出质量低 | 跳过或用默认值替代 |
| 不可恢复 | API Key 失效、模型不存在 | 立即终止,返回错误 |
| 部分失败 | 并行步骤中部分失败 | 聚合成功结果,标记失败步骤 |
4.2 链路追踪集成
// Agent 编排的链路追踪
// 接入 OpenTelemetry,每一步都是一次 Span
package agent
import (
"context"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
func (p *Pipeline) executeStepWithTracing(ctx context.Context, step *Step, input map[string]interface{}) (*StepResult, error) {
tracer := otel.Tracer("agent-pipeline")
ctx, span := tracer.Start(ctx, fmt.Sprintf("step.%s", step.Name),
trace.WithAttributes(
attribute.String("agent.name", step.Agent.Name()),
attribute.Int("step.max_retries", step.MaxRetries),
attribute.String("step.timeout", step.Timeout.String()),
),
)
defer span.End()
result := p.executeStepWithRetry(ctx, step, input)
// 记录结果到 Span
span.SetAttributes(
attribute.Int("step.retries", result.Retries),
attribute.Int("step.tokens_used", result.TokenUsed),
attribute.String("step.duration", result.Duration.String()),
)
if result.Error != nil {
span.RecordError(result.Error)
span.SetAttributes(attribute.Bool("step.success", false))
} else {
span.SetAttributes(attribute.Bool("step.success", true))
}
return result, nil
}
五、边界分析与架构权衡
5.1 编排模式对比
| 模式 | 适用场景 | 复杂度 | 灵活性 |
|---|---|---|---|
| 线性链 | 简单的 A→B→C 流程 | 低 | 低 |
| DAG | 有依赖的多步流程 | 中 | 高 |
| 状态机 | 需要条件分支和循环 | 高 | 最高 |
| ReAct 循环 | Agent 自主决策 | 高 | 最高但不可控 |
生产环境推荐 DAG + 状态机混合:主流程用 DAG 定义,条件分支用状态机处理。ReAct 循环在可控场景下可用,但需要设置最大步数和 Token 上限。
5.2 关键设计决策
同步 vs 异步执行。 步骤间有依赖时必须同步,无依赖时可并行。并行执行的步骤需要聚合机制,推荐用 WaitGroup + 超时控制。
上下文传递方式。 全量传递简单但 Token 浪费严重,按需传递高效但实现复杂。推荐按需传递 + 压缩策略:每一步只接收依赖步骤的输出,长上下文自动摘要。
失败策略。 全部回滚太重,部分降级更实用。推荐策略:关键步骤失败终止管线,非关键步骤失败降级继续。
六、总结
Agent 编排的核心不是调 API,是工程化。三个要点:
第一,DAG 驱动,拓扑排序确定执行顺序。依赖关系必须显式声明,不能靠调用顺序隐式保证。拓扑排序 + 依赖输入收集,这是编排引擎的骨架。
第二,Token 预算必须管。不管 Token 预算的 Agent 编排,跟不管内存的程序一样,迟早爆。每一步分配预算,超了就压缩上下文,这是生产环境的基本操作。
第三,可观测性不是锦上添花,是必需品。链路追踪、Token 统计、步骤耗时,这些数据是排查问题的基础。没有可观测性的编排系统,出了问题就是黑盒。
别整虚的,把编排引擎当基础设施来建设,Agent 才能真正跑起来。
五、总结
围绕“AI Agent 编排引擎实战:从单次调用到多步推理的调度架构”,更稳妥的落地方式不是一次性追求完整平台,而是先确定核心路径,再把复杂能力逐步收敛到可验证的模块。第一步,明确输入、输出和失败边界,避免把不稳定因素藏在默认配置里。第二步,优先实现最小闭环,用真实数据验证性能、稳定性和维护成本。第三步,把监控、告警和回滚策略前置到设计阶段,而不是上线后再补。
后续迭代可以从三个方向推进:补齐自动化测试,覆盖正常路径、边界路径和异常路径;建立基准数据,持续比较版本变化带来的收益和副作用;沉淀操作手册,把排障步骤、指标含义和禁用场景写清楚。只要这些基础工作到位,方案就不会停留在概念层,而能成为团队可以长期维护的工程资产。
更多推荐
所有评论(0)