万物皆映射:AI Agent 时代的系统思维与认知工程

第3章 控制映射:控制论、反馈与 Agent 自调节

核心命题:一切智能行为的本质是控制——感知环境、比较目标、计算偏差、执行校正。控制论不是工程的分支,而是理解所有自适应系统的统一科学。AI Agent 的根本架构,就是一个高度精密的控制系统:它的智能不在于映射的精确性,而在于映射的自调节能力。

核心公式

Agent Autonomy = Requisite Variety × Control Bandwidth × Adaptation Rate

其中:
- Requisite Variety = 控制器内部状态空间的复杂度(Ashby 定律)
- Control Bandwidth = 感知-决策-执行的循环频率
- Adaptation Rate = 控制器自身参数的更新速率

本章在全书中的角色:第1章建立了映射论的统一视角,第2章引入了系统思维的网络视角。本章将映射论与系统思维落地为工程方法——控制论。如果映射是字母,系统是语法,那么控制论就是"修辞学"——它教你如何写出优雅、稳定、自适应的映射。


3.1 控制论的诞生:从防空炮到宇宙万物

3.1.1 诺伯特·维纳的洞察

1943年,二战正酣。数学家诺伯特·维纳(Norbert Wiener)正在研究一个看似简单的工程问题:如何让防空炮自动瞄准高速飞行的敌机?

这个问题的核心挑战不在于火炮的机械设计,而在于预测。飞机不是静止的目标,它会转向、加速、规避。炮手需要预测飞机未来的位置,提前开火。但炮手自己也在根据飞机的实时运动不断调整——这是一个循环因果的过程:

飞机运动 → 炮手观察 → 炮手预测 → 炮手瞄准 → 开火
    ↑                                              ↓
    ← 飞机根据炮火调整运动 ←──────────────────────┘

维纳的突破性洞察是:这个"观察-预测-行动-反馈"的循环,不仅存在于防空炮系统中,它是一切有目的行为的共同结构。从人体的体温调节到恒温器的工作,从猫的追捕到经济市场的波动——所有这些看似不同的系统,共享同一个控制结构。

1948年,维纳出版了《控制论:关于在动物和机器中控制和通信的科学》(Cybernetics: Or Control and Communication in the Animal and the Machine)。“Cybernetics"一词来自希腊语"kubernetes”(舵手),暗示了控制论的核心隐喻:像舵手一样,不断根据目标偏差调整航向

3.1.2 控制论的核心结构

所有控制系统共享同一个基本结构:

                    ┌─────────────┐
  目标值 ──→ ⊕ ──→ │   控制器    │ ──→ 执行动作 ──→ 系统输出
              ↑     └─────────────┘                    │
              │                                        ↓
              │         ┌──────────┐            ┌──────────┐
              └──────── │  传感器  │ ←──────────│  环境/被控 │
                        └──────────┘            └──────────┘

这个结构包含六个核心要素:

要素 功能 Agent 对应
目标值(Setpoint) 系统期望达到的状态 用户的意图 / 任务目标
传感器(Sensor) 测量系统当前状态 输入解析 / 状态监控
比较器(Comparator) 计算目标与现状的偏差 错误分析 / 差异检测
控制器(Controller) 根据偏差计算校正动作 LLM 推理 / 决策引擎
执行器(Actuator) 执行控制器的决策 工具调用 / 代码执行
被控系统(Plant) 被控制的对象/环境 外部系统 / 文件系统

3.1.3 用映射论重新理解控制

从映射论的视角,控制系统是一个自指的映射(self-referential mapping):

标准映射:   f: X → Y        // 输入空间映射到输出空间

控制映射:   f: (X × E × G) → A
             其中:
             X = 系统当前状态(传感器读数)
             E = 历史误差序列(过去的偏差)
             G = 目标状态(期望值)
             A = 控制动作

且 f 的输出 A 会影响下一时刻的 X:
             X(t+1) = Plant(X(t), A(t), disturbance(t))

这就是控制映射的自指性:映射的输出改变了映射的输入。这不是一个静态的 f(x) = y,而是一个动态的、自我修正的映射循环。

3.1.4 控制论的三次浪潮

浪潮 时期 核心概念 代表人物 Agent 应用
经典控制 1940s-1960s 负反馈、PID控制 Wiener, Nyquist Agent 的基本纠错循环
现代控制 1960s-1980s 状态空间、最优控制 Kalman, Pontryagin Agent 的多目标优化
自适应控制 1980s-至今 自调节、自学习 Åström, Wittenmark Agent 的自我进化

AI Agent 的设计需要融合三次浪潮的所有工具。仅靠经典控制(简单的反馈纠错)无法应对复杂任务;仅靠现代控制(最优规划)无法应对环境变化;必须有自适应控制(自我更新)的能力。


3.2 负反馈:稳定的基石

3.2.1 负反馈的数学本质

负反馈(Negative Feedback)是控制系统最基础的机制:当输出偏离目标时,系统产生一个反向的修正力,将输出拉回目标。

误差 e = 目标值 r - 当前输出 y
控制动作 u = K × e    // K 是增益
系统响应 y_new = f(u)

关键:如果 y > r,则 e < 0,则 u < 0,则 y 减小 → 回归目标
      如果 y < r,则 e > 0,则 u > 0,则 y 增大 → 回归目标

负反馈的数学本质是压缩映射(Contraction Mapping):

定义:映射 T 是压缩映射,如果存在 0 ≤ k < 1,使得
      d(T(x), T(y)) ≤ k × d(x, y)  对所有 x, y 成立

性质:压缩映射存在唯一不动点 x*,且迭代 xₙ₊₁ = T(xₙ) 收敛到 x*

负反馈系统就是一个压缩映射:每次迭代都让系统状态更接近目标。这是巴拿赫不动点定理(Banach Fixed Point Theorem)在工程中的应用。

3.2.2 PID 控制器:工业控制的标准方案

PID 控制器(Proportional-Integral-Derivative)是工业控制中最广泛使用的控制器,它结合了三种校正策略:

u(t) = Kp × e(t) + Ki × ∫e(τ)dτ + Kd × de(t)/dt

其中:
- Kp × e(t)  → 比例项:当前误差越大,修正越强
- Ki × ∫e(τ)dτ → 积分项:消除持续存在的稳态误差
- Kd × de(t)/dt → 微分项:预测误差趋势,提前干预

三项的直觉理解

直觉 类比 Agent 应用
P(比例) “现在偏了多少?” 看到车偏了就打方向盘 发现输出错误,立即纠正
I(积分) “一直偏着吗?” 感觉一直在偏右,加大左转 发现系统性偏差,做根本修正
D(微分) “偏移的速度在加快吗?” 看到要冲出弯道了,提前减速 预测错误趋势,提前防范

3.2.3 Agent 中的 PID 控制实现

/**
 * PID 控制器在 AI Agent 中的应用
 * 
 * 将经典的 PID 控制原理应用于 Agent 的行为调节
 */

class PIDController {
  private Kp: number;  // 比例增益
  private Ki: number;  // 积分增益
  private Kd: number;  // 微分增益

  private integral: number = 0;
  private previousError: number = 0;
  private integralLimit: number;  // 积分饱和限制

  constructor(params: {
    Kp: number;
    Ki: number;
    Kd: number;
    integralLimit?: number;
  }) {
    this.Kp = params.Kp;
    this.Ki = params.Ki;
    this.Kd = params.Kd;
    this.integralLimit = params.integralLimit || 100;
  }

  /**
   * 计算控制输出
   */
  compute(error: number, dt: number = 1): number {
    // 比例项:当前误差
    const proportional = this.Kp * error;

    // 积分项:累积误差(带反饱和)
    this.integral += error * dt;
    this.integral = Math.max(-this.integralLimit,
      Math.min(this.integralLimit, this.integral));
    const integral = this.Ki * this.integral;

    // 微分项:误差变化率
    const derivative = this.Kd * (error - this.previousError) / dt;
    this.previousError = error;

    return proportional + integral + derivative;
  }

  /**
   * 重置控制器状态
   */
  reset(): void {
    this.integral = 0;
    this.previousError = 0;
  }
}

/**
 * Agent 质量控制器
 * 
 * 使用 PID 控制来维持 Agent 输出质量
 */
class AgentQualityController {
  private pid: PIDController;
  private targetAccuracy: number;
  private recentScores: number[] = [];
  private adjustmentHistory: { time: number; adjustment: number; reason: string }[] = [];

  constructor(targetAccuracy: number = 0.95) {
    this.targetAccuracy = targetAccuracy;
    this.pid = new PIDController({
      Kp: 0.5,   // 对当前偏差适度响应
      Ki: 0.1,   // 缓慢消除系统性偏差
      Kd: 0.2,   // 预测趋势,避免过度调整
      integralLimit: 10,
    });
  }

  /**
   * 每次 Agent 输出后调用,评估并决定调整策略
   */
  evaluate(score: number): QualityAdjustment {
    this.recentScores.push(score);
    if (this.recentScores.length > 50) this.recentScores.shift();

    // 计算误差:实际表现与目标的差距
    const error = this.targetAccuracy - score;

    // PID 计算调整量
    const adjustment = this.pid.compute(error);

    // 将调整量映射到具体的 Agent 参数
    return this.mapToAgentParams(adjustment, error);
  }

