飞道的博客

python_长短期记忆模型LSTM_公交短时客流预测

329人阅读  评论(0)

1、摘要

本文主要讲解:python_长短期记忆模型LSTM_公交短时客流预测
主要思路:

  1. 整理特征:天气、时间、工作日和非工作日、节假日和非节假日、温度等
  2. 构建LSTM网络,优化器选择Adam
  3. reshape训练集和测试集,适配LSTM网络的输入尺寸
  4. 设置 batch_size和epochs,开始训练
  5. 评估模型、保存模型、画出模型预测结果的图

2、数据介绍

公交车在高峰和平峰转换期间的调度
深圳公交17年4月的数据

3、完整代码

#!/usr/bin/env python
# coding: utf-8

import math
import os
from math import sqrt

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from keras.engine.saving import load_model
from keras.layers.core import Dense, Dropout
from keras.layers.recurrent import LSTM
from keras.models import Sequential
from sklearn import preprocessing
from sklearn.metrics import mean_squared_error

os.chdir(r'E:\项目文件\基于改进的LSTM短时客流预测\数据')

total_data = pd.read_excel(r'上车训练数据.xlsx', usecols=range(1, 9))
test_data = pd.read_excel(r'上车测试数据.xlsx', usecols=range(1, 9))


# 观察上车人数随时间的变化
def plot_card_id(total_data):
    print(total_data.head())
    plt.plot(total_data['card_id'], color='red', label='card_id')
    plt.legend(loc='best')
    plt.show()


# plot_card_id(total_data)

# 对上车人数标准化的标尺
min_max_scaler = preprocessing.MinMaxScaler()
# 对上车人数进行归一化
total_data['card_id'] = min_max_scaler.fit_transform(total_data['card_id'].values.reshape(-1, 1))
test_data['card_id'] = min_max_scaler.fit_transform(test_data['card_id'].values.reshape(-1, 1))


def make_train_test():
    ts = 61
    # 因为算法默认去除掉了第一块,所以默认再加一块
    train_all = pd.concat([total_data[:61], total_data])
    test_all = pd.concat([test_data[:61], test_data])
    train_x = train_all.drop(columns=['card_id']).values
    train_y = train_all[['card_id']].values

    test_x = test_all.drop(columns=['card_id']).values
    test_y = test_all[['card_id']].values

    ts_train_x = np.array([])
    ts_train_y = np.array([])

    ts_test_x = np.array([])
    ts_test_y = np.array([])

    # 构建训练数据集
    print('训练数据的原始shape:', train_x.shape)
    for i in range(train_x.shape[0]):
        if i + ts == train_x.shape[0]:
            break

        ts_train_x = np.append(ts_train_x, train_x[i: i + ts, :])

        ts_train_y = np.append(ts_train_y, train_y[i + ts])

    # 构建预测数据集
    print('预测数据的原始shape:', test_x.shape)
    for i in range(test_x.shape[0]):
        if i + ts == test_x.shape[0]:
            break

        ts_test_x = np.append(ts_test_x, test_x[i: i + ts, :])

        ts_test_y = np.append(ts_test_y, test_y[i + ts])

    return ts_train_x.reshape((train_x.shape[0] - ts, ts, train_x.shape[1])), ts_train_y, \
           ts_test_x.reshape((test_x.shape[0] - ts, ts, test_x.shape[1])), ts_test_y


X_train, y_train, X_test, y_test = make_train_test()


def build_model():
    d = 0.0727
    neurons = [32, 32, 8, 1]
    model_lstm = Sequential()
    # 对每天61条记录进行分块
    model_lstm.add(LSTM(neurons[0], input_shape=(61, 7), return_sequences=True))
    model_lstm.add(Dropout(d))
    model_lstm.add(LSTM(neurons[1], input_shape=(61, 1), return_sequences=False))
    model_lstm.add(Dropout(d))
    model_lstm.add(Dense(neurons[2], kernel_initializer="uniform", activation='relu'))
    model_lstm.add(Dense(neurons[3], kernel_initializer="uniform", activation='linear'))
    # adam = keras.optimizers.Adam(decay=0.2)
    model_lstm.compile(loss='mse', optimizer='adam', metrics=['accuracy'])
    model_lstm.summary()
    return model_lstm


model = build_model()
history = model.fit(
    X_train,
    y_train,
    batch_size=34,
    epochs=18,
    validation_data=(X_test, y_test),
    verbose=1)


plt.plot(history.history['loss'], label='train')
plt.plot(history.history['val_loss'], label='test')
plt.legend()
plt.show()

model.save('LSTM_bus_1219.h5')

# 训练完成后可直接加载模型
# model_lstm = load_model('LSTM_bus_1219.h5')


def model_score(model, X_train, y_train, X_test, y_test):
    trainScore = model.evaluate(X_train, y_train, verbose=0)
    print('Train Score: %.5f MSE (%.2f RMSE)' % (trainScore[0], math.sqrt(trainScore[0])))
    testScore = model.evaluate(X_test, y_test, verbose=0)
    print('Test Score: %.5f MSE (%.2f RMSE)' % (testScore[0], math.sqrt(testScore[0])))


model_score(model, X_train, y_train, X_test, y_test)


def model_test_score(model, X_test, y_test):
    y_hat = model.predict(X_test)
    y_t = y_test.reshape(-1, 1)

    temp = pd.DataFrame(y_hat)
    temp['yhat'] = y_hat
    temp['y'] = y_t
    temp_rmse = sqrt(mean_squared_error(temp.y, temp.yhat))
    temp_mse = mean_squared_error(temp.y, temp.yhat)
    print('TEMP RMSE: %.3f' % temp_rmse)
    print('TEMP MSE: %.3f' % temp_mse)
    return temp_rmse, temp_mse


model_test_score(model, X_test, y_test)


def percentage_difference(model, X_test, y_test):
    percentage_diff = []

    p = model.predict(X_test)
    for u in range(len(y_test)):  # for each data index in test data
        pr = p[u][0]  # pr = prediction on day u
        percentage_diff.append((pr - y_test[u] / pr) * 100)
    return p


p = percentage_difference(model, X_test, y_test)


def denormalize(normalized_value):
    normalized_value = normalized_value.reshape(-1, 1)
    new = min_max_scaler.inverse_transform(normalized_value)
    return new


def plot_result( normalized_value_p, normalized_value_y_test):
    newp = denormalize(normalized_value_p)
    newy_test = denormalize(normalized_value_y_test)
    plt.plot(newp, color='red', label='Prediction')
    plt.plot(newy_test, color='blue', label='Actual')
    plt.legend(loc='best')
    plt.title('The test result for lstm')
    plt.xlabel('Days')
    plt.ylabel('flow')
    plt.show()


plot_result(p, y_test)


转载:https://blog.csdn.net/qq_30803353/article/details/114790139
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场