连续动作空间的a3c代码及pytorch实现
时间: 2024-01-02 19:04:13 浏览: 47
代码如下:
```python
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
import gym
import random
import time
# Hyperparameters
learning_rate = 0.0001
gamma = 0.98
n_episode = 3000
max_step = 500
epsilon = 0.1
# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# Replay buffer
class ReplayBuffer():
def __init__(self, buffer_size):
self.buffer_size = buffer_size
self.buffer = []
def push(self, transition):
if len(self.buffer) == self.buffer_size:
self.buffer.pop(0)
self.buffer.append(transition)
def sample(self, batch_size):
return random.sample(self.buffer, batch_size)
def __len__(self):
return len(self.buffer)
# Actor-Critic network
class ActorCritic(nn.Module):
def __init__(self, n_state, n_action):
super(ActorCritic, self).__init__()
self.fc1 = nn.Linear(n_state, 256)
self.fc2 = nn.Linear(256, 128)
self.fc3 = nn.Linear(128, 64)
self.actor = nn.Linear(64, n_action)
self.critic = nn.Linear(64, 1)
def forward(self, x):
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = F.relu(self.fc3(x))
policy = F.softmax(self.actor(x), dim=1)
value = self.critic(x)
return policy, value
# A3C agent
class A3C():
def __init__(self, n_state, n_action, buffer_size):
self.n_state = n_state
self.n_action = n_action
self.buffer = ReplayBuffer(buffer_size)
self.actor_critic = ActorCritic(n_state, n_action).to(device)
self.optimizer = optim.Adam(self.actor_critic.parameters(), lr=learning_rate)
def select_action(self, state):
state = torch.FloatTensor(state).unsqueeze(0).to(device)
policy, _ = self.actor_critic(state)
action = torch.multinomial(policy, 1).item()
return action
def train(self):
transitions = self.buffer.sample(batch_size)
batch_state, batch_action, batch_reward, batch_next_state, batch_done = zip(*transitions)
batch_state = torch.FloatTensor(batch_state).to(device)
batch_action = torch.LongTensor(batch_action).unsqueeze(1).to(device)
batch_reward = torch.FloatTensor(batch_reward).unsqueeze(1).to(device)
batch_next_state = torch.FloatTensor(batch_next_state).to(device)
batch_done = torch.FloatTensor(batch_done).unsqueeze(1).to(device)
_, batch_value = self.actor_critic(batch_state)
_, batch_next_value = self.actor_critic(batch_next_state)
td_target = batch_reward + gamma * batch_next_value * (1 - batch_done)
td_error = td_target - batch_value
# Actor loss
log_policy, _ = self.actor_critic(batch_state)
log_policy = torch.log(log_policy.gather(1, batch_action))
actor_loss = (-log_policy * td_error.detach()).mean()
# Critic loss
critic_loss = F.smooth_l1_loss(batch_value, td_target.detach())
# Total loss
loss = actor_loss + critic_loss
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
def run(self):
env = gym.make('ContinuousMountainCar-v0')
for episode in range(n_episode):
state = env.reset().astype(np.float32)
episode_reward = 0
for step in range(max_step):
if np.random.rand() < epsilon:
action = np.random.uniform(-1, 1, (self.n_action,))
else:
action = self.select_action(state)
next_state, reward, done, _ = env.step(action)
next_state = next_state.astype(np.float32)
self.buffer.push((state, action, reward, next_state, done))
episode_reward += reward
state = next_state
if len(self.buffer) > batch_size:
self.train()
if done:
break
print('Episode: {}, Reward: {}'.format(episode, episode_reward))
env.close()
# Main
if __name__ == '__main__':
env = gym.make('ContinuousMountainCar-v0')
n_state = env.observation_space.shape[0]
n_action = env.action_space.shape[0]
buffer_size = 10000
batch_size = 128
agent = A3C(n_state, n_action, buffer_size)
agent.run()
```