  /**
   * 将 PID 输出映射到 Agent 的可调参数
   */
  private mapToAgentParams(adjustment: number, error: number): QualityAdjustment {
    const actions: string[] = [];

    if (Math.abs(adjustment) < 0.05) {
      return { adjust: false, actions: [], confidence: 'high' };
    }

    if (adjustment > 0) {
      // 质量低于目标,需要提升
      if (error > 0.1) {
        actions.push('启用思维链(Chain-of-Thought)推理');
        actions.push('增加自我验证步骤');
      }
      if (error > 0.2) {
        actions.push('降低生成温度(更确定性)');
        actions.push('启用外部知识检索');
      }
      actions.push('增加输出审查环节');
    } else {
      // 质量高于目标,可以放松以提升效率
      actions.push('适当提高生成温度(增加创造性)');
      actions.push('减少不必要的验证步骤以提升速度');
    }

    this.adjustmentHistory.push({
      time: Date.now(),
      adjustment,
      reason: actions.join(', '),
    });

    return {
      adjust: true,
      actions,
      magnitude: adjustment,
      confidence: Math.abs(adjustment) > 0.1 ? 'high' : 'medium',
    };
  }

  /**
   * 获取控制器状态报告
   */
  getStatus(): QualityControllerStatus {
    const avgScore = this.recentScores.length > 0
      ? this.recentScores.reduce((a, b) => a + b, 0) / this.recentScores.length
      : 0;

    return {
      currentAccuracy: avgScore,
      targetAccuracy: this.targetAccuracy,
      error: this.targetAccuracy - avgScore,
      integral: (this.pid as any).integral,
      recentTrend: this.calculateTrend(),
      adjustmentsApplied: this.adjustmentHistory.length,
    };
  }

  private calculateTrend(): 'improving' | 'declining' | 'stable' {
    if (this.recentScores.length < 10) return 'stable';
    const first5 = this.recentScores.slice(-10, -5);
    const last5 = this.recentScores.slice(-5);
    const firstAvg = first5.reduce((a, b) => a + b, 0) / 5;
    const lastAvg = last5.reduce((a, b) => a + b, 0) / 5;

    if (lastAvg > firstAvg + 0.02) return 'improving';
    if (lastAvg < firstAvg - 0.02) return 'declining';
    return 'stable';
  }
}

interface QualityAdjustment {
  adjust: boolean;
  actions: string[];
  magnitude?: number;
  confidence: 'low' | 'medium' | 'high';
}

interface QualityControllerStatus {
  currentAccuracy: number;
  targetAccuracy: number;
  error: number;
  integral: number;
  recentTrend: 'improving' | 'declining' | 'stable';
  adjustmentsApplied: number;
}

3.2.4 负反馈在 Agent 中的五种应用模式

模式一:输出质量反馈

用户问题 → Agent 生成回答 → 质量评估 → 偏差检测 → 参数调整
                                                  ↓
                              温度降低 / 增加验证步骤 / 启用 RAG

模式二:对话节奏反馈

用户消息 → 响应时间测量 → 节奏偏差 → 策略调整
                                     ↓
                回复过长则精简 / 回复过短则详细 / 延迟过高则简化

模式三:任务复杂度反馈

任务执行 → 成功率跟踪 → 复杂度评估 → 难度调节
                                     ↓
             成功率过高则提升难度 / 成功率过低则降低难度

模式四:资源使用反馈

API 调用 → 成本跟踪 → 预算偏差 → 策略调整
                                 ↓
           超预算则降级模型 / 低于预算则升级模型 / 优化提示词

模式五:用户满意度反馈

Agent 回复 → 用户反馈(显式/隐式)→ 满意度评估 → 行为调整
                                                   ↓
            满意度低则增加确认步骤 / 满意度高则减少确认步骤

3.3 正反馈:增长与崩溃的引擎

3.3.1 正反馈的数学本质

正反馈(Positive Feedback):当输出偏离初始状态时,系统产生一个同向的作用力,使偏离进一步增大。

正反馈:输出增大 → 检测到增大 → 产生增大信号 → 输出进一步增大

数学模型:
  dx/dt = k × x    (k > 0)
  解:x(t) = x₀ × e^(kt)    // 指数增长

正反馈是发散映射:每次迭代都让系统状态更远离初始状态。它没有不动点(除零外),行为是指数增长或指数崩溃。

3.3.2 正反馈的建设性与破坏性

建设性正反馈(需要设计):

场景 机制 Agent 应用
知识积累 知道越多 → 学习越快 → 知道更多 知识图谱的自我丰富
技能进化 技能越多 → 能做的任务越多 → 学到更多技能 技能库的自动提取和积累
信任建立 表现好 → 更多委托 → 更多数据 → 表现更好 用户信任的渐进式建立
创新涌现 想法组合 → 新想法 → 更多组合 Agent 的创造性思维

破坏性正反馈(需要抑制):

场景 机制 Agent 风险
错误放大 小错误 → 基于错误的推理 → 更大错误 幻觉的自我加强
过度自信 偶尔正确 → 减少验证 → 更多错误 校准失调
复杂度螺旋 新功能 → 代码复杂 → 更多 bug → 更多修复代码 技术债务失控
资源耗尽 更多用户 → 更多请求 → 更多资源使用 → 系统变慢 服务降级

3.3.3 Agent 正反馈管理系统

/**
 * Agent 正反馈管理器
 * 
 * 设计原则:
 * 1. 强化建设性正反馈(飞轮效应)
 * 2. 抑制破坏性正反馈(失控风险)
 * 3. 为正反馈设置安全边界
 */

class PositiveFeedbackManager {
  private reinforcingLoops: Map<string, ReinforcingLoop> = new Map();
  private safetyBounds: Map<string, SafetyBound> = new Map();

  /**
   * 注册一个增强回路
   */
  registerLoop(loop: ReinforcingLoop): void {
    this.reinforcingLoops.set(loop.id, loop);

    // 为每个增强回路设置安全边界
    if (loop.safetyBound) {
      this.safetyBounds.set(loop.id, loop.safetyBound);
    }
  }

  /**
   * 执行一轮增强回路更新
   */
  async update(): Promise<FeedbackUpdateResult[]> {
    const results: FeedbackUpdateResult[] = [];

    for (const [id, loop] of this.reinforcingLoops) {
      const currentValue = await loop.measureValue();
      const bound = this.safetyBounds.get(id);

      // 检查安全边界
      if (bound && currentValue >= bound.maxValue) {
        // 触发安全限制
        results.push({
          loopId: id,
          status: 'bounded',
          message: `增强回路 "${loop.name}" 已触发安全上限 ${bound.maxValue}`,
          action: 'throttle',
        });
        await loop.throttle();
        continue;
      }

      // 执行增强
      if (loop.isActive) {
        const growth = await loop.executeAmplification();
        results.push({
          loopId: id,
          status: 'amplifying',
          value: currentValue,
          growth,
          message: `增强回路 "${loop.name}" 正在加速: +${growth.toFixed(2)}`,
        });
      }
    }

    return results;
  }

  /**
   * 检测是否有增强回路正在失控
   */
  detectRunaway(): RunawayDetection {
    const runawayLoops: string[] = [];

    for (const [id, loop] of this.reinforcingLoops) {
      const history = loop.getHistory();
      if (history.length >= 5) {
        // 检查是否呈指数增长
        const growthRates: number[] = [];
        for (let i = 1; i < history.length; i++) {
          if (history[i - 1] > 0) {
            growthRates.push(history[i] / history[i - 1]);
          }
        }

        const avgGrowth = growthRates.reduce((a, b) => a + b, 0) / growthRates.length;
        const isAccelerating = growthRates.every((r, i) =>
          i === 0 || r >= growthRates[i - 1] * 0.9
        );

        if (avgGrowth > 1.1 && isAccelerating) {
          runawayLoops.push(id);
        }
      }
    }

    return {
      hasRunaway: runawayLoops.length > 0,
      runawayLoops,
      recommendation: runawayLoops.length > 0
        ? `以下增强回路可能失控: ${runawayLoops.join(', ')}。建议引入平衡回路。`
        : '所有增强回路在安全范围内',
    };
  }
}

interface ReinforcingLoop {
  id: string;
  name: string;
  type: 'constructive' | 'destructive';
  isActive: boolean;
  safetyBound?: SafetyBound;
  measureValue: () => Promise<number>;
  executeAmplification: () => Promise<number>;
  throttle: () => Promise<void>;
  getHistory: () => number[];
}

interface SafetyBound {
  maxValue: number;
  action: 'throttle' | 'stop' | 'alert';
}

interface FeedbackUpdateResult {
  loopId: string;
  status: 'amplifying' | 'bounded' | 'inactive';
  value?: number;
  growth?: number;
  message: string;
  action?: string;
}

interface RunawayDetection {
  hasRunaway: boolean;
  runawayLoops: string[];
  recommendation: string;
}

/**
 * 预定义的 Agent 增强回路
 */
