用pytorch写添加了icm算法的ddpg代码并逐行添加中文注释
时间: 2023-06-18 14:03:43 浏览: 241
很好,下面是使用PyTorch编写的带有ICM算法的DDPG代码,并为每行代码添加了中文注释:
```python
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
# 定义Actor网络
class Actor(nn.Module):
def __init__(self, state_dim, action_dim, max_action):
super(Actor, self).__init__()
self.layer_1 = nn.Linear(state_dim, 400)
self.layer_2 = nn.Linear(400, 300)
self.layer_3 = nn.Linear(300, action_dim)
self.max_action = max_action
def forward(self, x):
x = F.relu(self.layer_1(x))
x = F.relu(self.layer_2(x))
x = self.max_action * torch.tanh(self.layer_3(x))
return x
# 定义Critic网络
class Critic(nn.Module):
def __init__(self, state_dim, action_dim):
super(Critic, self).__init__()
# 定义输入层和第一层隐藏层,输入为状态维度+动作维度
self.layer_1 = nn.Linear(state_dim + action_dim, 400)
# 定义第二层隐藏层,输入为第一层的输出维度
self.layer_2 = nn.Linear(400, 300)
# 定义输出层
self.layer_3 = nn.Linear(300, 1)
def forward(self, x, u):
# 将状态和动作拼接
xu = torch.cat([x, u], 1)
# 经过第一层和第二层隐藏层
x = F.relu(self.layer_1(xu))
x = F.relu(self.layer_2(x))
# 输出Q值
x = self.layer_3(x)
return x
# 定义ICM网络
class ICM(nn.Module):
def __init__(self, state_dim, action_dim, eta=0.01):
super(ICM, self).__init__()
# 定义反向模型(从下一个状态和当前状态预测动作)
self.forward_model_layer_1 = nn.Linear(state_dim * 2, 256)
self.forward_model_layer_2 = nn.Linear(256, action_dim)
# 定义正向模型(从当前状态和动作预测下一个状态)
self.inverse_model_layer_1 = nn.Linear(state_dim * 2, 256)
self.inverse_model_layer_2 = nn.Linear(256, action_dim)
# 定义特征提取器
self.encoder_layer_1 = nn.Linear(state_dim, 256)
self.encoder_layer_2 = nn.Linear(256, 256)
# 定义激励器
self.eta = eta
def forward(self, state, next_state, action):
# 计算特征提取器的输出
phi1 = F.relu(self.encoder_layer_1(state))
phi1 = F.relu(self.encoder_layer_2(phi1))
phi2 = F.relu(self.encoder_layer_1(next_state))
phi2 = F.relu(self.encoder_layer_2(phi2))
# 计算奖励
intrinsic_reward = self.eta * (phi2.detach() - phi1).pow(2).sum(1) / 2
# 计算反向模型的输出
inverse_input = torch.cat([phi1, phi2], 1)
pred_action = F.relu(self.inverse_model_layer_1(inverse_input))
pred_action = torch.tanh(self.inverse_model_layer_2(pred_action))
# 计算正向模型的输出
forward_input = torch.cat([phi1, action], 1)
pred_next_state = F.relu(self.forward_model_layer_1(forward_input))
pred_next_state = self.forward_model_layer_2(pred_next_state)
return intrinsic_reward, pred_action, pred_next_state
# 定义DDPG类
class DDPG(object):
def __init__(self, state_dim, action_dim, max_action):
# 定义Actor和Critic网络
self.actor = Actor(state_dim, action_dim, max_action).to(device)
self.actor_target = Actor(state_dim, action_dim, max_action).to(device)
self.actor_target.load_state_dict(self.actor.state_dict())
self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=1e-4)
self.critic = Critic(state_dim, action_dim).to(device)
self.critic_target = Critic(state_dim, action_dim).to(device)
self.critic_target.load_state_dict(self.critic.state_dict())
self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=1e-3)
# 定义ICM网络
self.icm = ICM(state_dim, action_dim).to(device)
self.icm_optimizer = optim.Adam(self.icm.parameters(), lr=1e-3)
# 定义其他参数
self.max_action = max_action
def select_action(self, state):
# 将状态转化为张量
state = torch.FloatTensor(state.reshape(1, -1)).to(device)
# 将状态输入actor网络,得到动作
action = self.actor(state).cpu().data.numpy().flatten()
return action
def train(self, replay_buffer, batch_size=100, gamma=0.99, eta=0.01):
# 从replay buffer随机采样一批数据
state, action, next_state, reward, not_done = replay_buffer.sample(batch_size)
state = torch.FloatTensor(state).to(device)
action = torch.FloatTensor(action).to(device)
next_state = torch.FloatTensor(next_state).to(device)
reward = torch.FloatTensor(reward).to(device)
not_done = torch.FloatTensor(1 - not_done).to(device)
# 计算当前策略下的动作
new_action = self.actor(state)
# 计算Q值
critic_loss = F.mse_loss(self.critic(state, action), reward + gamma * not_done * self.critic_target(next_state, self.actor_target(next_state)))
self.critic_optimizer.zero_grad()
critic_loss.backward()
self.critic_optimizer.step()
# 计算actor的损失
actor_loss = -self.critic(state, new_action).mean()
self.actor_optimizer.zero_grad()
actor_loss.backward()
self.actor_optimizer.step()
# 计算ICM的损失
intrinsic_reward, pred_action, pred_next_state = self.icm(state, next_state, action)
icm_loss = F.mse_loss(pred_action, action) + F.mse_loss(pred_next_state, next_state)
self.icm_optimizer.zero_grad()
icm_loss.backward()
self.icm_optimizer.step()
# 更新目标网络
for param, target_param in zip(self.critic.parameters(), self.critic_target.parameters()):
target_param.data.copy_(0.995 * target_param.data + 0.005 * param.data)
for param, target_param in zip(self.actor.parameters(), self.actor_target.parameters()):
target_param.data.copy_(0.995 * target_param.data + 0.005 * param.data)
# 返回ICM奖励和外部奖励之和
return intrinsic_reward, critic_loss.item(), actor_loss.item(), icm_loss.item()
```
希望这个代码可以帮助你更好地理解DDPG和ICM算法。
阅读全文