预测股价一直是吸引投资者和研究人员的话题。投资者总是猜测股票的价格是否会上涨,因为有许多复杂的财务指标,只有投资者和具有良好财务知识的人才能理解,所以股市的走势对普通百姓来说非常难以琢磨。
对于非专家而言,机器学习是一个很好的机会,它可以准确地预测并获得稳定的财富,并且可以帮助专家获得最有用的指标并做出更好的预测。
本教程的目的是在TensorFlow 2和Keras中构建一个预测股市价格的神经网络。更具体地说,我们将使用LSTM单元构建循环神经网络,因为这是时间序列预测的最新技术。
一、安装环境
pip3 install tensorflow pandas numpy matplotlib yahoo_fin sklearn完成所有设置后,打开一个新的Python文件(或bfwstuio)并导入以下库:
import tensorflow as tf from tensorflow.keras.models import Sequential from tensorflow.keras.layers import LSTM, Dense, Dropout, Bidirectional from tensorflow.keras.callbacks import ModelCheckpoint, TensorBoard from sklearn import preprocessing from sklearn.model_selection import train_test_split from sklearn.metrics import accuracy_score from yahoo_fin import stock_info as si from collections import deque import numpy as np import pandas as pd import matplotlib.pyplot as plt import time import os import random
# set seed, so we can get the same results after rerunning several times np.random.seed(314) tf.random.set_seed(314) random.seed(314)
二、准备数据集
def load_data(ticker, n_steps=50, scale=True, shuffle=True, lookup_step=1, test_size=0.2, feature_columns=['adjclose', 'volume', 'open', 'high', 'low']): """ Loads data from Yahoo Finance source, as well as scaling, shuffling, normalizing and splitting. Params: ticker (str/pd.DataFrame): the ticker you want to load, examples include AAPL, TESL, etc. n_steps (int): the historical sequence length (i.e window size) used to predict, default is 50 scale (bool): whether to scale prices from 0 to 1, default is True shuffle (bool): whether to shuffle the data, default is True lookup_step (int): the future lookup step to predict, default is 1 (e.g next day) test_size (float): ratio for test data, default is 0.2 (20% testing data) feature_columns (list): the list of features to use to feed into the model, default is everything grabbed from yahoo_fin """ # see if ticker is already a loaded stock from yahoo finance if isinstance(ticker, str): # load it from yahoo_fin library df = si.get_data(ticker) elif isinstance(ticker, pd.DataFrame): # already loaded, use it directly df = ticker else: raise TypeError("ticker can be either a str or a `pd.DataFrame` instances") # this will contain all the elements we want to return from this function result = {} # we will also return the original dataframe itself result['df'] = df.copy() # make sure that the passed feature_columns exist in the dataframe for col in feature_columns: assert col in df.columns, f"'{col}' does not exist in the dataframe." if scale: column_scaler = {} # scale the data (prices) from 0 to 1 for column in feature_columns: scaler = preprocessing.MinMaxScaler() df[column] = scaler.fit_transform(np.expand_dims(df[column].values, axis=1)) column_scaler[column] = scaler # add the MinMaxScaler instances to the result returned result["column_scaler"] = column_scaler # add the target column (label) by shifting by `lookup_step` df['future'] = df['adjclose'].shift(-lookup_step) # last `lookup_step` columns contains NaN in future column # get them before droping NaNs last_sequence = np.array(df[feature_columns].tail(lookup_step)) # drop NaNs df.dropna(inplace=True) sequence_data = [] sequences = deque(maxlen=n_steps) for entry, target in zip(df[feature_columns].values, df['future'].values): sequences.append(entry) if len(sequences) == n_steps: sequence_data.append([np.array(sequences), target]) # get the last sequence by appending the last `n_step` sequence with `lookup_step` sequence # for instance, if n_steps=50 and lookup_step=10, last_sequence should be of 60 (that is 50+10) length # this last_sequence will be used to predict future stock prices not available in the dataset last_sequence = list(sequences) + list(last_sequence) last_sequence = np.array(last_sequence) # add to result result['last_sequence'] = last_sequence # construct the X's and y's X, y = [], [] for seq, target in sequence_data: X.append(seq) y.append(target) # convert to numpy arrays X = np.array(X) y = np.array(y) # reshape X to fit the neural network X = X.reshape((X.shape[0], X.shape[2], X.shape[1])) # split the dataset result["X_train"], result["X_test"], result["y_train"], result["y_test"] = train_test_split(X, y, test_size=test_size, shuffle=shuffle) # return the result return result
三、模型制作
def create_model(sequence_length, units=256, cell=LSTM, n_layers=2, dropout=0.3, loss="mean_absolute_error", optimizer="rmsprop", bidirectional=False): model = Sequential() for i in range(n_layers): if i == 0: # first layer if bidirectional: model.add(Bidirectional(cell(units, return_sequences=True), input_shape=(None, sequence_length))) else: model.add(cell(units, return_sequences=True, input_shape=(None, sequence_length))) elif i == n_layers - 1: # last layer if bidirectional: model.add(Bidirectional(cell(units, return_sequences=False))) else: model.add(cell(units, return_sequences=False)) else: # hidden layers if bidirectional: model.add(Bidirectional(cell(units, return_sequences=True))) else: model.add(cell(units, return_sequences=True)) # add dropout after each layer model.add(Dropout(dropout)) model.add(Dense(1, activation="linear")) model.compile(loss=loss, metrics=["mean_absolute_error"], optimizer=optimizer) return model
四、训练模型
# Window size or the sequence length N_STEPS = 70 # Lookup step, 1 is the next day LOOKUP_STEP = 1 # test ratio size, 0.2 is 20% TEST_SIZE = 0.2 # features to use FEATURE_COLUMNS = ["adjclose", "volume", "open", "high", "low"] # date now date_now = time.strftime("%Y-%m-%d") ### model parameters N_LAYERS = 3 # LSTM cell CELL = LSTM # 256 LSTM neurons UNITS = 256 # 40% dropout DROPOUT = 0.4 # whether to use bidirectional RNNs BIDIRECTIONAL = False ### training parameters # mean absolute error loss # LOSS = "mae" # huber loss LOSS = "huber_loss" OPTIMIZER = "adam" BATCH_SIZE = 64 EPOCHS = 400 # Tesla stock market ticker = "TSLA" ticker_data_filename = os.path.join("data", f"{ticker}_{date_now}.csv") # model name to save, making it as unique as possible based on parameters model_name = f"{date_now}_{ticker}-{LOSS}-{OPTIMIZER}-{CELL.__name__}-seq-{N_STEPS}-step-{LOOKUP_STEP}-layers-{N_LAYERS}-units-{UNITS}" if BIDIRECTIONAL: model_name += "-b"
# create these folders if they does not exist if not os.path.isdir("results"): os.mkdir("results") if not os.path.isdir("logs"): os.mkdir("logs") if not os.path.isdir("data"): os.mkdir("data")
# load the data data = load_data(ticker, N_STEPS, lookup_step=LOOKUP_STEP, test_size=TEST_SIZE, feature_columns=FEATURE_COLUMNS) # save the dataframe data["df"].to_csv(ticker_data_filename) # construct the model model = create_model(N_STEPS, loss=LOSS, units=UNITS, cell=CELL, n_layers=N_LAYERS, dropout=DROPOUT, optimizer=OPTIMIZER, bidirectional=BIDIRECTIONAL) # some tensorflow callbacks checkpointer = ModelCheckpoint(os.path.join("results", model_name + ".h5"), save_weights_only=True, save_best_only=True, verbose=1) tensorboard = TensorBoard(log_dir=os.path.join("logs", model_name)) history = model.fit(data["X_train"], data["y_train"], batch_size=BATCH_SIZE, epochs=EPOCHS, validation_data=(data["X_test"], data["y_test"]), callbacks=[checkpointer, tensorboard], verbose=1) model.save(os.path.join("results", model_name) + ".h5")
Epoch 1/300 3510/3510 [==============================] - 21s 6ms/sample - loss: 0.0117 - mean_absolute_error: 0.0515 - val_loss: 0.0065 - val_mean_absolute_error: 0.0487 Epoch 2/300 3264/3510 [==========================>...] - ETA: 0s - loss: 0.0049 - mean_absolute_error: 0.0352 Epoch 00002: val_loss did not improve from 0.00650 3510/3510 [==============================] - 1s 309us/sample - loss: 0.0051 - mean_absolute_error: 0.0357 - val_loss: 0.0082 - val_mean_absolute_error: 0.0494 Epoch 3/300 3456/3510 [============================>.] - ETA: 0s - loss: 0.0039 - mean_absolute_error: 0.0329 Epoch 00003: val_loss improved from 0.00650 to 0.00095, saving model to results\2020-01-08_NFLX-mse-LSTM-seq-50-step-1-layers-3-units-256 3510/3510 [==============================] - 14s 4ms/sample - loss: 0.0039 - mean_absolute_error: 0.0328 - val_loss: 9.5337e-04 - val_mean_absolute_error: 0.0150 Epoch 4/300 3264/3510 [==========================>...] - ETA: 0s - loss: 0.0034 - mean_absolute_error: 0.0304 Epoch 00004: val_loss did not improve from 0.00095 3510/3510 [==============================] - 1s 222us/sample - loss: 0.0037 - mean_absolute_error: 0.0316 - val_loss: 0.0034 - val_mean_absolute_error: 0.0300
五、测试模型
data = load_data(ticker, N_STEPS, lookup_step=LOOKUP_STEP, test_size=TEST_SIZE, feature_columns=FEATURE_COLUMNS, shuffle=False) # construct the model model = create_model(N_STEPS, loss=LOSS, units=UNITS, cell=CELL, n_layers=N_LAYERS, dropout=DROPOUT, optimizer=OPTIMIZER, bidirectional=BIDIRECTIONAL) model_path = os.path.join("results", model_name) + ".h5" model.load_weights(model_path)
# evaluate the model mse, mae = model.evaluate(data["X_test"], data["y_test"], verbose=0) # calculate the mean absolute error (inverse scaling) mean_absolute_error = data["column_scaler"]["adjclose"].inverse_transform([[mae]])[0][0] print("Mean Absolute Error:", mean_absolute_error)
Mean Absolute Error: 6.516846878481972
def predict(model, data): # retrieve the last sequence from data last_sequence = data["last_sequence"][-N_STEPS:] # retrieve the column scalers column_scaler = data["column_scaler"] # reshape the last sequence last_sequence = last_sequence.reshape((last_sequence.shape[1], last_sequence.shape[0])) # expand dimension last_sequence = np.expand_dims(last_sequence, axis=0) # get the prediction (scaled from 0 to 1) prediction = model.predict(last_sequence) # get the price (by inverting the scaling) predicted_price = column_scaler["adjclose"].inverse_transform(prediction)[0][0] return predicted_price
# predict the future price future_price = predict(model, data) print(f"Future price after {LOOKUP_STEP} days is {future_price:.2f}$")
Future price after 1 days is 404.78$
def plot_graph(model, data): y_test = data["y_test"] X_test = data["X_test"] y_pred = model.predict(X_test) y_test = np.squeeze(data["column_scaler"]["adjclose"].inverse_transform(np.expand_dims(y_test, axis=0))) y_pred = np.squeeze(data["column_scaler"]["adjclose"].inverse_transform(y_pred)) # last 200 days, feel free to edit that plt.plot(y_test[-200:], c='b') plt.plot(y_pred[-200:], c='r') plt.xlabel("Days") plt.ylabel("Price") plt.legend(["Actual Price", "Predicted Price"]) plt.show()
plot_graph(model, data)
如您所见,蓝色曲线是实际测试集,红色曲线是预测价格,太好了!请注意,正如我们预测的那样,最近股价正在下跌。
如果您的LOOKUP_STEP数据更高,这仍然会起作用,但是它将使用较旧的数据(按LOOKUP_STEP天数表示)以绘制红线。
到目前为止,我们仅习惯于预测第二天,我尝试构建使用不同lookup_steps的其他模型,这是张量板中的一个有趣结果:
有趣的是,蓝色曲线是我们在本教程中使用的模型,该模型使用下一时间步股票价格作为标签,而绿色和橙色曲线分别使用了10和30个查找步骤,例如,在本示例中,橙色模型可以预测30天后的股价,这是进行长期投资的理想模型(通常是这种情况)。
现在您可能会认为,但是,如果我们只是想预测价格是上涨还是下跌,而不是像我们在这里所做的那样,那么您可以使用以下两种方法之一来进行预测吗?将预测价格与当前价格一起做出决定,或者构建一个完整的模型并将最后输出的激活函数更改为sigmoid,同时将损失和度量标准更改为Sigmoid。
以下函数通过将预测价格转换为0或1(0表示价格下跌,而1表示价格上涨)来计算准确性得分:
def get_accuracy(model, data): y_test = data["y_test"] X_test = data["X_test"] y_pred = model.predict(X_test) y_test = np.squeeze(data["column_scaler"]["adjclose"].inverse_transform(np.expand_dims(y_test, axis=0))) y_pred = np.squeeze(data["column_scaler"]["adjclose"].inverse_transform(y_pred)) y_pred = list(map(lambda current, future: int(float(future) > float(current)), y_test[:-LOOKUP_STEP], y_pred[LOOKUP_STEP:])) y_test = list(map(lambda current, future: int(float(future) > float(current)), y_test[:-LOOKUP_STEP], y_test[LOOKUP_STEP:])) return accuracy_score(y_test, y_pred)
print(str(LOOKUP_STEP) + ":", "Accuracy Score:", get_accuracy(model, data))
1: Accuracy Score: 0.5642570281124498 10: Accuracy Score: 0.7192622950819673 30: Accuracy Score: 0.8318965517241379
六、结论
网友评论0