class AgentReinforcingLoops {
  /**
   * 知识积累飞轮(建设性)
   */
  static knowledgeAccumulation(agent: any): ReinforcingLoop {
    const history: number[] = [];

    return {
      id: 'knowledge_flywheel',
      name: '知识积累飞轮',
      type: 'constructive',
      isActive: true,
      safetyBound: { maxValue: 10000, action: 'throttle' },

      async measureValue(): Promise<number> {
        const knowledgeCount = agent.knowledgeBase?.size || 0;
        history.push(knowledgeCount);
        return knowledgeCount;
      },

      async executeAmplification(): Promise<number> {
        // 从最近的交互中提取新知识
        const newKnowledge = await agent.extractKnowledgeFromRecentInteractions();
        return newKnowledge;
      },

      async throttle(): Promise<void> {
        // 降低知识提取频率
        agent.setKnowledgeExtractionRate(0.5);
      },

      getHistory: () => [...history],
    };
  }

  /**
   * 错误放大螺旋(破坏性,需要抑制)
   */
  static errorAmplification(agent: any): ReinforcingLoop {
    const history: number[] = [];

    return {
      id: 'error_amplification',
      name: '错误放大螺旋',
      type: 'destructive',
      isActive: false,  // 默认不激活,只在检测到错误链时激活
      safetyBound: { maxValue: 5, action: 'stop' },

      async measureValue(): Promise<number> {
        const errorChain = agent.getConsecutiveErrors();
        history.push(errorChain);
        return errorChain;
      },

      async executeAmplification(): Promise<number> {
        // 这不是真正执行放大,而是检测放大是否正在发生
        return agent.getConsecutiveErrors();
      },

      async throttle(): Promise<void> {
        // 打破错误链:强制人工介入
        agent.requestHumanIntervention('检测到错误放大螺旋,需要人工确认');
        agent.resetErrorChain();
      },

      getHistory: () => [...history],
    };
  }
}

3.4 前馈控制:预判的艺术

3.4.1 反馈控制的根本局限

反馈控制有一个固有的、不可消除的局限:它只能在误差出现之后才进行修正

反馈控制的时间线:
  扰动发生 → 系统偏离 → 传感器检测 → 控制器计算 → 执行修正 → 系统恢复
  ↑                                                    ↑
  误差产生                                        误差消除

这段时间差就是反馈控制的"盲区"——在盲区期间,系统处于非理想状态。

对于慢变系统(如恒温器),这个盲区可以忽略。但对于高速系统(如自动驾驶、实时交易)或高精度系统(如精密制造),盲区可能导致严重后果。

对于 AI Agent 来说,反馈控制的盲区意味着:

  • Agent 在犯错之后才知道自己犯了错 → 已经给用户造成了影响
  • Agent 在任务失败之后才知道任务会失败 → 已经浪费了时间和资源
  • Agent 在用户不满意之后才知道用户不满意 → 信任已经受损

3.4.2 前馈控制的原理

前馈控制(Feedforward Control)的核心思想是:在扰动影响系统之前就进行预防性调整

前馈控制的时间线:
  扰动信号 → 前馈控制器预测影响 → 提前调整 → 扰动到达时系统已准备好
  
  误差从未产生!这就是前馈的优势。

前馈控制的数学模型

标准反馈:  u = K × e(t)        // 等到误差出现再修正
前馈控制:  u = K_ff × d(t)     // 根据扰动信号提前修正
组合控制:  u = K × e(t) + K_ff × d(t)   // 反馈 + 前馈

其中 d(t) 是可测量的扰动信号
K_ff 是前馈增益(需要精确的系统模型来计算)

前馈控制需要一个关键前提:对系统的精确模型。你必须知道扰动 d 会如何影响输出 y,才能计算出正确的预防性调整。

3.4.3 Agent 的前馈控制架构

AI Agent 的前馈控制体现为预测性决策——根据上下文和历史信息,预判可能的问题并提前处理。

/**
 * Agent 前馈控制器
 * 
 * 核心能力:在问题发生之前预测并预防
 */

class FeedforwardController {
  private disturbanceModels: Map<string, DisturbanceModel> = new Map();
  private preventionActions: Map<string, PreventionAction[]> = new Map();
  private predictionHistory: PredictionRecord[] = [];

  /**
   * 注册扰动模型
   */
  registerDisturbance(model: DisturbanceModel): void {
    this.disturbanceModels.set(model.id, model);
  }

  /**
   * 在每次任务执行前进行前馈分析
   */
  async analyze(input: AgentInput): Promise<FeedforwardAnalysis> {
    const detectedDisturbances: DisturbanceDetection[] = [];
    const recommendedPreventions: PreventionAction[] = [];

    // 扫描所有已注册的扰动模型
    for (const [id, model] of this.disturbanceModels) {
      const detection = await model.detect(input);
      if (detection.isPresent) {
        detectedDisturbances.push({ ...detection, modelId: id });

        // 获取预防措施
        const preventions = model.getPreventionActions(detection.severity);
        recommendedPreventions.push(...preventions);
      }
    }

    // 记录预测
    const prediction: PredictionRecord = {
      timestamp: Date.now(),
      input: input.task,
      disturbances: detectedDisturbances,
      preventions: recommendedPreventions,
      verified: false,
    };
    this.predictionHistory.push(prediction);

    return {
      hasDisturbances: detectedDisturbances.length > 0,
      disturbances: detectedDisturbances,
      preventions: recommendedPreventions,
      confidence: this.calculateConfidence(detectedDisturbances),
    };
  }

  /**
   * 验证预测是否准确(在任务完成后调用)
   */
  async verify(predictionIndex: number, actualOutcome: TaskOutcome): Promise<void> {
    const prediction = this.predictionHistory[predictionIndex];
    if (!prediction) return;

    prediction.verified = true;
    prediction.actualOutcome = actualOutcome;

    // 更新扰动模型的准确度
    for (const disturbance of prediction.disturbances) {
      const model = this.disturbanceModels.get(disturbance.modelId);
      if (model) {
        const wasAccurate = actualOutcome.problems?.some(p =>
          p.type === disturbance.modelId
        );
        model.updateAccuracy(wasAccurate);
      }
    }
  }

  private calculateConfidence(disturbances: DisturbanceDetection[]): number {
    if (disturbances.length === 0) return 1.0;
    const avgConfidence = disturbances.reduce(
      (sum, d) => sum + d.confidence, 0
    ) / disturbances.length;
    return avgConfidence;
  }
}

interface DisturbanceModel {
  id: string;
  name: string;
  detect: (input: AgentInput) => Promise<DisturbanceDetection>;
  getPreventionActions: (severity: number) => PreventionAction[];
  updateAccuracy: (wasAccurate: boolean) => void;
}

interface DisturbanceDetection {
  modelId?: string;
  isPresent: boolean;
  severity: number;       // 0-1
  confidence: number;     // 0-1
  description: string;
}

interface PreventionAction {
  type: 'parameter_adjustment' | 'additional_step' | 'alternative_strategy' | 'escalation';
  description: string;
  priority: number;
}

interface PredictionRecord {
  timestamp: number;
  input: string;
  disturbances: DisturbanceDetection[];
  preventions: PreventionAction[];
  verified: boolean;
  actualOutcome?: TaskOutcome;
}

interface TaskOutcome {
  success: boolean;
  problems?: { type: string; description: string }[];
  userFeedback?: string;
}

/**
 * 预定义的 Agent 扰动模型
 */
class AgentDisturbanceModels {
  /**
   * 歧义输入扰动
   * 当用户输入模糊或歧义时,提前采取措施
   */
  static ambiguousInput(): DisturbanceModel {
    let accuracyHistory: boolean[] = [];

    return {
      id: 'ambiguous_input',
      name: '歧义输入检测',

      async detect(input: AgentInput): Promise<DisturbanceDetection> {
        // 歧义信号检测
        const ambiguitySignals = [
          input.task.length < 10,                          // 输入过短
          /\?/.test(input.task) && input.task.length < 20, // 短问题
          input.constraints.length === 0,                  // 没有约束条件
          /可能|也许|大概|差不多/.test(input.task),         // 模糊词汇
        ];

        const signalCount = ambiguitySignals.filter(Boolean).length;
        const severity = signalCount / ambiguitySignals.length;

        return {
          isPresent: severity > 0.5,
          severity,
          confidence: 0.7,
          description: severity > 0.5
            ? `检测到 ${signalCount} 个歧义信号,建议先确认需求`
            : '输入清晰',
        };
      },

      getPreventionActions(severity: number): PreventionAction[] {
        if (severity < 0.5) return [];
        return [
          {
            type: 'additional_step',
            description: '在回答前先向用户确认需求理解是否正确',
            priority: 1,
          },
          {
            type: 'additional_step',
            description: '列出可能的理解方式,让用户选择',
            priority: 2,
          },
        ];
      },

      updateAccuracy(wasAccurate: boolean): void {
        accuracyHistory.push(wasAccurate);
        if (accuracyHistory.length > 100) accuracyHistory.shift();
      },
    };
  }

