适用于离散动作空间的MADDPG代码
时间: 2023-07-02 09:16:55 浏览: 337
以下是适用于离散动作空间的MADDPG代码的框架:
```python
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import random
from collections import deque
from itertools import count
# 定义神经网络
class Actor(nn.Module):
def __init__(self, state_dim, action_dim, hidden_dim):
super(Actor, self).__init__()
self.fc1 = nn.Linear(state_dim, hidden_dim)
self.fc2 = nn.Linear(hidden_dim, hidden_dim)
self.fc3 = nn.Linear(hidden_dim, action_dim)
def forward(self, state):
x = F.relu(self.fc1(state))
x = F.relu(self.fc2(x))
x = F.softmax(self.fc3(x), dim=-1)
return x
class Critic(nn.Module):
def __init__(self, state_dim, action_dim, hidden_dim):
super(Critic, self).__init__()
self.fc1 = nn.Linear(state_dim + action_dim, hidden_dim)
self.fc2 = nn.Linear(hidden_dim, hidden_dim)
self.fc3 = nn.Linear(hidden_dim, 1)
def forward(self, state, action):
x = torch.cat([state, action], dim=-1)
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return x
# 定义MADDPG算法
class MADDPG:
def __init__(self, state_dim, action_dim, hidden_dim, gamma, tau, lr, device):
self.actor_local = Actor(state_dim, action_dim, hidden_dim).to(device)
self.actor_target = Actor(state_dim, action_dim, hidden_dim).to(device)
self.actor_optimizer = torch.optim.Adam(self.actor_local.parameters(), lr=lr)
self.critic_local = Critic(state_dim, action_dim, hidden_dim).to(device)
self.critic_target = Critic(state_dim, action_dim, hidden_dim).to(device)
self.critic_optimizer = torch.optim.Adam(self.critic_local.parameters(), lr=lr)
self.gamma = gamma
self.tau = tau
self.device = device
def act(self, state):
state = torch.FloatTensor(state).to(self.device)
self.actor_local.eval()
with torch.no_grad():
action_probs = self.actor_local(state)
self.actor_local.train()
actions = [np.random.choice(np.arange(len(prob)), p=prob.detach().cpu().numpy()) for prob in action_probs]
return actions
def update(self, experiences):
states, actions, rewards, next_states, dones = experiences
states = torch.FloatTensor(states).to(self.device)
actions = torch.LongTensor(actions).unsqueeze(-1).to(self.device)
rewards = torch.FloatTensor(rewards).unsqueeze(-1).to(self.device)
next_states = torch.FloatTensor(next_states).to(self.device)
dones = torch.FloatTensor(dones).unsqueeze(-1).to(self.device)
# 更新critic网络
Q_targets_next = self.critic_target(next_states, self.actor_target(next_states))
Q_targets = rewards + (self.gamma * Q_targets_next * (1 - dones))
Q_expected = self.critic_local(states, actions)
critic_loss = F.mse_loss(Q_expected, Q_targets)
self.critic_optimizer.zero_grad()
critic_loss.backward()
self.critic_optimizer.step()
# 更新actor网络
actions_pred = self.actor_local(states)
actor_loss = -self.critic_local(states, actions_pred).mean()
self.actor_optimizer.zero_grad()
actor_loss.backward()
self.actor_optimizer.step()
# 更新target网络
self.soft_update(self.critic_local, self.critic_target)
self.soft_update(self.actor_local, self.actor_target)
def soft_update(self, local_model, target_model):
for target_param, local_param in zip(target_model.parameters(), local_model.parameters()):
target_param.data.copy_(self.tau * local_param.data + (1.0 - self.tau) * target_param.data)
```
在训练过程中,需要定义一个 replay buffer,用于存储经验,在每个时间步从 buffer 中随机采样一批经验进行训练,具体实现可以参考以下代码:
```python
from collections import deque
replay_buffer = deque(maxlen=10000)
for i_episode in range(1000):
state = env.reset()
for t in count():
# 在环境中执行动作
action = maddpg.act(state)
next_state, reward, done, _ = env.step(action)
# 存储经验到 replay buffer 中
replay_buffer.append((state, action, reward, next_state, done))
state = next_state
# 从 replay buffer 中随机采样一批经验进行训练
if len(replay_buffer) >= batch_size:
experiences = [replay_buffer.popleft() for _ in range(batch_size)]
maddpg.update(experiences)
if done:
break
```
其中,`env` 表示环境对象,`batch_size` 表示每次训练时从 replay buffer 中采样的经验数。需要注意的是,在离散动作空间下,需要使用 softmax 函数将 actor 输出的动作概率规范化。
阅读全文