print("\rEpisode {}/{}. ".format(i, num_episodes), end="")
时间: 2023-04-05 15:04:10 浏览: 48
这是一个 Python 代码的问题,可以回答。这段代码是用于在控制台中输出当前的“Episode”信息,其中“i”表示当前的集数,“num_episodes”表示总共的集数。使用“\r”可以让输出的信息覆盖之前的内容,达到更新的效果。
相关问题
dqn的pytorch代码
以下是一个基于PyTorch的DQN(Deep Q-Network)的示例代码,用于解决OpenAI Gym中的CartPole问题:
```python
import gym
import random
import math
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from collections import deque
# 定义 DQN 网络
class DQN(nn.Module):
def __init__(self, observation_space_size, action_space_size):
super(DQN, self).__init__()
self.fc1 = nn.Linear(observation_space_size, 64)
self.fc2 = nn.Linear(64, 64)
self.fc3 = nn.Linear(64, action_space_size)
def forward(self, x):
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return x
# 定义经验回放缓存
class ReplayBuffer():
def __init__(self, capacity):
self.buffer = deque(maxlen=capacity)
def push(self, state, action, reward, next_state, done):
self.buffer.append((state, action, reward, next_state, done))
def sample(self, batch_size):
state, action, reward, next_state, done = zip(*random.sample(self.buffer, batch_size))
return state, action, reward, next_state, done
def __len__(self):
return len(self.buffer)
# 定义 DQN 算法
class DQNAgent():
def __init__(self, observation_space_size, action_space_size, replay_buffer_capacity=10000,
batch_size=32, gamma=0.99, learning_rate=1e-3, eps_start=1.0, eps_end=0.01, eps_decay=200):
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.observation_space_size = observation_space_size
self.action_space_size = action_space_size
self.batch_size = batch_size
self.gamma = gamma
self.eps_start = eps_start
self.eps_end = eps_end
self.eps_decay = eps_decay
self.eps_decay_rate = (eps_start - eps_end) / eps_decay
self.steps_done = 0
self.policy_net = DQN(observation_space_size, action_space_size).to(self.device)
self.target_net = DQN(observation_space_size, action_space_size).to(self.device)
self.target_net.load_state_dict(self.policy_net.state_dict())
self.target_net.eval()
self.optimizer = optim.Adam(self.policy_net.parameters(), lr=learning_rate)
self.replay_buffer = ReplayBuffer(replay_buffer_capacity)
def select_action(self, state):
self.steps_done += 1
epsilon = self.eps_end + (self.eps_start - self.eps_end) * math.exp(-1. * self.steps_done / self.eps_decay)
if random.random() < epsilon:
return random.randrange(self.action_space_size)
else:
with torch.no_grad():
state_tensor = torch.tensor(state, dtype=torch.float32).to(self.device)
q_values = self.policy_net(state_tensor).cpu().numpy()
return q_values.argmax()
def optimize_model(self):
if len(self.replay_buffer) < self.batch_size:
return
state, action, reward, next_state, done = self.replay_buffer.sample(self.batch_size)
state_tensor = torch.tensor(state, dtype=torch.float32).to(self.device)
action_tensor = torch.tensor(action, dtype=torch.long).to(self.device)
reward_tensor = torch.tensor(reward, dtype=torch.float32).to(self.device)
next_state_tensor = torch.tensor(next_state, dtype=torch.float32).to(self.device)
done_tensor = torch.tensor(done, dtype=torch.float32).to(self.device)
q_values = self.policy_net(state_tensor).gather(1, action_tensor.unsqueeze(1)).squeeze(1)
next_q_values = self.target_net(next_state_tensor).max(1)[0]
expected_q_values = reward_tensor + (1 - done_tensor) * self.gamma * next_q_values
loss = F.mse_loss(q_values, expected_q_values)
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
def update_target_network(self):
self.target_net.load_state_dict(self.policy_net.state_dict())
def remember(self, state, action, reward, next_state, done):
self.replay_buffer.push(state, action, reward, next_state, done)
# 训练 DQN 算法
def train_dqn(env, agent, num_episodes):
episode_lengths = []
for i in range(num_episodes):
state = env.reset()
episode_reward = 0
while True:
action = agent.select_action(state)
next_state, reward, done, _ = env.step(action)
agent.remember(state, action, reward, next_state, done)
agent.optimize_model()
state = next_state
episode_reward += reward
if done:
episode_lengths.append(episode_reward)
break
if i % 20 == 0:
print("Episode: {}, average reward: {:.2f}".format(i, sum(episode_lengths[-20:]) / 20))
if i % 10 == 0:
agent.update_target_network()
return episode_lengths
# 运行 DQN 算法
if __name__ == '__main__':
env = gym.make('CartPole-v0')
observation_space_size = env.observation_space.shape[0]
action_space_size = env.action_space.n
agent = DQNAgent(observation_space_size, action_space_size)
episode_lengths = train_dqn(env, agent, num_episodes=200)
env.close()
```
这个代码是一个简单的DQN实现,可以在CartPole环境中得到不错的结果。如果要在其他环境中使用,需要根据环境的状态空间和动作空间进行修改。
maddpg结合优先经验回放和mpe环境 tensorflow代码
以下是使用TensorFlow实现MADDPG算法并结合优先经验回放和MPE环境的代码。代码中包含注释以帮助理解。
```python
import numpy as np
import tensorflow as tf
import random
from mlagents.envs import UnityEnvironment
from mlagents.envs import BrainInfo
# 超参数
BUFFER_SIZE = int(1e6) # 经验回放缓冲区大小
BATCH_SIZE = 128 # 批量大小
GAMMA = 0.99 # 折扣因子
TAU = 1e-3 # 目标网络更新速率
LR_ACTOR = 1e-3 # Actor网络学习率
LR_CRITIC = 1e-3 # Critic网络学习率
UPDATE_EVERY = 2 # 更新网络的时间步数
NUM_UPDATES = 10 # 每次更新网络的次数
# 神经网络模型
class Actor(tf.keras.Model):
def __init__(self, state_size, action_size):
super(Actor, self).__init__()
self.fc1 = tf.keras.layers.Dense(256, activation='relu')
self.fc2 = tf.keras.layers.Dense(128, activation='relu')
self.fc3 = tf.keras.layers.Dense(action_size, activation='tanh')
def call(self, state):
x = self.fc1(state)
x = self.fc2(x)
x = self.fc3(x)
return x
class Critic(tf.keras.Model):
def __init__(self, state_size, action_size):
super(Critic, self).__init__()
self.fc1 = tf.keras.layers.Dense(256, activation='relu')
self.fc2 = tf.keras.layers.Dense(128, activation='relu')
self.fc3 = tf.keras.layers.Dense(1, activation=None)
self.fc4 = tf.keras.layers.Dense(256, activation='relu')
self.fc5 = tf.keras.layers.Dense(128, activation='relu')
self.fc6 = tf.keras.layers.Dense(1, activation=None)
def call(self, state, action):
xs = tf.concat([state, action], axis=1)
x1 = self.fc1(xs)
x1 = self.fc2(x1)
x1 = self.fc3(x1)
x2 = self.fc4(xs)
x2 = self.fc5(x2)
x2 = self.fc6(x2)
return x1, x2
# 优先经验回放类
class PrioritizedReplay:
def __init__(self, buffer_size, batch_size):
self.buffer_size = buffer_size
self.batch_size = batch_size
self.buffer = []
self.priorities = np.zeros((buffer_size,), dtype=np.float32)
self.pos = 0
self.alpha = 0.5
self.beta = 0.5
self.beta_increment_per_sampling = 0.001
def add(self, state, action, reward, next_state, done):
max_priority = np.max(self.priorities) if self.buffer else 1.0
experience = (state, action, reward, next_state, done)
if len(self.buffer) < self.buffer_size:
self.buffer.append(experience)
else:
self.buffer[self.pos] = experience
self.priorities[self.pos] = max_priority
self.pos = (self.pos + 1) % self.buffer_size
def sample(self):
if len(self.buffer) == self.buffer_size:
priorities = self.priorities
else:
priorities = self.priorities[:self.pos]
probs = priorities ** self.alpha
probs /= probs.sum()
indices = np.random.choice(len(self.buffer), self.batch_size, p=probs)
samples = [self.buffer[idx] for idx in indices]
total = len(self.buffer)
weights = (total * probs[indices]) ** (-self.beta)
weights /= weights.max()
self.beta = np.min([1., self.beta + self.beta_increment_per_sampling])
return indices, samples, weights
def update_priorities(self, batch_indices, batch_priorities):
for idx, priority in zip(batch_indices, batch_priorities):
self.priorities[idx] = priority
# MADDPG算法类
class MADDPG:
def __init__(self, state_size, action_size, num_agents):
self.state_size = state_size
self.action_size = action_size
self.num_agents = num_agents
self.actors = [Actor(state_size, action_size) for _ in range(num_agents)]
self.critics = [Critic((state_size+action_size)*num_agents, 1) for _ in range(num_agents)]
self.target_actors = [Actor(state_size, action_size) for _ in range(num_agents)]
self.target_critics = [Critic((state_size+action_size)*num_agents, 1) for _ in range(num_agents)]
for i in range(num_agents):
self.target_actors[i].set_weights(self.actors[i].get_weights())
self.target_critics[i].set_weights(self.critics[i].get_weights())
self.buffer = PrioritizedReplay(BUFFER_SIZE, BATCH_SIZE)
self.actor_optimizer = [tf.keras.optimizers.Adam(LR_ACTOR) for _ in range(num_agents)]
self.critic_optimizer = [tf.keras.optimizers.Adam(LR_CRITIC) for _ in range(num_agents)]
self.t_step = 0
def act(self, obs):
obs = np.array(obs)
actions = []
for i in range(self.num_agents):
action = self.actors[i](obs[i][np.newaxis,:], training=False)
actions.append(action.numpy())
actions = np.concatenate(actions, axis=0)
return actions
def step(self, state, action, reward, next_state, done):
self.buffer.add(state, action, reward, next_state, done)
self.t_step = (self.t_step + 1) % UPDATE_EVERY
if self.t_step == 0 and len(self.buffer.buffer) > BATCH_SIZE:
for _ in range(NUM_UPDATES):
indices, samples, weights = self.buffer.sample()
self.learn(samples, weights)
self.update_targets()
self.buffer.update_priorities(indices, weights)
def learn(self, samples, weights):
states = np.array([sample[0] for sample in samples])
actions = np.array([sample[1] for sample in samples])
rewards = np.array([sample[2] for sample in samples])
next_states = np.array([sample[3] for sample in samples])
dones = np.array([sample[4] for sample in samples])
for i in range(self.num_agents):
# 计算Q值
with tf.GradientTape(persistent=True) as tape:
target_actions = [self.target_actors[j](next_states[j][np.newaxis,:], training=False) for j in range(self.num_agents)]
target_actions = np.concatenate(target_actions, axis=0)
target_qs = self.target_critics[i]((next_states.reshape(-1, self.state_size*self.num_agents), target_actions))
target_qs = target_qs.numpy().reshape(-1, self.num_agents)
q_targets = rewards[:,i][:,np.newaxis] + (GAMMA * target_qs * (1 - dones[:,i][:,np.newaxis]))
critic_qs = self.critics[i]((states.reshape(-1, self.state_size*self.num_agents), actions.reshape(-1, self.action_size*self.num_agents)))
critic_loss = tf.reduce_mean(weights * (q_targets - critic_qs)**2)
critic_grads = tape.gradient(critic_loss, self.critics[i].trainable_variables)
self.critic_optimizer[i].apply_gradients(zip(critic_grads, self.critics[i].trainable_variables))
# 计算Actor梯度
with tf.GradientTape() as tape:
actor_actions = [self.actors[j](states[:,j,:], training=False) if j == i else self.actors[j](states[:,j,:], training=True) for j in range(self.num_agents)]
actor_actions = np.concatenate(actor_actions, axis=0)
actor_loss = -tf.reduce_mean(self.critics[i]((states.reshape(-1, self.state_size*self.num_agents), actor_actions)))
actor_grads = tape.gradient(actor_loss, self.actors[i].trainable_variables)
self.actor_optimizer[i].apply_gradients(zip(actor_grads, self.actors[i].trainable_variables))
def update_targets(self):
for i in range(self.num_agents):
self.target_actors[i].set_weights(TAU*np.array(self.actors[i].get_weights())+(1-TAU)*np.array(self.target_actors[i].get_weights()))
self.target_critics[i].set_weights(TAU*np.array(self.critics[i].get_weights())+(1-TAU)*np.array(self.target_critics[i].get_weights()))
# 环境
env_name = "MPE/3DBall"
env = UnityEnvironment(file_name=env_name)
brain_name = env.brain_names[0]
brain = env.brains[brain_name]
env_info = env.reset()[brain_name]
state_size = env_info.vector_observations.shape[1]
action_size = brain.vector_action_space_size
num_agents = len(env_info.agents)
maddpg = MADDPG(state_size, action_size, num_agents)
scores = []
scores_window = deque(maxlen=100)
for i_episode in range(10000):
env_info = env.reset()[brain_name]
obs = env_info.vector_observations
score = np.zeros(num_agents)
while True:
actions = maddpg.act(obs)
env_info = env.step(actions)[brain_name]
next_obs = env_info.vector_observations
rewards = env_info.rewards
dones = env_info.local_done
maddpg.step(obs, actions, rewards, next_obs, dones)
obs = next_obs
score += rewards
if np.any(dones):
break
scores_window.append(np.max(score))
scores.append(np.max(score))
print('\rEpisode {}\tAverage Score: {:.2f}'.format(i_episode, np.mean(scores_window)), end="")
if i_episode % 100 == 0:
print('\rEpisode {}\tAverage Score: {:.2f}'.format(i_episode, np.mean(scores_window)))
if np.mean(scores_window)>=0.5:
print('\nEnvironment solved in {:d} episodes!\tAverage Score: {:.2f}'.format(i_episode, np.mean(scores_window)))
break
env.close()
```