编写MADDPG代码,适用于有两个不同奖励函数的智能体,且它们的动作空间是离散的场景
时间: 2024-02-12 17:09:59 浏览: 81
两个数相加c语言
以下是适用于有两个不同奖励函数的智能体,且它们的动作空间是离散的MADDPG代码示例:
```python
import numpy as np
import torch
import torch.nn.functional as F
from torch.optim import Adam
from collections import deque
import random
# 定义Actor网络
class ActorNet(torch.nn.Module):
def __init__(self, state_dim, action_dim, hidden_dim):
super(ActorNet, self).__init__()
self.fc1 = torch.nn.Linear(state_dim, hidden_dim)
self.fc2 = torch.nn.Linear(hidden_dim, hidden_dim)
self.fc3 = torch.nn.Linear(hidden_dim, action_dim)
def forward(self, x):
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = F.softmax(self.fc3(x), dim=1)
return x
# 定义Critic网络
class CriticNet(torch.nn.Module):
def __init__(self, state_dim, action_dim, hidden_dim):
super(CriticNet, self).__init__()
self.fc1 = torch.nn.Linear(state_dim+action_dim, hidden_dim)
self.fc2 = torch.nn.Linear(hidden_dim, hidden_dim)
self.fc3 = torch.nn.Linear(hidden_dim, 1)
def forward(self, state, action):
x = torch.cat([state, action], dim=1)
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return x
# 定义智能体
class MADDPG:
def __init__(self, state_dim, action_dim, hidden_dim, lr):
self.n_agents = 2
self.state_dim = state_dim
self.action_dim = action_dim
self.hidden_dim = hidden_dim
self.lr = lr
# 初始化Actor网络和Critic网络
self.actor_nets = [ActorNet(state_dim, action_dim, hidden_dim) for _ in range(self.n_agents)]
self.critic_nets = [CriticNet(state_dim*self.n_agents, action_dim*self.n_agents, hidden_dim) for _ in range(self.n_agents)]
self.target_actor_nets = [ActorNet(state_dim, action_dim, hidden_dim) for _ in range(self.n_agents)]
self.target_critic_nets = [CriticNet(state_dim*self.n_agents, action_dim*self.n_agents, hidden_dim) for _ in range(self.n_agents)]
# 初始化优化器
self.actor_optimizers = [Adam(actor_net.parameters(), lr=self.lr) for actor_net in self.actor_nets]
self.critic_optimizers = [Adam(critic_net.parameters(), lr=self.lr) for critic_net in self.critic_nets]
# 初始化经验池
self.memory = deque(maxlen=10000)
# 初始化超参数
self.batch_size = 128
self.gamma = 0.95
self.tau = 0.01
# 选择动作
def select_action(self, state, agent_id):
state = torch.FloatTensor(state).unsqueeze(0)
action_probs = self.actor_nets[agent_id](state)
action = np.random.choice(self.action_dim, p=action_probs.detach().numpy()[0])
return action
# 添加经验
def add_experience(self, state, action, reward, next_state, done):
self.memory.append((state, action, reward, next_state, done))
# 更新网络
def update(self):
# 从经验池中获取样本
if len(self.memory) < self.batch_size:
return
batch = random.sample(self.memory, self.batch_size)
states, actions, rewards, next_states, dones = zip(*batch)
states = torch.FloatTensor(states)
actions = torch.LongTensor(actions).view(self.batch_size, -1)
rewards = torch.FloatTensor(rewards).view(self.batch_size, -1)
next_states = torch.FloatTensor(next_states)
dones = torch.FloatTensor(dones).view(self.batch_size, -1)
# 计算每个智能体的目标值
target_actions = []
for i in range(self.n_agents):
target_action_probs = self.target_actor_nets[i](next_states)
target_action = torch.argmax(target_action_probs, dim=1, keepdim=True)
target_actions.append(target_action)
target_actions = torch.cat(target_actions, dim=1)
target_qs = []
for i in range(self.n_agents):
target_qs.append(self.target_critic_nets[i](next_states, target_actions))
target_qs = torch.cat(target_qs, dim=1)
targets = rewards + self.gamma * target_qs * (1 - dones)
# 更新Critic网络
for i in range(self.n_agents):
self.critic_optimizers[i].zero_grad()
q = self.critic_nets[i](states, actions)
critic_loss = F.mse_loss(q, targets.detach())
critic_loss.backward()
self.critic_optimizers[i].step()
# 更新Actor网络
for i in range(self.n_agents):
self.actor_optimizers[i].zero_grad()
actions_probs = self.actor_nets[i](states)
actions_probs = actions_probs.detach()
actions_probs[:, actions[:, i]] = F.softmax(actions_probs[:, actions[:, i]], dim=1)
q_values = self.critic_nets[i](states, actions_probs)
actor_loss = -q_values.mean()
actor_loss.backward()
self.actor_optimizers[i].step()
# 更新目标网络
for i in range(self.n_agents):
for target_param, param in zip(self.target_actor_nets[i].parameters(), self.actor_nets[i].parameters()):
target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)
for target_param, param in zip(self.target_critic_nets[i].parameters(), self.critic_nets[i].parameters()):
target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)
```
在这个例子中,我们定义了两个Actor网络和两个Critic网络,每个智能体都有一个Actor和一个Critic。其中,Actor网络的输出是一个概率分布,用于确定智能体的行动;Critic网络的输出是估计的Q值,用于评估智能体的行动。在训练过程中,我们使用经验重放缓存器来存储智能体的经验,然后从缓存器中随机选择一批经验进行训练。在更新Critic网络时,我们使用均方误差作为损失函数;在更新Actor网络时,我们使用Critic网络的输出作为损失函数,并采用随机梯度下降法进行优化。为了避免过度拟合,我们使用了目标网络进行训练,即每个智能体都有一个目标Actor网络和一个目标Critic网络。在每次更新后,我们使用软更新法来更新目标网络的参数。
阅读全文