AI Agent动态环境适应能力:如何让智能体应对未知场景
AI Agent动态环境适应能力:如何让智能体应对未知场景
1. 标题 (Title)
以下是5个包含核心关键词的吸引人标题选项:
AI Agent动态环境适应:从理论到实战,让智能体在未知场景中乘风破浪突破静态局限:如何构建具有"随机应变"能力的AI Agent?未知场景不再慌:AI Agent环境适应的核心算法与实现指南从强化学习到元学习:打造能快速适应新环境的AI AgentAI Agent进阶之路:动态环境适应的关键技术、实战与最佳实践
2. 引言 (Introduction)
痛点引入 (Hook)
你是否见过这样的场景:一个在实验室训练好的机器人,在干净的模拟环境中能精准地抓取物体,但一放到真实的家居环境里——地板有点滑、桌子高度和训练时不一样、物体形状也变了——它就突然"不知所措";又或者一个自动驾驶模型,在熟悉的城市路段表现完美,但遇到陌生的暴雨天气、临时改道的道路,就容易出现决策失误;甚至是推荐系统,面对新用户的冷启动问题,或者老用户突然变化的兴趣偏好,也常常推荐得"牛头不对马嘴"。
这些问题的核心都指向同一个挑战:我们训练的大多数AI Agent,往往只适用于与训练数据高度相似的"静态环境",一旦环境发生未知的变化,它们的性能就会急剧下降。 但真实世界永远是动态的、充满未知的——天气会变、用户会变、任务会变、甚至物理规则都可能在不同场景下有不同表现。如何让AI Agent像人类一样,具备在未知场景中快速学习、调整策略的能力,已经成为当前AI领域最受关注的研究方向之一。
文章内容概述 (What)
本文将带你系统性地探索AI Agent动态环境适应能力的全貌:从核心概念的定义,到关键技术的解析(包括在线强化学习、元学习、记忆增强网络等),再到手把手的实战实现——我们将用PyTorch和Gymnasium构建一个动态变化的实验环境,分别实现基于在线强化学习、元学习MAML和记忆增强LSTM的适应型Agent,并对比它们的效果。
读者收益 (Why)
读完本文,你将:
- 彻底理解动态环境与AI Agent适应能力的核心定义、分类和评估维度;
- 掌握当前主流的动态适应技术(在线强化学习、元学习、迁移学习、记忆增强网络)的原理、数学模型和适用场景;
- 能够动手实现一个简单的动态环境,并在上面搭建具备适应能力的AI Agent;
- 了解这些技术在真实行业中的应用案例,以及未来的发展趋势。
3. 准备工作 (Prerequisites)
在开始之前,请确保你已经具备以下知识和环境:
技术栈/知识
- 机器学习基础:熟悉监督学习、无监督学习的基本概念,理解损失函数、梯度下降、过拟合/欠拟合等核心概念;
- 深度学习基础:了解神经网络的基本结构(全连接层、卷积层、循环层),掌握PyTorch或TensorFlow的基本使用(本文使用PyTorch);
- 强化学习基础:理解强化学习的核心要素——状态(State)、动作(Action)、奖励(Reward)、策略(Policy)、价值函数(Value Function),了解DQN、PPO等基础强化学习算法的原理。
环境/工具
- Python 3.8+:确保你的Python版本满足要求(建议使用3.9或3.10);
- PyTorch:安装与你的CUDA版本(如有GPU)匹配的PyTorch,可参考PyTorch官网;
- Gymnasium:OpenAI Gym的继任者,用于构建和测试强化学习环境,安装命令:
pip install gymnasium[classic-control]; - 其他依赖库:NumPy(数值计算)、Matplotlib(可视化)、TensorBoard(可选,用于训练日志可视化),安装命令:
pip install numpy matplotlib tensorboard。
4. 核心内容:从概念到实战的完整指南
步骤一:理解动态环境与适应能力的核心概念
在我们开始实现算法之前,首先要搞清楚两个最基本的问题:什么是动态环境? 以及AI Agent的适应能力到底指什么?
4.1.1 核心概念定义
(1)环境的分类:静态 vs 动态
在强化学习和AI Agent的研究中,我们通常根据环境的变化特性,将其分为静态环境和动态环境:
- 静态环境:环境的规则(状态转移函数 P ( s ′ ∣ s , a ) P(s'|s,a) P(s′∣s,a)、奖励函数 R ( s , a , s ′ ) R(s,a,s') R(s,a,s′))在整个Agent的生命周期中保持不变。例如,经典的CartPole环境中,杆子的长度、质量、摩擦力都是固定的——只要Agent学会了在这个固定规则下平衡杆子,它的性能就不会随时间变化。
- 动态环境:环境的规则会随着时间、Agent的行为或其他未知因素发生变化。这种变化可能是离散的(比如突然切换到一个新的任务),也可能是连续的(比如摩擦力随时间逐渐增大);可能是可观察的(比如环境明确告诉Agent"杆子长度变了"),也可能是部分可观察的(Agent只能通过状态变化间接推断规则改变)。
为了更清晰地对比静态环境和动态环境,我们可以用下面的表格展示:
| 维度 | 静态环境 | 动态环境 |
|---|---|---|
| 规则稳定性 | 状态转移函数 P P P 和奖励函数 R R R 固定不变 | P P P 和/或 R R R 随时间/行为变化 |
| 变化类型 | 无 | 离散变化/连续变化 |
| 可观察性 | 通常完全可观察 | 可能完全可观察/部分可观察 |
| Agent需求 | 只需学习一个固定策略 | 需要学习"如何学习",或在线调整策略 |
| 典型例子 | 固定规则的棋盘游戏、静态图像分类 | 自动驾驶(天气/道路变化)、机器人(新环境操作)、推荐系统(用户兴趣变化) |
(2)AI Agent的动态适应能力
AI Agent的动态环境适应能力,简单来说就是:当环境发生未知变化时,Agent能够通过感知环境的变化,快速调整自身的策略或知识,从而保持或恢复良好性能的能力。
为了更系统地描述适应能力,我们可以将其分解为以下几个核心要素:
- 状态感知与变化检测:Agent需要能够从当前的观察中,感知到环境是否发生了变化,以及发生了什么样的变化(如果是部分可观察的,还需要推断变化);
- 知识/策略的调整机制:Agent需要有一套机制来更新自己的策略或知识——可能是在线学习新的策略,可能是从记忆中检索类似的场景,也可能是利用元学习快速适应;
- 记忆与经验复用:对于重复出现的环境变化,Agent需要能够记住之前的适应经验,避免重新学习,从而提高适应速度;
- 探索与利用的平衡:在适应过程中,Agent需要平衡"探索新策略以找到更好的适应方式"和"利用已知策略以保持当前性能"——这比静态环境中的探索利用平衡更复杂,因为环境本身也在变。
4.1.2 概念之间的关系:ER实体关系图
为了更直观地展示动态环境、适应能力、以及相关核心概念之间的关系,我们可以用下面的Mermaid ER图来描述:
这个ER图告诉我们:
- 一个AI Agent具有动态适应能力;
- 适应能力由状态感知、策略调整、记忆复用、探索利用平衡四个核心部分组成;
- AI Agent与动态环境进行交互,其适应能力的作用就是应对动态环境的变化。
4.1.3 适应能力的评估维度
要评估一个Agent的动态适应能力,我们不能只看"最终性能"——因为适应过程本身也很重要。通常我们会从以下几个维度来评估:
- 适应速度:环境变化后,Agent需要多少时间(或多少步交互)才能恢复到可接受的性能水平?适应速度越快越好;
- 适应性能上限:适应完成后,Agent的最终性能能达到多高?这反映了Agent的学习潜力;
- 性能下降幅度:环境刚变化时,Agent的性能会下降多少?下降幅度越小,说明Agent的鲁棒性越好;
- 泛化能力:Agent能否适应从未见过的环境变化类型?泛化能力越强,适应能力越通用;
- 数据效率:适应过程中,Agent需要多少与环境的交互数据才能完成适应?数据效率越高,越适合真实场景(因为真实环境中交互成本可能很高)。
这五个维度之间往往存在权衡——比如,有些Agent适应速度很快,但最终性能上限不高;有些Agent最终性能很好,但需要大量的交互数据。在实际应用中,我们需要根据具体场景选择合适的权衡。
步骤二:核心技术栈解析:让Agent适应动态环境的"武器库"
现在我们已经理解了核心概念,接下来要看看:有哪些技术可以用来构建具备动态适应能力的AI Agent? 这些技术各有优缺点,适用于不同的场景。下面我们将逐一解析。
4.2.1 在线强化学习(Online Reinforcement Learning)
(1)问题背景
在静态环境中,我们通常用离线强化学习(先收集大量数据,再训练策略)或者在线强化学习的固定模式(训练完成后就冻结策略)。但在动态环境中,策略冻结后就无法应对变化——因此我们需要让Agent在与环境交互的过程中持续更新策略,这就是在线强化学习的核心思想。
(2)核心概念
在线强化学习的核心是:Agent在每一步或每几步交互后,就利用新收集到的数据更新自己的策略或价值函数。这样,当环境发生变化时,新的数据会反映环境的变化,Agent的策略也会随之调整。
常见的在线强化学习算法包括:
- 在线DQN(Deep Q-Network):在每一步后更新Q网络,或者使用经验回放池(Experience Replay)持续从池中采样更新;
- 在线PPO(Proximal Policy Optimization):每收集一定数量的轨迹(Trajectory)后,就用这些轨迹更新策略网络;
- 在线策略梯度(Policy Gradient):每一步后直接用梯度上升更新策略。
(3)数学模型:在线DQN的更新规则
我们以在线DQN为例,看看它的数学模型。DQN的核心是学习一个Q函数 Q ( s , a ; θ ) Q(s,a;\theta) Q(s,a;θ),其中 θ \theta θ 是网络的参数, Q ( s , a ; θ ) Q(s,a;\theta) Q(s,a;θ) 表示在状态 s s s 下采取动作 a a a 后能获得的累积奖励的期望。
在静态环境中,DQN的目标是最小化时序差分误差(TD Error):
L ( θ ) = E ( s , a , r , s ′ ) ∼ D [ ( r + γ max a ′ Q ( s ′ , a ′ ; θ − ) − Q ( s , a ; θ ) ) 2 ] L(\theta) = \mathbb{E}_{(s,a,r,s') \sim D} \left[ \left( r + \gamma \max_{a'} Q(s',a';\theta^-) - Q(s,a;\theta) \right)^2 \right] L(θ)=E(s,a,r,s′)∼D[(r+γa′maxQ(s′,a′;θ−)−Q(s,a;θ))2]
其中 D D D 是经验回放池, θ − \theta^- θ− 是目标网络的参数(定期从 θ \theta θ 复制), γ \gamma γ 是折扣因子。
而在在线强化学习中,我们不需要等收集大量数据后再训练——我们可以持续将新的 ( s , a , r , s ′ ) (s,a,r,s') (s,a,r,s′) 加入回放池,并持续从池中采样更新。甚至,如果环境变化很快,我们可以减小回放池的大小,让池子里的数据更多是最近的,从而更快地反映环境的变化。
此外,我们还可以调整学习率:在环境刚变化时,增大学习率以快速适应;在环境稳定后,减小学习率以避免策略波动。
(4)适用场景与局限性
- 适用场景:环境变化比较缓慢,或者Agent与环境的交互成本很低(可以大量收集数据);
- 局限性:适应速度较慢——因为需要收集足够多的新数据才能更新策略;如果环境变化很快,Agent可能还没适应过来,环境又变了;此外,在线学习容易导致"灾难性遗忘"(Catastrophic Forgetting)——Agent适应新环境后,可能会忘记之前在旧环境中学到的知识。
4.2.2 元学习(Meta-Learning):学会"如何学习"
(1)问题背景
在线强化学习的问题是适应速度慢——因为它是从"零"开始学习新环境的。但人类在适应新场景时,往往不需要从零开始:比如你会骑自行车,那么学骑电动车就会很快;你会用Windows,那么学用Mac也只需要很短的时间。这是因为人类掌握了"如何学习"的能力——我们可以利用之前的经验,快速适应新任务。
元学习的核心思想就是:让Agent先在大量类似的任务上进行"元训练"(Meta-Training),学会一套"通用的初始参数"或"通用的学习规则";当遇到新的未知任务(环境变化)时,只需要用少量的数据进行"微调"(Fine-Tuning),就能快速适应。
(2)核心概念:MAML(Model-Agnostic Meta-Learning)
MAML是元学习中最经典、最常用的算法之一——它的名字里的"Model-Agnostic"(模型无关)意味着它可以应用于任何可微分的模型(比如神经网络),无论是用于监督学习、强化学习还是其他任务。
MAML的核心目标是:找到一组初始参数 θ \theta θ,使得对于任何新的任务 T i T_i Ti,只需要用少量的梯度步(比如1步或5步)就能将 θ \theta θ 微调为适合 T i T_i Ti 的参数 θ i \theta_i θi。
(3)数学模型:MAML的目标函数与更新规则
我们以强化学习场景下的MAML为例,看看它的数学推导。
首先,我们定义:
- 元训练任务分布: p ( T ) p(T) p(T)——我们会从这个分布中采样大量的任务用于元训练;
- 对于任务 T T T,它的损失函数是 L T ( θ ) L_T(\theta) LT(θ)——在强化学习中,通常是负的累积奖励(因为我们要最小化损失,也就是最大化奖励);
- 对于任务 T T T,用初始参数 θ \theta θ 进行 k k k 步梯度更新后的参数是:
θ T k = θ − α ∇ θ L T ( θ T k − 1 ) \theta_T^k = \theta - \alpha \nabla_\theta L_T(\theta_T^{k-1}) θTk=θ−α∇θLT(θTk−1)
其中 α \alpha α 是微调阶段的学习率, θ T 0 = θ \theta_T^0 = \theta θT0=θ。
MAML的目标是找到初始参数 θ \theta θ,使得微调后的参数 θ T k \theta_T^k θTk 在任务 T T T 上的损失最小——也就是最小化所有元训练任务上的微调后损失的期望:
min θ E T ∼ p ( T ) [ L T ( θ T k ) ] \min_\theta \mathbb{E}_{T \sim p(T)} \left[ L_T(\theta_T^k) \right] θminET∼p(T)[LT(θTk)]
为了优化这个目标函数,我们需要计算元梯度(Meta-Gradient)——也就是损失对初始参数 θ \theta θ 的梯度,然后用梯度下降更新 θ \theta θ:
θ ← θ − β ∇ θ E T ∼ p ( T ) [ L T ( θ T k ) ] \theta \leftarrow \theta - \beta \nabla_\theta \mathbb{E}_{T \sim p(T)} \left[ L_T(\theta_T^k) \right] θ←θ−β∇θET∼p(T)[LT(θTk)]
其中 β \beta β 是元训练阶段的学习率。
这里的关键是:元梯度需要通过微调的梯度步进行反向传播——也就是说,我们要计算"损失对微调后参数的梯度",再乘以"微调后参数对初始参数的梯度",才能得到元梯度。这就是MAML的"二阶导数"的来源(不过在实际应用中,我们常常可以用一阶近似来简化计算,也就是忽略"微调后参数对初始参数的梯度"中的二阶项,这样计算速度会更快)。
(4)算法流程图:MAML的元训练与测试流程
为了更直观地理解MAML的流程,我们可以用下面的Mermaid流程图来描述:
这个流程图清楚地展示了MAML的两个阶段:
- 元训练阶段:在大量任务上学习通用初始参数 θ ∗ \theta^* θ∗;
- 测试/适应阶段:遇到新任务时,用少量数据微调 θ ∗ \theta^* θ∗,快速得到适应新任务的参数。
(5)适用场景与局限性
- 适用场景:我们可以提前收集大量类似的任务用于元训练;新任务与元训练任务来自同一个分布;需要快速适应(只需要少量数据);
- 局限性:元训练阶段需要大量的任务和计算资源;如果新任务与元训练任务分布差异很大,MAML的效果会下降;对于连续变化的环境(而不是离散的任务切换),MAML的应用相对困难(不过可以结合在线学习)。
4.2.3 记忆增强网络(Memory-Augmented Networks)
(1)问题背景
在线强化学习容易"灾难性遗忘",MAML虽然能快速适应新任务,但如果旧任务再次出现,它可能还需要重新微调——而人类却能记住之前的经验,当旧场景再次出现时,直接复用之前的策略即可。
记忆增强网络的核心思想就是:给Agent增加一个"记忆库"(Memory Bank),让Agent能够记住之前遇到的环境变化和对应的适应策略;当遇到新的环境变化时,先从记忆库中检索类似的场景,如果找到就复用对应的策略,如果没找到就学习新的策略并存入记忆库。
(2)核心概念:常见的记忆增强结构
常见的记忆增强网络包括:
- 循环神经网络(RNN/LSTM/GRU):这是最简单的"记忆"结构——网络内部的隐藏状态(Hidden State)可以记住过去的观察和动作历史。对于短期的、连续的环境变化,LSTM往往能取得不错的效果;
- 神经图灵机(NTM, Neural Turing Machine):NTM有一个外部的可读写的记忆矩阵,网络可以通过"读头"和"写头"来访问记忆矩阵——这比LSTM的隐藏状态更适合存储长期的、大量的经验;
- 记忆库(Memory Bank)+ 检索机制:比如,我们可以将之前遇到的环境的"特征表示"和对应的策略参数存入记忆库;当遇到新环境时,先提取新环境的特征,然后用相似度度量(比如余弦相似度)从记忆库中检索最相似的几个场景,再复用或融合它们的策略。
(3)数学模型:LSTM的记忆机制
我们以LSTM为例,看看它是如何实现记忆的。LSTM的核心是细胞状态(Cell State)——它就像一条"传送带",可以让信息在时间步之间流动,只有少量的线性交互,因此信息很容易保持不变。
LSTM有三个门来控制细胞状态的信息流动:
- 遗忘门(Forget Gate):决定从细胞状态中丢弃什么信息——它读取当前的输入 x t x_t xt 和上一个隐藏状态 h t − 1 h_{t-1} ht−1,输出一个0到1之间的向量 f t f_t ft,1表示"完全保留",0表示"完全丢弃":
f t = σ ( W f ⋅ [ h t − 1 , x t ] + b f ) f_t = \sigma(W_f \cdot [h_{t-1}, x_t] + b_f) ft=σ(Wf⋅[ht−1,xt]+bf) - 输入门(Input Gate):决定将什么新信息存入细胞状态——它包括两部分:一是用sigmoid层决定哪些值需要更新,二是用tanh层创建新的候选值 C ~ t \tilde{C}_t C~t:
i t = σ ( W i ⋅ [ h t − 1 , x t ] + b i ) i_t = \sigma(W_i \cdot [h_{t-1}, x_t] + b_i) it=σ(Wi⋅[ht−1,xt]+bi)
C ~ t = tanh ( W C ⋅ [ h t − 1 , x t ] + b C ) \tilde{C}_t = \tanh(W_C \cdot [h_{t-1}, x_t] + b_C) C~t=tanh(WC⋅[ht−1,xt]+bC)
然后,我们更新细胞状态:
C t = f t ∗ C t − 1 + i t ∗ C ~ t C_t = f_t \ast C_{t-1} + i_t \ast \tilde{C}_t Ct=ft∗Ct−1+it∗C~t
其中 ∗ \ast ∗ 是逐元素乘法。 - 输出门(Output Gate):决定输出什么隐藏状态——首先用sigmoid层决定细胞状态的哪些部分需要输出,然后将细胞状态通过tanh层(得到-1到1之间的值),再乘以输出门的结果:
o t = σ ( W o ⋅ [ h t − 1 , x t ] + b o ) o_t = \sigma(W_o \cdot [h_{t-1}, x_t] + b_o) ot=σ(Wo⋅[ht−1,xt]+bo)
h t = o t ∗ tanh ( C t ) h_t = o_t \ast \tanh(C_t) ht=ot∗tanh(Ct)
在动态环境适应中,我们可以将LSTM的隐藏状态作为策略网络的输入——这样策略网络就能利用过去的观察和动作历史,推断环境的变化,从而调整策略。
(4)适用场景与局限性
- 适用场景:环境变化有一定的重复性(旧场景会再次出现);环境变化是连续的,需要利用过去的历史信息来推断当前状态;
- 局限性:LSTM的隐藏状态容量有限,不适合存储长期的、大量的经验;NTM和记忆库的实现更复杂,计算开销更大;检索机制的相似度度量需要精心设计,否则可能检索到不相关的经验。
4.2.4 迁移学习(Transfer Learning):将旧知识迁移到新环境
(1)问题背景
迁移学习的核心思想是:将在源域(Source Domain)/源任务(Source Task)上学到的知识,迁移到目标域(Target Domain)/目标任务(Target Task)上,从而减少目标任务所需的训练数据和时间。在动态环境适应中,源域可以是旧环境,目标域可以是变化后的新环境。
(2)核心概念:常见的迁移学习方法
在动态环境适应中,常用的迁移学习方法包括:
- 参数迁移:将在旧环境上学到的网络参数,作为新环境训练的初始参数——这其实和MAML的微调阶段有点像,但区别是MAML是在元训练阶段学习初始参数,而参数迁移是直接用旧任务的参数;
- 特征迁移:学习一个共享的特征提取器(Feature Extractor),将源域和目标域的数据映射到同一个特征空间——这样策略网络就可以在这个共享的特征空间上学习,从而适应不同的环境;
- 域适应(Domain Adaptation):如果源域和目标域的特征分布不同,我们可以用域适应的方法(比如对抗训练)来对齐两个域的特征分布——这样在源域上学到的策略也能在目标域上工作。
(3)适用场景与局限性
- 适用场景:旧环境和新环境有一定的相似性(知识可以迁移);
- 局限性:如果旧环境和新环境差异太大,迁移学习可能会导致"负迁移"(Negative Transfer)——也就是旧知识反而会干扰新环境的学习,导致性能下降。
4.2.5 技术对比:什么时候用什么技术?
为了帮助你根据具体场景选择合适的技术,我们用下面的表格对比了上述四种技术的核心特点、适用场景和局限性:
| 技术 | 核心思想 | 适应速度 | 数据效率 | 鲁棒性 | 适用场景 | 局限性 |
|---|---|---|---|---|---|---|
| 在线强化学习 | 持续用新数据更新策略 | 慢 | 低 | 中 | 环境变化慢、交互成本低 | 适应慢、灾难性遗忘 |
| 元学习(MAML) | 学会通用初始参数,微调快速适应 | 快 | 高 | 中 | 有大量元训练任务、新任务与元训练任务同分布 | 元训练成本高、分布外任务效果差 |
| 记忆增强网络 | 存储经验,检索复用 | 快(旧场景) | 高(旧场景) | 高 | 环境变化有重复性、需要历史信息 | LSTM容量有限、复杂记忆结构实现难 |
| 迁移学习 | 将旧知识迁移到新环境 | 中 | 中 | 中 | 新旧环境相似 | 可能负迁移 |
步骤三:搭建实验环境:一个动态变化的CartPole
讲了这么多理论,现在我们要动手实战了!首先,我们需要搭建一个动态变化的实验环境——这样才能测试我们的适应型Agent。
我们选择经典的CartPole环境作为基础,然后对它进行修改,让它变成动态环境。CartPole的任务很简单:在一个小车上固定一根杆子,小车可以左右移动,Agent需要通过控制小车的左右移动,让杆子保持直立不倒。
4.3.1 动态CartPole的设计
我们将经典CartPole修改为动态CartPole-v1,具体变化如下:
- 离散的任务切换:每隔一定的步数(比如500步),环境就会切换到一个新的"任务"——每个任务的杆子长度、杆子质量、小车质量、摩擦力系数都不同;
- 部分可观察的变化:环境不会直接告诉Agent"任务切换了",也不会告诉Agent新的参数——Agent只能通过观察小车的位置、速度、杆子的角度、角速度来推断环境的变化;
- 任务分布:我们定义一个任务分布,元训练阶段从这个分布中采样任务,测试阶段也从这个分布中采样(但测试任务不会在元训练中出现)。
具体来说,每个任务的参数采样范围如下:
- 杆子长度(length):0.3 ~ 0.8(经典CartPole是0.5)
- 杆子质量(masspole):0.05 ~ 0.2(经典是0.1)
- 小车质量(masscart):0.8 ~ 1.5(经典是1.0)
- 摩擦力系数(friction):0.0 ~ 0.1(经典是0.0)
4.3.2 实现动态CartPole环境
Gymnasium提供了非常方便的自定义环境接口——我们只需要继承gymnasium.Env类,实现__init__、reset、step、render(可选)这几个方法即可。
下面是动态CartPole环境的完整代码(我们将其保存为dynamic_cartpole.py):
import gymnasium as gym
from gymnasium import spaces
import numpy as np
from typing import Optional, Tuple
class DynamicCartPoleEnv(gym.Env):
"""
动态变化的CartPole环境:每隔fixed_episode_steps步切换一次任务(环境参数)
任务参数包括:杆子长度、杆子质量、小车质量、摩擦力系数
"""
metadata = {
"render_modes": ["human", "rgb_array"],
"render_fps": 50,
}
def __init__(
self,
render_mode: Optional[str] = None,
fixed_episode_steps: int = 500, # 每隔多少步切换一次任务
# 任务参数的采样范围
length_range: Tuple[float, float] = (0.3, 0.8),
masspole_range: Tuple[float, float] = (0.05, 0.2),
masscart_range: Tuple[float, float] = (0.8, 1.5),
friction_range: Tuple[float, float] = (0.0, 0.1),
):
super().__init__()
# 保存参数
self.render_mode = render_mode
self.fixed_episode_steps = fixed_episode_steps
self.length_range = length_range
self.masspole_range = masspole_range
self.masscart_range = masscart_range
self.friction_range = friction_range
# 经典CartPole的物理常数
self.gravity = 9.8
self.force_mag = 10.0
self.tau = 0.02 # 时间步长
# 动作空间:向左(0)或向右(1)
self.action_space = spaces.Discrete(2)
# 观察空间:[小车位置, 小车速度, 杆子角度, 杆子角速度]
# 范围与经典CartPole一致
high = np.array(
[
4.8,
np.finfo(np.float32).max,
12.0 * np.pi / 180.0,
np.finfo(np.float32).max,
],
dtype=np.float32,
)
self.observation_space = spaces.Box(-high, high, dtype=np.float32)
# 初始化环境参数
self.current_step = 0
self.length = None
self.masspole = None
self.masscart = None
self.friction = None
self.total_mass = None
self.polemass_length = None
# 渲染相关
self.screen = None
self.clock = None
self.isopen = True
self.state = None
def _sample_new_task(self) -> None:
"""采样新的任务参数"""
self.length = np.random.uniform(*self.length_range)
self.masspole = np.random.uniform(*self.masspole_range)
self.masscart = np.random.uniform(*self.masscart_range)
self.friction = np.random.uniform(*self.friction_range)
# 计算派生参数
self.total_mass = self.masspole + self.masscart
self.polemass_length = self.masspole * self.length
print(f"[DynamicCartPole] 任务切换:length={self.length:.2f}, masspole={self.masspole:.2f}, masscart={self.masscart:.2f}, friction={self.friction:.2f}")
def reset(
self,
*,
seed: Optional[int] = None,
options: Optional[dict] = None,
) -> Tuple[np.ndarray, dict]:
"""重置环境:同时重置任务参数和状态"""
super().reset(seed=seed)
# 重置步数
self.current_step = 0
# 采样初始任务参数
self._sample_new_task()
# 重置状态:小车位置和杆子角度都在小范围内随机
self.state = self.np_random.uniform(low=-0.05, high=0.05, size=(4,))
# 渲染
if self.render_mode == "human":
self.render()
return np.array(self.state, dtype=np.float32), {}
def step(self, action: int) -> Tuple[np.ndarray, float, bool, bool, dict]:
"""执行一步动作"""
# 检查任务是否需要切换
if self.current_step % self.fixed_episode_steps == 0 and self.current_step != 0:
self._sample_new_task()
# 获取当前状态
x, x_dot, theta, theta_dot = self.state
force = self.force_mag if action == 1 else -self.force_mag
# 物理计算(加入了摩擦力项,与经典CartPole略有不同)
costheta = np.cos(theta)
sintheta = np.sin(theta)
# 计算摩擦力
friction_cart = self.friction * x_dot
friction_pole = self.friction * theta_dot
# 计算中间量
temp = (force + self.polemass_length * theta_dot**2 * sintheta - friction_cart) / self.total_mass
thetaacc = (self.gravity * sintheta - costheta * temp - friction_pole / self.polemass_length) / (
self.length * (4.0 / 3.0 - self.masspole * costheta**2 / self.total_mass)
)
xacc = temp - self.polemass_length * thetaacc * costheta / self.total_mass
# 更新状态(欧拉法)
x = x + self.tau * x_dot
x_dot = x_dot + self.tau * xacc
theta = theta + self.tau * theta_dot
theta_dot = theta_dot + self.tau * thetaacc
self.state = (x, x_dot, theta, theta_dot)
# 检查是否终止
terminated = bool(
x < -4.8
or x > 4.8
or theta < -12 * np.pi / 180
or theta > 12 * np.pi / 180
)
# 奖励:每一步都给1分,终止后不给
reward = 1.0 if not terminated else 0.0
# 更新步数
self.current_step += 1
# 渲染
if self.render_mode == "human":
self.render()
return np.array(self.state, dtype=np.float32), reward, terminated, False, {}
def render(self) -> Optional[np.ndarray]:
"""渲染环境(直接复用经典CartPole的渲染代码)"""
try:
import pygame
from pygame import gfxdraw
except ImportError:
raise DependencyNotInstalled(
"pygame is not installed, run `pip install gymnasium[classic-control]`"
)
if self.screen is None:
pygame.init()
if self.render_mode == "human":
pygame.display.init()
self.screen = pygame.display.set_mode((600, 400))
else: # rgb_array
self.screen = pygame.Surface((600, 400))
if self.clock is None:
self.clock = pygame.time.Clock()
world_width = 4.8 * 2
scale = 600 / world_width
polewidth = 10.0
polelen = scale * self.length # 使用动态的杆子长度
cartwidth = 50.0
cartheight = 30.0
if self.state is None:
return None
x = self.state
self.surf = pygame.Surface((600, 400))
self.surf.fill((255, 255, 255))
l, r, t, b = -cartwidth / 2, cartwidth / 2, cartheight / 2, -cartheight / 2
axleoffset = cartheight / 4.0
cartx = x[0] * scale + 600 / 2.0 # MIDDLE OF CART
carty = 100 # TOP OF CART
cart_coords = [(l, b), (l, t), (r, t), (r, b)]
cart_coords = [(c[0] + cartx, c[1] + carty) for c in cart_coords]
gfxdraw.aapolygon(self.surf, cart_coords, (0, 0, 0))
gfxdraw.filled_polygon(self.surf, cart_coords, (0, 0, 0))
l, r, t, b = (
-polewidth / 2,
polewidth / 2,
polelen - polewidth / 2,
-polewidth / 2,
)
pole_coords = []
for coord in [(l, b), (l, t), (r, t), (r, b)]:
coord = pygame.math.Vector2(coord).rotate_rad(-x[2])
coord = (coord[0] + cartx, coord[1] + carty + axleoffset)
pole_coords.append(coord)
gfxdraw.aapolygon(self.surf, pole_coords, (202, 152, 101))
gfxdraw.filled_polygon(self.surf, pole_coords, (202, 152, 101))
gfxdraw.aacircle(
self.surf,
int(cartx),
int(carty + axleoffset),
int(polewidth / 2),
(129, 132, 203),
)
gfxdraw.filled_circle(
self.surf,
int(cartx),
int(carty + axleoffset),
int(polewidth / 2),
(129, 132, 203),
)
gfxdraw.hline(self.surf, 0, 600, carty, (0, 0, 0))
self.surf = pygame.transform.flip(self.surf, False, True)
self.screen.blit(self.surf, (0, 0))
if self.render_mode == "human":
pygame.event.pump()
self.clock.tick(self.metadata["render_fps"])
pygame.display.flip()
elif self.render_mode == "rgb_array":
return np.transpose(
np.array(pygame.surfarray.pixels3d(self.screen)), axes=(1, 0, 2)
)
def close(self) -> None:
"""关闭环境"""
if self.screen is not None:
import pygame
pygame.display.quit()
pygame.quit()
self.isopen = False
# 注册环境(可选,方便直接用gym.make调用)
gym.register(
id="DynamicCartPole-v1",
entry_point="dynamic_cartpole:DynamicCartPoleEnv",
max_episode_steps=5000, # 最大步数设大一点,方便测试任务切换
)
4.3.3 测试动态CartPole环境
现在我们来测试一下这个动态环境是否正常工作——我们可以用一个随机策略来和环境交互,看看任务是否会切换:
import gymnasium as gym
import dynamic_cartpole # 导入我们的自定义环境
# 创建环境
env = gym.make("DynamicCartPole-v1", render_mode="human", fixed_episode_steps=200)
# 重置环境
observation, info = env.reset()
# 用随机策略交互
for _ in range(1000):
# 随机采样动作
action = env.action_space.sample()
# 执行动作
observation, reward, terminated, truncated, info = env.step(action)
# 如果终止,重置环境
if terminated or truncated:
observation, info = env.reset()
# 关闭环境
env.close()
运行这段代码,你应该会看到一个CartPole的窗口,每隔200步就会打印出"任务切换"的信息,杆子的长度也会发生变化——这说明我们的动态环境已经搭建成功了!
步骤四:实现基础适应型Agent:在线PPO
现在我们有了动态环境,接下来要实现第一个适应型Agent——在线PPO。PPO是目前最常用的强化学习算法之一,它的性能稳定,实现简单,非常适合作为我们的基础线。
在线PPO和静态环境中的PPO的区别不大——唯一的区别是,我们不会冻结策略网络,而是持续收集数据并更新,并且为了更快地适应环境变化,我们可以减小经验回放池(或者在PPO中是轨迹缓冲区)的大小,让我们的更新更多地使用最近的数据。
4.4.1 在线PPO的实现
下面是在线PPO的完整代码(我们将其保存为online_ppo_agent.py):
import torch
import torch.nn as nn
import torch.optim as optim
from torch.distributions import Categorical
import numpy as np
import matplotlib.pyplot as plt
from typing import List, Tuple
# 定义策略网络和价值网络(共享一部分层)
class ActorCritic(nn.Module):
def __init__(self, state_dim: int, action_dim: int, hidden_dim: int = 64):
super(ActorCritic, self).__init__()
# 共享层
self.shared_layers = nn.Sequential(
nn.Linear(state_dim, hidden_dim),
nn.Tanh(),
nn.Linear(hidden_dim, hidden_dim),
nn.Tanh(),
)
# 策略头(输出动作概率)
self.actor = nn.Linear(hidden_dim, action_dim)
# 价值头(输出状态价值)
self.critic = nn.Linear(hidden_dim, 1)
def get_action(self, state: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
"""
根据状态获取动作、动作的对数概率、状态价值
"""
shared_out = self.shared_layers(state)
# 策略头:输出动作概率
action_logits = self.actor(shared_out)
action_dist = Categorical(logits=action_logits)
action = action_dist.sample()
log_prob = action_dist.log_prob(action)
# 价值头:输出状态价值
value = self.critic(shared_out).squeeze(-1)
return action, log_prob, value
def evaluate(self, states: torch.Tensor, actions: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
"""
评估一批状态和动作:输出对数概率、状态价值、熵
"""
shared_out = self.shared_layers(states)
# 策略头
action_logits = self.actor(shared_out)
action_dist = Categorical(logits=action_logits)
log_probs = action_dist.log_prob(actions)
entropy = action_dist.entropy()
#
更多推荐
所有评论(0)