ddpg代码 dqn代码
时间: 2023-09-12 22:03:48 浏览: 129
DDPG(Deep Deterministic Policy Gradient)代码实现:
```python
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
import random
from collections import deque
class ReplayBuffer:
def __init__(self, buffer_size):
self.buffer_size = buffer_size
self.buffer = deque(maxlen=buffer_size)
def push(self, state, action, reward, next_state, done):
experience = (state, action, np.array([reward]), next_state, done)
self.buffer.append(experience)
def sample(self, batch_size):
state_batch = []
action_batch = []
reward_batch = []
next_state_batch = []
done_batch = []
batch = random.sample(self.buffer, batch_size)
for experience in batch:
state, action, reward, next_state, done = experience
state_batch.append(state)
action_batch.append(action)
reward_batch.append(reward)
next_state_batch.append(next_state)
done_batch.append(done)
return (state_batch, action_batch, reward_batch, next_state_batch, done_batch)
def __len__(self):
return len(self.buffer)
class Actor(nn.Module):
def __init__(self, state_dim, action_dim, max_action):
super(Actor, self).__init__()
self.fc1 = nn.Linear(state_dim, 256)
self.fc2 = nn.Linear(256, 256)
self.fc3 = nn.Linear(256, action_dim)
self.max_action = max_action
def forward(self, x):
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.max_action * torch.tanh(self.fc3(x))
return x
class Critic(nn.Module):
def __init__(self, state_dim, action_dim):
super(Critic, self).__init__()
self.fc1 = nn.Linear(state_dim + action_dim, 256)
self.fc2 = nn.Linear(256, 256)
self.fc3 = nn.Linear(256, 1)
def forward(self, state, action):
x = torch.cat([state, action], dim=1)
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return x
class DDPG:
def __init__(self, state_dim, action_dim, max_action):
self.actor = Actor(state_dim, action_dim, max_action).to(device)
self.actor_target = Actor(state_dim, action_dim, max_action).to(device)
self.actor_target.load_state_dict(self.actor.state_dict())
self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=1e-3)
self.critic = Critic(state_dim, action_dim).to(device)
self.critic_target = Critic(state_dim, action_dim).to(device)
self.critic_target.load_state_dict(self.critic.state_dict())
self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=1e-3)
self.buffer = ReplayBuffer(buffer_size)
self.state_dim = state_dim
self.action_dim = action_dim
self.max_action = max_action
def select_action(self, state):
state = torch.FloatTensor(state.reshape(1, -1)).to(device)
action = self.actor(state).cpu().data.numpy().flatten()
return action
def train(self, batch_size, gamma, tau):
state_batch, action_batch, reward_batch, next_state_batch, done_batch = self.buffer.sample(batch_size)
state_batch = torch.FloatTensor(state_batch).to(device)
action_batch = torch.FloatTensor(action_batch).to(device)
reward_batch = torch.FloatTensor(reward_batch).to(device)
next_state_batch = torch.FloatTensor(next_state_batch).to(device)
done_batch = torch.FloatTensor(done_batch).to(device)
next_action_batch = self.actor_target(next_state_batch)
next_q_value = self.critic_target(next_state_batch, next_action_batch)
q_value = self.critic(state_batch, action_batch)
target_q_value = reward_batch + (1 - done_batch) * gamma * next_q_value
critic_loss = F.mse_loss(q_value, target_q_value.detach())
self.critic_optimizer.zero_grad()
critic_loss.backward()
self.critic_optimizer.step()
actor_loss = -self.critic(state_batch, self.actor(state_batch)).mean()
self.actor_optimizer.zero_grad()
actor_loss.backward()
self.actor_optimizer.step()
for param, target_param in zip(self.actor.parameters(), self.actor_target.parameters()):
target_param.data.copy_(tau * param.data + (1 - tau) * target_param.data)
for param, target_param in zip(self.critic.parameters(), self.critic_target.parameters()):
target_param.data.copy_(tau * param.data + (1 - tau) * target_param.data)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
ddpg = DDPG(state_dim=4, action_dim=1, max_action=1)
batch_size = 128
gamma = 0.99
tau = 0.001
buffer_size = 100000
max_episodes = 1000
max_steps = 1000
for episode in range(max_episodes):
state = env.reset()
episode_reward = 0
for step in range(max_steps):
action = ddpg.select_action(state)
next_state, reward, done, _ = env.step(action)
ddpg.buffer.push(state, action, reward, next_state, done)
if len(ddpg.buffer) > batch_size:
ddpg.train(batch_size, gamma, tau)
state = next_state
episode_reward += reward
if done:
break
print(f"Episode {episode+1} : Episode Reward {episode_reward:.2f}")
```
DQN(Deep Q-Network)代码实现:
```python
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
import random
from collections import deque
class ReplayBuffer:
def __init__(self, buffer_size):
self.buffer_size = buffer_size
self.buffer = deque(maxlen=buffer_size)
def push(self, state, action, reward, next_state, done):
experience = (state, action, np.array([reward]), next_state, done)
self.buffer.append(experience)
def sample(self, batch_size):
state_batch = []
action_batch = []
reward_batch = []
next_state_batch = []
done_batch = []
batch = random.sample(self.buffer, batch_size)
for experience in batch:
state, action, reward, next_state, done = experience
state_batch.append(state)
action_batch.append(action)
reward_batch.append(reward)
next_state_batch.append(next_state)
done_batch.append(done)
return (state_batch, action_batch, reward_batch, next_state_batch, done_batch)
def __len__(self):
return len(self.buffer)
class DQN(nn.Module):
def __init__(self, state_dim, action_dim):
super(DQN, self).__init__()
self.fc1 = nn.Linear(state_dim, 128)
self.fc2 = nn.Linear(128, 64)
self.fc3 = nn.Linear(64, action_dim)
def forward(self, x):
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return x
class DQNAgent:
def __init__(self, state_dim, action_dim):
self.q_network = DQN(state_dim, action_dim).to(device)
self.target_network = DQN(state_dim, action_dim).to(device)
self.target_network.load_state_dict(self.q_network.state_dict())
self.optimizer = optim.Adam(self.q_network.parameters(), lr=1e-3)
self.buffer = ReplayBuffer(buffer_size)
self.state_dim = state_dim
self.action_dim = action_dim
def select_action(self, state, eps):
if random.random() < eps:
action = np.random.uniform(-1, 1, size=self.action_dim)
else:
state = torch.FloatTensor(state).unsqueeze(0).to(device)
q_values = self.q_network(state)
action = q_values.argmax().cpu().numpy()
return action
def train(self, batch_size, gamma):
state_batch, action_batch, reward_batch, next_state_batch, done_batch = self.buffer.sample(batch_size)
state_batch = torch.FloatTensor(state_batch).to(device)
action_batch = torch.LongTensor(action_batch).unsqueeze(-1).to(device)
reward_batch = torch.FloatTensor(reward_batch).to(device)
next_state_batch = torch.FloatTensor(next_state_batch).to(device)
done_batch = torch.FloatTensor(done_batch).to(device)
q_values = self.q_network(state_batch).gather(-1, action_batch)
next_q_values = self.target_network(next_state_batch).max(-1)[0].detach()
target_q_values = reward_batch + gamma * next_q_values * (1 - done_batch)
loss = F.mse_loss(q_values, target_q_values.unsqueeze(-1))
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
for param, target_param in zip(self.q_network.parameters(), self.target_network.parameters()):
target_param.data.copy_(param.data)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
dqn_agent = DQNAgent(state_dim=4, action_dim=2)
buffer_size = 10000
batch_size = 128
gamma = 0.99
max_episodes = 1000
max_steps = 1000
eps_start = 1.0
eps_end = 0.01
eps_decay = 0.995
for episode in range(max_episodes):
state = env.reset()
episode_reward = 0
eps = eps_end + (eps_start - eps_end) * np.exp(-episode / 200)
for step in range(max_steps):
action = dqn_agent.select_action(state, eps)
next_state, reward, done, _ = env.step(action)
dqn_agent.buffer.push(state, action, reward, next_state, done)
if len(dqn_agent.buffer) > batch_size:
dqn_agent.train(batch_size, gamma)
state = next_state
episode_reward += reward
if done:
break
print(f"Episode {episode+1} : Episode Reward {episode_reward:.2f}")
```
以上两份代码都是使用 PyTorch 实现的。
阅读全文