Python Colaboratory Ⅳ

LSTM 豪ドル/円 予測ファイル

LSTM 豪ドル/円予測ファイル keras-go-lstm.ipynb
の内容を表示します。
RNN 豪ドル/円予測ファイルⅡ keras-go-2.ipynb
との差分は赤字にしています。


import os
import sys
sys.path.append(
  '/content/drive/MyDrive/Colab Notebooks/my-modules'
)
import datetime
from datetime import datetime as dt
#以上①部追加
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
from tensorflow import keras
from mplfinance.original_flavor import candlestick_ohlc
def addBusinessDays(from_date, add_days):
  cur_date8 = [] # 21/03/30
  cur_date4 = [] # 3/30
  business_days_to_add = add_days
  current_date = from_date
  while business_days_to_add > 0:
    current_date += datetime.timedelta(days=1)
    weekday = current_date.weekday()
    if weekday >= 5: # sunday = 6
      continue
    #土日は以下はスキップ
    # 年/月/日として文字列にする
    #例 21/03/30      
    e11 = current_date.strftime("%y/%m/%d")
    e22 = e11.replace("/0", "/")
    e33 = e22[3:] # 21/ 削除
    cur_date8.append(e22)
    cur_date4.append(e33)
    business_days_to_add -= 1
  return cur_date8, cur_date4
#以上②部追加
xl_df = pd.read_csv(
 "drive/MyDrive/Colab Notebooks/my_data/colab_mane_chart_go.csv",
  encoding="cp932")
Open = xl_df["始値(売り)"].values
High = xl_df["高値(売り)"].values
Low = xl_df["安値(売り)"].values
Close = xl_df["終値(売り)"].values
Date = xl_df["日付"].values
Idx = xl_df.index
tstr = Date[-1]

tdatetime = dt.strptime(tstr, '%Y/%m/%d %H:%M:%S')
# 文字列をdatetimeに変換するのがstrptime()関数
# datetime.datetime.strptime(文字列, 書式指定文字列)
lastday = addBusinessDays(tdatetime, 5)

xDate = []
xD = []
for i, key in enumerate(Date):
  if(i % 10 == 0):
    e4 = str(key)[4:10]
    e6 = e4.replace("/0", "/")
    e8 = e6.lstrip("/")   
    xDate.append(e8)
    xD.append(i)

xDate.append(lastday[1][4])
xD.append(i + 5)
#5日間の予測日の追加
#以上①②③部追加
raw_data0 = xl_df["終値(売り)"].values
print("raw_data0.shape:", raw_data0.shape)
print("raw_data0")
print(raw_data0)
plt.plot(range(len(raw_data0)), raw_data0)
plt.show()
raw_data = raw_data0.copy()
# 行列の平均、標準偏差を求めます。
mean = np.mean(raw_data)
print("Mean", mean)
raw_data -= mean
std = np.std(raw_data)
print("Std ",std)
# 標準偏差値に変換
raw_data /= std
print("各標準偏差値")
print(raw_data)

# 連続デ-タとする。一つおきは、2
sampling_rate = 1
# 過去20間隔デ-タをひとまとまりとして時系列予測する
sequence_length = 20
delay = sampling_rate * sequence_length
print("delay:", delay)
batch_size = 32  # 適当
# 検証デ-タのスタ-ト値
num_half_samples = int(0.5 * len(raw_data))

train_dataset = keras.utils.timeseries_dataset_from_array(
  raw_data,
  targets=raw_data[delay:],
  sampling_rate=sampling_rate,
  sequence_length=sequence_length,
  batch_size=batch_size,
)
val_dataset = keras.utils.timeseries_dataset_from_array(
  raw_data[:-1],
  targets=raw_data[delay:],
  sampling_rate=sampling_rate,
  sequence_length=sequence_length,
  batch_size=batch_size,
  start_index=num_half_samples,
)
test_dataset = keras.utils.timeseries_dataset_from_array(
  raw_data,
  targets=None,
  sampling_rate=sampling_rate,
  sequence_length=sequence_length,
  batch_size=batch_size,
)
# numpy ndarray 配列に変換して表示
# 訓練デ-タセット表示
itr = 0
for samples, targets in train_dataset:
  samples_n = samples.numpy()
  targets_n = targets.numpy()
  if itr == 0:
    print("Start in-train:", samples_n[0])
    print("Start tar-train:", targets_n[0])
    itr = itr + 1
print("End in-train:", samples_n[-1])
print("End tar-train:", targets_n[-1])
# 検証デ-タセット表示
itv = 0
for samples_v, targets_v in val_dataset:
  samples_vn = samples_v.numpy()
  targets_vn = targets_v.numpy()
  if itv == 0:
    print("Start in-val:", samples_vn[0])
    print("Start tar-val:", targets_vn[0])
    itv = itv + 1
print("End in-val:", samples_vn[-1])
print("End tar-val:", targets_vn[-1])
# テストデ-タセット表示
i = 0
for inputs_t in test_dataset:
  inputs_n = inputs_t.numpy()
  if i == 0:
    print("Start test:", inputs_n[0])
    i = i + 1