  /**
   * 复杂度超载扰动
   * 当任务复杂度超出 Agent 能力时,提前预警
   */
  static complexityOverload(): DisturbanceModel {
    return {
      id: 'complexity_overload',
      name: '复杂度超载检测',

      async detect(input: AgentInput): Promise<DisturbanceDetection> {
        // 复杂度信号
        const complexitySignals = [
          input.constraints.length > 10,                  // 约束过多
          input.task.split('和').length > 3,              // 多任务混合
          input.context.length > 5000,                    // 上下文过长
          /同时|并且|还要|另外/.test(input.task),          // 并行需求词汇
        ];

        const signalCount = complexitySignals.filter(Boolean).length;
        const severity = signalCount / complexitySignals.length;

        return {
          isPresent: severity > 0.5,
          severity,
          confidence: 0.65,
          description: severity > 0.5
            ? `任务复杂度较高(${signalCount}/4 信号),建议分步处理`
            : '复杂度适中',
        };
      },

      getPreventionActions(severity: number): PreventionAction[] {
        if (severity < 0.5) return [];
        return [
          {
            type: 'alternative_strategy',
            description: '将任务分解为子任务,分步执行',
            priority: 1,
          },
          {
            type: 'additional_step',
            description: '为每个子任务设置检查点',
            priority: 2,
          },
          {
            type: 'escalation',
            description: '如果子任务仍然复杂,考虑升级为多Agent协作',
            priority: 3,
          },
        ];
      },

      updateAccuracy(_wasAccurate: boolean): void {},
    };
  }

  /**
   * 幻觉风险扰动
   * 当任务涉及 Agent 可能编造信息的领域时,提前防范
   */
  static hallucinationRisk(): DisturbanceModel {
    return {
      id: 'hallucination_risk',
      name: '幻觉风险检测',

      async detect(input: AgentInput): Promise<DisturbanceDetection> {
        const hallucinationSignals = [
          /最新|最近|今天/.test(input.task),              // 时效性信息
          /具体数字|统计数据|百分比/.test(input.task),     // 精确数据
          /法律|医疗|金融/.test(input.task),              // 高风险领域
          !input.constraints.some(c => c.includes('source')),  // 无来源约束
        ];

        const signalCount = hallucinationSignals.filter(Boolean).length;
        const severity = signalCount / hallucinationSignals.length;

        return {
          isPresent: severity > 0.5,
          severity,
          confidence: 0.75,
          description: severity > 0.5
            ? `幻觉风险较高(${signalCount}/4 信号),建议启用事实验证`
            : '幻觉风险较低',
        };
      },

      getPreventionActions(severity: number): PreventionAction[] {
        if (severity < 0.5) return [];
        return [
          {
            type: 'additional_step',
            description: '启用 RAG 检索增强,引用具体来源',
            priority: 1,
          },
          {
            type: 'parameter_adjustment',
            description: '降低温度参数,增加确定性',
            priority: 2,
          },
          {
            type: 'additional_step',
            description: '在回答中标注置信度和信息来源',
            priority: 3,
          },
        ];
      },

      updateAccuracy(_wasAccurate: boolean): void {},
    };
  }
}

3.4.4 反馈与前馈的组合:最佳控制策略

最优的 Agent 控制策略是反馈与前馈的组合

理想控制 = 前馈(预防已知问题) + 反馈(修正未预见的偏差)

前馈:基于模型预测,提前防范
  → 优点:零延迟,误差从未产生
  → 缺点:需要精确模型,无法处理未知扰动

反馈:基于误差修正,事后纠正
  → 优点:不依赖模型,能处理任何扰动
  → 缺点:有延迟,误差已经产生

组合:两者的优势互补
  → 前馈处理已知的、可预测的问题
  → 反馈处理未知的、不可预测的问题

3.5 Ashby 必要多样性定律:控制能力的终极限制

3.5.1 定律的陈述

1956年,W. Ross Ashby 提出了控制论中最重要的定理之一——必要多样性定律(Law of Requisite Variety):

只有多样性才能吸收多样性。控制器的状态空间必须大于或等于被控系统的状态空间,控制才可能实现。

用数学语言表达:

H(扰动) ≤ H(控制器) + H(环境)

其中 H 表示信息熵(多样性度量)

简化版:
  Variety(控制器) ≥ Variety(被控系统) / Variety(环境缓冲)

这个定律的直觉理解:

  • 如果你的系统有100种可能的状态
  • 而你的控制器只能区分10种状态
  • 那么你最多只能将系统控制到10种目标状态
  • 剩下的90种状态你无法区分,更无法控制

3.5.2 Ashby 定律在 AI Agent 中的深层含义

Ashby 定律揭示了 AI Agent 能力的根本限制

含义一:Agent 的能力受限于其内部状态的多样性

Agent 的内部状态空间:
  = LLM 的参数空间 × 上下文窗口 × 记忆系统 × 工具链
  
如果 Agent 的内部状态不够多样,它就无法处理足够多样的输入。

含义二:工具链是扩展必要多样性的关键杠杆

无工具的 Agent:
  Variety = LLM 的参数多样性(固定)

有工具的 Agent:
  Variety = LLM 多样性 × 工具组合空间
  
  10 个工具,每个有 5 种用法 → 5^10 ≈ 1000万种组合
  → 必要多样性指数级增长

含义三:多 Agent 系统是突破单一 Agent 多样性限制的途径

单 Agent:
  Variety = V₁

N Agent 协作:
  Variety = V₁ × V₂ × ... × Vₙ   (理论上)
  Variety = V₁ + V₂ + ... + Vₙ   (实际上,受通信带宽限制)

3.5.3 Agent 必要多样性分析器

/**
 * Agent 必要多样性分析器
 * 
 * 基于 Ashby 定律,分析 Agent 是否有足够的内部控制多样性
 * 来应对环境的复杂性
 */

class RequisiteVarietyAnalyzer {
  /**
   * 分析 Agent 的内部多样性
   */
  analyzeAgentVariety(agent: AgentDescription): VarietyAnalysis {
    // 1. LLM 的参数多样性
    const llmVariety = this.estimateLLMVariety(agent.llm);

    // 2. 上下文窗口的信息容量
    const contextVariety = this.estimateContextVariety(agent.contextWindow);

    // 3. 记忆系统的状态空间
    const memoryVariety = this.estimateMemoryVariety(agent.memory);

    // 4. 工具链的组合空间
    const toolVariety = this.estimateToolVariety(agent.tools);

    // 5. 策略库的行为空间
    const strategyVariety = this.estimateStrategyVariety(agent.strategies);

    // 总内部多样性(log空间加法,因为组合是乘法)
    const totalVariety = llmVariety + contextVariety + memoryVariety +
      toolVariety + strategyVariety;

    return {
      components: {
        llm: { bits: llmVariety, description: 'LLM 参数空间多样性' },
        context: { bits: contextVariety, description: '上下文窗口信息容量' },
        memory: { bits: memoryVariety, description: '记忆系统状态空间' },
        tools: { bits: toolVariety, description: '工具链组合空间' },
        strategies: { bits: strategyVariety, description: '策略库行为空间' },
      },
      totalBits: totalVariety,
      totalStates: Math.pow(2, totalVariety),
      assessment: this.assess(totalVariety),
    };
  }

  /**
   * 分析环境的多样性
   */
  analyzeEnvironmentVariety(environment: EnvironmentDescription): VarietyAnalysis {
    // 1. 输入空间的多样性
    const inputVariety = this.estimateInputVariety(environment.inputs);

    // 2. 任务类型的多样性
    const taskVariety = this.estimateTaskVariety(environment.tasks);

    // 3. 干扰/噪声的多样性
    const noiseVariety = this.estimateNoiseVariety(environment.disturbances);

    const totalVariety = inputVariety + taskVariety + noiseVariety;

    return {
      components: {
        inputs: { bits: inputVariety, description: '输入空间多样性' },
        tasks: { bits: taskVariety, description: '任务类型多样性' },
        noise: { bits: noiseVariety, description: '干扰/噪声多样性' },
      },
      totalBits: totalVariety,
      totalStates: Math.pow(2, totalVariety),
      assessment: this.assess(totalVariety),
    };
  }

  /**
   * Ashby 定律检验:Agent 是否有足够的控制能力?
   */
  ashbyCheck(agent: AgentDescription, environment: EnvironmentDescription): AshbyCheckResult {
    const agentVariety = this.analyzeAgentVariety(agent);
    const envVariety = this.analyzeEnvironmentVariety(environment);

    const ratio = agentVariety.totalBits / envVariety.totalBits;
    const deficit = Math.max(0, envVariety.totalBits - agentVariety.totalBits);

    return {
      agentVariety: agentVariety.totalBits,
      environmentVariety: envVariety.totalBits,
      ratio,
      isSufficient: ratio >= 1.0,
      deficitBits: deficit,
      recommendation: this.generateRecommendation(agentVariety, envVariety, deficit),
      gapAnalysis: this.identifyGaps(agentVariety, envVariety),
    };
  }

  private estimateLLMVariet(llm: { parameterCount: number; vocabSize: number }): number {
    // 粗略估计:log2(参数数量) 作为多样性上界
    return Math.log2(llm.parameterCount || 1e11);  // ~37 bits for 100B params
  }

  private estimateContextVariety(contextWindow: number): number {
    // 上下文窗口可以容纳的信息量
    return contextWindow * Math.log2(50000);  // 每个 token ~16 bits
  }

