maddpg结合优先经验回放和mpe环境 tensorflow代码
时间: 2024-05-13 13:18:55 浏览: 292
基于 python+mindspore框架训练多智能体强化学习中的mpe环境
以下是使用TensorFlow实现MADDPG算法并结合优先经验回放和MPE环境的代码。代码中包含注释以帮助理解。
```python
import numpy as np
import tensorflow as tf
import random
from mlagents.envs import UnityEnvironment
from mlagents.envs import BrainInfo
# 超参数
BUFFER_SIZE = int(1e6) # 经验回放缓冲区大小
BATCH_SIZE = 128 # 批量大小
GAMMA = 0.99 # 折扣因子
TAU = 1e-3 # 目标网络更新速率
LR_ACTOR = 1e-3 # Actor网络学习率
LR_CRITIC = 1e-3 # Critic网络学习率
UPDATE_EVERY = 2 # 更新网络的时间步数
NUM_UPDATES = 10 # 每次更新网络的次数
# 神经网络模型
class Actor(tf.keras.Model):
def __init__(self, state_size, action_size):
super(Actor, self).__init__()
self.fc1 = tf.keras.layers.Dense(256, activation='relu')
self.fc2 = tf.keras.layers.Dense(128, activation='relu')
self.fc3 = tf.keras.layers.Dense(action_size, activation='tanh')
def call(self, state):
x = self.fc1(state)
x = self.fc2(x)
x = self.fc3(x)
return x
class Critic(tf.keras.Model):
def __init__(self, state_size, action_size):
super(Critic, self).__init__()
self.fc1 = tf.keras.layers.Dense(256, activation='relu')
self.fc2 = tf.keras.layers.Dense(128, activation='relu')
self.fc3 = tf.keras.layers.Dense(1, activation=None)
self.fc4 = tf.keras.layers.Dense(256, activation='relu')
self.fc5 = tf.keras.layers.Dense(128, activation='relu')
self.fc6 = tf.keras.layers.Dense(1, activation=None)
def call(self, state, action):
xs = tf.concat([state, action], axis=1)
x1 = self.fc1(xs)
x1 = self.fc2(x1)
x1 = self.fc3(x1)
x2 = self.fc4(xs)
x2 = self.fc5(x2)
x2 = self.fc6(x2)
return x1, x2
# 优先经验回放类
class PrioritizedReplay:
def __init__(self, buffer_size, batch_size):
self.buffer_size = buffer_size
self.batch_size = batch_size
self.buffer = []
self.priorities = np.zeros((buffer_size,), dtype=np.float32)
self.pos = 0
self.alpha = 0.5
self.beta = 0.5
self.beta_increment_per_sampling = 0.001
def add(self, state, action, reward, next_state, done):
max_priority = np.max(self.priorities) if self.buffer else 1.0
experience = (state, action, reward, next_state, done)
if len(self.buffer) < self.buffer_size:
self.buffer.append(experience)
else:
self.buffer[self.pos] = experience
self.priorities[self.pos] = max_priority
self.pos = (self.pos + 1) % self.buffer_size
def sample(self):
if len(self.buffer) == self.buffer_size:
priorities = self.priorities
else:
priorities = self.priorities[:self.pos]
probs = priorities ** self.alpha
probs /= probs.sum()
indices = np.random.choice(len(self.buffer), self.batch_size, p=probs)
samples = [self.buffer[idx] for idx in indices]
total = len(self.buffer)
weights = (total * probs[indices]) ** (-self.beta)
weights /= weights.max()
self.beta = np.min([1., self.beta + self.beta_increment_per_sampling])
return indices, samples, weights
def update_priorities(self, batch_indices, batch_priorities):
for idx, priority in zip(batch_indices, batch_priorities):
self.priorities[idx] = priority
# MADDPG算法类
class MADDPG:
def __init__(self, state_size, action_size, num_agents):
self.state_size = state_size
self.action_size = action_size
self.num_agents = num_agents
self.actors = [Actor(state_size, action_size) for _ in range(num_agents)]
self.critics = [Critic((state_size+action_size)*num_agents, 1) for _ in range(num_agents)]
self.target_actors = [Actor(state_size, action_size) for _ in range(num_agents)]
self.target_critics = [Critic((state_size+action_size)*num_agents, 1) for _ in range(num_agents)]
for i in range(num_agents):
self.target_actors[i].set_weights(self.actors[i].get_weights())
self.target_critics[i].set_weights(self.critics[i].get_weights())
self.buffer = PrioritizedReplay(BUFFER_SIZE, BATCH_SIZE)
self.actor_optimizer = [tf.keras.optimizers.Adam(LR_ACTOR) for _ in range(num_agents)]
self.critic_optimizer = [tf.keras.optimizers.Adam(LR_CRITIC) for _ in range(num_agents)]
self.t_step = 0
def act(self, obs):
obs = np.array(obs)
actions = []
for i in range(self.num_agents):
action = self.actors[i](obs[i][np.newaxis,:], training=False)
actions.append(action.numpy())
actions = np.concatenate(actions, axis=0)
return actions
def step(self, state, action, reward, next_state, done):
self.buffer.add(state, action, reward, next_state, done)
self.t_step = (self.t_step + 1) % UPDATE_EVERY
if self.t_step == 0 and len(self.buffer.buffer) > BATCH_SIZE:
for _ in range(NUM_UPDATES):
indices, samples, weights = self.buffer.sample()
self.learn(samples, weights)
self.update_targets()
self.buffer.update_priorities(indices, weights)
def learn(self, samples, weights):
states = np.array([sample[0] for sample in samples])
actions = np.array([sample[1] for sample in samples])
rewards = np.array([sample[2] for sample in samples])
next_states = np.array([sample[3] for sample in samples])
dones = np.array([sample[4] for sample in samples])
for i in range(self.num_agents):
# 计算Q值
with tf.GradientTape(persistent=True) as tape:
target_actions = [self.target_actors[j](next_states[j][np.newaxis,:], training=False) for j in range(self.num_agents)]
target_actions = np.concatenate(target_actions, axis=0)
target_qs = self.target_critics[i]((next_states.reshape(-1, self.state_size*self.num_agents), target_actions))
target_qs = target_qs.numpy().reshape(-1, self.num_agents)
q_targets = rewards[:,i][:,np.newaxis] + (GAMMA * target_qs * (1 - dones[:,i][:,np.newaxis]))
critic_qs = self.critics[i]((states.reshape(-1, self.state_size*self.num_agents), actions.reshape(-1, self.action_size*self.num_agents)))
critic_loss = tf.reduce_mean(weights * (q_targets - critic_qs)**2)
critic_grads = tape.gradient(critic_loss, self.critics[i].trainable_variables)
self.critic_optimizer[i].apply_gradients(zip(critic_grads, self.critics[i].trainable_variables))
# 计算Actor梯度
with tf.GradientTape() as tape:
actor_actions = [self.actors[j](states[:,j,:], training=False) if j == i else self.actors[j](states[:,j,:], training=True) for j in range(self.num_agents)]
actor_actions = np.concatenate(actor_actions, axis=0)
actor_loss = -tf.reduce_mean(self.critics[i]((states.reshape(-1, self.state_size*self.num_agents), actor_actions)))
actor_grads = tape.gradient(actor_loss, self.actors[i].trainable_variables)
self.actor_optimizer[i].apply_gradients(zip(actor_grads, self.actors[i].trainable_variables))
def update_targets(self):
for i in range(self.num_agents):
self.target_actors[i].set_weights(TAU*np.array(self.actors[i].get_weights())+(1-TAU)*np.array(self.target_actors[i].get_weights()))
self.target_critics[i].set_weights(TAU*np.array(self.critics[i].get_weights())+(1-TAU)*np.array(self.target_critics[i].get_weights()))
# 环境
env_name = "MPE/3DBall"
env = UnityEnvironment(file_name=env_name)
brain_name = env.brain_names[0]
brain = env.brains[brain_name]
env_info = env.reset()[brain_name]
state_size = env_info.vector_observations.shape[1]
action_size = brain.vector_action_space_size
num_agents = len(env_info.agents)
maddpg = MADDPG(state_size, action_size, num_agents)
scores = []
scores_window = deque(maxlen=100)
for i_episode in range(10000):
env_info = env.reset()[brain_name]
obs = env_info.vector_observations
score = np.zeros(num_agents)
while True:
actions = maddpg.act(obs)
env_info = env.step(actions)[brain_name]
next_obs = env_info.vector_observations
rewards = env_info.rewards
dones = env_info.local_done
maddpg.step(obs, actions, rewards, next_obs, dones)
obs = next_obs
score += rewards
if np.any(dones):
break
scores_window.append(np.max(score))
scores.append(np.max(score))
print('\rEpisode {}\tAverage Score: {:.2f}'.format(i_episode, np.mean(scores_window)), end="")
if i_episode % 100 == 0:
print('\rEpisode {}\tAverage Score: {:.2f}'.format(i_episode, np.mean(scores_window)))
if np.mean(scores_window)>=0.5:
print('\nEnvironment solved in {:d} episodes!\tAverage Score: {:.2f}'.format(i_episode, np.mean(scores_window)))
break
env.close()
```
阅读全文