python pytorch DQN GPU
时间: 2025-01-03 09:24:48 浏览: 5
### 如何在PyTorch中使用GPU加速DQN(深度Q网络)的训练
#### 加载必要的库和模块
为了利用GPU进行高效的矩阵运算,在编写代码前需导入PyTorch及相关组件。
```python
import torch
import torch.nn as nn
import torch.optim as optim
from collections import namedtuple, deque
import random
```
#### 初始化设备设置
确认是否有可用的CUDA环境,并据此设定默认张量类型以及初始化模型实例。这一步骤对于后续操作至关重要,因为所有的张量都需要被移动到相同的设备上才能执行计算。
```python
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # 设置优先选用GPU作为处理单元[^2]
class DQN(nn.Module):
def __init__(self, input_dim, output_dim):
super(DQN, self).__init__()
self.fc = nn.Sequential(
nn.Linear(input_dim, 128),
nn.ReLU(),
nn.Linear(128, output_dim)
)
def forward(self, x):
return self.fc(x)
policy_net = DQN(n_observations, n_actions).to(device) # 创建策略网络并将其实例化对象迁移到指定设备上[^1]
target_net = DQN(n_observations, n_actions).to(device) # 同样地创建目标网络并迁移至相同位置
```
#### 数据预处理与传输
当准备好了要使用的数据集之后,应当将其转换成适合于送入神经网络的形式——即`torch.Tensor`类型的变量;接着通过`.to()`方法将这些张量转移到选定好的硬件资源上去完成实际的数据传递过程。
```python
state_tensor = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0) # 转换状态为Tensor并向量化以便输入给NN
action_tensor = ...
reward_tensor = ...
transition = (state_tensor, action_tensor, next_state_tensor, reward_tensor)
memory.push(*transition) # 存储经验元组进入回放缓冲区
```
#### 执行批量更新
从记忆池随机抽取一批次的经验样本用于反向传播调整权重参数。注意这里同样涉及到大量的张量间交互,因此也需要确保它们都处于同一物理地址空间内以保障高效运行效率。
```python
def optimize_model():
if len(memory) < BATCH_SIZE:
return
transitions = memory.sample(BATCH_SIZE)
batch = Transition(*zip(*transitions))
non_final_mask = torch.tensor(tuple(map(lambda s: s is not None,
batch.next_state)), device=device, dtype=torch.bool)
non_final_next_states = torch.cat([s for s in batch.next_state
if s is not None])
state_batch = torch.cat(batch.state).to(device) # 移动当前状态批次到GPU
action_batch = torch.cat(batch.action).to(device) # 移动动作批次到GPU
reward_batch = torch.cat(batch.reward).to(device) # 移动奖励批次到GPU
# Compute Q(s_t, a) - the model computes Q(s_t), then we select the columns of actions taken.
current_q_values = policy_net(state_batch).gather(1, action_batch.unsqueeze(-1)).squeeze()
# Compute V(s_{t+1}) for all next states.
with torch.no_grad():
expected_q_values = torch.zeros(BATCH_SIZE, device=device)
expected_q_values[non_final_mask] = target_net(non_final_next_states).max(dim=1)[0].detach()
# Compute Huber loss
criterion = nn.SmoothL1Loss()
loss = criterion(current_q_values, reward_batch + GAMMA * expected_q_values)
optimizer.zero_grad(set_to_none=True)
loss.backward()
optimizer.step()
```
阅读全文