前言

过去一周(本文最初写于25年6.21,但Update by 25年9月),我司「七月在线」长沙分部的具身团队在机械臂和人形上并行发力

  • 关于机械臂
    一方面,在IL和VLA的路线下,先后采集了抓杯子、桌面收纳、插入耳机孔的数据,然后云端训-本地5090推理
    二方面,在RL的路线下,通过复现UC伯克利的HIL-SERL先后在仿真、真机上抓方块
  • 关于人形
    一者,对于manipulation,本周暂先实现了通过VR遥操宇树G1采集数据
    二者,对于locomotion,我司长沙具身团队的其中一个小组准备复现ASAP

而为了更好的复现ASAP,本文先来解读下ASAP的代码

根据ASAP的GitHub代码仓库可知,ASAP构建在HumanoidVerse多模拟器框架之上——之后,发现很多运控模型都基于该框架之上,比如本博客中解读过的KungfuBot、FALCON、SoFTA,其

  1. 支持多种物理模拟器:IsaacGym、IsaacSim、Genesis
  2. 实现人形机器人的运动追踪和全身控制
  3. 提供从仿真到现实的转移能力
  4. 模块化设计,支持算法、环境、模拟器的分离
┌─────────────────────────────────────────────────┐
│                配置系统 (config/)                │
│  base.yaml → algo/ → env/ → robot/ → rewards/   │
└─────────────┬───────────────────────────────────┘
              │ Hydra instantiate()
              ▼
┌─────────────────────────────────────────────────┐
│            环境层 (envs/)                       │
│  BaseTask ← locomotion/motion_tracking          │
└─────────────┬───────────────────────────────────┘
              │ composition
              ▼
┌─────────────────────────────────────────────────┐
│          模拟器层 (simulator/)                   │
│  BaseSimulator ← IsaacGym/IsaacSim/Genesis      │
└─────────────┬───────────────────────────────────┘
              │ used by
              ▼
┌─────────────────────────────────────────────────┐
│            算法层 (agents/)                     │
│  BaseAlgo ← PPO/DAGGER/DeltaA/ForceControl     │
└─────────────────────────────────────────────────┘

运行时调用顺序如下

1. train_agent.py::main()
2. ├── config加载 (Hydra)
3. ├── 模拟器选择 (IsaacGym/IsaacSim/Genesis)
4. ├── env实例化 (BaseTask子类)
5. │   ├── simulator实例化 (BaseSimulator子类)
6. │   ├── robot配置加载
7. │   └── terrain/obs/rewards设置
8. ├── algo实例化 (BaseAlgo子类)
9. │   ├── actor/critic网络创建
10. │   └── 数据缓冲区初始化
11. └── algo.learn() 训练循环

第一部分 humanoidverse/agents:模块组件、回调系统、PPO实现

包含

  • train_agent.py: 训练智能体的主入口
    train_agent.py:
    ├── hydra → 配置加载
    ├── utils.config_utils → 配置预处理
    ├── BaseTask → 环境创建
    ├── BaseAlgo → 算法创建
    └── wandb → 日志记录
  • eval_agent.py: 评估智能体的主入口
    提供实时键盘控制接口
    支持力控制调试功能
    可视化评估结果

agents模块的完整目录如下

agents/
├── base_algo/              # 算法基类定义
│   └── base_algo.py       # BaseAlgo抽象基类
├── modules/                # 神经网络核心组件
│   ├── ppo_modules.py     # PPO Actor/Critic网络
│   ├── modules.py         # 通用神经网络模块
│   ├── encoder_modules.py # 编码器模块
│   ├── world_models.py    # 世界模型实现
│   └── data_utils.py      # 数据存储和GAE计算
├── callbacks/              # 实时分析和可视化系统
│   ├── base_callback.py   # 回调基类
│   ├── analysis_plot_*.py # 各类分析绘图回调
│   └── analysis_plot_template.html # 实时可视化模板
├── ppo/                   # 标准PPO算法
├── dagger/                # 模仿学习算法
├── decouple/              # 解耦合控制算法
├── delta_a/               # ASAP核心创新:增量学习
├── delta_dynamics/        # Delta动力学模型
├── force_control/         # 力感知控制算法
├── mppi/                  # 模型预测路径积分控制
└── ppo_locomanip.py       # 运动操作专用PPO

1.1 模块组件agents/modules/

1.1.1 ppo_modules.py: PPO算法的Actor-Critic网络

这段代码实现了PPO(Proximal Policy Optimization)算法中的核心模块,包括策略网络(Actor)、价值网络(Critic)以及一个特殊的Actor变体

  • PPOActor(策略网络):根据观测生成动作分布,用于采样动作
    主要成员变量:
    `self.actor_module`:底层神经网络(由BaseModule构建)
    `self.std`:动作噪声的可学习参数(标准差)
    `self.distribution`:当前动作的概率分布(Normal分布)

    具体实现了以下主要方法:
    update_distribution(actor_obs):用当前观测生成动作分布(均值为网络输出,方差为std)
        # 根据观测更新动作分布
        def update_distribution(self, actor_obs):  
            # 通过网络获得动作均值
            mean = self.actor(actor_obs)  
    
            # 构造正态分布
            self.distribution = Normal(mean, mean*0. + self.std)  
    act(actor_obs, **kwargs):采样动作(先更新分布,再采样)
        # 根据观测采样动作
        def act(self, actor_obs, **kwargs):  
            # 更新分布
            self.update_distribution(actor_obs)  
    
            # 从分布中采样动作
            return self.distribution.sample()  
    get_actions_log_prob(actions):计算给定动作的对数概率
        # 计算给定动作的对数概率
        def get_actions_log_prob(self, actions):  
            return self.distribution.log_prob(actions).sum(dim=-1)
    act_inference(actor_obs):推理时直接输出均值(确定性动作)action_mean`/`action_std`/`entropy:分别返回分布的均值、标准差、熵
        # 推理时直接输出动作均值
        def act_inference(self, actor_obs):  
            actions_mean = self.actor(actor_obs)
            return actions_mean
    to_cpu():将模型和参数转到CPU
  • PPOCritic(价值网络):根据观测输出状态价值V(s)
    主要成员变量:`self.critic_module`:底层神经网络(由BaseModule构建)

    实现的主要方法有:
    `evaluate(critic_obs, **kwargs)`:输入观测,输出状态价值
        # 输入观测,输出状态价值
        def evaluate(self, critic_obs, **kwargs):  
            value = self.critic(critic_obs)
            return value
    `critic`属性:返回底层网络

1.1.2 modules.py:通用神经网络模块

1.1.3 encoder_modules.py:编码器模块

1.1.4 world_models.py:世界模型实现

1.1.5 data_utils.py: 数据处理工具,包含经验回放缓冲区

// 待更

1.2 回调系统agents/callbacks/

涉及一系列分析和可视化——支持力估计、运动分析、开环跟踪等多种分析

  • analysis_plot_.py: 各种分析图表生成
  • base_callback.py: 回调基类

1.3 模仿学习agents/dagger:DAgger 算法实现dagger.py

本文件实现了 DAgger(Dataset Aggregation)模仿学习算法,是 ASAP 框架中用于“模仿学习/专家演示学习”的核心类。它让智能体在自己的策略下探索环境,并在这些状态下查询专家动作,逐步聚合数据,提升策略泛化能力

