适用于离散动作空间,且智能体具有不同奖励函数的MADDPG代码案例
时间: 2023-12-27 13:02:34 浏览: 54
以下是一个适用于离散动作空间,且智能体具有不同奖励函数的MADDPG代码案例:
```python
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
# 定义 Actor 和 Critic 网络
class Actor(nn.Module):
def __init__(self, obs_dim, n_actions, hidden_size):
super().__init__()
self.fc1 = nn.Linear(obs_dim, hidden_size)
self.fc2 = nn.Linear(hidden_size, hidden_size)
self.fc3 = nn.Linear(hidden_size, n_actions)
def forward(self, obs):
x = F.relu(self.fc1(obs))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return F.softmax(x, dim=-1)
class Critic(nn.Module):
def __init__(self, obs_dim, n_actions, hidden_size):
super().__init__()
self.fc1 = nn.Linear(obs_dim + n_actions, hidden_size)
self.fc2 = nn.Linear(hidden_size, hidden_size)
self.fc3 = nn.Linear(hidden_size, 1)
def forward(self, obs, actions):
x = torch.cat([obs, actions], dim=-1)
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return x
# 定义 MADDPG 算法
class MADDPG:
def __init__(self, n_agents, obs_dim, n_actions, hidden_size, lr=1e-3, gamma=0.99, tau=0.01):
self.n_agents = n_agents
self.obs_dim = obs_dim
self.n_actions = n_actions
self.hidden_size = hidden_size
self.lr = lr
self.gamma = gamma
self.tau = tau
self.actors = [Actor(obs_dim, n_actions, hidden_size) for _ in range(n_agents)]
self.critics = [Critic(obs_dim * n_agents, n_actions * n_agents, hidden_size) for _ in range(n_agents)]
self.actor_optimizers = [optim.Adam(actor.parameters(), lr=lr) for actor in self.actors]
self.critic_optimizers = [optim.Adam(critic.parameters(), lr=lr) for critic in self.critics]
def get_actions(self, obs, noise=0.1):
actions = []
for i in range(self.n_agents):
obs_tensor = torch.FloatTensor(obs[i]).unsqueeze(0)
action_probs = self.actors[i](obs_tensor)
action = torch.multinomial(action_probs, 1).item()
action += np.random.normal(scale=noise)
action = np.clip(action, 0, self.n_actions - 1)
actions.append(action)
return actions
def update(self, transitions, reward_fn):
obs, actions, next_obs, rewards, done = zip(*transitions)
obs = torch.FloatTensor(obs)
actions = torch.FloatTensor(actions)
next_obs = torch.FloatTensor(next_obs)
rewards = torch.FloatTensor(rewards)
done = torch.FloatTensor(done)
# 计算每个智能体的 Q 值
q_values = []
for i in range(self.n_agents):
obs_i = obs[:, i, :]
actions_i = actions.view(len(actions), -1)[:, i*self.n_actions:(i+1)*self.n_actions]
q_value_i = self.critics[i](obs_i, actions_i)
q_values.append(q_value_i)
# 计算每个智能体的 target Q 值
target_q_values = []
for i in range(self.n_agents):
next_actions = []
for j in range(self.n_agents):
next_obs_j = next_obs[:, j, :]
next_action_probs_j = self.actors[j](next_obs_j)
next_action_j = torch.argmax(next_action_probs_j, dim=-1, keepdim=True)
next_actions.append(next_action_j)
next_actions = torch.cat(next_actions, dim=-1)
target_q_value_i = reward_fn(i, obs, actions, next_obs, rewards, done) + self.gamma * self.critics[i](next_obs[:, i, :], next_actions)
target_q_values.append(target_q_value_i)
# 更新每个智能体的 Critic 网络
for i in range(self.n_agents):
critic_loss = F.mse_loss(q_values[i], target_q_values[i])
self.critic_optimizers[i].zero_grad()
critic_loss.backward()
self.critic_optimizers[i].step()
# 更新每个智能体的 Actor 网络
for i in range(self.n_agents):
actions_i = actions.view(len(actions), -1)[:, i*self.n_actions:(i+1)*self.n_actions]
actor_loss = -self.critics[i](obs[:, i, :], actions_i).mean()
self.actor_optimizers[i].zero_grad()
actor_loss.backward()
self.actor_optimizers[i].step()
# 更新每个智能体的 target Actor 网络
for i in range(self.n_agents):
for param, target_param in zip(self.actors[i].parameters(), self.target_actors[i].parameters()):
target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)
```
其中,`n_agents` 表示智能体的数量,`obs_dim` 表示每个智能体的观察空间维度,`n_actions` 表示每个智能体的动作空间维度,`hidden_size` 表示神经网络的隐藏层大小,`lr` 表示学习率,`gamma` 表示折扣因子,`tau` 表示目标网络更新速率。`get_actions` 方法用于根据当前观察状态选择动作,`update` 方法用于根据经验样本更新模型参数。`reward_fn` 是一个函数,用于计算每个智能体的奖励值。在 `update` 方法中,首先计算每个智能体的 Q 值和 target Q 值,然后分别更新 Critic 和 Actor 网络,最后更新每个智能体的 target Actor 网络。