  private estimateMemoryVariety(memory: { type: string; capacity: number }): number {
    if (memory.type === 'unlimited') return 100;  // 理论无限
    return Math.log2(memory.capacity) * 10;
  }

  private estimateToolVariety(tools: { count: number; avgOptions: number }): number {
    // 工具的组合空间
    return tools.count * Math.log2(tools.avgOptions);
  }

  private estimateStrategyVariety(strategies: string[]): number {
    return Math.log2(Math.max(1, strategies.length));
  }

  private estimateInputVariety(inputs: { types: number; avgComplexity: number }): number {
    return Math.log2(inputs.types) + Math.log2(inputs.avgComplexity);
  }

  private estimateTaskVariety(tasks: { categories: number; variations: number }): number {
    return Math.log2(tasks.categories) + Math.log2(tasks.variations);
  }

  private estimateNoiseVariety(disturbances: { types: number; severity: number }): number {
    return Math.log2(disturbances.types) + Math.log2(disturbances.severity * 10);
  }

  private assess(bits: number): string {
    if (bits > 1000) return '极高多样性';
    if (bits > 100) return '高多样性';
    if (bits > 50) return '中等多样性';
    if (bits > 20) return '有限多样性';
    return '极低多样性';
  }

  private generateRecommendation(
    agent: VarietyAnalysis,
    env: VarietyAnalysis,
    deficit: number
  ): string {
    if (deficit <= 0) {
      return 'Agent 的控制多样性充足,可以应对环境复杂性';
    }

    const suggestions: string[] = [];
    suggestions.push(`多样性缺口: ${deficit.toFixed(0)} bits`);

    if (agent.components.tools.bits < 20) {
      suggestions.push('建议增加工具种类以扩展组合空间');
    }
    if (agent.components.memory.bits < 50) {
      suggestions.push('建议增强记忆系统以增加状态空间');
    }
    if (agent.components.strategies.bits < 10) {
      suggestions.push('建议丰富策略库以增加行为多样性');
    }

    return suggestions.join('。');
  }

  private identifyGaps(agent: VarietyAnalysis, env: VarietyAnalysis): string[] {
    const gaps: string[] = [];
    const agentComponents = Object.entries(agent.components);
    const envComponents = Object.entries(env.components);

    for (const [envKey, envComp] of envComponents) {
      // 找到对应的Agent组件
      const agentComp = agentComponents.find(([k]) =>
        k === envKey || k === envKey.replace('inputs', 'context')
      );
      if (agentComp && agentComp[1].bits < (envComp as any).bits) {
        gaps.push(`${envKey}: Agent(${agentComp[1].bits}bits) < 环境(${(envComp as any).bits}bits)`);
      }
    }

    return gaps;
  }
}

interface VarietyAnalysis {
  components: Record<string, { bits: number; description: string }>;
  totalBits: number;
  totalStates: number;
  assessment: string;
}

interface AshbyCheckResult {
  agentVariety: number;
  environmentVariety: number;
  ratio: number;
  isSufficient: boolean;
  deficitBits: number;
  recommendation: string;
  gapAnalysis: string[];
}

interface AgentDescription {
  llm: { parameterCount: number; vocabSize: number };
  contextWindow: number;
  memory: { type: string; capacity: number };
  tools: { count: number; avgOptions: number };
  strategies: string[];
}

interface EnvironmentDescription {
  inputs: { types: number; avgComplexity: number };
  tasks: { categories: number; variations: number };
  disturbances: { types: number; severity: number };
}

3.6 自适应控制:Agent 的自我进化引擎

3.6.1 从固定控制到自适应控制

传统控制器的参数是固定的——设计时确定,运行时不变。但 AI Agent 面对的环境是不断变化的:

  • 用户的偏好会改变
  • 任务的分布会漂移
  • 系统的性能会波动
  • 新的工具和信息源不断出现

自适应控制(Adaptive Control)的核心思想是:控制器的参数本身也是可调的,它能根据环境变化自动更新自己的控制策略

传统控制:
  控制器参数 θ 固定
  u(t) = f(e(t), θ)    // θ 不变

自适应控制:
  控制器参数 θ(t) 随时间更新
  u(t) = f(e(t), θ(t))  // θ(t) 根据性能反馈更新
  θ(t+1) = g(θ(t), performance_history)  // 参数更新规则

自适应控制有三个层次:

层次 机制 Agent 对应
增益调度 根据工作点切换参数 不同任务类型用不同提示词
模型参考自适应 参照理想模型调整参数 对照"完美回答"调整生成策略
自校正调节器 在线估计系统模型并更新控制律 Agent 实时学习环境特征并优化策略

3.6.2 Agent 自适应控制架构

/**
 * Agent 自适应控制器
 * 
 * 三个核心组件:
 * 1. 系统辨识器:实时估计环境和自身模型
 * 2. 策略优化器:根据模型更新控制策略
 * 3. 安全约束器:确保适应过程不违反安全边界
 */

class AdaptiveController {
  private systemIdentifier: SystemIdentifier;
  private strategyOptimizer: StrategyOptimizer;
  private safetyConstrainer: SafetyConstrainer;
  private adaptationHistory: AdaptationRecord[] = [];

  constructor(config: AdaptiveControllerConfig) {
    this.systemIdentifier = new SystemIdentifier(config.identification);
    this.strategyOptimizer = new StrategyOptimizer(config.optimization);
    this.safetyConstrainer = new SafetyConstrainer(config.safety);
  }

  /**
   * 主适应循环:在每次任务完成后执行
   */
  async adapt(taskResult: TaskResult): Promise<AdaptationOutcome> {
    // 1. 系统辨识:更新环境和自身模型
    const modelUpdate = await this.systemIdentifier.identify(taskResult);

    // 2. 策略优化:根据新模型计算策略更新
    const strategyUpdate = await this.strategyOptimizer.optimize(
      modelUpdate,
      taskResult
    );

    // 3. 安全约束:检查策略更新是否安全
    const constrained = this.safetyConstrainer.constrain(strategyUpdate);

    // 4. 应用更新
    if (constrained.isSafe) {
      await this.applyStrategyUpdate(constrained.update);
      this.adaptationHistory.push({
        timestamp: Date.now(),
        modelUpdate: modelUpdate.description,
        strategyUpdate: constrained.update.description,
        taskResult: taskResult.success,
      });
    }

    return {
      adapted: constrained.isSafe,
      reason: constrained.isSafe ? '策略已更新' : constrained.reason,
      modelUpdate: modelUpdate.description,
    };
  }

  private async applyStrategyUpdate(update: StrategyUpdate): Promise<void> {
    // 更新提示词模板
    if (update.promptAdjustments) {
      await this.updatePromptTemplates(update.promptAdjustments);
    }

    // 更新工具选择策略
    if (update.toolSelectionStrategy) {
      await this.updateToolSelection(update.toolSelectionStrategy);
    }

    // 更新质量阈值
    if (update.qualityThresholds) {
      await this.updateQualityThresholds(update.qualityThresholds);
    }
  }

  private async updatePromptTemplates(adj: any): Promise<void> { /* ... */ }
  private async updateToolSelection(strategy: any): Promise<void> { /* ... */ }
  private async updateQualityThresholds(thresholds: any): Promise<void> { /* ... */ }
}

/**
 * 系统辨识器:实时学习环境模型
 */
class SystemIdentifier {
  private environmentModel: EnvironmentModel;
  private selfModel: AgentSelfModel;
  private observationBuffer: Observation[] = [];

  constructor(config: any) {
    this.environmentModel = new EnvironmentModel();
    this.selfModel = new AgentSelfModel();
  }

  async identify(taskResult: TaskResult): Promise<ModelUpdate> {
    this.observationBuffer.push({
      timestamp: Date.now(),
      taskType: taskResult.taskType,
      complexity: taskResult.complexity,
      success: taskResult.success,
      duration: taskResult.duration,
      qualityScore: taskResult.qualityScore,
    });

    // 保留最近100个观察
    if (this.observationBuffer.length > 100) {
      this.observationBuffer.shift();
    }

    // 环境模型更新
    const envUpdate = this.environmentModel.update(this.observationBuffer);

    // 自我模型更新
    const selfUpdate = this.selfModel.update(this.observationBuffer);

    return {
      description: `环境模型: ${envUpdate.summary}; 自我模型: ${selfUpdate.summary}`,
      environmentModel: envUpdate,
      selfModel: selfUpdate,
    };
  }
}

/**
 * 策略优化器:根据模型更新计算最优策略
 */
class StrategyOptimizer {
  private currentStrategy: AgentStrategy;

  constructor(config: any) {
    this.currentStrategy = this.getDefaultStrategy();
  }

