ppo FrozenLake
时间: 2025-01-02 14:31:26 浏览: 8
### PPO算法在FrozenLake环境中的应用
#### 环境介绍
FrozenLake 是一个经典的强化学习环境,目标是在不掉入冰洞的情况下找到通往宝藏的路径。该环境中存在滑动效应,增加了任务难度。
#### PPO算法概述
近端策略优化 (Proximal Policy Optimization, PPO) 是一种高效的Actor-Critic方法,在保持性能的同时简化了训练过程[^1]。PPO通过裁剪概率比率来防止更新步幅过大,从而提高了模型稳定性和收敛速度。
#### 实现细节
以下是使用Python和PyTorch框架实现PPO算法解决FrozenLake问题的具体代码:
```python
import gymnasium as gym
import torch
import torch.nn as nn
import numpy as np
class ActorCritic(nn.Module):
def __init__(self, state_dim, action_dim):
super(ActorCritic, self).__init__()
self.actor = nn.Sequential(
nn.Linear(state_dim, 64),
nn.Tanh(),
nn.Linear(64, 64),
nn.Tanh(),
nn.Linear(64, action_dim),
nn.Softmax(dim=-1)
)
self.critic = nn.Sequential(
nn.Linear(state_dim, 64),
nn.Tanh(),
nn.Linear(64, 64),
nn.Tanh(),
nn.Linear(64, 1)
)
def forward(self, x):
value = self.critic(x)
probs = self.actor(x)
dist = torch.distributions.Categorical(probs=probs)
return dist, value.flatten()
def train(env_name='FrozenLake-v1', max_episodes=5000, update_timestep=2000, K_epochs=80, eps_clip=0.2, gamma=0.99, lr=0.0007):
env = gym.make(env_name, render_mode="ansi", map_name="4x4", is_slippery=True)
state_dim = env.observation_space.n
action_dim = env.action_space.n
policy = ActorCritic(state_dim, action_dim).float()
optimizer = torch.optim.Adam(policy.parameters(), lr=lr)
memory_states = []
memory_actions = []
memory_rewards = []
time_step = 0
for episode in range(max_episodes):
state = env.reset()[0]
while True:
time_step += 1
# Select action according to probability distribution π(a|s;θ)
state_tensor = torch.tensor([state], dtype=torch.float32)
dist, _ = policy(state_tensor)
action = dist.sample().item()
next_state, reward, done, _, info = env.step(action)
# Save experience tuple into replay buffer
memory_states.append(state)
memory_actions.append(action)
memory_rewards.append(reward)
if time_step % update_timestep == 0 or done:
optimize_policy(memory_states, memory_actions, memory_rewards, policy, optimizer, K_epochs, eps_clip, gamma)
# Clear old experiences after updating
del memory_states[:]
del memory_actions[:]
del memory_rewards[:]
state = next_state
if done:
break
if episode % 100 == 0:
print(f'Episode [{episode}/{max_episodes}]')
env.close()
def optimize_policy(states, actions, rewards, policy, optimizer, K_epochs, eps_clip, gamma):
discounted_reward = calculate_discounted_rewards(rewards, gamma)
states = torch.tensor(states, dtype=torch.float32)
actions = torch.tensor(actions, dtype=torch.int64)
discounted_reward = torch.tensor(discounted_reward, dtype=torch.float32)
advantages = compute_advantages(policy, states, discounted_reward)
old_log_probs = evaluate_old_action_probabilities(policy, states, actions)
for _ in range(K_epochs):
new_dist, values = policy(states)
log_probs = new_dist.log_prob(actions)
ratios = torch.exp(log_probs - old_log_probs.detach())
surr1 = ratios * advantages
surr2 = torch.clamp(ratios, 1-eps_clip, 1+eps_clip) * advantages
loss = -torch.min(surr1, surr2) + 0.5*(discounted_reward-values)**2 - 0.01*new_dist.entropy()
optimizer.zero_grad()
loss.mean().backward()
optimizer.step()
def calculate_discounted_rewards(rewards, gamma):
discounted_rewards = []
running_add = 0
for r in reversed(rewards):
running_add = r + gamma * running_add
discounted_rewards.insert(0, running_add)
return normalized(np.array(discounted_rewards))
def compute_advantages(policy, states, returns):
_, values = policy(states)
advantages = returns - values.squeeze(-1)
return (advantages - advantages.mean()) / (advantages.std() + 1e-10)
def evaluate_old_action_probabilities(policy, states, actions):
with torch.no_grad():
dist, _ = policy(states)
log_probs = dist.log_prob(actions)
return log_probs
if __name__ == "__main__":
train()
```
此代码实现了完整的PPO流程,包括但不限于:
- 定义Actor-Critic网络结构;
- 记录状态、动作及奖励信息;
- 使用折扣因子计算累积回报并标准化处理;
- 更新策略参数以最大化预期收益;
阅读全文