DDPG和GCN的结合代码
时间: 2024-05-06 15:02:08 浏览: 205
DDPG (Deep Deterministic Policy Gradient) 和 GCN (Graph Convolutional Network) 的结合可以用于解决带有图结构的连续动作空间的强化学习问题。下面是一个简单的示例代码:
```python
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import gym
import dgl
from dgl.nn.pytorch import GraphConv
# 定义 DDPG 神经网络
class Actor(nn.Module):
def __init__(self, obs_dim, act_dim):
super(Actor, self).__init__()
self.fc1 = nn.Linear(obs_dim, 256)
self.fc2 = nn.Linear(256, 256)
self.fc3 = nn.Linear(256, act_dim)
def forward(self, obs):
x = F.relu(self.fc1(obs))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return x
class Critic(nn.Module):
def __init__(self, obs_dim, act_dim):
super(Critic, self).__init__()
self.fc1 = nn.Linear(obs_dim + act_dim, 256)
self.fc2 = nn.Linear(256, 256)
self.fc3 = nn.Linear(256, 1)
def forward(self, obs, act):
x = torch.cat([obs, act], dim=-1)
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return x
# 定义 GCN 神经网络
class GCN(nn.Module):
def __init__(self, in_feats, out_feats):
super(GCN, self).__init__()
self.conv1 = GraphConv(in_feats, 64)
self.conv2 = GraphConv(64, out_feats)
def forward(self, g, inputs):
h = self.conv1(g, inputs)
h = F.relu(h)
h = self.conv2(g, h)
return h
# 定义整个网络
class DDPG_GCN(nn.Module):
def __init__(self, obs_dim, act_dim, in_feats, out_feats):
super(DDPG_GCN, self).__init__()
self.actor = Actor(obs_dim, act_dim)
self.critic = Critic(obs_dim, act_dim)
self.gcn = GCN(in_feats, out_feats)
def forward(self, obs, g):
x = self.actor(obs)
x = torch.tanh(x)
h = self.gcn(g, obs)
q = self.critic(h, x)
return q
# 初始化环境和模型
env = gym.make('Pendulum-v0')
obs_dim = env.observation_space.shape[0]
act_dim = env.action_space.shape[0]
in_feats = obs_dim
out_feats = 64
model = DDPG_GCN(obs_dim, act_dim, in_feats, out_feats)
optimizer = optim.Adam(model.parameters(), lr=1e-3)
# 训练
max_episodes = 1000
max_steps = 200
batch_size = 64
gamma = 0.99
tau = 0.005
buffer_size = 100000
replay_buffer = []
for i in range(max_episodes):
obs = env.reset()
ep_reward = 0
for t in range(max_steps):
# 选择动作
with torch.no_grad():
obs_tensor = torch.FloatTensor(obs)
g = dgl.graph(([0, 1], [1, 2]), num_nodes=3) # 构造简单的图结构
q = model(obs_tensor, g)
action = q.cpu().numpy()
noise = np.random.normal(scale=0.1, size=act_dim)
action = np.clip(action + noise, -1, 1)
# 执行动作
next_obs, reward, done, _ = env.step(action)
ep_reward += reward
# 存储经验到重放缓存
replay_buffer.append((obs, action, reward, next_obs, done))
if len(replay_buffer) > buffer_size:
replay_buffer.pop(0)
obs = next_obs
# 更新模型
if len(replay_buffer) > batch_size:
batch = np.random.choice(replay_buffer, batch_size, replace=False)
obs_batch = torch.FloatTensor(batch[:, 0])
act_batch = torch.FloatTensor(batch[:, 1])
rew_batch = torch.FloatTensor(batch[:, 2])
next_obs_batch = torch.FloatTensor(batch[:, 3])
done_batch = torch.FloatTensor(batch[:, 4])
g_batch = dgl.batch([dgl.graph(([0, 1], [1, 2]), num_nodes=3) for i in range(batch_size)]) # 批量构造图结构
q = model(obs_batch, g_batch)
next_q = model(next_obs_batch, g_batch)
next_action = torch.tanh(model.actor(next_obs_batch))
target_q = rew_batch + gamma * (1 - done_batch) * next_q
critic_loss = F.mse_loss(q, target_q.detach())
optimizer.zero_grad()
critic_loss.backward()
optimizer.step()
actor_loss = -model.critic(model.gcn(g_batch, obs_batch), model.actor(obs_batch)).mean()
optimizer.zero_grad()
actor_loss.backward()
optimizer.step()
for target_param, param in zip(model.parameters(), model.parameters()):
target_param.data.copy_(tau * param + (1 - tau) * target_param)
if done:
break
print('Episode: {}, Reward: {}'.format(i+1, ep_reward))
```
需要注意的是,这只是一个简单的示例代码,实际应用中需要根据具体的问题进行更加细致的调整和优化。
阅读全文