Python Colaboratory Ⅳ
LSTM 豪ドル/円 予測ファイル
LSTM 豪ドル/円予測ファイル keras-go-lstm.ipynb
の内容を表示します。
RNN 豪ドル/円予測ファイルⅡ keras-go-2.ipynb
との差分は赤字にしています。
import os
import sys
sys.path.append(
'/content/drive/MyDrive/Colab Notebooks/my-modules'
)
import datetime
from datetime import datetime as dt
#以上①部追加
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
from tensorflow import keras
from mplfinance.original_flavor import candlestick_ohlc
def addBusinessDays(from_date, add_days):
cur_date8 = [] # 21/03/30
cur_date4 = [] # 3/30
business_days_to_add = add_days
current_date = from_date
while business_days_to_add > 0:
current_date += datetime.timedelta(days=1)
weekday = current_date.weekday()
if weekday >= 5: # sunday = 6
continue
#土日は以下はスキップ
# 年/月/日として文字列にする
#例 21/03/30
e11 = current_date.strftime("%y/%m/%d")
e22 = e11.replace("/0", "/")
e33 = e22[3:] # 21/ 削除
cur_date8.append(e22)
cur_date4.append(e33)
business_days_to_add -= 1
return cur_date8, cur_date4
#以上②部追加
xl_df = pd.read_csv(
"drive/MyDrive/Colab Notebooks/my_data/colab_mane_chart_go.csv",
encoding="cp932")
Open = xl_df["始値(売り)"].values
High = xl_df["高値(売り)"].values
Low = xl_df["安値(売り)"].values
Close = xl_df["終値(売り)"].values
Date = xl_df["日付"].values
Idx = xl_df.index
tstr = Date[-1]
tdatetime = dt.strptime(tstr, '%Y/%m/%d %H:%M:%S')
# 文字列をdatetimeに変換するのがstrptime()関数
# datetime.datetime.strptime(文字列, 書式指定文字列)
lastday = addBusinessDays(tdatetime, 5)
xDate = []
xD = []
for i, key in enumerate(Date):
if(i % 10 == 0):
e4 = str(key)[4:10]
e6 = e4.replace("/0", "/")
e8 = e6.lstrip("/")
xDate.append(e8)
xD.append(i)
xDate.append(lastday[1][4])
xD.append(i + 5)
#5日間の予測日の追加
#以上①②③部追加
raw_data0 = xl_df["終値(売り)"].values
print("raw_data0.shape:", raw_data0.shape)
print("raw_data0")
print(raw_data0)
plt.plot(range(len(raw_data0)), raw_data0)
plt.show()
raw_data = raw_data0.copy()
# 行列の平均、標準偏差を求めます。
mean = np.mean(raw_data)
print("Mean", mean)
raw_data -= mean
std = np.std(raw_data)
print("Std ",std)
# 標準偏差値に変換
raw_data /= std
print("各標準偏差値")
print(raw_data)
# 連続デ-タとする。一つおきは、2
sampling_rate = 1
# 過去20間隔デ-タをひとまとまりとして時系列予測する
sequence_length = 20
delay = sampling_rate * sequence_length
print("delay:", delay)
batch_size = 32 # 適当
# 検証デ-タのスタ-ト値
num_half_samples = int(0.5 * len(raw_data))
train_dataset = keras.utils.timeseries_dataset_from_array(
raw_data,
targets=raw_data[delay:],
sampling_rate=sampling_rate,
sequence_length=sequence_length,
batch_size=batch_size,
)
val_dataset = keras.utils.timeseries_dataset_from_array(
raw_data[:-1],
targets=raw_data[delay:],
sampling_rate=sampling_rate,
sequence_length=sequence_length,
batch_size=batch_size,
start_index=num_half_samples,
)
test_dataset = keras.utils.timeseries_dataset_from_array(
raw_data,
targets=None,
sampling_rate=sampling_rate,
sequence_length=sequence_length,
batch_size=batch_size,
)
# numpy ndarray 配列に変換して表示
# 訓練デ-タセット表示
itr = 0
for samples, targets in train_dataset:
samples_n = samples.numpy()
targets_n = targets.numpy()
if itr == 0:
print("Start in-train:", samples_n[0])
print("Start tar-train:", targets_n[0])
itr = itr + 1
print("End in-train:", samples_n[-1])
print("End tar-train:", targets_n[-1])
# 検証デ-タセット表示
itv = 0
for samples_v, targets_v in val_dataset:
samples_vn = samples_v.numpy()
targets_vn = targets_v.numpy()
if itv == 0:
print("Start in-val:", samples_vn[0])
print("Start tar-val:", targets_vn[0])
itv = itv + 1
print("End in-val:", samples_vn[-1])
print("End tar-val:", targets_vn[-1])
# テストデ-タセット表示
i = 0
for inputs_t in test_dataset:
inputs_n = inputs_t.numpy()
if i == 0:
print("Start test:", inputs_n[0])
i = i + 1
print("End test:", inputs_n[-1])
#------ここまで keras-test-51.ipynb
# from tensorflow import keras
from keras import layers
from keras import initializers
"""以下8行削除
sequence_length = 20
inputs = keras.Input(shape=(sequence_length,))
x = layers.Flatten()(inputs)
x = layers.Dense(
20,
activation="tanh",
kernel_initializer='zeros'
)(x)
"""
inputs = keras.Input(shape=(sequence_length, 1))
x = layers.LSTM(20, kernel_initializer='zeros')(inputs)
outputs = layers.Dense(1)(x)
model = keras.Model(inputs, outputs)
print("モデルア-キテクチャ")
print(model.summary())
#---ここまで keras-test-52.ipynb
callbacks_list = [
keras.callbacks.EarlyStopping(
monitor="val_loss",
patience=2,
),
keras.callbacks.ModelCheckpoint(
"drive/MyDrive/Colab Notebooks/my_data/jena_dense_go.keras",
save_best_only=True,
)
]
model.compile(optimizer="rmsprop", loss="mse", metrics=["mae"])
history = model.fit(
train_dataset,
epochs=40,
# verbose=0,
validation_data=val_dataset,
callbacks=callbacks_list)
loss = history.history["mae"] # 平均絶対誤差(MAE)
val_loss = history.history["val_mae"]
epochs = range(1, len(loss) + 1)
plt.plot(epochs, loss, "bo", label="Training MAE")
plt.plot(epochs, val_loss, "b", label="Validation MAE")
plt.title("Training and Validation MAE")
plt.legend()
plt.show()
#---ここまで keras-test-53.ipynb
model = keras.models.load_model(
"drive/MyDrive/Colab Notebooks/my_data/jena_dense_go.keras")
pre = model.predict(test_dataset)
pre1 = np.reshape(pre, (-1))
print(f"Test 予測値:")
print(pre1)
future_test = inputs_t[-1:]
print("future_test 最初の配列値")
print(future_test)
future_result = []
for i in range(5):
test_data_f = np.reshape(future_test, (1, 20, 1))
batch_predict = model.predict(test_data_f)
future_test = np.delete(future_test, 0)
future_test = np.append(future_test, batch_predict)
future_result = np.append(future_result, batch_predict)
print("future_result :")
print(future_result)
# ここまで colab-21 まとめ
len_raw_data = len(raw_data)
xx1 = np.arange(sequence_length, len_raw_data + 1)
xx3 = np.arange(len_raw_data, len_raw_data + 5)
plt.plot(xx1, pre)
plt.plot(xx3, future_result)
plt.show()
#ここまで colab-22①まとめ
pre_chg = pre.copy()
pre_chg *= std
pre_chg += mean
pre_chg1 = np.reshape(pre_chg, (-1))
print("pre_chg1:")
print(pre_chg1)
f_result = future_result.copy()
f_result *= std
f_result += mean
print("f_result:", f_result)
plt.plot(range(len(raw_data0)), raw_data0)
plt.plot(xx1, pre_chg)
plt.plot(xx3, f_result)
#plt.show()
#ここまで colab-22②まとめ
ohlc = zip(
Idx, Open, High, Low, Close)
fig = plt.figure(
figsize=(8.34, 5.56))
# python スクリプトと Jupyter とでは、
# matplotlib の図のサイズが違うので注意
ax = fig.add_subplot(1,1,1)
ax.grid()
# 解析結果表示はここに挿入
plt.plot(xx1, pre_chg)
plt.plot(xx3, f_result, 'bo')
# ----------------
candlestick_ohlc(
ax, ohlc, width=0.5, alpha = 1,
colorup='r', colordown='g')
plt.xticks(xD, xDate)
plt.title('AUS$ / JPY chart')
plt.xlabel('Date')
plt.ylabel('Yen')
# plt.show()
plt.savefig(
'colab_mane_chart_go_keras.png')
#以上①部追加
print("予測値=最終日+1~+5日")
valhe = np.round(f_result, 3)#3桁まで表示
valhe_pd = pd.DataFrame(valhe)
# pandas concat 関数で横(列)方向へ連結する、axis=1 を忘れないこと
lastday_pd = pd.DataFrame(lastday[0])
df_concat = pd.concat([lastday_pd, valhe_pd], axis = 1)
print(df_concat)
df_concat.to_csv(
'colab_mane_chart_go_lstm.csv',
header=False, index=False)
#以上④部追加