整个算法的流程为

  1. 初始化:加载学生和专家网络,准备数据存储
  2. 训练循环:
    用学生策略探索环境,记录每一步的状态和专家动作
    用专家动作作为标签,训练学生策略模仿专家
    记录和可视化训练过程
  3. 评估:支持回调式评估和推理

DAgger(BaseAlgo)继承自 BaseAlgo,拥有统一的 setup、learn、evaluate_policy 等接口,其关键成员变量包括

  • self.actor:待训练的策略网络
  • self.gt_actor:专家(教师)策略网络
  • self.optimizer:优化器
  • self.storage:数据存储(经验回放)
  • self.writer:Tensorboard 日志
  • self.eval_callbacks:评估回调
  • self.episode_env_tensors:环境统计

1.3.1 setup:模仿学习中"学生-教师"双网络架构的实现

该方法主要

  1. 网络初始化:创建学生网络(需要训练的actor)和教师网络(提供专家示范的gt_actor)
  2. 优化器设置:为学生网络配置Adam优化器
  3. 存储初始化:设置用于存储轨迹数据的RolloutStorage,注册各种数据类型的存储键
  4. 统计跟踪:初始化用于跟踪训练过程中各种统计信息的变量和缓冲区
    即初始化 RolloutStorage,注册观测、动作、专家动作等数据字段

具体而言

  1. 方法开始时通过日志记录初始化过程,然后创建两个关键的神经网络组件
    第一个是学习者网络 (`self.actor`),它是需要被训练的策略网络,使用当前配置中的网络架构参数进行初始化
    def setup(self):
        logger.info("Setting up Dagger")  # 记录开始设置DAgger算法的日志信息
        
        # 从配置中提取演员网络的配置字典
        actor_network_dict = dict(actor=self.config.network_dict['actor'])
    第二个是专家网络 (`self.gt_actor`),它代表"ground truth"或教师策略,不仅使用专门的网络配置,还通过 `teacher_actor_network_load_dict` 加载预训练的权重
        # 从配置中提取教师演员网络的配置字典
        teacher_actor_network_dict = dict(actor=self.config.network_dict['teacher_actor'])
    
        # 从配置中提取教师演员网络的加载配置字典
        teacher_actor_network_load_dict = dict(actor=self.config.network_load_dict['teacher_actor'])
    最终,分别创建学生actor、教师gt_actor
       # 创建学生演员网络(PPO演员,固定标准差)
        self.actor = PPOActorFixSigma(self.algo_obs_dim_dict, actor_network_dict, {}, self.num_act)
    
        # 创建教师演员网络(ground truth演员,用于提供专家示范)
        self.gt_actor = PPOActorFixSigma(self.algo_obs_dim_dict, teacher_actor_network_dict, teacher_actor_network_load_dict, self.num_act)
    这种设计使得专家网络能够在训练过程中为学习者提供高质量的示范动作
  2. 网络初始化完成后,代码将两个网络都移动到指定的计算设备上,确保训练过程中的计算效率
        # 将学生演员网络移动到指定设备(CPU或GPU)
        self.actor.to(self.device)
    
        # 将教师演员网络移动到指定设备(CPU或GPU)
        self.gt_actor.to(self.device)
  3. 接下来,为学习者网络创建 Adam 优化器,这是训练过程中唯一需要更新参数的网络,而专家网络的参数保持固定
        # 创建Adam优化器来训练学生演员网络
        self.optimizer = optim.Adam(self.actor.parameters(), lr=self.learning_rate)
        logger.info(f"Setting up Storage")  # 记录开始设置存储的日志信息
  4. 然后,方法初始化了一个专门的存储系统 `RolloutStorage`,用于管理训练数据的收集和批处理
    存储系统的设计特别体现了 DAgger 算法的数据需求

    首先,它为所有观测变量分配存储空间
        # 创建回合存储对象,用于存储轨迹数据
        self.storage = RolloutStorage(self.env.num_envs, self.num_steps_per_env)
    
        ## Register obs keys  # 注册观测键
        # 遍历算法观测维度字典,为每个观测类型注册存储键
        for obs_key, obs_dim in self.algo_obs_dim_dict.items():
            self.storage.register_key(obs_key, shape=(obs_dim,), dtype=torch.float)
    然后注册两套完整的动作相关数据:
    \rightarrow  一套用于学习者的动作、动作概率、动作均值和方差
        # 注册学生演员的动作存储键
        self.storage.register_key('actions', shape=(self.num_act,), dtype=torch.float)
        # 注册学生演员的动作对数概率存储键
        self.storage.register_key('actions_log_prob', shape=(1,), dtype=torch.float)
    
        # 注册学生演员的动作均值存储键
        self.storage.register_key('action_mean', shape=(self.num_act,), dtype=torch.float)
        # 注册学生演员的动作标准差存储键
        self.storage.register_key('action_sigma', shape=(self.num_act,), dtype=torch.float)
    \rightarrow  另一套用于专家的对应数据(以 `gt_` 前缀标识)
        # 注册教师演员的动作存储键(ground truth actions)
        self.storage.register_key('gt_actions', shape=(self.num_act,), dtype=torch.float)
        # 注册教师演员的动作对数概率存储键
        self.storage.register_key('gt_actions_log_prob', shape=(1,), dtype=torch.float)
    
        # 注册教师演员的动作均值存储键
        self.storage.register_key('gt_action_mean', shape=(self.num_act,), dtype=torch.float)
        # 注册教师演员的动作标准差存储键
        self.storage.register_key('gt_action_sigma', shape=(self.num_act,), dtype=torch.float)
    这种并行存储机制使得算法能够在每个时间步同时记录学习者的行为和专家在相同状态下的示范行为,为后续的监督学习提供配对的训练数据
  5. 最后,方法初始化了训练过程中的统计和监控组件
    这包括用于记录回合信息的列表,以及使用双端队列实现的滑动窗口统计,可以跟踪最近100个回合的奖励和长度
        # 初始化回合信息列表,用于存储每个回合的统计信息
        self.ep_infos = []
    
        # 创建奖励缓冲区,最多存储100个回合的奖励(用于计算平均奖励)
        self.rewbuffer = deque(maxlen=100)
    
        # 创建长度缓冲区,最多存储100个回合的长度(用于计算平均长度)
        self.lenbuffer = deque(maxlen=100)
    同时,为每个并行环境分配了当前奖励总和和回合长度的张量,支持多环境并行训练的实时统计
        # 初始化当前奖励累计张量,为每个环境跟踪当前回合的奖励总和
        self.cur_reward_sum = torch.zeros(self.env.num_envs, dtype=torch.float, device=self.device)
    
        # 初始化当前回合长度张量,为每个环境跟踪当前回合的步数
        self.cur_episode_length = torch.zeros(self.env.num_envs, dtype=torch.float, device=self.device)

1.3.2 learn:训练主循环,分为数据采集和训练两个阶段

每次迭代:

  1. _rollout_step:用当前策略与环境交互,收集状态、动作、专家动作等数据
  2. _training_step:用行为克隆损失(BC Loss)训练 actor,使其输出尽量接近专家动作
  3. 日志记录与模型保存

