图卷积神经网络交通流量预测代码
时间: 2023-06-30 19:13:29 浏览: 136
以下是一个使用PyTorch实现的基于图卷积神经网络(GCN)的交通流量预测代码,可以用于预测城市中的道路网络上的交通流量情况:
```python
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torch.optim import Adam
from torch.optim.lr_scheduler import StepLR
from torch.utils.data import Dataset
import numpy as np
import pandas as pd
import argparse
from datetime import datetime
from tqdm import tqdm
# 参数解析
parser = argparse.ArgumentParser(description='GCN traffic flow prediction')
parser.add_argument('--data_path', type=str, default='data/METR-LA', help='数据集路径')
parser.add_argument('--device', type=str, default='cpu', help='cpu或gpu')
parser.add_argument('--epochs', type=int, default=100, help='训练epochs')
parser.add_argument('--lr', type=float, default=1e-3, help='学习率')
parser.add_argument('--weight_decay', type=float, default=5e-4, help='权重衰减')
parser.add_argument('--batch_size', type=int, default=64, help='batch 大小')
parser.add_argument('--num_layers', type=int, default=1, help='GCN层数')
parser.add_argument('--hidden_size', type=int, default=64, help='GCN隐藏层大小')
parser.add_argument('--dropout', type=float, default=0.5, help='GCN中的dropout率')
parser.add_argument('--print_every', type=int, default=10, help='每隔几个epoch打印一次')
args = parser.parse_args()
# 加载数据
class TrafficDataset(Dataset):
def __init__(self, data_path):
super().__init__()
self.data_path = data_path
self.adj_mx = np.load('{}/adj_mx.npy'.format(data_path))
self.features = np.load('{}/features.npy'.format(data_path))
self.labels = np.load('{}/labels.npy'.format(data_path))
def __getitem__(self, index):
return self.adj_mx, self.features[index], self.labels[index]
def __len__(self):
return self.features.shape[0]
# GCN模型定义
class GCN(nn.Module):
def __init__(self, num_layers, hidden_size, input_size, dropout):
super(GCN, self).__init__()
self.num_layers = num_layers
self.dropout = dropout
self.gcn_layers = nn.ModuleList()
for i in range(num_layers):
if i == 0:
self.gcn_layers.append(GCNLayer(input_size, hidden_size))
else:
self.gcn_layers.append(GCNLayer(hidden_size, hidden_size))
def forward(self, adj_mx, x):
for i in range(self.num_layers):
x = self.gcn_layers[i](adj_mx, x)
x = F.relu(x)
x = F.dropout(x, self.dropout, training=self.training)
return x
class GCNLayer(nn.Module):
def __init__(self, input_size, hidden_size):
super(GCNLayer, self).__init__()
self.input_size = input_size
self.hidden_size = hidden_size
self.weight = nn.Parameter(torch.Tensor(input_size, hidden_size))
self.bias = nn.Parameter(torch.Tensor(hidden_size))
self.reset_parameters()
def reset_parameters(self):
stdv = 1.0 / np.sqrt(self.hidden_size)
self.weight.data.uniform_(-stdv, stdv)
self.bias.data.uniform_(-stdv, stdv)
def forward(self, adj_mx, x):
support = torch.mm(x, self.weight) # (batch_size, hidden_size)
output = torch.spmm(adj_mx, support) # (num_nodes, hidden_size)
output = output + self.bias
return output
# 训练函数
def train(model, device, train_loader, optimizer, epoch):
model.train()
train_loss = 0
for i, (adj_mx, features, labels) in enumerate(train_loader):
adj_mx, features, labels = adj_mx.to(device), features.to(device), labels.to(device)
optimizer.zero_grad()
output = model(adj_mx, features)
loss = F.mse_loss(output, labels)
loss.backward()
optimizer.step()
train_loss += loss.item()
train_loss /= (i + 1)
if epoch % args.print_every == 0:
print('Epoch: {}, Train Loss: {:.4f}'.format(epoch, train_loss))
# 测试函数
def test(model, device, test_loader):
model.eval()
test_loss = 0
with torch.no_grad():
for i, (adj_mx, features, labels) in enumerate(test_loader):
adj_mx, features, labels = adj_mx.to(device), features.to(device), labels.to(device)
output = model(adj_mx, features)
loss = F.mse_loss(output, labels)
test_loss += loss.item()
test_loss /= (i + 1)
print('Test Loss: {:.4f}'.format(test_loss))
# 主函数
def main():
# 加载数据
dataset = TrafficDataset(args.data_path)
train_size = int(len(dataset) * 0.8)
test_size = len(dataset) - train_size
train_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, test_size])
train_loader = DataLoader(train_dataset, batch_size=args.batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=args.batch_size, shuffle=True)
# 模型定义
adj_mx = dataset.adj_mx
input_size = dataset.features.shape[1]
model = GCN(args.num_layers, args.hidden_size, input_size, args.dropout).to(args.device)
# 优化器定义
optimizer = Adam(model.parameters(), lr=args.lr, weight_decay=args.weight_decay)
scheduler = StepLR(optimizer, step_size=20, gamma=0.5)
# 训练和测试
for epoch in range(1, args.epochs + 1):
train(model, args.device, train_loader, optimizer, epoch)
test(model, args.device, test_loader)
scheduler.step()
if __name__ == '__main__':
main()
```
需要注意的是,该代码中使用了一个包含了METR-LA数据集的数据集,如果需要使用其他数据集进行预测,则需要修改数据集加载的部分。此外,该代码中使用的是均方误差作为损失函数,如果需要使用其他损失函数,则需要修改相应的部分。
阅读全文