GRU模型预测交通量具体实例及代码
时间: 2023-06-17 09:07:42 浏览: 127
以下是使用GRU模型进行交通流量预测的具体实例和代码:
1. 数据准备
首先需要准备交通流量的历史数据。在这个实例中,我们使用了Los Angeles的交通数据集。数据集包含了2015年1月到2016年3月期间的交通流量数据,时间间隔为5分钟。数据集可以在这里下载:https://archive.ics.uci.edu/ml/datasets/Metro+Interstate+Traffic+Volume
2. 数据预处理
在预测之前,我们需要对数据进行预处理。具体来说,我们需要将数据集分成训练集和测试集,并对数据进行归一化处理。
```python
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
# 读取数据集
data = pd.read_csv('Metro_Interstate_Traffic_Volume.csv')
# 将时间列转换为datetime类型
data['date_time'] = pd.to_datetime(data['date_time'])
# 创建新的时间列
data['year'] = data['date_time'].dt.year
data['month'] = data['date_time'].dt.month
data['day'] = data['date_time'].dt.day
data['hour'] = data['date_time'].dt.hour
data['minute'] = data['date_time'].dt.minute
# 将时间列设置为索引
data.set_index('date_time', inplace=True)
# 取出2015年1月到2016年2月的数据作为训练集
train_data = data.loc['2015-01-01':'2016-02-29']
# 取出2016年3月的数据作为测试集
test_data = data.loc['2016-03-01':]
# 将训练集和测试集的数据归一化
scaler = MinMaxScaler()
train_data = scaler.fit_transform(train_data)
test_data = scaler.transform(test_data)
```
3. 创建GRU模型
我们使用Keras库创建GRU模型。在这个实例中,我们使用了一个包含3个GRU层的模型。
```python
from keras.models import Sequential
from keras.layers import Dense, GRU
# 创建GRU模型
model = Sequential()
model.add(GRU(units=64, input_shape=(train_data.shape[1], 1), return_sequences=True))
model.add(GRU(units=64, return_sequences=True))
model.add(GRU(units=64))
model.add(Dense(units=1))
# 编译模型
model.compile(optimizer='adam', loss='mean_squared_error')
```
4. 训练模型
训练模型时,我们使用了训练集中的历史数据作为输入,预测下一个时间步的交通流量。
```python
import numpy as np
# 准备训练数据
X_train = []
y_train = []
for i in range(60, len(train_data)):
X_train.append(train_data[i-60:i])
y_train.append(train_data[i])
X_train = np.array(X_train)
y_train = np.array(y_train)
# 训练模型
model.fit(X_train, y_train, epochs=50, batch_size=32)
```
5. 测试模型
测试模型时,我们使用了测试集中的历史数据作为输入,预测下一个时间步的交通流量,并将预测结果与实际值进行比较。
```python
# 准备测试数据
X_test = []
y_test = []
for i in range(60, len(test_data)):
X_test.append(test_data[i-60:i])
y_test.append(test_data[i])
X_test = np.array(X_test)
y_test = np.array(y_test)
# 预测测试集
y_pred = model.predict(X_test)
# 将预测值和实际值进行反归一化
y_pred = scaler.inverse_transform(y_pred)
y_test = scaler.inverse_transform(y_test)
# 计算均方根误差
from sklearn.metrics import mean_squared_error
rmse = np.sqrt(mean_squared_error(y_test, y_pred))
print('RMSE:', rmse)
```
完整代码如下:
```python
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
import numpy as np
from keras.models import Sequential
from keras.layers import Dense, GRU
from sklearn.metrics import mean_squared_error
# 读取数据集
data = pd.read_csv('Metro_Interstate_Traffic_Volume.csv')
# 将时间列转换为datetime类型
data['date_time'] = pd.to_datetime(data['date_time'])
# 创建新的时间列
data['year'] = data['date_time'].dt.year
data['month'] = data['date_time'].dt.month
data['day'] = data['date_time'].dt.day
data['hour'] = data['date_time'].dt.hour
data['minute'] = data['date_time'].dt.minute
# 将时间列设置为索引
data.set_index('date_time', inplace=True)
# 取出2015年1月到2016年2月的数据作为训练集
train_data = data.loc['2015-01-01':'2016-02-29']
# 取出2016年3月的数据作为测试集
test_data = data.loc['2016-03-01':]
# 将训练集和测试集的数据归一化
scaler = MinMaxScaler()
train_data = scaler.fit_transform(train_data)
test_data = scaler.transform(test_data)
# 创建GRU模型
model = Sequential()
model.add(GRU(units=64, input_shape=(train_data.shape[1], 1), return_sequences=True))
model.add(GRU(units=64, return_sequences=True))
model.add(GRU(units=64))
model.add(Dense(units=1))
# 编译模型
model.compile(optimizer='adam', loss='mean_squared_error')
# 准备训练数据
X_train = []
y_train = []
for i in range(60, len(train_data)):
X_train.append(train_data[i-60:i])
y_train.append(train_data[i])
X_train = np.array(X_train)
y_train = np.array(y_train)
# 训练模型
model.fit(X_train, y_train, epochs=50, batch_size=32)
# 准备测试数据
X_test = []
y_test = []
for i in range(60, len(test_data)):
X_test.append(test_data[i-60:i])
y_test.append(test_data[i])
X_test = np.array(X_test)
y_test = np.array(y_test)
# 预测测试集
y_pred = model.predict(X_test)
# 将预测值和实际值进行反归一化
y_pred = scaler.inverse_transform(y_pred)
y_test = scaler.inverse_transform(y_test)
# 计算均方根误差
rmse = np.sqrt(mean_squared_error(y_test, y_pred))
print('RMSE:', rmse)
```
阅读全文