  async optimize(
    modelUpdate: ModelUpdate,
    taskResult: TaskResult
  ): Promise<StrategyUpdate> {
    const updates: StrategyUpdate = {
      description: '',
      promptAdjustments: null,
      toolSelectionStrategy: null,
      qualityThresholds: null,
    };

    // 根据任务结果调整策略
    if (!taskResult.success) {
      // 失败任务分析
      const failureReason = taskResult.failureReason;

      if (failureReason === 'insufficient_context') {
        updates.promptAdjustments = {
          addContextRequest: true,
          increaseDetailLevel: true,
        };
        updates.description += '增加上下文请求; ';
      }

      if (failureReason === 'wrong_tool_selection') {
        updates.toolSelectionStrategy = {
          increaseExploration: true,  // 增加工具探索
          penalizeFailedTools: true,  // 惩罚失败的工具选择
        };
        updates.description += '调整工具选择策略; ';
      }

      if (failureReason === 'quality_below_threshold') {
        updates.qualityThresholds = {
          increaseVerificationSteps: true,
          enableChainOfThought: true,
        };
        updates.description += '提高质量门槛; ';
      }
    } else if (taskResult.qualityScore > 0.95) {
      // 高质量完成:可以适度放松以提升效率
      updates.qualityThresholds = {
        reduceVerificationSteps: true,
        increaseEfficiency: true,
      };
      updates.description += '适度放松以提升效率; ';
    }

    return updates;
  }

  private getDefaultStrategy(): AgentStrategy {
    return {
      promptTemplate: 'default',
      toolSelectionPolicy: 'greedy',
      qualityThreshold: 0.85,
      maxRetries: 3,
    };
  }
}

/**
 * 安全约束器:确保适应过程安全
 */
class SafetyConstrainer {
  private bounds: SafetyBounds;

  constructor(bounds: SafetyBounds) {
    this.bounds = bounds;
  }

  constrain(update: StrategyUpdate): ConstrainedUpdate {
    // 检查是否在安全边界内
    const violations: string[] = [];

    if (update.qualityThresholds) {
      if (update.qualityThresholds.reduceVerificationSteps) {
        // 不允许降低到安全阈值以下
        if (this.bounds.minVerificationSteps > 0) {
          violations.push('验证步骤不能减少到安全阈值以下');
        }
      }
    }

    if (violations.length > 0) {
      return {
        isSafe: false,
        reason: violations.join('; '),
        update: { description: '被安全约束拒绝' },
      };
    }

    return {
      isSafe: true,
      update,
    };
  }
}

// 类型定义
interface AdaptiveControllerConfig {
  identification: any;
  optimization: any;
  safety: SafetyBounds;
}

interface SafetyBounds {
  minVerificationSteps: number;
  maxTemperature: number;
  minQualityThreshold: number;
}

interface TaskResult {
  taskType: string;
  complexity: number;
  success: boolean;
  duration: number;
  qualityScore: number;
  failureReason?: string;
}

interface ModelUpdate {
  description: string;
  environmentModel: any;
  selfModel: any;
}

interface StrategyUpdate {
  description: string;
  promptAdjustments: any;
  toolSelectionStrategy: any;
  qualityThresholds: any;
}

interface ConstrainedUpdate {
  isSafe: boolean;
  reason?: string;
  update: StrategyUpdate;
}

interface AdaptationOutcome {
  adapted: boolean;
  reason: string;
  modelUpdate: string;
}

interface AdaptationRecord {
  timestamp: number;
  modelUpdate: string;
  strategyUpdate: string;
  taskResult: boolean;
}

interface Observation {
  timestamp: number;
  taskType: string;
  complexity: number;
  success: boolean;
  duration: number;
  qualityScore: number;
}

interface EnvironmentModel {
  update(observations: Observation[]): any;
}

interface AgentSelfModel {
  update(observations: Observation[]): any;
}

interface AgentStrategy {
  promptTemplate: string;
  toolSelectionPolicy: string;
  qualityThreshold: number;
  maxRetries: number;
}

3.7 控制映射的完整框架:从理论到 Agent 架构

3.7.1 Agent 控制系统的五层架构

将控制论的所有概念整合为一个完整的 Agent 控制系统架构:

┌─────────────────────────────────────────────────────┐
│  L5: 元控制层(Meta-Control)                         │
│  监控和优化控制系统本身                               │
│  → Ashby 多样性分析、控制器选择、适应速率调节          │
├─────────────────────────────────────────────────────┤
│  L4: 自适应层(Adaptive Control)                     │
│  更新控制器参数                                      │
│  → 系统辨识、策略优化、模型更新                       │
├─────────────────────────────────────────────────────┤
│  L3: 前馈层(Feedforward Control)                    │
│  预测性干预                                          │
│  → 扰动检测、风险预判、预防性调整                     │
├─────────────────────────────────────────────────────┤
│  L2: 反馈层(Feedback Control)                       │
│  误差修正                                            │
│  → PID 控制、质量监控、行为调节                       │
├─────────────────────────────────────────────────────┤
│  L1: 执行层(Execution)                              │
│  基本的感知-行动循环                                  │
│  → 输入解析、LLM 推理、工具调用、输出生成             │
└─────────────────────────────────────────────────────┘

3.7.2 完整实现:自控制 Agent

/**
 * 自控制 Agent(Self-Controlling Agent)
 * 
 * 整合控制论所有层次的完整 Agent 架构
 */

class SelfControllingAgent {
  // 五层控制系统
  private executionLayer: ExecutionLayer;
  private feedbackLayer: FeedbackControlLayer;
  private feedforwardLayer: FeedforwardControlLayer;
  private adaptiveLayer: AdaptiveControlLayer;
  private metaControlLayer: MetaControlLayer;

  constructor(config: AgentConfig) {
    this.executionLayer = new ExecutionLayer(config.execution);
    this.feedbackLayer = new FeedbackControlLayer(config.feedback);
    this.feedforwardLayer = new FeedforwardControlLayer(config.feedforward);
    this.adaptiveLayer = new AdaptiveControlLayer(config.adaptive);
    this.metaControlLayer = new MetaControlLayer(config.meta);
  }

  /**
   * 主处理循环
   */
  async process(input: AgentInput): Promise<AgentOutput> {
    // L5: 元控制 - 检查控制系统本身的健康状态
    const metaStatus = this.metaControlLayer.checkHealth();
    if (metaStatus.needsAttention) {
      await this.metaControlLayer.selfRepair();
    }

    // L3: 前馈 - 预测可能的问题并预防
    const feedforwardAnalysis = await this.feedforwardLayer.analyze(input);
    const preventions = feedforwardAnalysis.preventions;

    // 应用前馈预防措施
    const adjustedInput = this.applyPreventions(input, preventions);

    // L1: 执行 - 处理任务
    let output = await this.executionLayer.execute(adjustedInput);

    // L2: 反馈 - 检查输出质量并修正
    const qualityCheck = this.feedbackLayer.evaluate(output, input);
    if (qualityCheck.needsCorrection) {
      output = await this.feedbackLayer.correct(output, qualityCheck);
    }

    // L4: 自适应 - 从本次经验中学习
    const taskResult = this.buildTaskResult(input, output, qualityCheck);
    await this.adaptiveLayer.adapt(taskResult);

    // L5: 元控制 - 更新控制系统统计
    this.metaControlLayer.recordCycle(taskResult);

    return output;
  }

  private applyPreventions(input: AgentInput, preventions: PreventionAction[]): AgentInput {
    let adjusted = { ...input };
    for (const prevention of preventions) {
      switch (prevention.type) {
        case 'additional_step':
          adjusted.additionalSteps = [...(adjusted.additionalSteps || []), prevention.description];
          break;
        case 'parameter_adjustment':
          adjusted.parameterOverrides = {
            ...adjusted.parameterOverrides,
            ...prevention,
          };
          break;
      }
    }
    return adjusted;
  }

  private buildTaskResult(
    input: AgentInput,
    output: AgentOutput,
    quality: QualityEvaluation
  ): TaskResult {
    return {
      taskType: input.taskType,
      complexity: input.complexity || 0.5,
      success: quality.passes,
      duration: output.duration || 0,
      qualityScore: quality.score,
      failureReason: quality.failureReason,
    };
  }
}

/**
 * L1: 执行层
 */
class ExecutionLayer {
  async execute(input: AgentInput): Promise<AgentOutput> {
    const startTime = Date.now();
    // 实际的 LLM 推理和工具调用
    // ... 这里省略具体实现
    return {
      result: '',
      duration: Date.now() - startTime,
      tokensUsed: 0,
    };
  }
}

/**
 * L2: 反馈控制层
 */
class FeedbackControlLayer {
  private qualityController: AgentQualityController;

  constructor(config: any) {
    this.qualityController = new AgentQualityController(config.targetQuality || 0.9);
  }

  evaluate(output: AgentOutput, input: AgentInput): QualityEvaluation {
    // 多维质量评估
    const scores = {
      relevance: this.evaluateRelevance(output, input),
      completeness: this.evaluateCompleteness(output, input),
      accuracy: this.evaluateAccuracy(output),
      coherence: this.evaluateCoherence(output),
    };

    const totalScore = (scores.relevance + scores.completeness +
      scores.accuracy + scores.coherence) / 4;

    return {
      score: totalScore,
      passes: totalScore >= 0.8,
      needsCorrection: totalScore < 0.8,
      details: scores,
      failureReason: totalScore < 0.8 ? this.identifyFailureReason(scores) : undefined,
    };
  }

