maddpg算法pytorch实例讲解
时间: 2024-01-17 12:03:04 浏览: 156
MADDPG是一种多智能体强化学习算法,它是DDPG的扩展版本。DDPG是一种深度强化学习算法,用于解决连续动作空间的问题。在MADDPG中,每个智能体都具有自己的actor和critic网络。这些网络被用来学习不同的策略,并且可以共享经验池中的经验。在此过程中,每个智能体都能够观察到其他智能体的状态,并且可以考虑其他智能体的行为。
下面是一个使用Pytorch实现MADDPG算法的示例代码:
```python
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from collections import deque
# 神经网络定义
class Actor(nn.Module):
def __init__(self, state_dim, action_dim, max_action):
super(Actor, self).__init__()
self.layer_1 = nn.Linear(state_dim, 256)
self.layer_2 = nn.Linear(256, 256)
self.layer_3 = nn.Linear(256, action_dim)
self.max_action = max_action
def forward(self, x):
x = torch.relu(self.layer_1(x))
x = torch.relu(self.layer_2(x))
x = self.max_action * torch.tanh(self.layer_3(x))
return x
class Critic(nn.Module):
def __init__(self, state_dim, action_dim):
super(Critic, self).__init__()
self.layer_1 = nn.Linear(state_dim + action_dim, 256)
self.layer_2 = nn.Linear(256, 256)
self.layer_3 = nn.Linear(256, 1)
def forward(self, x, u):
xu = torch.cat([x, u], 1)
xu = torch.relu(self.layer_1(xu))
xu = torch.relu(self.layer_2(xu))
xu = self.layer_3(xu)
return xu
# 经验回放缓冲区
class ReplayBuffer:
def __init__(self, max_size):
self.buffer = deque(maxlen=max_size)
def add(self, state, action, reward, next_state, done):
self.buffer.append((state, action, reward, next_state, done))
def sample(self, batch_size):
state, action, reward, next_state, done = zip(*np.random.choice(self.buffer, batch_size, replace=False))
return np.concatenate(state), np.concatenate(action), np.array(reward, dtype=np.float32), np.concatenate(next_state), np.array(done, dtype=np.uint8)
def __len__(self):
return len(self.buffer)
# MADDPG算法
class MADDPG:
def __init__(self, state_dim, action_dim, max_action, discount=0.99, tau=0.01):
self.discount = discount
self.tau = tau
self.memory = ReplayBuffer(1000000)
self.actor = Actor(state_dim, action_dim, max_action)
self.actor_target = Actor(state_dim, action_dim, max_action)
self.critic = Critic(state_dim, action_dim)
self.critic_target = Critic(state_dim, action_dim)
self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=0.001)
self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=0.001)
def get_action(self, state):
state = torch.FloatTensor(state.reshape(1, -1))
return self.actor(state).detach().numpy()[0]
def update(self, batch_size):
state, action, reward, next_state, done = self.memory.sample(batch_size)
state = torch.FloatTensor(state)
action = torch.FloatTensor(action)
reward = torch.FloatTensor(reward).unsqueeze(1)
next_state = torch.FloatTensor(next_state)
done = torch.FloatTensor(done).unsqueeze(1)
# 更新critic网络
next_actions = []
for i in range(action.shape[0]):
next_actions.append(self.actor_target(torch.FloatTensor(next_state[i]).unsqueeze(0)))
next_actions = torch.cat(next_actions, dim=0)
target_Q = self.critic_target(next_state, next_actions).detach()
y = reward + (1 - done) * self.discount * target_Q
current_Q = self.critic(state, action)
critic_loss = nn.MSELoss()(current_Q, y)
self.critic_optimizer.zero_grad()
critic_loss.backward()
self.critic_optimizer.step()
# 更新actor网络
actions = []
for i in range(action.shape[0]):
actions.append(self.actor(torch.FloatTensor(state[i]).unsqueeze(0)))
actions = torch.cat(actions, dim=0)
actor_loss = -self.critic(state, actions).mean()
self.actor_optimizer.zero_grad()
actor_loss.backward()
self.actor_optimizer.step()
# 更新target网络
for target_param, param in zip(self.actor_target.parameters(), self.actor.parameters()):
target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)
for target_param, param in zip(self.critic_target.parameters(), self.critic.parameters()):
target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)
# 训练
def train(env, agent, episodes, batch_size):
for episode in range(episodes):
state = env.reset()
done = False
while not done:
action = agent.get_action(state)
next_state, reward, done, _ = env.step(action)
agent.memory.add(state, action, reward, next_state, done)
state = next_state
if len(agent.memory) > batch_size:
agent.update(batch_size)
env = gym.make('MultiAgentPendulum-v0')
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.shape[0]
max_action = float(env.action_space.high[0])
agent = MADDPG(state_dim, action_dim, max_action)
train(env, agent, 10000, 256)
```
这段代码中,我们首先定义了Actor和Critic神经网络,然后定义了经验回放缓冲区。接着,我们实现了MADDPG算法,并在训练函数中使用该算法。在每个episode中,我们首先将环境重置,并在每个时间步中获取智能体的动作。然后,我们将经验添加到回放缓冲区中,并在缓冲区中积累一定的经验后,使用MADDPG算法更新每个智能体的actor和critic网络。
阅读全文