Python实现LSTM多变量多步滚动预测以及可视化和误差分析代码
时间: 2024-01-09 13:03:46 浏览: 238
Python中利用LSTM模型进行时间序列预测分析的实现
5星 · 资源好评率100%
以下是Python实现LSTM多变量多步滚动预测的代码,包括可视化和误差分析:
首先,我们需要导入所需的库和数据:
```python
import numpy as np
import pandas as pd
from keras.models import Sequential
from keras.layers import LSTM, Dense
from sklearn.preprocessing import MinMaxScaler
import matplotlib.pyplot as plt
# 读取数据
df = pd.read_csv('data.csv', parse_dates=['date'])
df.set_index('date', inplace=True)
```
我们需要将数据进行标准化:
```python
# 标准化数据
scaler = MinMaxScaler()
scaled_data = scaler.fit_transform(df)
```
接下来,我们需要定义函数来将数据转换为LSTM模型的输入格式。我们将使用多个时间步作为输入来预测多个时间步。
```python
def create_dataset(data, look_back=1, look_forward=1):
X, y = [], []
for i in range(len(data) - look_back - look_forward + 1):
X.append(data[i:(i+look_back), :])
y.append(data[(i+look_back):(i+look_back+look_forward), 0])
return np.array(X), np.array(y)
```
现在,我们可以将数据转换为LSTM模型的输入格式:
```python
# 定义时间步长
look_back = 12
look_forward = 6
# 转换数据成LSTM模型的输入格式
X, y = create_dataset(scaled_data, look_back, look_forward)
```
我们需要将数据分为训练集和测试集:
```python
# 划分训练集和测试集
train_size = int(len(X) * 0.8)
test_size = len(X) - train_size
X_train, X_test = X[0:train_size,:,:], X[train_size:len(X),:,:]
y_train, y_test = y[0:train_size,:], y[train_size:len(y),:]
```
我们需要定义LSTM模型:
```python
# 定义LSTM模型
model = Sequential()
model.add(LSTM(50, input_shape=(X_train.shape[1], X_train.shape[2])))
model.add(Dense(look_forward))
model.compile(loss='mean_squared_error', optimizer='adam')
```
现在,我们可以训练模型:
```python
# 训练模型
model.fit(X_train, y_train, epochs=100, batch_size=32, validation_data=(X_test, y_test))
```
接下来,我们可以使用模型来进行预测:
```python
# 预测数据
train_predict = model.predict(X_train)
test_predict = model.predict(X_test)
```
我们需要将预测结果反标准化:
```python
# 反标准化预测值
train_predict = scaler.inverse_transform(np.concatenate((train_predict, np.zeros((train_predict.shape[0], scaled_data.shape[1]-look_forward))), axis=1))[:,0]
y_train = scaler.inverse_transform(np.concatenate((y_train, np.zeros((y_train.shape[0], scaled_data.shape[1]-look_forward))), axis=1))[:,0]
test_predict = scaler.inverse_transform(np.concatenate((test_predict, np.zeros((test_predict.shape[0], scaled_data.shape[1]-look_forward))), axis=1))[:,0]
y_test = scaler.inverse_transform(np.concatenate((y_test, np.zeros((y_test.shape[0], scaled_data.shape[1]-look_forward))), axis=1))[:,0]
```
我们可以绘制训练集和测试集的预测结果:
```python
# 绘制训练集和测试集的预测结果
plt.plot(df.index[look_back:train_size], y_train, label='Train Data')
plt.plot(df.index[look_back:train_size], train_predict, label='Train Predict')
plt.plot(df.index[train_size+look_back:], y_test, label='Test Data')
plt.plot(df.index[train_size+look_back:], test_predict, label='Test Predict')
plt.legend()
plt.show()
```
最后,我们可以计算模型的误差:
```python
# 计算模型的误差
train_rmse = np.sqrt(np.mean(np.square(train_predict - y_train)))
test_rmse = np.sqrt(np.mean(np.square(test_predict - y_test)))
print('Train RMSE: %.3f' % train_rmse)
print('Test RMSE: %.3f' % test_rmse)
```
完整的代码如下:
```python
import numpy as np
import pandas as pd
from keras.models import Sequential
from keras.layers import LSTM, Dense
from sklearn.preprocessing import MinMaxScaler
import matplotlib.pyplot as plt
# 读取数据
df = pd.read_csv('data.csv', parse_dates=['date'])
df.set_index('date', inplace=True)
# 标准化数据
scaler = MinMaxScaler()
scaled_data = scaler.fit_transform(df)
# 定义函数将数据转换为LSTM模型的输入格式
def create_dataset(data, look_back=1, look_forward=1):
X, y = [], []
for i in range(len(data) - look_back - look_forward + 1):
X.append(data[i:(i+look_back), :])
y.append(data[(i+look_back):(i+look_back+look_forward), 0])
return np.array(X), np.array(y)
# 定义时间步长
look_back = 12
look_forward = 6
# 转换数据成LSTM模型的输入格式
X, y = create_dataset(scaled_data, look_back, look_forward)
# 划分训练集和测试集
train_size = int(len(X) * 0.8)
test_size = len(X) - train_size
X_train, X_test = X[0:train_size,:,:], X[train_size:len(X),:,:]
y_train, y_test = y[0:train_size,:], y[train_size:len(y),:]
# 定义LSTM模型
model = Sequential()
model.add(LSTM(50, input_shape=(X_train.shape[1], X_train.shape[2])))
model.add(Dense(look_forward))
model.compile(loss='mean_squared_error', optimizer='adam')
# 训练模型
model.fit(X_train, y_train, epochs=100, batch_size=32, validation_data=(X_test, y_test))
# 预测数据
train_predict = model.predict(X_train)
test_predict = model.predict(X_test)
# 反标准化预测值
train_predict = scaler.inverse_transform(np.concatenate((train_predict, np.zeros((train_predict.shape[0], scaled_data.shape[1]-look_forward))), axis=1))[:,0]
y_train = scaler.inverse_transform(np.concatenate((y_train, np.zeros((y_train.shape[0], scaled_data.shape[1]-look_forward))), axis=1))[:,0]
test_predict = scaler.inverse_transform(np.concatenate((test_predict, np.zeros((test_predict.shape[0], scaled_data.shape[1]-look_forward))), axis=1))[:,0]
y_test = scaler.inverse_transform(np.concatenate((y_test, np.zeros((y_test.shape[0], scaled_data.shape[1]-look_forward))), axis=1))[:,0]
# 绘制训练集和测试集的预测结果
plt.plot(df.index[look_back:train_size], y_train, label='Train Data')
plt.plot(df.index[look_back:train_size], train_predict, label='Train Predict')
plt.plot(df.index[train_size+look_back:], y_test, label='Test Data')
plt.plot(df.index[train_size+look_back:], test_predict, label='Test Predict')
plt.legend()
plt.show()
# 计算模型的误差
train_rmse = np.sqrt(np.mean(np.square(train_predict - y_train)))
test_rmse = np.sqrt(np.mean(np.square(test_predict - y_test)))
print('Train RMSE: %.3f' % train_rmse)
print('Test RMSE: %.3f' % test_rmse)
```
阅读全文