maddpg结合优先经验回放 tensorflow代码
时间: 2023-09-30 10:09:02 浏览: 170
tensorflow源代码
以下是使用Tensorflow实现MADDPG算法并结合优先经验回放的示例代码:
``` python
import numpy as np
import tensorflow as tf
import random
# 定义神经网络模型
class ActorNetwork:
def __init__(self, sess, state_dim, action_dim, action_bound, learning_rate, tau):
self.sess = sess
self.s_dim = state_dim
self.a_dim = action_dim
self.action_bound = action_bound
self.learning_rate = learning_rate
self.tau = tau
# 创建actor网络
self.inputs, self.out, self.scaled_out = self.create_actor_network()
# 创建actor目标网络
self.target_inputs, self.target_out, self.target_scaled_out = self.create_actor_network()
# 定义actor网络参数
self.network_params = tf.trainable_variables()
# 定义actor目标网络参数
self.target_network_params = tf.trainable_variables()[len(self.network_params):]
# 定义actor目标网络更新操作
self.update_target_network_params = \
[self.target_network_params[i].assign(tf.multiply(self.network_params[i], self.tau) +
tf.multiply(self.target_network_params[i], 1. - self.tau))
for i in range(len(self.target_network_params))]
# 定义actor网络梯度
self.action_gradient = tf.placeholder(tf.float32, [None, self.a_dim])
# 定义actor网络参数梯度
self.unnormalized_actor_gradients = tf.gradients(self.scaled_out, self.network_params, -self.action_gradient)
self.actor_gradients = list(map(lambda x: tf.div(x, self.batch_size), self.unnormalized_actor_gradients))
# 定义actor网络优化器
self.optimizer = tf.train.AdamOptimizer(self.learning_rate).\
apply_gradients(zip(self.actor_gradients, self.network_params))
# 初始化网络参数
self.sess.run(tf.global_variables_initializer())
# 创建actor网络
def create_actor_network(self):
inputs = tf.placeholder(tf.float32, [None, self.s_dim])
w1 = tf.Variable(tf.random_normal([self.s_dim, 64]))
b1 = tf.Variable(tf.random_normal([64]))
net = tf.nn.relu(tf.matmul(inputs, w1) + b1)
w2 = tf.Variable(tf.random_normal([64, 32]))
b2 = tf.Variable(tf.random_normal([32]))
net = tf.nn.relu(tf.matmul(net, w2) + b2)
w3 = tf.Variable(tf.random_normal([32, self.a_dim]))
b3 = tf.Variable(tf.random_normal([self.a_dim]))
out = tf.matmul(net, w3) + b3
scaled_out = tf.multiply(out, self.action_bound)
return inputs, out, scaled_out
# 计算actor网络梯度
def actor_gradient(self, inputs, action_gradients, batch_size):
self.batch_size = batch_size
self.sess.run(self.optimizer, feed_dict={
self.inputs: inputs,
self.action_gradient: action_gradients
})
# 预测动作
def predict(self, inputs):
return self.sess.run(self.scaled_out, feed_dict={
self.inputs: inputs
})
# 更新actor目标网络
def update_target_network(self):
self.sess.run(self.update_target_network_params)
# 获取actor目标网络参数
def get_target_network_params(self):
return self.sess.run(self.target_network_params)
# 保存actor网络参数
def save_network(self, save_path):
saver = tf.train.Saver()
saver.save(self.sess, save_path)
# 加载actor网络参数
def load_network(self, load_path):
saver = tf.train.Saver()
saver.restore(self.sess, load_path)
class CriticNetwork:
def __init__(self, sess, state_dim, action_dim, learning_rate, tau, gamma, num_agents):
self.sess = sess
self.s_dim = state_dim
self.a_dim = action_dim
self.learning_rate = learning_rate
self.tau = tau
self.gamma = gamma
self.num_agents = num_agents
# 创建critic网络
self.inputs, self.action, self.out = self.create_critic_network()
# 创建critic目标网络
self.target_inputs, self.target_action, self.target_out = self.create_critic_network()
# 定义critic网络参数
self.network_params = tf.trainable_variables()[self.num_agents * 2:]
# 定义critic目标网络参数
self.target_network_params = tf.trainable_variables()[(self.num_agents * 2) + len(self.network_params):]
# 定义critic目标网络更新操作
self.update_target_network_params = \
[self.target_network_params[i].assign(tf.multiply(self.network_params[i], self.tau) +
tf.multiply(self.target_network_params[i], 1. - self.tau))
for i in range(len(self.target_network_params))]
# 定义critic网络梯度
self.predicted_q_value = tf.placeholder(tf.float32, [None, 1])
self.loss = tf.reduce_mean(tf.square(self.predicted_q_value - self.out))
self.optimizer = tf.train.AdamOptimizer(self.learning_rate).minimize(self.loss)
# 定义critic网络梯度
self.action_gradients = tf.gradients(self.out, self.action)
# 初始化网络参数
self.sess.run(tf.global_variables_initializer())
# 创建critic网络
def create_critic_network(self):
inputs = tf.placeholder(tf.float32, [None, self.s_dim])
action = tf.placeholder(tf.float32, [None, self.a_dim])
w1 = tf.Variable(tf.random_normal([self.s_dim, 64]))
b1 = tf.Variable(tf.random_normal([64]))
net = tf.nn.relu(tf.matmul(inputs, w1) + b1)
w2 = tf.Variable(tf.random_normal([64, 32]))
b2 = tf.Variable(tf.random_normal([32]))
action_net = tf.nn.relu(tf.matmul(action, w2) + b2)
w2_ = tf.Variable(tf.random_normal([32, 32]))
b2_ = tf.Variable(tf.random_normal([32]))
net_ = tf.nn.relu(tf.matmul(net, w2_) + tf.matmul(action_net, w2_) + b2_)
w3 = tf.Variable(tf.random_normal([32, 1]))
b3 = tf.Variable(tf.random_normal([1]))
out = tf.matmul(net_, w3) + b3
return inputs, action, out
# 计算critic网络梯度
def critic_gradient(self, inputs, action):
return self.sess.run(self.action_gradients, feed_dict={
self.inputs: inputs,
self.action: action
})[0]
# 训练critic网络
def train(self, inputs, action, predicted_q_value):
self.sess.run(self.optimizer, feed_dict={
self.inputs: inputs,
self.action: action,
self.predicted_q_value: predicted_q_value
})
# 预测Q值
def predict(self, inputs, action):
return self.sess.run(self.out, feed_dict={
self.inputs: inputs,
self.action: action
})
# 更新critic目标网络
def update_target_network(self):
self.sess.run(self.update_target_network_params)
# 获取critic目标网络参数
def get_target_network_params(self):
return self.sess.run(self.target_network_params)
# 保存critic网络参数
def save_network(self, save_path):
saver = tf.train.Saver()
saver.save(self.sess, save_path)
# 加载critic网络参数
def load_network(self, load_path):
saver = tf.train.Saver()
saver.restore(self.sess, load_path)
# 定义优先经验回放缓存类
class ReplayBuffer:
def __init__(self, buffer_size, batch_size):
self.buffer_size = buffer_size
self.batch_size = batch_size
self.buffer = []
self.priorities = np.zeros((buffer_size,), dtype=np.float32)
self.pos = 0
# 添加经验到缓存
def add(self, state, action, reward, next_state, done):
max_prio = self.priorities.max() if self.buffer else 1.0
self.buffer.append((state, action, reward, next_state, done))
self.priorities[self.pos] = max_prio
self.pos = (self.pos + 1) % self.buffer_size
# 计算重要性采样权重
def _get_weights(self, prob, num_samples):
weights = (self.buffer_size * prob) ** (-1 * num_samples)
return weights / weights.max()
# 从缓存中随机采样经验
def sample(self, beta):
prob = self.priorities / self.priorities.sum()
indices = np.random.choice(len(self.buffer), self.batch_size, p=prob)
samples = [self.buffer[idx] for idx in indices]
weights = self._get_weights(prob[indices], len(self.buffer))
states, actions, rewards, next_states, dones = zip(*samples)
return states, actions, rewards, next_states, dones, indices, weights
# 更新优先级
def update_priorities(self, indices, td_errors):
for idx, td_error in zip(indices, td_errors):
self.priorities[idx] = abs(td_error) + 1e-6
# 定义MADDPG算法类
class MADDPG:
def __init__(self, sess, state_dim, action_dim, action_bound, learning_rate_actor, learning_rate_critic,
tau, gamma, memory_size, batch_size, num_agents, prioritized_replay=False):
self.sess = sess
self.state_dim = state_dim
self.action_dim = action_dim
self.action_bound = action_bound
self.learning_rate_actor = learning_rate_actor
self.learning_rate_critic = learning_rate_critic
self.tau = tau
self.gamma = gamma
self.memory_size = memory_size
self.batch_size = batch_size
self.num_agents = num_agents
self.prioritized_replay = prioritized_replay
# 创建actor网络
self.actors = []
for i in range(num_agents):
actor = ActorNetwork(sess, state_dim, action_dim, action_bound, learning_rate_actor, tau)
self.actors.append(actor)
# 创建critic网络
self.critics = []
for i in range(num_agents):
critic = CriticNetwork(sess, state_dim, action_dim, learning_rate_critic, tau, gamma, i)
self.critics.append(critic)
# 创建优先经验回放缓存
if prioritized_replay:
self.memory = ReplayBuffer(memory_size, batch_size)
else:
self.memory = []
self.memory_size = memory_size
# 初始化MADDPG网络参数
self.sess.run(tf.global_variables_initializer())
# 预测动作
def predict(self, inputs):
actions = []
for i in range(self.num_agents):
action = self.actors[i].predict(inputs[i])
actions.append(action)
return actions
# 更新MADDPG网络
def update(self):
if len(self.memory) < self.batch_size:
return
# 从缓存中随机采样经验
if self.prioritized_replay:
states, actions, rewards, next_states, dones, indices, weights = self.memory.sample(1.0)
else:
samples = random.sample(self.memory, self.batch_size)
states, actions, rewards, next_states, dones = zip(*samples)
# 更新critic网络参数
target_next_actions = []
for i in range(self.num_agents):
target_next_action = self.actors[i].predict(next_states[i])
target_next_actions.append(target_next_action)
target_next_actions = np.concatenate(target_next_actions, axis=1)
q_value = []
for i in range(self.num_agents):
q = self.critics[i].predict(states[i], actions[i])
q_value.append(q)
q_value = np.concatenate(q_value, axis=1)
next_q_value = []
for i in range(self.num_agents):
next_q = self.critics[i].predict(next_states[i], target_next_actions)
next_q_value.append(next_q)
next_q_value = np.concatenate(next_q_value, axis=1)
td_targets = []
for i in range(self.num_agents):
td_target = []
for j in range(self.batch_size):
if dones[j]:
td_target.append(rewards[i][j])
else:
td_target.append(rewards[i][j] + self.gamma * next_q_value[j])
td_targets.append(np.reshape(td_target, [-1, 1]))
for i in range(self.num_agents):
td_error = td_targets[i] - q_value[:, i:i + 1]
if self.prioritized_replay:
self.memory.update_priorities(indices, td_error)
self.critics[i].train(states[i], actions[i], td_targets[i])
# 更新actor网络参数
actions = []
for i in range(self.num_agents):
action = self.actors[i].predict(states[i])
actions.append(action)
actions = np.concatenate(actions, axis=1)
critic_gradients = []
for i in range(self.num_agents):
critic_gradient = self.critics[i].critic_gradient(states[i], actions)
critic_gradients.append(critic_gradient)
critic_gradients = np.concatenate(critic_gradients, axis=1)
for i in range(self.num_agents):
self.actors[i].actor_gradient(states[i], critic_gradients[:, i:i + self.action_dim], self.batch_size)
# 更新目标网络
for i in range(self.num_agents):
self.actors[i].update_target_network()
self.critics[i].update_target_network()
# 添加经验到缓存
def add_experience(self, state, action, reward, next_state, done):
if self.prioritized_replay:
max_prio = self.memory.priorities.max() if self.memory.buffer else 1.0
self.memory.add(state, action, reward, next_state, done)
else:
if len(self.memory) < self.memory_size:
self.memory.append((state, action, reward, next_state, done))
else:
self.memory.pop(0)
self.memory.append((state, action, reward, next_state, done))
# 保存MADDPG网络参数
def save_network(self, save_path):
for i in range(self.num_agents):
self.actors[i].save_network(save_path + '_actor' + str(i))
self.critics[i].save_network(save_path + '_critic' + str(i))
# 加载MADDPG网络参数
def load_network(self, load_path):
for i in range(self.num_agents):
self.actors[i].load_network(load_path + '_actor' + str(i))
self.critics[i].load_network(load_path + '_critic' + str(i))
```
其中,ActorNetwork和CriticNetwork分别表示Actor网络和Critic网络的类,ReplayBuffer表示优先经验回放缓存的类,MADDPG表示使用MADDPG算法的类。该代码中使用了Tensorflow作为深度学习框架,可以根据自己的实际需求进行修改。
阅读全文