具体而言

  1. 首先,如果设置了 `init_at_random_ep_len`,会将环境的 episode 长度缓冲区随机初始化,以增加训练多样性
    def learn(self):
        # 如果需要在随机的episode长度初始化,则对环境的episode长度缓冲区进行随机赋值
        if self.init_at_random_ep_len:
            self.env.episode_length_buf = torch.randint_like(self.env.episode_length_buf, high=int(self.env.max_episode_length))
    接着,环境被重置,获得初始观测,并将所有观测张量移动到指定设备(如 GPU),确保后续计算高效
        # 重置所有环境,获取初始观测
        obs_dict = self.env.reset_all()
    
        # 将所有观测数据转移到指定设备(如GPU或CPU)
        for obs_key in obs_dict.keys():
            obs_dict[obs_key] = obs_dict[obs_key].to(self.device)
  2. 随后,模型被切换到训练模式
        # 设置模型为训练模式
        self._train_mode()
    主循环根据 `num_learning_iterations` 控制迭代次数,每次迭代都记录起始时间
        # 获取本次学习的迭代次数
        num_learning_iterations = self.num_learning_iterations
    
        # 计算总的迭代次数
        tot_iter = self.current_learning_iteration + num_learning_iterations
        
        # 遍历每一次学习迭代
        # for it in track(range(self.current_learning_iteration, tot_iter), description="Learning Iterations"):
        for it in range(self.current_learning_iteration, tot_iter):
            # 记录本次迭代的起始时间
            self.start_time = time.time()
    每一轮,首先通过 `_rollout_step` 与环境交互,采集一批新数据,并返回最新的观测
            # 进行一次rollout,返回新的观测(必须更新,否则一直用初始观测)
            obs_dict = self._rollout_step(obs_dict)
    然后调用 `_training_step`,对采集到的数据进行行为克隆(BC)训练,得到平均损失
            # 执行一次训练步骤,返回平均BC损失
            mean_bc_loss = self._training_step()
    完成训练后,记录本轮训练所用时间
            # 记录本次迭代的结束时间
            self.stop_time = time.time()
    
            # 计算本次学习所用时间
            self.learn_time = self.stop_time - self.start_time
  3. 每轮结束后,会整理日志信息(如当前迭代数、采集和训练时间、平均损失、奖励等)
    并调用 `_post_epoch_logging` 进行可视化和记录
    如果当前迭代数满足保存间隔条件,还会保存模型检查点
    每轮结束后,episode 相关信息会被清空,为下次迭代做准备
  4. 循环结束后,更新当前的学习迭代计数,并再次保存最终模型

整体来看,该方法实现了 DAgger 算法的标准训练流程,包括数据采集、模型训练、日志记录和模型保存等关键步骤

1.3.3 _rollout_step:actor 采样动作与环境交互,gt_actor计算对应的专家动作

  1. 用 actor 采样动作与环境交互,同时用 gt_actor 计算同一状态下的专家动作
  2. 将观测、动作、专家动作等信息存入 storage
  3. 统计奖励、episode 长度等信息

具体而言

  1. 它首先在 `torch.inference_mode()` 下运行,确保在采样阶段不会计算梯度,提高效率
    def _rollout_step(self, obs_dict):
        # 在推理模式下,不计算梯度,提高效率
        with torch.inference_mode():
    每次循环代表一个环境步,循环次数由 `self.num_steps_per_env` 决定
            # 按每个环境步数进行循环
            for i in range(self.num_steps_per_env):
  2. 在每一步中,方法会通过 `self.actor`(当前策略)和 `self.gt_actor`(专家策略或教师策略)分别计算动作及其均值
                # 用当前actor网络根据观测计算动作,并分离计算图
                actions = self.actor.act(obs_dict).detach()
    
                # 获取actor输出的动作均值,并分离计算图
                action_mean = self.actor.action_mean.detach()
                
                # 用教师actor(专家策略)计算动作,并分离计算图
                gt_actions = self.gt_actor.act(obs_dict).detach()
    
                # 获取教师actor输出的动作均值,并分离计算图
                gt_action_mean = self.gt_actor.action_mean.detach()
    并断言这些张量的形状符合预期

    随后,这些动作及相关统计信息会被打包进 `policy_state_dict`,并与观测数据一起存入 `self.storage`,为后续训练做准备
                # 将当前观测数据存入存储器
                for obs_key in obs_dict.keys():
                    self.storage.update_key(obs_key, obs_dict[obs_key])
    
                # 将策略相关数据存入存储器
                for obs_ in policy_state_dict.keys():
                    self.storage.update_key(obs_, policy_state_dict[obs_])
  3. 接下来,方法使用当前策略的动作与环境交互,获得新的观测、奖励、done 标志和额外信息
                # 从策略字典中取出动作,用于与环境交互
                actions = policy_state_dict["actions"]
    
                # 用动作与环境交互,获得新的观测、奖励、done标志和信息
                obs_dict, rewards, dones, infos = self.env.step(actions)
    所有张量会被转移到指定设备(如 GPU),并将日志信息添加到`self.episode_env_tensors`
    步数计数器递增,并调用 `_process_env_step` 处理环境步(如重置策略状态)
  4. 如果启用了日志目录,还会进行奖励和回合长度的统计与记录。每当环境回合结束(done),就会将累计奖励和长度存入缓冲区,并重置相应计数
  5. 最后,方法记录采样时间,为后续训练阶段做准备,并返回最新的观测字典
        # 返回最新的观测字典
        return obs_dict

1.3.4 _training_step:计算行为克隆损失BC Loss

  1. 从 storage 生成小批量数据
  2. 计算行为克隆损失(BC Loss):均方误差,目标是让 actor 输出接近专家动作
  3. 反向传播并优化 actor 网络

具体而言

  1. 首先,方法初始化 `mean_bc_loss` 用于累计每个 mini-batch 的 BC 损失
    # 定义训练步骤函数
    def _training_step(self):  
        # 初始化平均行为克隆损失为0
        mean_bc_loss = 0.0  
    它通过 `self.storage.mini_batch_generator` 生成多个 mini-batch,每个 batch 包含采集到的观测和动作数据,并支持多轮训练(由 `num_learning_epochs` 控制)
        # 生成小批量数据生成器
        generator = self.storage.mini_batch_generator(self.num_mini_batches, self.num_learning_epochs)  
  2. 在每个 mini-batch 上,方法会将所有数据移动到指定设备(如 GPU),确保后续计算高效
        # 遍历每个小批量数据
        for obs_dict in generator:  
            # 遍历观测字典的每个键
            for obs_key in obs_dict.keys():  
                # 将数据移动到指定设备(如GPU)
                obs_dict[obs_key] = obs_dict[obs_key].to(self.device)  
    然后,从 batch 中取出专家动作 `gt_actions_batch`
            # 获取教师动作批次
            gt_actions_batch = obs_dict['gt_actions']  
    并用当前策略 `self.actor` 对 batch 观测进行前向推理,得到当前策略的动作均值 `mu_batch`
            # 前向传播,计算当前策略的动作均值
            self.actor.act(obs_dict,)  
    
            # 获取当前策略输出的动作均值
            mu_batch = self.actor.action_mean
  3. BC 损失的计算方式是当前策略动作均值与专家动作之间的均方误差(MSE),即 `torch.square(mu_batch - gt_actions_batch).mean()`
            # 计算行为克隆损失(均方误差)
            bc_loss = torch.square(mu_batch - gt_actions_batch).mean()  
    损失乘以配置中的系数后
             # 乘以损失系数,得到最终损失
            loss = self.config.bc_loss_coef * bc_loss 
    进行反向传播和梯度裁剪(防止梯度爆炸),最后更新策略网络参数
            # Gradient step
            # 梯度清零
            self.optimizer.zero_grad()  
    
            # 反向传播计算梯度
            loss.backward()  
            
            # 梯度裁剪,防止梯度爆炸
            nn.utils.clip_grad_norm_(self.actor.parameters(), self.max_grad_norm)  
    
            # 优化器更新参数
            self.optimizer.step()
    每个 batch 的 BC 损失会累加到 `mean_bc_loss`
            # 累加当前批次的bc损失
            mean_bc_loss += bc_loss.item()
  4. 所有 mini-batch 训练完成后,方法会对累计损失取平均,并清空存储器,为下一个训练周期做准备
         # 计算总的更新次数
        num_updates = self.num_learning_epochs * self.num_mini_batches 
    
        # 计算平均bc损失
        mean_bc_loss /= num_updates  
    
        # 清空存储器
        self.storage.clear()
  5. 最终返回平均 BC 损失,便于日志记录和监控训练过程
        # 返回平均bc损失
        return mean_bc_loss  

