1、摘要
本文主要讲解:python_长短期记忆模型LSTM_公交短时客流预测
主要思路:
- 整理特征:天气、时间、工作日和非工作日、节假日和非节假日、温度等
- 构建LSTM网络,优化器选择Adam
- reshape训练集和测试集,适配LSTM网络的输入尺寸
- 设置 batch_size和epochs,开始训练
- 评估模型、保存模型、画出模型预测结果的图
2、数据介绍
3、完整代码
#!/usr/bin/env python
# coding: utf-8
import math
import os
from math import sqrt
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from keras.engine.saving import load_model
from keras.layers.core import Dense, Dropout
from keras.layers.recurrent import LSTM
from keras.models import Sequential
from sklearn import preprocessing
from sklearn.metrics import mean_squared_error
os.chdir(r'E:\项目文件\基于改进的LSTM短时客流预测\数据')
total_data = pd.read_excel(r'上车训练数据.xlsx', usecols=range(1, 9))
test_data = pd.read_excel(r'上车测试数据.xlsx', usecols=range(1, 9))
# 观察上车人数随时间的变化
def plot_card_id(total_data):
print(total_data.head())
plt.plot(total_data['card_id'], color='red', label='card_id')
plt.legend(loc='best')
plt.show()
# plot_card_id(total_data)
# 对上车人数标准化的标尺
min_max_scaler = preprocessing.MinMaxScaler()
# 对上车人数进行归一化
total_data['card_id'] = min_max_scaler.fit_transform(total_data['card_id'].values.reshape(-1, 1))
test_data['card_id'] = min_max_scaler.fit_transform(test_data['card_id'].values.reshape(-1, 1))
def make_train_test():
ts = 61
# 因为算法默认去除掉了第一块,所以默认再加一块
train_all = pd.concat([total_data[:61], total_data])
test_all = pd.concat([test_data[:61], test_data])
train_x = train_all.drop(columns=['card_id']).values
train_y = train_all[['card_id']].values
test_x = test_all.drop(columns=['card_id']).values
test_y = test_all[['card_id']].values
ts_train_x = np.array([])
ts_train_y = np.array([])
ts_test_x = np.array([])
ts_test_y = np.array([])
# 构建训练数据集
print('训练数据的原始shape:', train_x.shape)
for i in range(train_x.shape[0]):
if i + ts == train_x.shape[0]:
break
ts_train_x = np.append(ts_train_x, train_x[i: i + ts, :])
ts_train_y = np.append(ts_train_y, train_y[i + ts])
# 构建预测数据集
print('预测数据的原始shape:', test_x.shape)
for i in range(test_x.shape[0]):
if i + ts == test_x.shape[0]:
break
ts_test_x = np.append(ts_test_x, test_x[i: i + ts, :])
ts_test_y = np.append(ts_test_y, test_y[i + ts])
return ts_train_x.reshape((train_x.shape[0] - ts, ts, train_x.shape[1])), ts_train_y, \
ts_test_x.reshape((test_x.shape[0] - ts, ts, test_x.shape[1])), ts_test_y
X_train, y_train, X_test, y_test = make_train_test()
def build_model():
d = 0.0727
neurons = [32, 32, 8, 1]
model_lstm = Sequential()
# 对每天61条记录进行分块
model_lstm.add(LSTM(neurons[0], input_shape=(61, 7), return_sequences=True))
model_lstm.add(Dropout(d))
model_lstm.add(LSTM(neurons[1], input_shape=(61, 1), return_sequences=False))
model_lstm.add(Dropout(d))
model_lstm.add(Dense(neurons[2], kernel_initializer="uniform", activation='relu'))
model_lstm.add(Dense(neurons[3], kernel_initializer="uniform", activation='linear'))
# adam = keras.optimizers.Adam(decay=0.2)
model_lstm.compile(loss='mse', optimizer='adam', metrics=['accuracy'])
model_lstm.summary()
return model_lstm
model = build_model()
history = model.fit(
X_train,
y_train,
batch_size=34,
epochs=18,
validation_data=(X_test, y_test),
verbose=1)
plt.plot(history.history['loss'], label='train')
plt.plot(history.history['val_loss'], label='test')
plt.legend()
plt.show()
model.save('LSTM_bus_1219.h5')
# 训练完成后可直接加载模型
# model_lstm = load_model('LSTM_bus_1219.h5')
def model_score(model, X_train, y_train, X_test, y_test):
trainScore = model.evaluate(X_train, y_train, verbose=0)
print('Train Score: %.5f MSE (%.2f RMSE)' % (trainScore[0], math.sqrt(trainScore[0])))
testScore = model.evaluate(X_test, y_test, verbose=0)
print('Test Score: %.5f MSE (%.2f RMSE)' % (testScore[0], math.sqrt(testScore[0])))
model_score(model, X_train, y_train, X_test, y_test)
def model_test_score(model, X_test, y_test):
y_hat = model.predict(X_test)
y_t = y_test.reshape(-1, 1)
temp = pd.DataFrame(y_hat)
temp['yhat'] = y_hat
temp['y'] = y_t
temp_rmse = sqrt(mean_squared_error(temp.y, temp.yhat))
temp_mse = mean_squared_error(temp.y, temp.yhat)
print('TEMP RMSE: %.3f' % temp_rmse)
print('TEMP MSE: %.3f' % temp_mse)
return temp_rmse, temp_mse
model_test_score(model, X_test, y_test)
def percentage_difference(model, X_test, y_test):
percentage_diff = []
p = model.predict(X_test)
for u in range(len(y_test)): # for each data index in test data
pr = p[u][0] # pr = prediction on day u
percentage_diff.append((pr - y_test[u] / pr) * 100)
return p
p = percentage_difference(model, X_test, y_test)
def denormalize(normalized_value):
normalized_value = normalized_value.reshape(-1, 1)
new = min_max_scaler.inverse_transform(normalized_value)
return new
def plot_result( normalized_value_p, normalized_value_y_test):
newp = denormalize(normalized_value_p)
newy_test = denormalize(normalized_value_y_test)
plt.plot(newp, color='red', label='Prediction')
plt.plot(newy_test, color='blue', label='Actual')
plt.legend(loc='best')
plt.title('The test result for lstm')
plt.xlabel('Days')
plt.ylabel('flow')
plt.show()
plot_result(p, y_test)
转载:https://blog.csdn.net/qq_30803353/article/details/114790139
查看评论