用Python实战拆解马尔科夫决策过程:从OpenAI Gym到贝尔曼方程

当第一次翻开强化学习教材时,那些充满数学符号的MDP五元组定义总让人望而生畏。S、A、P、R、γ这些字母背后,到底隐藏着怎样的智能决策奥秘?与其在概率公式中迷失,不如打开Python笔记本,让CartPole的平衡杆为我们演绎最生动的马尔科夫决策课。

1. 从游戏厅到代码实验室:MDP的具象化理解

2004年,剑桥大学的研究人员在《Nature》发表了一项有趣发现:人类学习新技能时,大脑中基底神经节的激活模式与马尔科夫决策模型的预测高度吻合。这解释了为什么我们教孩子骑自行车时,不会讲解微分方程,而是让他们直接感受平衡——实践中的反馈循环,正是MDP最自然的表达形式。

OpenAI Gym中的经典控制问题CartPole,完美复现了这种学习场景。想象一根倒立摆杆通过活动关节连接在小车上,你的任务是左右移动小车保持杆子竖直。这个看似简单的游戏,包含了MDP所有核心要素:

import gym
env = gym.make('CartPole-v1')
observation = env.reset()

# 典型观察值示例
print(observation)  # 输出类似:[cart位置, cart速度, 杆角度, 杆角速度]

在这个4维状态空间中,每个数字都对应着物理世界的某个可观测特征。当杆子向右倾斜时(状态s),我们推车向右(动作a),环境会返回新状态s'和即时奖励r。Gym环境已经帮我们封装好了状态转移概率P——虽然看不见具体的概率矩阵,但每次env.step(action)的调用,都是在按这个隐藏规则演化。

MDP五元组在CartPole中的实际对应

理论概念 代码实现 可视化线索
状态(S) observation数组 屏幕像素或数值显示
动作(A) env.action_space.sample() 小车左右移动
转移(P) env.step()的内部逻辑 杆子运动轨迹的连续性
奖励(R) 每步+1,直到回合结束 界面顶端的分数累计
折扣(γ) 自定义(通常0.95-0.99) 长期策略的稳定性体现

提示:在Jupyter中运行 env.action_space env.observation_space 可以查看动作和状态的合法范围,这是构建智能体的重要约束条件。

2. 解剖Gym环境:状态转移的微观视角

让我们深入Gym环境的源码层面,看看MDP如何被翻译成代码逻辑。以CartPole为例,其动力学模型实际上由以下几个物理方程控制:

杆子中心x坐标 = cart_position + pole_length * sin(angle)
水平受力 = force * cos(angle)
角加速度 = (重力 * sin(angle) + cos(angle) * (-force - pole_mass * length * angular_velocity² * sin(angle))) / (length * (4/3 - pole_mass * cos²(angle) / total_mass))

这些方程被离散化后,形成了我们调用的 step() 函数。虽然看不到显式的概率转移矩阵,但随机性通过初始状态分布和物理模拟的浮点运算误差自然引入。要验证马尔科夫性质,可以记录连续两帧的状态变化:

def check_markov_property(env):
    state1 = env.reset()
    action = env.action_space.sample()
    state2, _, _, _ = env.step(action)
    print(f"原始状态: {state1}")
    print(f"执行动作{action}后状态: {state2}")
    
    # 从相同state1重复执行相同action
    env.reset()
    env.state = state1  # 强制设置相同初始状态
    new_state2, _, _, _ = env.step(action)
    print(f"重新执行后的状态: {new_state2}")
    print(f"状态差异: {np.sum(np.abs(state2 - new_state2))}")