1.3.5 日志与评估

  1. _post_epoch_logging:详细记录训练过程中的损失、奖励、性能等信息,支持 Tensorboard 和 Rich 控制台美化
  2. evaluate_policy:支持评估模式,结合回调机制灵活扩展评估逻辑

1.4 PPO模块agents/ppo:标准实现与运动操作专用PPO的实现

1.4.1 PPO的标准实现:ppo/ppo.py

这段代码实现了 PPO(Proximal Policy Optimization)强化学习算法的主类。它负责整个算法的生命周期管理,包括初始化、模型和优化器的构建、数据存储、训练循环、模型保存与加载、评估流程等

1.4.1.1 __init__
  • 在 `__init__` 构造函数中,PPO 类会初始化环境、配置、日志记录器(用于 TensorBoard)、以及一些用于统计和追踪训练过程的变量。它还会初始化奖励和回合长度的缓冲区,用于后续的统计分析
  • `_init_config` 方法会从配置对象中提取所有关键超参数,包括环境数量、观测和动作维度、学习率、损失系数、折扣因子等,确保算法的灵活性和可配置性
1.4.1.2 setup方法:调用 `_setup_models_and_optimizer` 和 `_setup_storage`

`setup` 方法会调用 `_setup_models_and_optimizer` 和 `_setup_storage`,分别初始化 actor/critic 网络及其优化器,以及 rollout 数据存储结构

  • 模型采用 PPOActor 和 PPOCritic 两个神经网络,优化器使用 Adam
  • 数据存储结构会注册所有需要追踪的张量,包括观测、动作、奖励、优势等
1.4.1.3 训练主循环在 `learn` 方法

简言之,训练主循环在 `learn` 方法中实现。每次迭代会先进行 rollout(与环境交互收集数据),然后进行训练步骤(mini-batch SGD),并记录训练过程中的各种统计信息。训练过程中会定期保存模型和优化器状态,便于断点恢复

具体而言

  1. 首先,如果配置了 `init_at_random_ep_len`,会将环境的 episode 长度缓冲区随机初始化,这有助于增加训练的多样性
    def learn(self):
        # 如果需要在随机的episode长度初始化,则对环境的episode长度缓冲区进行随机赋值
        if self.init_at_random_ep_len:
            self.env.episode_length_buf = torch.randint_like(self.env.episode_length_buf, high=int(self.env.max_episode_length))
  2. 接着,环境会被重置,获得初始观测
        # 重置所有环境,获取初始观测
        obs_dict = self.env.reset_all()
    并将所有观测张量移动到指定的设备(如 GPU 或 CPU),以确保后续计算的高效性
        # 将每个观测都转移到指定的设备上(如GPU或CPU)
        for obs_key in obs_dict.keys():
            obs_dict[obs_key] = obs_dict[obs_key].to(self.device)
  3. 随后,模型被切换到训练模式(`self._train_mode()`),以启用如 dropout、BN 等训练相关的行为
        # 设置模型为训练模式
        self._train_mode()
    主循环根据当前迭代次数和总训练迭代数进行
        # 获取本次学习的迭代次数
        num_learning_iterations = self.num_learning_iterations
    
        # 计算总的迭代次数(当前迭代数 + 本次要迭代的次数)
        tot_iter = self.current_learning_iteration + num_learning_iterations
    每次迭代包括以下步骤:
    1. 记录当前时间,便于后续统计训练耗时
        # 不使用track进度条,因为会和motion loading bar冲突
        # for it in track(range(self.current_learning_iteration, tot_iter), description="Learning Iterations"):
        for it in range(self.current_learning_iteration, tot_iter):
            # 记录本次迭代的起始时间
            self.start_time = time.time()
    2. 调用 `_rollout_step` 与环境交互,收集一批数据(观测、动作、奖励等),并返回最新的观测
            # 进行一次rollout,返回新的观测(必须更新,否则一直用初始观测)
            obs_dict = self._rollout_step(obs_dict)
    3. 调用 `_training_step`,对采集到的数据进行策略和价值网络的训练
            # 进行一次训练步骤,返回损失字典
            loss_dict = self._training_step()
    4. 记录本次迭代的耗时,并将相关统计信息(如损失、采集时间、奖励等)打包到 `log_dict`,用于日志记录和可视化
            # 记录本次迭代的结束时间
            self.stop_time = time.time()
    
            # 计算本次学习所用时间
            self.learn_time = self.stop_time - self.start_time
    
            # 日志信息字典,包含当前迭代、损失、采集和学习时间、奖励等
            log_dict = {
                'it': it,
                'loss_dict': loss_dict,
                'collection_time': self.collection_time,
                'learn_time': self.learn_time,
                'ep_infos': self.ep_infos,
                'rewbuffer': self.rewbuffer,
                'lenbuffer': self.lenbuffer,
                'num_learning_iterations': num_learning_iterations
            }
    
            # 执行日志记录
            self._post_epoch_logging(log_dict)
    5. 每隔 `save_interval` 次迭代保存一次模型和优化器状态,便于断点恢复
            # 每隔save_interval步保存一次模型
            if it % self.save_interval == 0:
                self.save(os.path.join(self.log_dir, 'model_{}.pt'.format(it)))
    6. 清空本轮的 episode 信息缓存,为下轮训练做准备
            # 清空本轮收集的episode信息
            self.ep_infos.clear()
  4. 循环结束后,更新当前的训练迭代计数,并再次保存一次模型,确保所有训练进度都被持久化
        # 累加当前学习迭代次数
        self.current_learning_iteration += num_learning_iterations
    
        # 保存最终模型
        self.save(os.path.join(self.log_dir, 'model_{}.pt'.format(self.current_learning_iteration)))
1.4.1.4 rollout (采样和数据收集)的实现:_rollout_step与_actor_rollout_step

rollout 过程由 `_rollout_step` 实现——_rollout_step又会调用_actor_rollout_step,负责与环境交互、收集数据、处理奖励和终止信息,并将数据写入存储

