帮我写一份使用Pytorch 实现DDPG算法的示例代码
时间: 2023-05-29 11:07:14 浏览: 311
以下是一份使用Pytorch实现DDPG算法的示例代码:
```python
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
# 定义神经网络,包括Actor和Critic
class Actor(nn.Module):
def __init__(self, state_dim, action_dim, max_action):
super(Actor, self).__init__()
self.layer1 = nn.Linear(state_dim, 256)
self.layer2 = nn.Linear(256, 256)
self.layer3 = nn.Linear(256, action_dim)
self.max_action = max_action
def forward(self, state):
x = torch.relu(self.layer1(state))
x = torch.relu(self.layer2(x))
x = self.max_action * torch.tanh(self.layer3(x))
return x
class Critic(nn.Module):
def __init__(self, state_dim, action_dim):
super(Critic, self).__init__()
self.layer1 = nn.Linear(state_dim + action_dim, 256)
self.layer2 = nn.Linear(256, 256)
self.layer3 = nn.Linear(256, 1)
def forward(self, state, action):
x = torch.cat([state, action], 1)
x = torch.relu(self.layer1(x))
x = torch.relu(self.layer2(x))
x = self.layer3(x)
return x
# 定义DDPG算法
class DDPG(object):
def __init__(self, state_dim, action_dim, max_action):
self.actor = Actor(state_dim, action_dim, max_action).to(device)
self.actor_target = Actor(state_dim, action_dim, max_action).to(device)
self.actor_target.load_state_dict(self.actor.state_dict())
self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=1e-3)
self.critic = Critic(state_dim, action_dim).to(device)
self.critic_target = Critic(state_dim, action_dim).to(device)
self.critic_target.load_state_dict(self.critic.state_dict())
self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=1e-3)
self.max_action = max_action
def select_action(self, state):
state = torch.FloatTensor(state.reshape(1, -1)).to(device)
return self.actor(state).cpu().data.numpy().flatten()
def train(self, replay_buffer, batch_size=256, discount=0.99, tau=0.005):
# 从缓冲区中采样一批数据
state, action, next_state, reward, not_done = replay_buffer.sample(batch_size)
state = torch.FloatTensor(state).to(device)
action = torch.FloatTensor(action).to(device)
next_state = torch.FloatTensor(next_state).to(device)
reward = torch.FloatTensor(reward).to(device)
not_done = torch.FloatTensor(1 - not_done).to(device)
# 更新critic网络
next_action = self.actor_target(next_state)
target_Q = self.critic_target(next_state, next_action).detach()
target_Q = reward + (discount * not_done * target_Q)
current_Q = self.critic(state, action)
critic_loss = nn.functional.mse_loss(current_Q, target_Q)
self.critic_optimizer.zero_grad()
critic_loss.backward()
self.critic_optimizer.step()
# 更新actor网络
pred_action = self.actor(state)
actor_loss = -self.critic(state, pred_action).mean()
self.actor_optimizer.zero_grad()
actor_loss.backward()
self.actor_optimizer.step()
# 更新目标网络
for param, target_param in zip(self.actor.parameters(), self.actor_target.parameters()):
target_param.data.copy_(tau * param.data + (1 - tau) * target_param.data)
for param, target_param in zip(self.critic.parameters(), self.critic_target.parameters()):
target_param.data.copy_(tau * param.data + (1 - tau) * target_param.data)
# 定义replay buffer
class ReplayBuffer(object):
def __init__(self, state_dim, action_dim, max_size=int(1e6)):
self.max_size = max_size
self.ptr = 0
self.size = 0
self.state = np.zeros((max_size, state_dim))
self.action = np.zeros((max_size, action_dim))
self.next_state = np.zeros((max_size, state_dim))
self.reward = np.zeros((max_size, 1))
self.not_done = np.zeros((max_size, 1))
def add(self, state, action, next_state, reward, done):
self.state[self.ptr] = state
self.action[self.ptr] = action
self.next_state[self.ptr] = next_state
self.reward[self.ptr] = reward
self.not_done[self.ptr] = 1 - done
self.ptr = (self.ptr + 1) % self.max_size
self.size = min(self.size + 1, self.max_size)
def sample(self, batch_size):
idx = np.random.randint(0, self.size, size=batch_size)
return (
self.state[idx],
self.action[idx],
self.next_state[idx],
self.reward[idx],
self.not_done[idx]
)
# 定义训练过程
def train(env, agent, episodes, steps_per_episode, batch_size, discount, tau):
for episode in range(episodes):
state = env.reset()
total_reward = 0
for step in range(steps_per_episode):
action = agent.select_action(state)
next_state, reward, done, _ = env.step(action)
replay_buffer.add(state, action, next_state, reward, done)
if replay_buffer.size > batch_size:
agent.train(replay_buffer, batch_size, discount, tau)
state = next_state
total_reward += reward
if done:
break
print("Episode: {}, total reward: {:.2f}".format(episode, total_reward))
# 测试代码
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
env = gym.make("Pendulum-v0")
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.shape[0]
max_action = float(env.action_space.high[0])
agent = DDPG(state_dim, action_dim, max_action)
replay_buffer = ReplayBuffer(state_dim, action_dim)
train(env, agent, episodes=100, steps_per_episode=500, batch_size=256, discount=0.99, tau=0.005)
```
阅读全文