DDPG代码
时间: 2023-07-03 15:21:50 浏览: 60
以下是DDPG算法的代码示例,包括Actor和Critic网络的定义,以及训练过程中的主要步骤:
```python
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import random
from collections import namedtuple, deque
# 定义Actor网络
class Actor(nn.Module):
def __init__(self, state_size, action_size, hidden_size):
super(Actor, self).__init__()
self.fc1 = nn.Linear(state_size, hidden_size)
self.fc2 = nn.Linear(hidden_size, hidden_size)
self.fc3 = nn.Linear(hidden_size, action_size)
self.relu = nn.ReLU()
self.tanh = nn.Tanh()
def forward(self, state):
x = self.fc1(state)
x = self.relu(x)
x = self.fc2(x)
x = self.relu(x)
x = self.fc3(x)
x = self.tanh(x)
return x
# 定义Critic网络
class Critic(nn.Module):
def __init__(self, state_size, action_size, hidden_size):
super(Critic, self).__init__()
self.fc1 = nn.Linear(state_size + action_size, hidden_size)
self.fc2 = nn.Linear(hidden_size, hidden_size)
self.fc3 = nn.Linear(hidden_size, 1)
self.relu = nn.ReLU()
def forward(self, state, action):
x = torch.cat([state, action], dim=1)
x = self.fc1(x)
x = self.relu(x)
x = self.fc2(x)
x = self.relu(x)
x = self.fc3(x)
return x
# 定义经验回放缓存
class ReplayBuffer:
def __init__(self, buffer_size):
self.buffer_size = buffer_size
self.buffer = deque(maxlen=self.buffer_size)
self.experience = namedtuple("Experience", field_names=["state", "action", "reward", "next_state", "done"])
def add(self, state, action, reward, next_state, done):
e = self.experience(state, action, reward, next_state, done)
self.buffer.append(e)
def sample(self, batch_size):
experiences = random.sample(self.buffer, k=batch_size)
states = torch.from_numpy(np.vstack([e.state for e in experiences])).float()
actions = torch.from_numpy(np.vstack([e.action for e in experiences])).float()
rewards = torch.from_numpy(np.vstack([e.reward for e in experiences])).float()
next_states = torch.from_numpy(np.vstack([e.next_state for e in experiences])).float()
dones = torch.from_numpy(np.vstack([e.done for e in experiences]).astype(np.uint8)).float()
return (states, actions, rewards, next_states, dones)
def __len__(self):
return len(self.buffer)
# 定义DDPG算法
class DDPG:
def __init__(self, state_size, action_size, hidden_size, buffer_size, batch_size, gamma, tau, lr_actor, lr_critic):
self.state_size = state_size
self.action_size = action_size
self.hidden_size = hidden_size
self.buffer_size = buffer_size
self.batch_size = batch_size
self.gamma = gamma
self.tau = tau
self.lr_actor = lr_actor
self.lr_critic = lr_critic
self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
self.actor_local = Actor(state_size, action_size, hidden_size).to(self.device)
self.actor_target = Actor(state_size, action_size, hidden_size).to(self.device)
self.actor_optimizer = optim.Adam(self.actor_local.parameters(), lr=self.lr_actor)
self.critic_local = Critic(state_size, action_size, hidden_size).to(self.device)
self.critic_target = Critic(state_size, action_size, hidden_size).to(self.device)
self.critic_optimizer = optim.Adam(self.critic_local.parameters(), lr=self.lr_critic)
self.memory = ReplayBuffer(buffer_size)
def act(self, state, noise):
state = torch.from_numpy(state).float().to(self.device)
self.actor_local.eval()
with torch.no_grad():
action = self.actor_local(state).cpu().data.numpy()
self.actor_local.train()
action += noise.sample()
return np.clip(action, -1, 1)
def step(self, state, action, reward, next_state, done):
self.memory.add(state, action, reward, next_state, done)
if len(self.memory) > self.batch_size:
experiences = self.memory.sample(self.batch_size)
self.learn(experiences)
def learn(self, experiences):
states, actions, rewards, next_states, dones = experiences
# 更新Critic网络
Q_targets_next = self.critic_target(next_states, self.actor_target(next_states))
Q_targets = rewards + (self.gamma * Q_targets_next * (1 - dones))
Q_expected = self.critic_local(states, actions)
critic_loss = nn.MSELoss()(Q_expected, Q_targets)
self.critic_optimizer.zero_grad()
critic_loss.backward()
self.critic_optimizer.step()
# 更新Actor网络
actions_pred = self.actor_local(states)
actor_loss = -self.critic_local(states, actions_pred).mean()
self.actor_optimizer.zero_grad()
actor_loss.backward()
self.actor_optimizer.step()
# 更新目标网络
self.soft_update(self.critic_local, self.critic_target, self.tau)
self.soft_update(self.actor_local, self.actor_target, self.tau)
def soft_update(self, local_model, target_model, tau):
for target_param, local_param in zip(target_model.parameters(), local_model.parameters()):
target_param.data.copy_(tau * local_param.data + (1.0 - tau) * target_param.data)
# 训练过程
env = gym.make('Pendulum-v0')
state_size = env.observation_space.shape[0]
action_size = env.action_space.shape[0]
hidden_size = 128
buffer_size = 100000
batch_size = 128
gamma = 0.99
tau = 0.001
lr_actor = 0.0001
lr_critic = 0.001
ddpg = DDPG(state_size, action_size, hidden_size, buffer_size, batch_size, gamma, tau, lr_actor, lr_critic)
noise = OUNoise(action_size)
scores = []
for i_episode in range(1000):
state = env.reset()
score = 0
noise.reset()
while True:
action = ddpg.act(state, noise)
next_state, reward, done, _ = env.step(action)
ddpg.step(state, action, reward, next_state, done)
score += reward
state = next_state
if done:
break
scores.append(score)
print('Episode {} Score: {:.2f}'.format(i_episode, score))
if np.mean(scores[-100:]) >= -150:
print('Task solved after {} episodes'.format(i_episode))
break
env.close()
```
以上代码仅供参考,具体实现可能需要根据具体问题进行调整。