具体而言,

  1. 它首先在 `torch.inference_mode()` 上下文中运行,这样可以避免梯度计算,提高推理效率。方法内部通过循环 `self.num_steps_per_env` 次,每次循环代表智能体在环境中的一步
    def _rollout_step(self, obs_dict):
        # 在推理模式下,不计算梯度,提升效率
        with torch.inference_mode():
            # 遍历每个环境步数
            for i in range(self.num_steps_per_env):
                # 初始化策略状态字典
                policy_state_dict = {}
  2. 在每一步中
    首先通过 `_actor_rollout_step` 计算当前观测下的动作及相关策略信息
                # 通过actor获取动作及相关策略信息,存入policy_state_dict
                policy_state_dict = self._actor_rollout_step(obs_dict, policy_state_dict)
    并通过 `_critic_eval_step` 评估当前状态的价值
                # 通过critic评估当前状态的价值
                values = self._critic_eval_step(obs_dict).detach()
                # 将价值信息加入策略状态字典
                policy_state_dict["values"] = values
    所有与观测和策略相关的数据都会被存储到 `self.storage`,以便后续训练使用
                # 将观测信息存入rollout存储
                for obs_key in obs_dict.keys():
                    self.storage.update_key(obs_key, obs_dict[obs_key])
    
                # 将策略相关信息存入rollout存储
                for obs_ in policy_state_dict.keys():
                    self.storage.update_key(obs_, policy_state_dict[obs_])
    随后,智能体根据动作与环境交互,获得新的观测、奖励、终止信号和额外信息
                # 获取当前动作
                actions = policy_state_dict["actions"]
    
                # 构造actor_state字典
                actor_state = {}
                actor_state["actions"] = actions
    
                # 与环境交互,获得新的观测、奖励、done标志和额外信息
                obs_dict, rewards, dones, infos = self.env.step(actor_state)
    并将这些数据转移到设备上(如 GPU)
                # 将新的观测转移到指定设备
                for obs_key in obs_dict.keys():
                    obs_dict[obs_key] = obs_dict[obs_key].to(self.device)
    
                # 奖励和done也转移到设备
                rewards, dones = rewards.to(self.device), dones.to(self.device)
  3. 奖励会根据是否有超时(`time_outs`)进行修正,确保奖励的准确性
    每一步的数据(奖励、终止信号等)都会被存储,并通过 `self._process_env_step` 处理智能体和价值网络的重置
    若启用了日志记录,还会统计每个 episode 的奖励和长度,并在 episode 结束时重置相关统计量
  4. 循环结束后,方法会记录采集数据所用的时间,并调用 `_compute_returns` 计算每一步的回报(returns)和优势(advantages)
            # 为训练准备数据,计算returns和advantages
            returns, advantages = self._compute_returns(
                last_obs_dict=obs_dict,
                policy_state_dict=dict(
                    values=self.storage.query_key('values'), 
                    dones=self.storage.query_key('dones'), 
                    rewards=self.storage.query_key('rewards'))
            )
    这些数据随后批量存储到 `self.storage`,为后续的策略优化做准备
  5. 最终,方法返回最新的观测字典
        # 返回最新的观测字典
        return obs_dict
1.4.1.5  GAE(广义优势估计):在 `_compute_returns` 方法中实现

优势和回报的计算采用 GAE(广义优势估计),在 `_compute_returns` 方法中实现,能够有效降低策略梯度的方差

  1. 首先,方法会用当前环境的最后一个观测(`last_obs_dict`)通过价值网络(critic)计算最后状态的价值 `last_values`
    def _compute_returns(self, last_obs_dict, policy_state_dict):
        """
        计算每一步的回报(returns)和优势(advantages),用于PPO训练
        使用广义优势估计(GAE)来降低策略梯度的方差。
        """
        # 用critic网络评估最后一个观测的状态价值,并去除梯度
        last_values = self.critic.evaluate(last_obs_dict["critic_obs"]).detach()
        # 初始化优势为0
        advantage = 0
    
        # 从策略状态字典中获取values、dones和rewards
        values = policy_state_dict['values']
        dones = policy_state_dict['dones']
        rewards = policy_state_dict['rewards']
    并将所有相关张量(values、dones、rewards、last_values)移动到指定设备(如 GPU)
        # 将所有张量转移到指定设备(如GPU)
        last_values = last_values.to(self.device)
        values = values.to(self.device)
        dones = dones.to(self.device)
        rewards = rewards.to(self.device)
  2. 随后,初始化一个与 values 形状相同的 returns 张量,用于存储每一步的回报
        # 初始化returns张量,形状与values相同
        returns = torch.zeros_like(values)
  3. 为方便大家更好的理解,我直接引用此文《ChatGPT技术原理解析:从RL之PPO算法、RLHF到GPT4、instructGPT》的中的「3.3 针对「对话序列/奖励值序列/values/DSC优势函数/returns」的一个完整示例」内容 给大家解释说明下
    首先,GAE的计算公式为 

    且考虑到有

    故,实际计算的时候,为减少计算量,可以先计算A_7,再分别计算A_6、A_5、A_4、A_3、A_2、A_1

    其次,再回顾一下TD误差的含义:δ_1 = r1 + γV_old(2) - V_old(1)
    比如对于上式:实际获得的即时奖励 r1 加上折扣后的未来奖励预测 γV_old(2),再减去我们原先预测的当前时间步的奖励 V_old(1),这就是后者的预测与前者实际经验之间的差距


    SO,核心计算在一个反向循环中完成(从最后一步到第一步)。对于每一步,方法会判断下一个状态是否为终止状态(`next_is_not_terminal`),并据此计算 TD 残差(delta)

        # 获取步数
        num_steps = returns.shape[0]
    
        # 反向遍历每一步,计算GAE优势和回报
        for step in reversed(range(num_steps)):
            # 最后一步的next_values用last_values,其余用下一步的values
            if step == num_steps - 1:
                next_values = last_values
            else:
                next_values = values[step + 1]
    
            # 判断下一步是否为终止状态
            next_is_not_terminal = 1.0 - dones[step].float()
    
            # 计算TD残差
            delta = rewards[step] + next_is_not_terminal * self.gamma * next_values - values[step]
    优势(advantage)通过递归方式累加,结合了当前步的 TD 残差和未来步的优势,体现了 GAE 的思想
            # 递归计算GAE优势
            advantage = delta + next_is_not_terminal * self.gamma * self.lam * advantage
    每一步的回报则等于当前优势加上当前状态的价值,即,故有
            # 当前步的回报等于优势加上当前价值
            returns[step] = advantage + values[step]
  4. 最后,方法计算优势(returns - values)
        # 计算优势(returns - values),并进行标准化
        advantages = returns - values
    并进行标准化处理(减去均值,除以标准差),以便后续训练时数值更稳定
        advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)
        # 返回回报和标准化后的优势
  5. 最终返回每一步的回报和标准化后的优势。这个过程确保了 PPO 算法在更新策略时能够利用高质量、低方差的优势估计
        return returns, advantages
1.4.1.6 _training_step

训练步骤 `_training_step` 会从存储中生成 mini-batch,依次进行策略和价值网络的更新

1.4.1.7 _update_ppo

PPO 的核心损失计算在 `_update_ppo` 方法中实现,包括代理损失、价值损失和熵损失,并支持自适应 KL 散度调整学习率

1.4.1.8 其他

此外,类还实现了模型的保存与加载、评估流程(包括回调机制)、以及详细的日志记录和可视化

1.4.2 运动操作专用PPO(不训练 仅评估):agents/ppo_locomanip.py