运行这个函数会发现,即使从完全相同的state1出发,执行相同action得到的state2也可能有微小差异——这正是环境随机性的体现。但在浮点精度范围内,我们可以认为P(s'|s,a)是确定的。

3. 价值函数的代码诠释

贝尔曼方程常常是理论学习的绊脚石,但当我们用NumPy数组来实现时,抽象概念突然变得触手可及。假设我们已有一个随机策略π,可以这样估算状态价值V(s):

def estimate_state_value(env, policy, state, gamma=0.99, n_simulations=100):
    total_return = 0
    for _ in range(n_simulations):
        env.reset()
        env.state = state  # 设置特定初始状态
        episode_return = 0
        discount = 1
        done = False
        
        while not done:
            action = policy(state)
            state, reward, done, _ = env.step(action)
            episode_return += reward * discount
            discount *= gamma
            
        total_return += episode_return
    
    return total_return / n_simulations

# 随机策略示例
def random_policy(observation):
    return env.action_space.sample()

# 评估特定状态价值
sample_state = np.array([0.1, 0, 0.05, 0])
print(f"状态价值估算: {estimate_state_value(env, random_policy, sample_state)}")

这个蒙特卡洛评估过程虽然简单,却直观展示了V(s)的本质——从该状态出发,按照策略π能获得的期望累计回报。当我们在CartPole的不同状态调用这个函数时,会发现靠近中心位置的状态价值明显更高,这符合物理直觉。

更高效的是动态规划方法,我们可以用表格表示离散化后的价值函数:

# 状态离散化(简化示例)
def discretize_state(obs):
    cart_pos, cart_v, pole_ang, pole_v = obs
    pos_bin = np.digitize(cart_pos, bins=np.linspace(-2.4, 2.4, 10))
    ang_bin = np.digitize(pole_ang, bins=np.linspace(-0.2, 0.2, 10))
    return (pos_bin, ang_bin)  # 简化版离散状态

# 初始化价值表
state_space_size = (10, 10)
V_table = np.zeros(state_space_size)

# 策略评估迭代
for _ in range(100):
    new_V = np.zeros_like(V_table)
    for i in range(state_space_size[0]):
        for j in range(state_space_size[1]):
            # 模拟所有可能动作(这里简化处理)
            total = 0
            for action in [0, 1]:
                env.reset()
                # 设置到离散状态对应的典型连续状态
                env.state = [
                    np.linspace(-2.4, 2.4, 10)[i],
                    0,  # 速度设为0简化计算
                    np.linspace(-0.2, 0.2, 10)[j],
                    0
                ]
                next_state, reward, done, _ = env.step(action)
                next_discrete = discretize_state(next_state)
                total += 0.5 * (reward + gamma * V_table[next_discrete])
            new_V[i,j] = total
    V_table = new_V

虽然这个离散化方法非常粗糙,但已经能显示出价值函数的整体轮廓。在完整实现中,我们会使用神经网络作为函数逼近器来处理连续状态空间。

4. 策略迭代:从价值到行动的完整闭环

理解了价值评估后,策略改进就水到渠成。Greedy策略改进的代码实现出奇简单:

def policy_improvement(env, V, gamma=0.99):
    """根据价值函数生成改进后的策略"""
    policy = {}
    # 遍历离散状态空间
    for i in range(state_space_size[0]):
        for j in range(state_space_size[1]):
            state_value = -float('inf')
            best_action = None
            # 测试所有可能动作
            for action in [0, 1]:
                env.reset()
                env.state = [
                    np.linspace(-2.4, 2.4, 10)[i],
                    0,
                    np.linspace(-0.2, 0.2, 10)[j],
                    0
                ]
                next_state, reward, done, _ = env.step(action)
                next_discrete = discretize_state(next_state)
                current_value = reward + gamma * V[next_discrete]
                if current_value > state_value:
                    state_value = current_value
                    best_action = action
            policy[(i,j)] = best_action
    return policy

将策略评估和改进交替进行,就形成了完整的策略迭代算法。虽然CartPole问题简单到可以用表格法解决,但同样的逻辑适用于Atari游戏等复杂环境——只是把离散状态换成神经网络提取的特征,把表格V函数换成深度Q网络。

在实现过程中,有几个常见陷阱需要注意:

  1. 折扣因子选择 :γ接近1时学习缓慢但策略长远,γ较小时快速收敛但可能短视
  2. 探索-利用平衡 :完全贪婪策略容易陷入局部最优,需要ε-greedy等机制
  3. 状态表示 :原始观测可能包含冗余信息,适当的特征工程能加速学习
# 完整策略迭代框架
V = np.zeros(state_space_size)
policy = lambda s: env.action_space.sample()  # 初始随机策略

for iteration in range(100):
    # 策略评估
    V = evaluate_policy(env, policy, V, gamma)
    # 策略改进
    policy = policy_improvement(env, V, gamma)
    
    # 测试当前策略性能
    total_reward = test_policy(env, policy)
    print(f"Iter {iteration}, 平均奖励: {total_reward}")
    if total_reward >= 195:  # CartPole的解决标准
        print("问题已解决!")
        break

当看到自己编写的策略从随机摇摆到稳稳立住杆子时,那些抽象的贝尔曼方程突然有了生命。这种通过编码获得的理解,远比死记硬背五元组定义来得深刻。

Logo

Agent 垂直技术社区,欢迎活跃、内容共建。

更多推荐