print("End test:", inputs_n[-1])

#------ここまで keras-test-51.ipynb 
# from tensorflow import keras

from keras import layers
from keras import initializers

"""以下8行削除
sequence_length = 20
inputs = keras.Input(shape=(sequence_length,))
x = layers.Flatten()(inputs)
x = layers.Dense(
    20,
    activation="tanh",
    kernel_initializer='zeros'
    )(x)
"""
inputs = keras.Input(shape=(sequence_length, 1))
x = layers.LSTM(20, kernel_initializer='zeros')(inputs)

outputs = layers.Dense(1)(x)
model = keras.Model(inputs, outputs)
print("モデルア-キテクチャ")
print(model.summary())

#---ここまで keras-test-52.ipynb
 
callbacks_list = [
  keras.callbacks.EarlyStopping(
    monitor="val_loss",
    patience=2,
    ),
  keras.callbacks.ModelCheckpoint(
    "drive/MyDrive/Colab Notebooks/my_data/jena_dense_go.keras",
    save_best_only=True,
    )
]
model.compile(optimizer="rmsprop", loss="mse", metrics=["mae"])
history = model.fit(
  train_dataset,
  epochs=40,
  # verbose=0,
  validation_data=val_dataset,
  callbacks=callbacks_list)
  
loss = history.history["mae"] # 平均絶対誤差(MAE)
val_loss = history.history["val_mae"]
epochs = range(1, len(loss) + 1)

plt.plot(epochs, loss, "bo", label="Training MAE")
plt.plot(epochs, val_loss, "b", label="Validation MAE")
plt.title("Training and Validation MAE")
plt.legend()
plt.show()
#---ここまで keras-test-53.ipynb
model = keras.models.load_model(
  "drive/MyDrive/Colab Notebooks/my_data/jena_dense_go.keras")
pre = model.predict(test_dataset)
pre1 = np.reshape(pre, (-1))
print(f"Test 予測値:")
print(pre1)

future_test = inputs_t[-1:]
print("future_test 最初の配列値")
print(future_test)
future_result = []
for i in range(5):
  test_data_f = np.reshape(future_test, (1, 20, 1))
  batch_predict = model.predict(test_data_f)
  future_test = np.delete(future_test, 0)
  future_test = np.append(future_test, batch_predict)
  future_result = np.append(future_result, batch_predict)
print("future_result :")
print(future_result)
# ここまで colab-21 まとめ

len_raw_data = len(raw_data)
xx1 = np.arange(sequence_length, len_raw_data + 1)
xx3 = np.arange(len_raw_data, len_raw_data + 5)

plt.plot(xx1, pre)
plt.plot(xx3, future_result)
plt.show()
#ここまで colab-22①まとめ

pre_chg = pre.copy()
pre_chg *= std
pre_chg += mean

pre_chg1 = np.reshape(pre_chg, (-1))
print("pre_chg1:")
print(pre_chg1)

f_result = future_result.copy()
f_result *= std
f_result += mean
print("f_result:", f_result)

plt.plot(range(len(raw_data0)), raw_data0)
plt.plot(xx1, pre_chg)
plt.plot(xx3, f_result)

#plt.show()
#ここまで colab-22②まとめ

ohlc = zip(
 Idx, Open, High, Low, Close)
fig = plt.figure(
 figsize=(8.34, 5.56))

# python スクリプトと Jupyter とでは、
# matplotlib の図のサイズが違うので注意
ax = fig.add_subplot(1,1,1)
ax.grid()

# 解析結果表示はここに挿入
plt.plot(xx1, pre_chg)
plt.plot(xx3, f_result, 'bo')
# ----------------
candlestick_ohlc(
 ax, ohlc, width=0.5, alpha = 1,
 colorup='r', colordown='g')
plt.xticks(xD, xDate)
plt.title('AUS$ / JPY chart')
plt.xlabel('Date')
plt.ylabel('Yen')
# plt.show()
plt.savefig(
 'colab_mane_chart_go_keras.png')
#以上①部追加
print("予測値=最終日+1~+5日")
valhe = np.round(f_result, 3)#3桁まで表示
valhe_pd = pd.DataFrame(valhe)
# pandas concat 関数で横(列)方向へ連結する、axis=1 を忘れないこと
lastday_pd = pd.DataFrame(lastday[0])
df_concat = pd.concat([lastday_pd, valhe_pd], axis = 1)
print(df_concat)
df_concat.to_csv(
 'colab_mane_chart_go_lstm.csv',
 header=False, index=False)
#以上④部追加 


  • LSTM 豪ドル/円 予測ファイル動作確認 に進む
  • Python LSTM 豪ドル/円 予測 に戻る
  • LSTM モデルファイル動作結果後半 に戻る
  • LSTM モデルファイル動作結果 に戻る
  • LSTM モデルファイルまとめ に戻る
  • compile メソッド に戻る
  • LSTM(Long Short Term Memory)の実現 に戻る
  • LSTM(Long Short Term Memory)概要 に戻る
  • 70VPS に戻る