这段代码定义了一个名为 `PPOLocoManip` 的类,是基于PPO(Proximal Policy Optimization)算法的智能体,专门用于评估阶段——即该类不包含训练逻辑,只用于评估,支持多环境并行评估,主要用于“站立”和“行走操控”两种策略的切换与评估

  1. 类初始化(`__init__`)
    参数:
    `env`: 环境对象,通常是一个多智能体或多任务环境
    `config`: 配置对象,包含网络结构、超参数等
    `log_dir`: 日志目录
    `device`: 运行设备(如'cpu'或'cuda')

    主要成员变量:
    `self.actor_obs`: 两种策略的观测键名(站立/行走)
    `self.actor`: 存储两个策略网络(站立和行走)
    `self.eval_callbacks`: 评估回调列表
    `self.episode_env_tensors`: 用于统计评估指标
    还有一些用于记录奖励、步数等的缓存
  2. 配置初始化(`_init_config`)
    从环境配置中读取环境数、观测维度、动作维度等
  3. 网络与优化器初始化(`_setup_models_and_optimizer`)
    分别初始化站立和行走的PPO Actor网络,并放到指定设备上
    `self.actor` 是一个列表,包含两个策略网络
  4. 评估模式切换(`_eval_mode`)
    将两个策略网络都切换到`eval()`模式,关闭dropout等训练特性
  5. 加载模型权重(`load`)
    分别加载站立和行走策略的模型权重
    返回权重文件中的`infos`信息
  6. 环境步进与策略动作(`env_step`, `_actor_act_step`)
    `env_step`: 将下半身动作和参考上半身动作拼接后送入环境,获得新观测、奖励等_actor_act_step`: 根据当前控制模式,选择对应策略网络进行动作推理
  7. 评估流程相关
    evaluate_loco_manip_policy: 评估主循环,反复与环境交互,调用回调,直到外部打断_create_eval_callbacks: 根据配置实例化评估回调
    _pre_evaluate_policy/_post_evaluate_policy: 评估前后调用回调
    _pre_eval_env_step/_post_eval_env_step: 每步环境交互前后调用回调_get_inference_policy: 获取推理用的策略函数(act_inference)
  8. 其他辅助方法
    get_example_obs: 打印并返回一次环境重置后的观测样本
    inference_model属性:返回策略网络列表

第二部分 环境模块humanoidverse/envs

BaseTask (envs/base_task/base_task.py):
├── BaseSimulator → 物理模拟器接口
├── terrain → 地形生成
├── robot配置 → 机器人参数
├── obs配置 → 观察空间定义
└── rewards配置 → 奖励函数设计

具体任务继承关系:
BaseTask
├── LocomotionTask (locomotion/)
├── MotionTrackingTask (motion_tracking/) 
└── LeggedBaseTask (legged_base_task/)

基础任务架构:

  • BaseTask: 所有RL任务的基类,处理模拟器初始化、环境设置、观察空间定义
  • base_task/: 包含基础任务实现和配置

专门任务类型

  • locomotion/: 运动控制任务,实现机器人行走、跑步等基础运动
  • motion_tracking/: 运动追踪任务,实现对人类动作的模仿和跟踪
  • legged_base_task/: 腿式机器人基础任务
  • env_utils/: 环境工具函数,包含地形生成、物理参数设置等

2.1 base_task

2.2 legged_base_task

2.3 locomotion:运动控制任务——实现机器人行走、跑步等基础运动

2.4 motion_tracking:运动追踪任务,RL中实现对人类动作的模仿和跟踪

这个 `LeggedRobotMotionTracking` 类是一个复杂的机器人运动跟踪系统,主要用于强化学习环境中的人形机器人运动仿真和跟踪

这个类继承自 `LeggedRobotBase`,专门处理机器人的运动跟踪任务,包括:运动数据管理、VR遥操作支持、奖励计算、状态重置和观察

2.4.1 核心初始化流程:__init__

2.4.2 关键组件解析:运动库初始化 (`_init_motion_lib`)

这个 `_init_motion_lib` 方法负责初始化机器人的运动库系统,这是整个运动跟踪环境的核心组件。该方法的主要职责是设置运动数据管理、加载参考运动序列,并为多个并行环境准备运动时间采样

  1. 首先,方法设置时间步长配置,将环境的仿真时间步 `self.dt` 赋值给运动配置的 `step_dt` 参数。这确保了运动库与仿真环境使用相同的时间分辨率,保证时间同步的准确性
    def _init_motion_lib(self):
        # 设置机器人运动配置的时间步长为环境的时间步长
        self.config.robot.motion.step_dt = self.dt  
    接着创建 `MotionLibRobot` 实例,这个类继承自 `MotionLibBase`,专门处理人形机器人的运动数据
        # 创建机器人运动库实例,传入运动配置、环境数量和设备
        self._motion_lib = MotionLibRobot(self.config.robot.motion, num_envs=self.num_envs, device=self.device)  
    构造函数传入运动配置、环境数量和计算设备,为并行处理多个仿真环境做准备
  2. 运动加载策略根据当前模式动态调整
    在评估模式下(`self.is_evaluating` 为 True),系统调用 `load_motions(random_sample=False)`,这意味着运动序列将按照确定性顺序加载,确保评估结果的可重现性和一致性
        # 如果当前处于评估模式
        if self.is_evaluating:  
            # 加载运动数据,不进行随机采样(按顺序加载)
            self._motion_lib.load_motions(random_sample=False)  
    在训练模式下,系统使用 `load_motions(random_sample=True)`,启用随机采样策略来增加训练数据的多样性,防止模型过拟合特定的运动序列
        # 如果当前处于训练模式
        else:  
            # 加载运动数据,进行随机采样
            self._motion_lib.load_motions(random_sample=True)  
    `load_motions` 方法的内部实现展现了复杂的数据处理流程
    该方法根据采样策略选择运动序列:
    随机模式下使用多项式采样 `torch.multinomial` 基于预设的采样概率分布选择运动
    确定性模式下使用取余操作 `torch.remainder` 循环遍历所有可用运动

    加载过程中,系统解析运动文件数据,提取关键信息如帧率、帧数、运动长度等,并将所有运动数据连接成大型张量存储在GPU上,包括全局位置、旋转、速度、角速度等多维运动状态
  3. 方法的最后部分处理运动时间的初始化
    调用 `_resample_motion_times(torch.arange(self.num_envs))` 为所有环境重新采样运动开始时间,这个函数会为每个环境设置运动长度和随机的开始时间点(训练模式下)或零开始时间(评估模式下)
        # 重新采样所有环境的运动时间
        res = self._resample_motion_times(torch.arange(self.num_envs))  
  4. 最后,方法保存重要的运动参数:
    `motion_dt` 存储运动的时间间隔
    `motion_start_idx` 初始化为0用于跟踪当前运动序列的起始索引
    `num_motions` 记录可用的独特运动数量
        # 设置运动数据的时间步长
        self.motion_dt = self._motion_lib._motion_dt  
    
        # 初始化运动开始索引为0
        self.motion_start_idx = 0  
    
        # 获取唯一运动的总数量
        self.num_motions = self._motion_lib._num_unique_motions

这种设计允许系统高效地管理大量运动数据,支持并行仿真环境,并提供灵活的训练与评估模式切换。通过批量预加载和GPU存储,系统能够快速访问运动状态,满足实时仿真的性能要求。

2.4.3. 扩展刚体系统 (`_init_motion_extend`)

这个 `_init_motion_extend` 方法负责初始化机器人运动追踪系统中的扩展身体部件。该方法主要用于处理需要额外追踪的虚拟关节或标记点,这些点通常附着在机器人的现有身体部件上

  1. 首先,方法检查配置文件中是否存在 `extend_config` 配置项。如果存在,它会创建三个临时列表来收集扩展配置数据:父身体部件的索引、相对位置和旋转信息
  2. 在配置解析阶段,代码遍历每个扩展配置项,提取关键信息:
    - 通过 `self.simulator._body_list.index()` 查找父身体部件在模拟器中的索引
    - 收集扩展点相对于父部件的位置和旋转数据
    - 将新的关节名称添加到模拟器的身体列表中
  3. 接下来是张量创建和数据转换阶段。代码将收集的数据转换为 PyTorch 张量,并进行必要的格式调整:
    - `extend_body_parent_ids` 存储父身体索引
    - 位置和旋转数据通过 `.repeat(self.num_envs, 1, 1)` 扩展到所有环境
    - 特别注意旋转数据的四元数格式转换:从 `wxyz` 格式转换为 `xyzw` 格式
  4. 最后,方法初始化用于运动追踪的关键张量。这些张量的尺寸都考虑了原始身体数量加上扩展身体数量:
    - `marker_coords` 用于存储标记点坐标
    - `ref_body_pos_extend` 存储参考身体位置
    - `dif_global_body_pos` 用于计算全局位置差异

2.4.4 观察计算_pre_compute_observations_callback:计算参考运动与当前状态的差异

这个 `_pre_compute_observations_callback` 方法是运动追踪系统的核心观测预处理函数,负责计算机器人当前状态与参考运动之间的差异,并准备用于强化学习的观测数据

  • 参考运动状态获取
    方法首先调用父类的预处理函数,然后计算当前时间步的运动时间戳。通过 `self._motion_lib.get_motion_state()` 从运动库中获取参考运动的状态信息,包括身体位置、速度、旋转、角速度以及关节位置和速度。这些参考数据代表了机器人应该达到的目标状态
  • 扩展刚体状态计算
    接下来的四个部分处理扩展刚体的物理状态计算,这是该方法的技术核心:
    1. 位置计算:使用四元数旋转将扩展身体的相对位置转换到全局坐标系。首先将相对位置在父身体坐标系中旋转,然后应用扩展身体自身的旋转,最后加上父身体的全局位置
    2. 旋转计算:通过四元数乘法组合父身体旋转和扩展身体的相对旋转,得到扩展身体的全局旋转
    3. 角速度计算:直接继承父身体的角速度,这是一个简化假设
    4. 线速度计算:这里使用了刚体运动学的核心公式:`v = v_parent + ω × r`,其中角速度与相对位置的叉积计算出由旋转引起的速度贡献
  • 差异计算
    方法计算参考状态与当前模拟状态之间的差异,这些差异是强化学习奖励函数的基础:
    - 位置差异:直接相减
        ## diff compute - kinematic position  
        # 差异计算 - 运动学位置
        # 计算全局刚体位置差异
        self.dif_global_body_pos = ref_body_pos_extend - self._rigid_body_pos_extend  
    - 旋转差异:使用四元数乘法和共轭计算旋转误差
        ## diff compute - kinematic rotation  
        # 差异计算 - 运动学旋转
        # 计算全局刚体旋转差异
        self.dif_global_body_rot = quat_mul(ref_body_rot_extend, quat_conjugate(self._rigid_body_rot_extend, w_last=True), w_last=True)  
    - 速度和角速度差异:直接相减
        ## diff compute - kinematic velocity  
        # 差异计算 - 运动学速度
        # 计算全局刚体速度差异
        self.dif_global_body_vel = ref_body_vel_extend - self._rigid_body_vel_extend  
        
        # 计算全局刚体角速度差异
        self.dif_global_body_ang_vel = ref_body_ang_vel_extend - self._rigid_body_ang_vel_extend
    - 关节角度和速度差异:参考值与当前DOF状态的差异
        ## diff compute - kinematic joint position  
        # 差异计算 - 运动学关节位置
        # 计算关节角度差异
        self.dif_joint_angles = ref_joint_pos - self.simulator.dof_pos  
    
        ## diff compute - kinematic joint velocity  
        # 差异计算 - 运动学关节速度
        # 计算关节速度差异
        self.dif_joint_velocities = ref_joint_vel - self.simulator.dof_vel
  • 坐标系转换和观测准备
    代码执行关键的坐标系转换,将全局坐标系中的差异转换为机器人本地坐标系。这通过计算机器人朝向的逆四元数并应用旋转实现。本地坐标系的观测数据对强化学习更有意义,因为它们与机器人的朝向无关
  • VR三点追踪处理
    方法处理VR控制模式下的三点追踪(通常是两只手和头部)。根据是否启用远程操作控制,选择使用参考运动数据或实时VR标记坐标
  • 运动相位计算——Deepmimic阶段计算
    最后计算运动相位信息,这对于周期性运动(如步行)的追踪很重要。相位通过当前时间与运动总长度的比值计算,并进行边界检查确保数值在有效范围内
        # 获取参考运动长度
        self._ref_motion_length = self._motion_lib.get_motion_length(self.motion_ids)  
    
        # 计算参考运动阶段(0-1之间的比例)
        self._ref_motion_phase = motion_times / self._ref_motion_length  
    
        # 检查运动阶段是否在有效范围内(硬编码1.05,因为+1会超过1)
        if not (torch.all(self._ref_motion_phase >= 0) and torch.all(self._ref_motion_phase <= 1.05)):  
            # 获取最大阶段值
            max_phase = self._ref_motion_phase.max()  
    
        # 在第二维增加一个维度
        self._ref_motion_phase = self._ref_motion_phase.unsqueeze(1)
    
        # 记录运动跟踪信息
        self._log_motion_tracking_info()  

而上面代码中,倒数第一行的_log_motion_tracking_info 用于计算和记录机器人运动追踪过程中的差异指标。该方法属于`LeggedRobotMotionTracking`类,主要目的是监控机器人模拟动作与参考动作之间的偏差程度

  1. 首先,代码从类的实例变量中提取了四种不同的差异数据:上半身位置差异、下半身位置差异、VR三点位置差异以及关节角度差异
    def _log_motion_tracking_info(self):
        # 计算上半身各关节点在全局坐标系下的位置差异
        upper_body_diff = self.dif_global_body_pos[:, self.upper_body_id, :]
    
        # 计算下半身各关节点在全局坐标系下的位置差异
        lower_body_diff = self.dif_global_body_pos[:, self.lower_body_id, :]
    
        # 计算VR三点跟踪点(通常是头部和双手)在全局坐标系下的位置差异
        vr_3point_diff = self.dif_global_body_pos[:, self.motion_tracking_id, :]
    
        # 获取关节角度差异
        joint_pos_diff = self.dif_joint_angles
    这些差异值存储在张量(tensor)中,格式为多维数组,表示不同环境、不同身体部位和不同坐标轴的差异值
  2. 接着,代码计算了每种差异的范数(norm)
    具体来说,它对每个差异张量的最后一个维度(代表xyz坐标)计算欧几里得范数(L2范数),然后取所有值的平均值。这个操作将多维差异数据压缩成单个标量值,表示整体偏差的大小
  3. 最后,这些计算得到的标量差异值被存储到类的`log_dict`字典中,使用描述性的键名,如"upper_body_diff_norm"(上半身差异范数)等。这样做的目的是为了后续可视化、分析或用于优化算法的反馈

总体而言,此方法是动作追踪系统中的一个重要组成部分,它通过量化模拟动作与目标动作之间的差异,为评估系统性能和调试提供了量化指标。这些指标可能会用于计算奖励函数、确定训练进度或者在可视化界面中展示动作匹配程度

2.4.5. 课程学习支持:支持动态调整终止条件

2.4 6. VR遥操作支持:通过ROS2接收VR设备数据

2.4.7. 运动数据保存_post_physics_step:支持保存训练过程中的运动数据

这个函数用于在物理模拟步骤之后收集和保存机器人的运动数据。它的主要功能是在每次物理模拟后跟踪记录机器人的各种状态,并在收集了足够多的数据后将它们保存到文件中

主要流程如下

  1. 首先调用父类方法:`super()._post_physics_step()`,继承父类的后处理行为
  2. 条件检查:只有当`self.save_motion`为真时才进行数据收集
  3. 时间更新:计算当前的运动时间点,根据已经过的帧数(`episode_length_buf`)、时间步长(`dt`)和起始时间
  4. 数据保存条件判断:
    - 当收集的数据量超过配置的阈值(`self.config.save_total_steps`)时,进行数据处理和保存
    - 处理包括将列表数据转换为张量、转置和转换为NumPy数组
    - 跳过前3帧数据(`v[3:]`),可能是为了稳定性考虑
  5. 数据组织
       - 为每个环境创建单独的`motion{i}`键
       - 将所有收集的数据按环境分组
       - 添加帧率(`fps`)信息
  6. 文件保存
    - 使用`joblib.dump`将数据保存到指定路径 
    - 打印彩色的成功消息
    - 保存后立即退出程序(`sys.exit()`)
  7. 数据收集
       - 如果未达到保存条件,则继续收集机器人的各种状态数据:
         - 根部位置(`root_trans`)和旋转(`root_rot`)
         - 根据不同模拟器(isaacgym/isaacsim/genesis)调整旋转四元数格式
         - 转换旋转为轴角表示(`root_rot_vec`)
         - 关节角度(`dof`)
         - 构建姿势轴角表示(`pose_aa`)
         - 收集动作、观察、终止状态、速度等信息
  8. 数据追加:将所有收集的数据添加到相应的保存列表中
  9. 开始保存标记:设置`self.start_save = True`,表示已开始数据收集过程

这个函数在机器人学习或仿真系统中实现了详细的运动数据收集,支持多种模拟器,并在达到预定数量的步骤后保存并退出,有可能是为了收集训练数据或分析机器人的运动表现

// 待更

2.4.8 多个奖励函数,用于评估机器人跟踪参考运动的表现

  • _reward_teleop_body_position_extend(): 身体位置跟踪奖励
  • _reward_teleop_vr_3point(): VR三点跟踪奖励  
  • _reward_teleop_joint_position(): 关节位置跟踪奖励

// 待更

2.5 humanoidverse/envs/env_utils

2.6 新增humanoidverse/envs/delta_a

// 待更

 第四部分 配置系统humanoidverse/config

采用Hydra配置管理,结构化组织

  1. base.yaml/base_eval.yaml: 基础配置
  2. algo/: 算法特定配置
  3. env/: 环境配置
  4. robot/: 机器人配置 (如G1机器人的29自由度配置)
  5. rewards/: 奖励函数配置
  6. obs/: 观察空间配置
  7. terrain/: 地形配置

根据 Hydra 的加载规则,入口脚本里写的是:

  1. @hydra.main(config_path="config", config_name="base", version_base="1.1")
    根据 Hydra 的加载逻辑:@hydra.main(config_path="config", config_name="base", ...) → 默认加载 config/base.yaml
    但 base.yaml 并没有在 defaults 中指定 algo
  2. 所以必须通过 +exp=train_delta_a_open_loop 这个 override,间接指定 algo

所以,ASAP给出的 train 示例 是:

python humanoidverse/train_agent.py \                                                                                   
  +simulator=isaacgym \
  +exp=train_delta_a_open_loop \
  +domain_rand=NO_domain_rand \
  +rewards=motion_tracking/delta_a/reward_delta_a_openloop \
  +robot=g1/g1_29dof_anneal_23dof \
  +terrain=terrain_locomotion_plane \
  +obs=delta_a/open_loop \
  num_envs=5000 \
  project_name=DeltaA_Training \
  experiment_name=openloopDeltaA_training \
  robot.motion.motion_file="<PATH_TO_YOUR_MOTION_FILE_WITH_ACTION_KEYNAME>" \
  env.config.max_episode_length_s=1.0 \
  rewards.reward_scales.penalty_minimal_action_norm=-0.1 \
  +device=cuda:0 \
  env.config.resample_motion_when_training=True \
  env.config.resample_time_interval_s=10000

第三部分 模拟器模块humanoidverse/simulator

支持多种物理引擎:

  1. IsaacGym: NVIDIA的GPU加速物理模拟器
  2. IsaacSim: 基于Omniverse的仿真平台
  3. Genesis: 新兴的高性能物理模拟器
  4. base_simulator/: 模拟器基类,提供统一接口

第五部分 工具模块humanoidverse/utils

通用工具

  1. common.py: 通用函数
  2. math.py: 数学工具函数
  3. torch_utils.py: PyTorch相关工具
  4. config_utils.py: 配置处理工具
  5. motion_lib/: 动作库,存储和处理人类动作数据

第六部分 数据模块humanoidverse/data

  1. motions/: 存储人类动作数据
  2. robots/: 机器人模型文件和配置

第七部分 训练部署humanoidverse/eval_agent.py

如七月姚老师所说,eval_agent.py 是一个非常工程非常重要的脚本,原因在于后续的 Sim2Sim 或 Sim2Real 部署,这个入口脚本正好提供了统一导出的 ONNX/JIT

这时候有两个重要的问题先解释一下:

  1. 为什么需要“统一导出”来打通 Sim2Sim / Sim2Real?
    训练时的策略模型藏在一堆工程依赖里(Hydra 配置、RSL-RL、环境类、观测构造器、归一化器等)

    导出 ONNX/JIT 后,推理侧只需要满足输入张量字典和输出动作张量这件事就够了:
    输入(obs_dict):以键值对的形式传观测(如 observation.state、observation.images.cam_left_high…),每个键对应固定形状/类型(float32, NCHW 等)
    输出(action):策略产生的动作向量(比如关节期望、末端力/位姿参数等,以训练时定义为准)

    只要新环境(IsaacSim 或真实机器人)能按同样规则准备 obs_dict,再把动作解读成控制命令(位置/速度/力矩/末端力控制),策略就能工作
    这就是“统一导出”的意义:训练侧的依赖被“收敛”为一个跨平台的推理文件 + 明确的 I/O 约定
  2. 导出了什么:ONNX vs JIT 有啥差别?
    脚本里两条导出路径:
    1. ONNX(默认开启):
    example_obs = algo.get_example_obs()
    export_policy_as_onnx(algo.inference_model, exported_dir, exported_name, example_obs)
    优点:跨框(TensorRT、ONNX Runtime、OpenVINO 等),利于嵌入非 Python 环境、GPU 加速部署
    注意:导出时会按 example_obs 的键名和形状固化输入签名;历史堆叠维度/图像尺寸/状态维度都被“写死”为允许的动态/静态轴(取决于导出实现)

    2. TorchScript (JIT)(可选开启):  
    优点:保留 PyTorch 语义(在 Python/C++ LibTorch 里推理很顺),对字典输入/控制流更包容
    缺点:跨框能力不如 ONNX,TensorRT 等需要额外路径

// 待更​

Logo

Agent 垂直技术社区,欢迎活跃、内容共建。

更多推荐