  async correct(output: AgentOutput, evaluation: QualityEvaluation): Promise<AgentOutput> {
    // 基于质量评估进行修正
    const adjustment = this.qualityController.evaluate(evaluation.score);

    if (adjustment.adjust) {
      // 应用修正策略并重新生成
      // ... 省略具体实现
    }

    return output;
  }

  private evaluateRelevance(_output: AgentOutput, _input: AgentInput): number { return 0.85; }
  private evaluateCompleteness(_output: AgentOutput, _input: AgentInput): number { return 0.8; }
  private evaluateAccuracy(_output: AgentOutput): number { return 0.9; }
  private evaluateCoherence(_output: AgentOutput): number { return 0.85; }

  private identifyFailureReason(scores: Record<string, number>): string {
    const entries = Object.entries(scores);
    entries.sort((a, b) => a[1] - b[1]);
    return `${entries[0][0]}_below_threshold`;
  }
}

/**
 * L3: 前馈控制层
 */
class FeedforwardControlLayer {
  private controller: FeedforwardController;

  constructor(_config: any) {
    this.controller = new FeedforwardController();
    // 注册默认扰动模型
    this.controller.registerDisturbance(AgentDisturbanceModels.ambiguousInput());
    this.controller.registerDisturbance(AgentDisturbanceModels.complexityOverload());
    this.controller.registerDisturbance(AgentDisturbanceModels.hallucinationRisk());
  }

  async analyze(input: AgentInput): Promise<FeedforwardAnalysis> {
    return this.controller.analyze(input);
  }
}

/**
 * L4: 自适应控制层
 */
class AdaptiveControlLayer {
  private controller: AdaptiveController;

  constructor(config: any) {
    this.controller = new AdaptiveController(config);
  }

  async adapt(taskResult: TaskResult): Promise<AdaptationOutcome> {
    return this.controller.adapt(taskResult);
  }
}

/**
 * L5: 元控制层
 */
class MetaControlLayer {
  private cycleHistory: TaskResult[] = [];
  private lastHealthCheck: number = 0;

  checkHealth(): MetaControlStatus {
    const recentCycles = this.cycleHistory.slice(-50);
    if (recentCycles.length < 10) {
      return { needsAttention: false, health: 'initializing' };
    }

    const successRate = recentCycles.filter(c => c.success).length / recentCycles.length;
    const avgQuality = recentCycles.reduce((sum, c) => sum + c.qualityScore, 0) / recentCycles.length;

    const needsAttention = successRate < 0.7 || avgQuality < 0.7;

    return {
      needsAttention,
      health: needsAttention ? 'degraded' : 'healthy',
      successRate,
      avgQuality,
      cyclesProcessed: this.cycleHistory.length,
    };
  }

  async selfRepair(): Promise<void> {
    // 元控制层的自我修复
    // 例如:重置自适应参数、清理历史数据、重新校准评估模型
  }

  recordCycle(result: TaskResult): void {
    this.cycleHistory.push(result);
    if (this.cycleHistory.length > 1000) {
      this.cycleHistory = this.cycleHistory.slice(-500);
    }
  }
}

// 类型补充
interface AgentConfig {
  execution: any;
  feedback: any;
  feedforward: any;
  adaptive: any;
  meta: any;
}

interface AgentOutput {
  result: string;
  duration?: number;
  tokensUsed?: number;
}

interface QualityEvaluation {
  score: number;
  passes: boolean;
  needsCorrection: boolean;
  details: Record<string, number>;
  failureReason?: string;
}

interface MetaControlStatus {
  needsAttention: boolean;
  health: 'healthy' | 'degraded' | 'initializing';
  successRate?: number;
  avgQuality?: number;
  cyclesProcessed?: number;
}

interface AgentInput {
  task: string;
  taskType?: string;
  context?: string;
  constraints?: string[];
  complexity?: number;
  additionalSteps?: string[];
  parameterOverrides?: any;
}

3.8 实战案例:控制论解构真实 Agent 系统

3.8.1 案例一:Claude Code 的控制架构分析

Claude Code 是 Anthropic 的 AI 编程助手,它的设计完美体现了控制论的所有层次:

L1 执行层:
  → 代码生成、编辑、执行

L2 反馈层:
  → 代码执行结果检查(成功/失败)
  → 类型检查反馈
  → 测试结果反馈
  → 编译错误 → 自动修复循环

L3 前馈层:
  → 读取项目上下文(CLAUDE.md)预判规范
  → 分析文件依赖关系预防破坏性修改
  → 检测权限问题(只读文件)提前规避

L4 自适应层:
  → 从编辑成功/失败中学习文件结构
  → 根据项目特征调整编码风格
  → 根据用户反馈调整行为模式

L5 元控制层:
  → 上下文压缩前的信息归档(PreCompact Hook)
  → 会话持久化和恢复
  → 工具使用效率的自我监控

Claude Code 的控制论亮点

  1. 强反馈循环:每次代码修改后都会运行测试,形成闭环
  2. 前馈预防:通过 CLAUDE.md 预先了解项目规范
  3. 自适应能力:从每次成功/失败中更新策略
  4. 安全约束:破坏性操作需要确认(安全约束器)

3.8.2 案例二:自动驾驶的控制论全景

自动驾驶是控制论的集大成者,融合了所有控制层次:

经典控制(L1-L2):
  → PID 控制方向盘角度
  → 车道保持的负反馈
  → 巡航控制的速度调节

现代控制(L3):
  → 路径规划(最优控制)
  → 障碍预测(前馈控制)
  → 多目标优化(安全 vs 效率 vs 舒适)

自适应控制(L4-L5):
  → 不同天气条件下的参数自适应
  → 不同驾驶风格的风格学习
  → 车辆老化后的模型重辨识

3.8.3 案例三:HappyClaw 的消息路由控制系统

HappyClaw 的消息处理系统是一个精密的控制系统:

控制目标:确保每条消息都被正确处理且不丢失

负反馈机制:
  → 消息轮询(2s 间隔)检测新消息
  → 容器状态监控(运行中/空闲/满载)
  → 并发控制(最大20容器 + 5宿主机进程)
  → 失败重试(指数退避:5s→10s→20s→40s→80s)

前馈机制:
  → 消息去重(LRU 1000条/30min TTL)
  → 优先级调度(任务优先于消息)
  → 容量预判(满载时排队等待)

自适应机制:
  → 空闲超时自动关闭(30min无新消息)
  → 容器超时保护(30min最大运行时间)
  → 优雅关闭(_close sentinel → docker stop → docker kill)

Ashby 多样性:
  → 多通道支持(飞书/Telegram/QQ/钉钉/Web)
  → 多执行模式(容器/宿主机)
  → 多上下文模式(group/isolated)
  → 多通信协议(stdin/stdout/IPC/WebSocket)

3.9 动手实验:构建自适应温度控制器

3.9.1 实验目标

构建一个自适应温度控制器,模拟 AI Agent 的核心控制机制:

  1. 基本 PID 控制:经典反馈控制
  2. 扰动处理:前馈补偿
  3. 参数自适应:根据环境变化自动调参
  4. 安全约束:防止过冲和振荡

3.9.2 完整实现

/**
 * 自适应温度控制器实验
 * 
 * 模拟 AI Agent 的核心控制机制:
 * - PID 反馈控制(基本纠错)
 * - 前馈控制(预判性调整)
 * - 参数自适应(自动调参)
 * - 安全约束(防止过冲)
 */

class AdaptiveTemperatureController {
  // PID 参数
  private Kp: number = 2.0;
  private Ki: number = 0.5;
  private Kd: number = 1.0;

  // 控制器状态
  private integral: number = 0;
  private previousError: number = 0;
  private outputHistory: number[] = [];

  // 自适应参数
  private adaptationRate: number = 0.01;
  private performanceWindow: number[] = [];

  // 安全约束
  private maxOutput: number = 100;
  private minOutput: number = -100;
  private maxOvershoot: number = 2.0;  // 最大允许过冲

  /**
   * PID 控制步骤
   */
  pidStep(
    setpoint: number,
    currentTemp: number,
    dt: number = 1
  ): { output: number; error: number; p: number; i: number; d: number } {
    const error = setpoint - currentTemp;

    // P: 比例项
    const p = this.Kp * error;

    // I: 积分项(带反饱和)
    this.integral += error * dt;
    this.integral = Math.max(-50, Math.min(50, this.integral));
    const i = this.Ki * this.integral;

    // D: 微分项
    const d = this.Kd * (error - this.previousError) / dt;
    this.previousError = error;

    // 总输出(带安全约束)
    let output = p + i + d;
    output = Math.max(this.minOutput, Math.min(this.maxOutput, output));

    return { output, error, p, i, d };
  }

  /**
   * 自适应调参
   */
  adaptParams(performanceMetric: number): { Kp: number; Ki: number; Kd: number } {
    this.performanceWindow.push(performanceMetric);
    if (this.performanceWindow.length > 20) {
      this.performanceWindow.shift();
    }

    if (this.performanceWindow.length < 10) {
      return { Kp: this.Kp, Ki: this.Ki, Kd: this.Kd };
    }

    // 分析性能趋势
    const recent = this.performanceWindow.slice(-10);
    const older = this.performanceWindow.slice(-20, -10);
    const recentAvg = recent.reduce((a, b) => a + b, 0) / recent.length;
    const olderAvg = older.reduce((a, b) => a + b, 0) / older.length;

    // 性能在改善 → 保守调整
    if (recentAvg > olderAvg) {
      // 略微降低增益以提高稳定性
      this.Kp *= (1 - this.adaptationRate * 0.5);
      this.Kd *= (1 - this.adaptationRate * 0.3);
    }
    // 性能在恶化 → 激进调整
    else if (recentAvg < olderAvg) {
      // 检查是否振荡(增加微分)
      const oscillation = this.detectOscillation();
      if (oscillation) {
        this.Kd *= (1 + this.adaptationRate * 2);
        this.Kp *= (1 - this.adaptationRate);
      } else {
        // 响应太慢(增加比例和积分)
        this.Kp *= (1 + this.adaptationRate);
        this.Ki *= (1 + this.adaptationRate * 0.5);
      }
    }

    return { Kp: this.Kp, Ki: this.Ki, Kd: this.Kd };
  }

  private detectOscillation(): boolean {
    if (this.outputHistory.length < 20) return false;

    const recent = this.outputHistory.slice(-20);
    let signChanges = 0;
    for (let i = 1; i < recent.length; i++) {
      if (recent[i] * recent[i - 1] < 0) signChanges++;
    }

    return signChanges > 5;  // 频繁变号 = 振荡
  }

  /**
   * 运行完整模拟
   */
  simulate(params: {
    setpoint: number;
    initialTemp: number;
    steps: number;
    disturbanceAt?: number;
    disturbanceMagnitude?: number;
  }): SimulationResults {
    const { setpoint, initialTemp, steps } = params;
    let currentTemp = initialTemp;
    const results: StepResult[] = [];

    for (let step = 0; step < steps; step++) {
      // 扰动注入
      let disturbance = 0;
      if (params.disturbanceAt && step === params.disturbanceAt) {
        disturbance = params.disturbanceMagnitude || 0;
      }

      // PID 控制
      const control = this.pidStep(setpoint, currentTemp);

      // 环境模型(温度变化 = 控制输出 + 扰动 - 散热)
      const heatLoss = (currentTemp - 20) * 0.02;  // 散热
      currentTemp += (control.output * 0.1 + disturbance - heatLoss);

      // 性能评估
      const performance = 1 / (1 + Math.abs(control.error));

      // 自适应调参
      const newParams = this.adaptParams(performance);

      // 记录
      results.push({
        step,
        setpoint,
        currentTemp,
        error: control.error,
        output: control.output,
        p: control.p,
        i: control.i,
        d: control.d,
        Kp: newParams.Kp,
        Ki: newParams.Ki,
        Kd: newParams.Kd,
        disturbance,
      });

      this.outputHistory.push(control.output);
    }

    return {
      results,
      summary: this.summarizeResults(results, setpoint),
    };
  }

  private summarizeResults(results: StepResult[], setpoint: number): SimulationSummary {
    const temps = results.map(r => r.currentTemp);
    const errors = results.map(r => Math.abs(r.error));

    // 上升时间(首次达到目标的90%)
    const riseTime = results.findIndex(r =>
      Math.abs(r.currentTemp - setpoint) < Math.abs(setpoint - results[0].currentTemp) * 0.1
    );

    // 过冲量
    const maxTemp = Math.max(...temps);
    const overshoot = maxTemp > setpoint ? (maxTemp - setpoint) / setpoint * 100 : 0;

    // 稳态误差(最后10步的平均误差)
    const lastErrors = errors.slice(-10);
    const steadyStateError = lastErrors.reduce((a, b) => a + b, 0) / lastErrors.length;

    // 振荡次数
    let oscillations = 0;
    for (let i = 2; i < temps.length; i++) {
      const d1 = temps[i - 1] - temps[i - 2];
      const d2 = temps[i] - temps[i - 1];
      if (d1 * d2 < 0) oscillations++;
    }

    return {
      riseTime,
      overshootPercent: overshoot,
      steadyStateError,
      oscillations,
      finalTemp: temps[temps.length - 1],
      maxTemp,
      minTemp: Math.min(...temps),
    };
  }
}

interface StepResult {
  step: number;
  setpoint: number;
  currentTemp: number;
  error: number;
  output: number;
  p: number;
  i: number;
  d: number;
  Kp: number;
  Ki: number;
  Kd: number;
  disturbance: number;
}

interface SimulationResults {
  results: StepResult[];
  summary: SimulationSummary;
}

interface SimulationSummary {
  riseTime: number;
  overshootPercent: number;
  steadyStateError: number;
  oscillations: number;
  finalTemp: number;
  maxTemp: number;
  minTemp: number;
}

// 运行实验
const controller = new AdaptiveTemperatureController();

console.log('=== 实验1: 基本 PID 控制 ===');
const result1 = controller.simulate({
  setpoint: 25,
  initialTemp: 20,
  steps: 100,
});
console.log('上升时间:', result1.summary.riseTime);
console.log('过冲:', result1.summary.overshootPercent.toFixed(2) + '%');
console.log('稳态误差:', result1.summary.steadyStateError.toFixed(3));
console.log('振荡次数:', result1.summary.oscillations);

console.log('\n=== 实验2: 带扰动的自适应控制 ===');
const controller2 = new AdaptiveTemperatureController();
const result2 = controller2.simulate({
  setpoint: 25,
  initialTemp: 20,
  steps: 100,
  disturbanceAt: 50,
  disturbanceMagnitude: -10,  // 第50步突然降温
});
console.log('扰动后恢复时间:', result2.summary.riseTime);
console.log('最终温度:', result2.summary.finalTemp.toFixed(2));
console.log('最终 Kp:', result2.results[99].Kp.toFixed(3));
console.log('最终 Ki:', result2.results[99].Ki.toFixed(3));
console.log('最终 Kd:', result2.results[99].Kd.toFixed(3));

3.10 本章小结

3.10.1 核心概念回顾

概念 定义 映射论对应 Agent 应用
控制论 研究控制和通信的科学 自指映射的科学 Agent 的架构基础
负反馈 减小偏差的反馈 压缩映射 质量维持、错误修正
正反馈 放大偏差的反馈 发散映射 增长飞轮、创新涌现
前馈控制 预测性干预 预映射 风险预判、预防性调整
PID 控制 比例+积分+微分控制 三阶映射组合 综合行为调节
Ashby 定律 控制器多样性≥被控对象多样性 映射域覆盖定理 能力上限分析
自适应控制 参数自动更新的控制 元映射 自我进化
元控制 对控制系统的控制 映射的映射 系统健康管理

3.10.2 关键公式

1. Agent 自主性 = 必要多样性 × 控制带宽 × 适应速率

2. PID 控制律:u(t) = Kp×e(t) + Ki×∫e(τ)dτ + Kd×de(t)/dt

3. Ashby 必要多样性定律:
   Variety(控制器) ≥ Variety(被控系统)

4. 压缩映射定理(负反馈收敛性保证):
   d(T(x), T(y)) ≤ k × d(x, y), 0 ≤ k < 1
   → 存在唯一不动点 x*,迭代收敛

5. 控制带宽 = 1 / (感知延迟 + 决策延迟 + 执行延迟)

3.10.3 最佳实践

  1. 设计闭合回路:每个输出都应有反馈通道,永远不要设计开环系统
  2. 组合前馈与反馈:前馈处理已知问题,反馈处理未知问题
  3. 管理正反馈:强化建设性飞轮,抑制破坏性螺旋,设置安全边界
  4. 投资多样性:工具链和策略库是扩展控制能力的最有效杠杆
  5. 自适应要带约束:自我更新是必要的,但必须在安全边界内进行
  6. 五层控制架构:确保你的 Agent 在每个层次都有适当的控制机制

3.10.4 下一章预告

第4章:信息映射——信息论、熵与认知编码工程

控制系统需要信息来运作——传感器提供状态信息,控制器处理信息,执行器将信息转化为行动。但信息的本质是什么?如何量化信息?如何高效编码?信息论(香农理论)将回答这些问题,并揭示 AI Agent 的核心挑战:在有限的上下文窗口中,如何编码最大量的有用信息?这是映射论的"编码学"——如何设计最优的映射表示。


本章参考文献

  1. Wiener, N. (1948). Cybernetics: Or Control and Communication in the Animal and the Machine. MIT Press.
  2. Ashby, W. R. (1956). An Introduction to Cybernetics. Chapman & Hall.
  3. Åström, K. J. & Wittenmark, B. (2008). Adaptive Control (2nd ed.). Dover.
  4. Ogata, K. (2010). Modern Control Engineering (5th ed.). Prentice Hall.
  5. Franklin, G. F., Powell, J. D., & Emami-Naeini, A. (2019). Feedback Control of Dynamic Systems (8th ed.). Pearson.
  6. Beer, S. (1972). Brain of the Firm. Allen Lane.
  7. Conant, R. C. & Ashby, W. R. (1970). “Every good regulator of a system must be a model of that system.” International Journal of Systems Science.
  8. Powers, W. T. (1973). Behavior: The Control of Perception. Aldine.
Logo

Agent 垂直技术社区,欢迎活跃、内容共建。

更多推荐